Skip to content

Architecture Layers

Domain Layer

The domain layer contains pure business logic with zero I/O dependencies. Everything here is deterministic and testable without mocks.

Value Objects

Immutable types that carry protocol semantics:

// @nekte/core - Domain Value Objects
export interface TokenBudget {
readonly max_tokens: number;
readonly detail_level: 'minimal' | 'compact' | 'full';
}
export interface CapabilityRef {
readonly id: string;
readonly cat: string;
readonly h: string; // version hash
}

Aggregate Root: TaskEntry

TaskEntry is the central aggregate that enforces task lifecycle rules. It validates every state transition and rejects illegal ones.

// Valid transitions defined as const for exhaustive checking
const VALID_TRANSITIONS = {
pending: ['accepted', 'cancelled', 'failed'],
accepted: ['running', 'cancelled', 'failed'],
running: ['completed', 'suspended', 'cancelled', 'failed'],
suspended: ['running', 'cancelled', 'failed'],
} as const;
export class TaskEntry {
readonly id: string;
private _status: TaskStatus = 'pending';
transition(to: TaskStatus): void {
const allowed = VALID_TRANSITIONS[this._status];
if (!allowed?.includes(to)) {
throw new TaskTransitionError(this._status, to);
}
this._status = to;
}
}

Pure Algorithms

Domain algorithms have no side effects. They take data in, return data out.

// Budget resolution - pure function
export function resolveBudget(
requested: TokenBudget | undefined,
serverDefault: TokenBudget,
): TokenBudget {
if (!requested) return serverDefault;
return {
max_tokens: Math.min(requested.max_tokens, serverDefault.max_tokens),
detail_level: requested.detail_level,
};
}
// Version hash - deterministic
export function canonicalize(schema: unknown): string {
const json = JSON.stringify(schema, Object.keys(schema as object).sort());
return hash(json).slice(0, 8);
}

Ports Layer

Ports are interfaces (TypeScript) or protocols (Python) that define contracts without implementation. They live between the domain and the outside world.

Transport Port

The core outbound port for client communication:

export interface Transport {
rpc(method: string, params: unknown): Promise<JsonRpcResponse>;
stream(method: string, params: unknown): AsyncIterable<SseEvent>;
get(path: string): Promise<unknown>;
close(): Promise<void>;
}

DelegateHandler Port

The inbound port for handling delegated tasks on the server:

export type DelegateHandler = (
task: DelegateTask,
stream: StreamWriter,
context: HandlerContext,
signal: AbortSignal, // required -- every handler gets a signal
) => Promise<void>;

StreamWriter Port

Defines how streaming output is written, regardless of transport:

export interface StreamWriter {
progress(processed: number, total: number, message?: string): void;
partial(data: unknown, resolvedLevel?: string): void;
complete(taskId: string, out: MultiLevelResult): void;
error(taskId: string, error: NekteProtocolError): void;
cancelled(taskId: string, previousStatus: string, reason: string): void;
}

Application Layer

The application layer orchestrates domain logic through ports. It never touches infrastructure directly.

NekteClient

export class NekteClient {
// Injected via constructor -- depends on Transport PORT, not HttpTransport
constructor(
endpoint: string,
options?: { transport?: Transport; headers?: Record<string, string> },
) {}
// Uses Transport.rpc() -- doesn't know if it's HTTP or gRPC
async catalog(): Promise<DiscoverResult> {
return this.transport.rpc('nekte.discover', { level: 0 });
}
// Delegates to CapabilityCache (application service) for caching
async invoke(cap: string, opts: InvokeOptions): Promise<InvokeResult> {
const cached = this.cache.get(cap);
// ... orchestration logic
}
}

NekteServer

export class NekteServer {
private capabilities = new CapabilityRegistry(); // Domain Service
private tasks = new TaskRegistry(); // Domain Service + Repository
capability(id: string, config: CapabilityConfig): void {
this.capabilities.register(id, config); // Domain operation
}
onDelegate(handler: DelegateHandler): void {
this.delegateHandler = handler; // Store the port implementation
}
}

Adapters Layer

Adapters implement ports using real infrastructure. They are the only layer that touches the network, filesystem, or external services.

HttpTransport (Outbound Adapter)

export class HttpTransport implements Transport {
async rpc(method: string, params: unknown): Promise<JsonRpcResponse> {
const res = await fetch(this.endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json', ...this.headers },
body: JSON.stringify({ jsonrpc: '2.0', method, params, id: this.nextId() }),
});
return res.json();
}
async *stream(method: string, params: unknown): AsyncIterable<SseEvent> {
// SSE implementation using EventSource or fetch streaming
}
}

GrpcTransport (Outbound Adapter)

export class GrpcTransport implements Transport {
async rpc(method: string, params: unknown): Promise<JsonRpcResponse> {
// Uses @grpc/grpc-js client to make unary RPC
return new Promise((resolve, reject) => {
this.client.rpc({ method, params: JSON.stringify(params) },
(err, response) => err ? reject(err) : resolve(JSON.parse(response.result))
);
});
}
}

SseStreamWriter (Inbound Adapter)

export class SseStreamWriter implements StreamWriter {
constructor(private res: ServerResponse) {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
}
progress(processed: number, total: number, message?: string): void {
this.res.write(`event: progress\ndata: ${JSON.stringify({ processed, total, message })}\n\n`);
}
complete(taskId: string, out: MultiLevelResult): void {
this.res.write(`event: complete\ndata: ${JSON.stringify({ task_id: taskId, out })}\n\n`);
this.res.end();
}
}

How to Add a New Transport

Adding a transport means implementing the Transport port:

  1. Create a class that implements Transport (the port interface)
  2. Implement rpc(), stream(), get(), close()
  3. Pass it to NekteClient via the transport constructor option
  4. Do not modify domain or application code
// Example: NATS transport adapter
export class NatsTransport implements Transport {
constructor(private nc: NatsConnection) {}
async rpc(method: string, params: unknown): Promise<JsonRpcResponse> {
const msg = await this.nc.request('nekte.rpc', JSON.stringify({ method, params }));
return JSON.parse(msg.data.toString());
}
// ... implement stream(), get(), close()
}
// Usage -- NekteClient doesn't change
const client = new NekteClient('nats://localhost:4222', {
transport: new NatsTransport(nc),
});