33 — Streaming & Cancellation
End-to-end protocol for streaming inference results from llama.cpp/MLX through the Rust runtime, across the Tauri IPC boundary, into the frontend, and being able to cancel cleanly at any point.
Real chat apps stand or fall on this. Get it wrong and tokens stutter, cancellation hangs, or memory leaks accumulate.
Surface
The SDK exposes streaming via async generators:
import { llm } from '@locara/sdk'
const stream = llm.chatStream({
model: 'qwen2.5-3b-q4',
messages: [...],
signal: abortController.signal, // optional
})
for await (const delta of stream) {
console.log(delta.content)
// delta: { content?: string, tool_calls?: ToolCall[], done?: boolean, usage?: {...} }
}
Three other modalities use the same shape:
transcribe.live(...)— STT streamembed.create({ stream: true })— chunked embedding generationtts.synthesize(...)— audio chunks
Layers (top to bottom)
[App frontend (TS, async generator)]
↑ ReadableStream chunks
│
[@locara/sdk (TS shim)]
↑ Tauri Channel<Chunk>
│
[locara-runtime (Rust, Tauri command)]
↑ tokio::sync::mpsc
│
[locara-core (inference loop, llama.cpp/MLX FFI)]
↑ per-token callback
│
[llama.cpp (C, native callback)]
Each transition is one hop. Errors and cancellation propagate cleanly through every hop.
Tauri Channel (the IPC primitive)
Tauri 2.0 provides Channel<T> for streaming command results. The Rust side returns a Channel, the frontend gets handed a corresponding object that emits messages.
// Rust side
#[tauri::command]
async fn locara_llm_chat_stream(
app: AppHandle,
args: ChatArgs,
channel: Channel<TokenDelta>,
) -> Result<(), Error> {
let cancel = CancellationToken::new();
// ... spawn inference task that sends to channel
// ... return when stream completes or cancellation requested
Ok(())
}
// TS side (inside @locara/sdk)
import { invoke, Channel } from '@tauri-apps/api/core'
async function* chatStream(args: ChatArgs, signal?: AbortSignal): AsyncIterable<TokenDelta> {
const channel = new Channel<TokenDelta>()
const promise = invoke('locara_llm_chat_stream', { args, channel })
// Bridge channel events to async generator
const queue: TokenDelta[] = []
let done = false
channel.onmessage = (delta) => {
queue.push(delta)
if (delta.done) done = true
}
// Wire up cancellation
signal?.addEventListener('abort', () => {
invoke('locara_cancel_stream', { streamId: channel.id })
})
while (!done) {
if (queue.length === 0) {
await new Promise(resolve => setTimeout(resolve, 16))
continue
}
yield queue.shift()!
}
await promise
}
(Pseudocode — production version uses proper async semaphores, not setTimeout polling.)
Cancellation propagation
The contract: when a frontend AbortController fires, inference stops in <100ms.
Mechanism
- Frontend calls
controller.abort(). - SDK shim invokes
locara_cancel_streamwith the channel ID. - Runtime’s stream registry looks up the inference task and signals its
CancellationToken. - Inference loop checks the token at every token generation (cheap atomic load).
- Loop returns early; releases KV cache; closes the channel.
- Frontend’s async generator throws
AbortError.
Latency target: <100ms
Achievable because:
- Token generation is ~30-100ms per token (so check happens at most once per token).
- Cancellation token check is one atomic load (nanoseconds).
- Channel close + cleanup is microseconds.
If cancellation doesn’t fire within 100ms, the runtime forcibly drops the channel and the inference task is left to finish but its output is discarded (CPU work wasted but no semantic correctness impact).
Cleanup invariants
After cancellation:
- KV cache freed.
- Token output discarded (not delivered to frontend).
- Model stays loaded (not unloaded — the model itself is fine; only the inflight request is dropped).
- No outstanding promises in the SDK.
- No leaked Tauri channels.
Error propagation
Errors at any layer must surface to the SDK consumer cleanly:
| Error origin | Becomes | Frontend sees |
|---|---|---|
| Inference engine error | Rust Result::Err | Promise rejection from async generator |
| Capability denied | CapabilityDeniedError | Throw with capability name |
| Model not loaded | ResourceNotAvailableError | Throw with model + memory info |
| Channel closed unexpectedly | StreamError | Throw |
| Frontend AbortController | AbortError | Throw AbortError (DOM standard) |
| App quit mid-stream | (channel closes silently) | Promise rejects |
Errors are typed; consumers can try/catch and discriminate.
Backpressure
The frontend may be slower than the inference loop (e.g., rendering markdown is non-trivial). The protocol handles backpressure:
- Tauri Channel is bounded (default ~256 items).
- When the channel is full, the Rust inference loop blocks on
send().await. - This naturally pauses inference until the frontend catches up.
Effective rate is min(inference speed, frontend consumption speed). No memory blowup.
Multi-stream concurrency within an app
An app may run multiple streams in parallel (e.g., chat response while live transcription is ongoing). Each stream:
- Has its own channel.
- Has its own cancellation token.
- Has its own inference task.
- Shares the model (one in-memory copy).
Concurrency is bounded by the runtime’s parallel-inference policy (see 32-resource-policy.md).
Stream lifecycle states
pending → streaming → (completed | cancelled | errored)
Once in a terminal state, the stream is closed and the channel cleaned up. Reuse is not supported; each call creates a new stream.
SDK conveniences
The raw async generator is the foundation. The SDK adds conveniences:
// Collect all tokens into a single string
const fullText = await llm.chat({ ... }) // non-streaming variant
// React-friendly hook (in @locara/components)
function ChatComponent() {
const { tokens, isStreaming, cancel } = useStreamedChat({ ... })
// ...
}
// Tee a stream into multiple consumers
const [a, b] = teeStream(stream)
Streaming for non-LLM modalities
transcribe.live
const stream = transcribe.live({ model: 'whisper-large-v3-q4' })
for await (const segment of stream) {
// segment: { text, start, end, isFinal }
}
Same protocol; chunks are partial transcription segments. STT differs from LLM in that segments may be revised (e.g., live Whisper can update earlier segments as more context arrives). The isFinal flag distinguishes.
embed.create({ stream: true })
For batches of texts, stream embeddings as they’re computed:
const stream = embed.create({ texts, stream: true })
for await (const item of stream) {
// item: { index, embedding }
}
Useful when embedding millions of documents and progress UI matters.
tts.synthesize
Streams audio chunks:
const stream = tts.synthesize({ text, model: 'kokoro-tts' })
for await (const chunk of stream) {
audioPlayer.appendBuffer(chunk.audioBytes)
}
Critical for low-latency voice apps; first audio chunk arrives in <200ms.
Resource accounting during streams
While a stream is active:
- KV cache memory counts toward the app’s RAM budget.
- The model stays “in use” (not eligible for eviction).
- The runtime’s resource monitor shows “active inference.”
When the stream closes, all resources are released.
Failure modes + UX
Network not involved (typical case)
Streams are fully local. Network outages don’t affect them. This is one of the privacy-thesis benefits — your transcription doesn’t stop when WiFi drops.
Model corruption mid-stream
If the model produces bad tokens (rare but possible), the inference engine returns an error. SDK throws; consumer can retry.
Resource exhaustion mid-stream
If memory pressure forces eviction during a stream:
- Active streams are protected (their model isn’t evicted).
- New streams may fail to start with
ResourceNotAvailableError. - Already-running stream completes normally.
This protects the user from “I asked it to translate this paragraph and it stopped halfway.”
App crash mid-stream
If the app process dies:
- Tauri channel closes.
- Runtime detects closure, cancels the inference task.
- All resources cleaned up.
No orphan inference processes.
Testing
Streaming + cancellation is heavily covered by 30-testing-strategy.md:
- Unit tests for the SDK shim (mock Tauri channel).
- Integration tests for the Rust runtime (real channel, fake inference engine).
- E2E tests with real models and real cancellation.
- Property tests: random cancellation timings should never leak resources.
- Stress tests: many parallel streams + cancellations.
Open questions
- (open) Should streams be resumable (e.g., reconnect after a tab refresh)? Probably not v1; stateless model. Resumable streams would require server-side state.
- (open) WebSocket-style binary streaming for TTS? Tauri Channel handles binary fine; just need to confirm performance.
- (open) Backpressure tuning — default 256-item channel might be too small for fast inference; profile and adjust.
Cross-references
- SDK API surface: 05-sdk.md
- Runtime architecture: 07-runtime.md
- Resource policy (memory during streams): 32-resource-policy.md
- Testing this: 30-testing-strategy.md
- Tauri Channel docs: https://v2.tauri.app/develop/calling-frontend/#channels