Skip to content

Streaming Delegation

Task delegation lets one agent hand off a complex task to another, with real-time streaming of progress, partial results, and lifecycle control (cancel, suspend, resume).

Quick Example

// Client: delegate and stream results
const stream = client.delegateStream({
id: 'task-001',
desc: 'Analyze 500 customer reviews',
timeout_ms: 30_000,
budget: { max_tokens: 500, detail_level: 'compact' },
});
for await (const event of stream.events) {
switch (event.event) {
case 'progress':
console.log(`${event.data.processed}/${event.data.total}`);
break;
case 'partial':
console.log('Partial:', event.data.out);
break;
case 'complete':
console.log('Result:', event.data.out);
break;
case 'cancelled':
console.log('Cancelled:', event.data.reason);
break;
}
}

Server Handler

The delegate handler receives four arguments:

server.onDelegate(async (task, stream, context, signal) => {
// task - TaskEntry with id, desc, budget
// stream - StreamWriter for sending events
// context - HandlerContext with optional context envelope
// signal - AbortSignal for cooperative cancellation
});

Full Example

server.onDelegate(async (task, stream, context, signal) => {
const reviews = await loadReviews(context.data?.reviews_url);
for (let i = 0; i < reviews.length; i++) {
// Check for cancellation at each iteration
if (signal.aborted) {
stream.cancelled(task.id, 'running', 'Client cancelled');
return;
}
// Send progress updates
stream.progress(i + 1, reviews.length, `Processing review ${i + 1}`);
await analyzeReview(reviews[i]);
}
// Send the final result with multi-level compression
stream.complete(task.id, {
minimal: `${reviews.length} reviews analyzed, 72% positive`,
compact: {
total: reviews.length,
positive: 360,
negative: 90,
neutral: 50,
score: 0.72,
},
});
});

Stream Events

EventWhenData
progressPeriodic updates{ processed, total, message }
partialIntermediate results{ out, resolved_level }
completeTask finished{ task_id, status, out, meta }
cancelledTask cancelled{ task_id, reason, previous_status }
suspendedTask suspended{ task_id, checkpoint_available }
resumedTask resumed{ task_id, from_checkpoint }

Cancel a Running Task

The client can cancel at any time. Cancellation is cooperative — the handler’s AbortSignal fires, and the handler should stop gracefully.

const stream = client.delegateStream({ id: 'task-002', desc: 'Big analysis' });
for await (const event of stream.events) {
if (event.event === 'progress' && event.data.processed > 100) {
await stream.cancel('Got enough data');
break;
}
}

You can also cancel by task ID without a stream reference:

await client.taskCancel('task-002', 'Budget exceeded');

Passing Context

Delegate requests can include a context envelope with permissions and TTL:

const stream = client.delegateStream({
id: 'task-003',
desc: 'Translate with user preferences',
context: {
data: { lang: 'es', tone: 'formal' },
permissions: { forward: false, persist: false, derive: true },
ttl_s: 3600,
},
});

The handler receives this context:

server.onDelegate(async (task, stream, context, signal) => {
const lang = context.data?.lang ?? 'en';
// context.permissions enforced by the server
});

Transport Behavior

TransportStreaming MechanismNotes
HTTPServer-Sent Events (SSE)Default. Connection stays open until terminal event.
gRPCServer-streaming RPCDelegate() returns stream DelegateEvent.
WebSocketMessages on open connectionEvents sent as JSON frames.

The DelegateStream API is identical regardless of transport:

// Works the same over HTTP, gRPC, or WebSocket
for await (const event of stream.events) { /* ... */ }
await stream.cancel('reason');

Error Handling

If the handler throws, the server emits a failed event and transitions the task to the failed state:

server.onDelegate(async (task, stream, context, signal) => {
throw new Error('Out of memory');
// → event: error
// → data: {"task_id":"...","error":"Out of memory"}
// → task status: "failed"
});

Best Practices

  1. Check signal.aborted in every loop iteration. Cancellation is cooperative.
  2. Send progress frequently. Clients use progress events to show UI updates and decide whether to cancel.
  3. Use partial for intermediate results. If a long task produces useful intermediate output, send it as partial so the client can use it even if the task is later cancelled.
  4. Set realistic timeouts. The timeout_ms on the delegate request prevents runaway tasks.
  5. Use multi-level results. Always provide at least minimal and compact in stream.complete() so the client’s budget is respected.