Architecture Layers
Domain Layer
The domain layer contains pure business logic with zero I/O dependencies. Everything here is deterministic and testable without mocks.
Value Objects
Immutable types that carry protocol semantics:
// @nekte/core - Domain Value Objectsexport interface TokenBudget { readonly max_tokens: number; readonly detail_level: 'minimal' | 'compact' | 'full';}
export interface CapabilityRef { readonly id: string; readonly cat: string; readonly h: string; // version hash}# nekte.core - Domain Value Objectsfrom pydantic import BaseModel
class TokenBudget(BaseModel, frozen=True): max_tokens: int detail_level: Literal["minimal", "compact", "full"]
class CapabilityRef(BaseModel, frozen=True): id: str cat: str h: str # version hashAggregate Root: TaskEntry
TaskEntry is the central aggregate that enforces task lifecycle rules. It validates every state transition and rejects illegal ones.
// Valid transitions defined as const for exhaustive checkingconst VALID_TRANSITIONS = { pending: ['accepted', 'cancelled', 'failed'], accepted: ['running', 'cancelled', 'failed'], running: ['completed', 'suspended', 'cancelled', 'failed'], suspended: ['running', 'cancelled', 'failed'],} as const;
export class TaskEntry { readonly id: string; private _status: TaskStatus = 'pending';
transition(to: TaskStatus): void { const allowed = VALID_TRANSITIONS[this._status]; if (!allowed?.includes(to)) { throw new TaskTransitionError(this._status, to); } this._status = to; }}VALID_TRANSITIONS = { "pending": ["accepted", "cancelled", "failed"], "accepted": ["running", "cancelled", "failed"], "running": ["completed", "suspended", "cancelled", "failed"], "suspended": ["running", "cancelled", "failed"],}
class TaskEntry: def __init__(self, task_id: str): self.id = task_id self._status = "pending"
def transition(self, to: str) -> None: allowed = VALID_TRANSITIONS.get(self._status, []) if to not in allowed: raise TaskTransitionError(self._status, to) self._status = toPure Algorithms
Domain algorithms have no side effects. They take data in, return data out.
// Budget resolution - pure functionexport function resolveBudget( requested: TokenBudget | undefined, serverDefault: TokenBudget,): TokenBudget { if (!requested) return serverDefault; return { max_tokens: Math.min(requested.max_tokens, serverDefault.max_tokens), detail_level: requested.detail_level, };}
// Version hash - deterministicexport function canonicalize(schema: unknown): string { const json = JSON.stringify(schema, Object.keys(schema as object).sort()); return hash(json).slice(0, 8);}Ports Layer
Ports are interfaces (TypeScript) or protocols (Python) that define contracts without implementation. They live between the domain and the outside world.
Transport Port
The core outbound port for client communication:
export interface Transport { rpc(method: string, params: unknown): Promise<JsonRpcResponse>; stream(method: string, params: unknown): AsyncIterable<SseEvent>; get(path: string): Promise<unknown>; close(): Promise<void>;}from typing import Protocol, AsyncIterator
class Transport(Protocol): async def rpc(self, method: str, params: Any) -> JsonRpcResponse: ... def stream(self, method: str, params: Any) -> AsyncIterator[SseEvent]: ... async def get(self, path: str) -> Any: ... async def close(self) -> None: ...DelegateHandler Port
The inbound port for handling delegated tasks on the server:
export type DelegateHandler = ( task: DelegateTask, stream: StreamWriter, context: HandlerContext, signal: AbortSignal, // required -- every handler gets a signal) => Promise<void>;StreamWriter Port
Defines how streaming output is written, regardless of transport:
export interface StreamWriter { progress(processed: number, total: number, message?: string): void; partial(data: unknown, resolvedLevel?: string): void; complete(taskId: string, out: MultiLevelResult): void; error(taskId: string, error: NekteProtocolError): void; cancelled(taskId: string, previousStatus: string, reason: string): void;}Application Layer
The application layer orchestrates domain logic through ports. It never touches infrastructure directly.
NekteClient
export class NekteClient { // Injected via constructor -- depends on Transport PORT, not HttpTransport constructor( endpoint: string, options?: { transport?: Transport; headers?: Record<string, string> }, ) {}
// Uses Transport.rpc() -- doesn't know if it's HTTP or gRPC async catalog(): Promise<DiscoverResult> { return this.transport.rpc('nekte.discover', { level: 0 }); }
// Delegates to CapabilityCache (application service) for caching async invoke(cap: string, opts: InvokeOptions): Promise<InvokeResult> { const cached = this.cache.get(cap); // ... orchestration logic }}NekteServer
export class NekteServer { private capabilities = new CapabilityRegistry(); // Domain Service private tasks = new TaskRegistry(); // Domain Service + Repository
capability(id: string, config: CapabilityConfig): void { this.capabilities.register(id, config); // Domain operation }
onDelegate(handler: DelegateHandler): void { this.delegateHandler = handler; // Store the port implementation }}Adapters Layer
Adapters implement ports using real infrastructure. They are the only layer that touches the network, filesystem, or external services.
HttpTransport (Outbound Adapter)
export class HttpTransport implements Transport { async rpc(method: string, params: unknown): Promise<JsonRpcResponse> { const res = await fetch(this.endpoint, { method: 'POST', headers: { 'Content-Type': 'application/json', ...this.headers }, body: JSON.stringify({ jsonrpc: '2.0', method, params, id: this.nextId() }), }); return res.json(); }
async *stream(method: string, params: unknown): AsyncIterable<SseEvent> { // SSE implementation using EventSource or fetch streaming }}GrpcTransport (Outbound Adapter)
export class GrpcTransport implements Transport { async rpc(method: string, params: unknown): Promise<JsonRpcResponse> { // Uses @grpc/grpc-js client to make unary RPC return new Promise((resolve, reject) => { this.client.rpc({ method, params: JSON.stringify(params) }, (err, response) => err ? reject(err) : resolve(JSON.parse(response.result)) ); }); }}SseStreamWriter (Inbound Adapter)
export class SseStreamWriter implements StreamWriter { constructor(private res: ServerResponse) { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive', }); }
progress(processed: number, total: number, message?: string): void { this.res.write(`event: progress\ndata: ${JSON.stringify({ processed, total, message })}\n\n`); }
complete(taskId: string, out: MultiLevelResult): void { this.res.write(`event: complete\ndata: ${JSON.stringify({ task_id: taskId, out })}\n\n`); this.res.end(); }}How to Add a New Transport
Adding a transport means implementing the Transport port:
- Create a class that implements
Transport(the port interface) - Implement
rpc(),stream(),get(),close() - Pass it to
NekteClientvia thetransportconstructor option - Do not modify domain or application code
// Example: NATS transport adapterexport class NatsTransport implements Transport { constructor(private nc: NatsConnection) {}
async rpc(method: string, params: unknown): Promise<JsonRpcResponse> { const msg = await this.nc.request('nekte.rpc', JSON.stringify({ method, params })); return JSON.parse(msg.data.toString()); }
// ... implement stream(), get(), close()}
// Usage -- NekteClient doesn't changeconst client = new NekteClient('nats://localhost:4222', { transport: new NatsTransport(nc),});