API Reference
Complete API documentation for @selfagency/llm-stream-parser. All exports are available from the root or via subpath imports.
Module Exports
Root export
import * as llmStreamParser from '@selfagency/llm-stream-parser';Subpath exports
import { ThinkingParser } from '@selfagency/llm-stream-parser/thinking';
import { createXmlStreamFilter, XmlStreamFilter } from '@selfagency/llm-stream-parser/xml-filter';
import { extractXmlToolCalls, buildXmlToolSystemPrompt } from '@selfagency/llm-stream-parser/tool-calls';
import {
splitLeadingXmlContextBlocks,
dedupeXmlContextBlocksByTag,
stripXmlContextTags,
} from '@selfagency/llm-stream-parser/context';
import {
parseJson,
validateJsonSchema,
buildFormatInstructions,
buildRepairPrompt,
pipe,
streamJson,
zodToJsonSchema,
validateWithZod,
repairWithLLM,
} from '@selfagency/llm-stream-parser/structured';
import { sanitizeNonStreamingModelOutput, formatXmlLikeResponseForDisplay } from '@selfagency/llm-stream-parser/formatting';
import { LLMStreamProcessor } from '@selfagency/llm-stream-parser/processor';
import { appendToBlockquote } from '@selfagency/llm-stream-parser/markdown';
import { processStream } from '@selfagency/llm-stream-parser/adapters';Thinking Extraction
Subpath:
@selfagency/llm-stream-parser/thinking
Streaming-first parser for extracting reasoning sections from LLM responses.
ThinkingParser
export interface ThinkingParserOptions {
openingTag?: string; // Default: '<think>'
closingTag?: string; // Default: '</think>'
}
export class ThinkingParser {
constructor(options?: ThinkingParserOptions);
// Process a chunk, returning [thinkingContent, regularContent]
addContent(chunk: string): [string, string];
// Flush any buffered content
flush(): [string, string];
// Reset parser state
reset(): void;
// Static factory for model-specific tag detection
static forModel(modelId: string, thinkingTagMap?: Map<string, [string, string]>): ThinkingParser;
// Access tags
readonly openingTag: string;
readonly closingTag: string;
}Example:
const parser = new ThinkingParser({ openingTag: '<think>', closingTag: '</think>' });
for await (const chunk of stream) {
const [thinking, content] = parser.addContent(chunk);
displayThinking(thinking);
displayContent(content);
}
const [finalThinking, finalContent] = parser.flush();Model detection:
// Automatically detect thinking tags for common models
const parser = ThinkingParser.forModel('deepseek'); // <think></think>
const parser2 = ThinkingParser.forModel('granite'); // <|thinking|></|thinking|>
const parser3 = ThinkingParser.forModel('claude-opus'); // <think></think>
// Custom model mappings
const customMap = new Map([['my-model', ['<reasoning>', '</reasoning>']]]);
const parser4 = ThinkingParser.forModel('my-model', customMap);XML/Context Filtering
Subpath:
@selfagency/llm-stream-parser/xml-filter+@selfagency/llm-stream-parser/context
Stream-safe XML context block filtering and deduplication.
XmlStreamFilter (xml-filter)
export interface XmlStreamFilterOptions {
extraScrubTags?: Set<string>;
overrideScrubTags?: Set<string>;
enforcePrivacyTags?: boolean; // Default: true
maxXmlNestingDepth?: number;
onWarning?: (message: string, context?: Record<string, unknown>) => void;
}
export interface XmlStreamFilter {
// Write a chunk and get scrubbed output
write(chunk: string): string;
// Finalize and flush any buffered content
end(): string;
}
export function createXmlStreamFilter(options?: XmlStreamFilterOptions): XmlStreamFilter;Example:
const filter = createXmlStreamFilter({
enforcePrivacyTags: true,
maxXmlNestingDepth: 10,
});
for await (const chunk of stream) {
const filtered = filter.write(chunk);
output.write(filtered);
}
const final = filter.end();
output.write(final);Context Functions (context)
// Split leading context blocks from content
function splitLeadingXmlContextBlocks(input: string): {
contextBlocks: string[];
remaining: string;
};
// Deduplicate context blocks by tag (accepts array of XML block strings)
function dedupeXmlContextBlocksByTag(blocks: string[]): string[];
// Remove all XML context tags
function stripXmlContextTags(input: string): string;Example:
const { contextBlocks, remaining } = splitLeadingXmlContextBlocks(response);
const deduplicated = dedupeXmlContextBlocksByTag(contextBlocks);
const stripped = stripXmlContextTags(remaining);Tool-Call Extraction
Subpath:
@selfagency/llm-stream-parser/tool-calls
Extract structured tool calls from XML format in responses.
Types
export interface XmlToolCall {
name: string;
parameters: Record<string, unknown>;
format: 'bare-xml' | 'json-wrapped';
}Functions
export interface XmlToolInfo {
name: string;
description?: string;
inputSchema?: {
properties?: Record<string, { description?: string; type?: string }>;
required?: string[];
};
}
// Extract tool calls from response text
function extractXmlToolCalls(input: string, knownTools: Set<string>): XmlToolCall[];
// Build system prompt for tool use
function buildXmlToolSystemPrompt(tools: readonly XmlToolInfo[]): string;Example:
const toolCalls = extractXmlToolCalls(response, new Set(['search', 'edit_file']));
for (const call of toolCalls) {
console.log(`Calling ${call.name} with:`, call.parameters);
const result = await executeToolInHost(call.name, call.parameters);
}
// Generate system prompt for models
const tools: XmlToolInfo[] = [
{
name: 'search',
description: 'Search the web',
inputSchema: { properties: { query: { type: 'string' } }, required: ['query'] },
},
{ name: 'edit_file', description: 'Edit a file' },
{ name: 'run_tests', description: 'Run tests' },
];
const systemPrompt = buildXmlToolSystemPrompt(tools);Structured Output Parsing
Subpath:
@selfagency/llm-stream-parser/structured
JSON parsing with schema validation, repair prompts, and composable pipelines.
parseJson
export interface ParseJsonOptions {
selectMostComprehensive?: boolean; // Default: true
repairIncomplete?: boolean;
maxJsonDepth?: number; // Default: 64
maxJsonKeys?: number; // Default: 10_000
}
function parseJson(text: string, options?: ParseJsonOptions): unknown | null;Returns parsed JSON or null if parsing fails.
Example:
const data = parseJson('{"name": "test"}');
if (data !== null) {
console.log(data);
}
// With limits
const limitedData = parseJson(text, {
maxJsonDepth: 10,
maxJsonKeys: 100,
selectMostComprehensive: true,
});validateJsonSchema
export interface ValidateJsonSchemaOptions extends ParseJsonOptions {
validator?: JsonSchemaValidator;
validatorTimeoutMs?: number; // Reserved — not currently enforced
}
function validateJsonSchema<T = unknown>(
text: string,
schema: Record<string, unknown>,
options?: ValidateJsonSchemaOptions,
): { success: true; data: T } | { success: false; errors: string[] };Example:
const schema = {
type: 'object',
properties: {
name: { type: 'string' },
age: { type: 'number' },
},
};
const result = validateJsonSchema(response, schema);
if (result.success) {
console.log('Valid:', result.data);
} else {
console.error('Invalid:', result.errors);
}buildFormatInstructions
function buildFormatInstructions(schema: Record<string, unknown>): string;Generate instructions for models to format output according to schema.
buildRepairPrompt
function buildRepairPrompt(options: {
failedOutput: string;
error: string;
schema?: Record<string, unknown>;
originalPrompt?: string;
}): string;Example:
let result = validateJsonSchema(response, schema);
if (!result.success) {
const repairPrompt = buildRepairPrompt({
failedOutput: response,
error: result.errors[0],
schema,
originalPrompt: originalUserPrompt,
});
// Ask model to fix
response = await chat([...messages, { role: 'user', content: repairPrompt }]);
result = validateJsonSchema(response, schema);
}pipe
function pipe<T>(...fns: Array<(input: T) => T>): (input: T) => T;Compose multiple parser functions.
Example:
const parsed = parseJson(response);
if (parsed === null) throw new Error('Parse failed');
const result = validateJsonSchema(JSON.stringify(parsed), schema);
if (!result.success) throw new Error('Validation failed');
const data = result.data;streamJson
export interface StreamJsonOptions extends ParseJsonOptions {
emitPartials?: boolean; // Default: true
}
export interface StreamJsonResult<T = unknown> {
value: T;
isPartial: boolean;
}
async function* streamJson<T = unknown>(
source: AsyncIterable<string>,
options?: StreamJsonOptions,
): AsyncGenerator<StreamJsonResult<T>>;Incrementally parse JSON from a text stream, yielding partial and complete objects as chunks arrive. Only emits when the parsed value changes.
Example:
import { streamJson } from '@selfagency/llm-stream-parser/structured';
for await (const result of streamJson<{ name: string }>(textStream)) {
console.log(result.isPartial ? '(partial)' : '(complete)', result.value);
}repairWithLLM
export interface AutoRepairOptions extends ValidateJsonSchemaOptions {
maxAttempts?: number; // Default: 3
originalPrompt?: string;
}
export interface AutoRepairResult<T = unknown> {
success: boolean;
data?: T;
errors?: string[];
attempts: number;
}
async function repairWithLLM<T = unknown>(
initialOutput: string,
schema: Record<string, unknown>,
callLLM: (repairPrompt: string) => Promise<string>,
options?: AutoRepairOptions,
): Promise<AutoRepairResult<T>>;Automatically retries parsing and validation by sending repair prompts to the LLM. Follows the LangChain OutputFixingParser / RetryParser pattern.
Example:
import { repairWithLLM } from '@selfagency/llm-stream-parser/structured';
const result = await repairWithLLM(llmOutput, schema, async prompt => await callModel(prompt), {
maxAttempts: 3,
originalPrompt: 'Return a person object',
});
if (result.success) {
console.log('Parsed:', result.data);
}Zod Integration
Optional Zod schema support. Requires zod and zod-to-json-schema as peer dependencies.
export async function zodToJsonSchema(zodSchema: ZodLike): Promise<Record<string, unknown>>;
export async function validateWithZod<T = unknown>(
text: string,
zodSchema: ZodLike,
options?: ValidateJsonSchemaOptions,
): Promise<{ success: true; data: T } | { success: false; errors: string[] }>;Example:
import { z } from 'zod';
import { validateWithZod } from '@selfagency/llm-stream-parser/structured';
const PersonSchema = z.object({ name: z.string(), age: z.number() });
const result = await validateWithZod(response, PersonSchema);Stream Processing
Subpath:
@selfagency/llm-stream-parser/processor
Orchestrate multiple parsers to process complete LLM stream responses.
Types
export interface StreamChunk {
content?: string;
thinking?: string;
tool_calls?: Array<{ function?: { name?: string; arguments?: unknown } }>;
done?: boolean;
}
export interface ProcessorOptions {
parseThinkTags?: boolean; // Default: true
scrubContextTags?: boolean; // Default: true
extraScrubTags?: Set<string>;
overrideScrubTags?: Set<string>;
enforcePrivacyTags?: boolean; // Default: true
knownTools?: Set<string>;
modelId?: string;
thinkingOpenTag?: string;
thinkingCloseTag?: string;
thinkingTagMap?: Map<string, [string, string]>;
onWarning?: (message: string, context?: Record<string, unknown>) => void;
maxInputLength?: number; // Default: 256 KB
maxToolCallsPerMessage?: number; // Default: 64
maxToolArgumentBytes?: number; // Default: 128 KB
maxWarnings?: number; // Default: 100
maxXmlNestingDepth?: number; // Default: 64
}
export interface ProcessedOutput {
thinking: string;
content: string;
toolCalls: XmlToolCall[];
done: boolean;
parts: Array<
{ type: 'text'; text: string } | { type: 'thinking'; text: string } | { type: 'tool_call'; call: XmlToolCall }
>;
}
export type StreamEventMap = {
text: (delta: string) => void;
thinking: (delta: string) => void;
tool_call: (call: XmlToolCall) => void;
done: () => void;
warning: (message: string, context?: Record<string, unknown>) => void;
};LLMStreamProcessor
export class LLMStreamProcessor {
constructor(options?: ProcessorOptions);
// Process a single chunk
process(chunk: StreamChunk): ProcessedOutput;
// Process a complete response (calls process + flush)
processComplete(response: StreamChunk): ProcessedOutput;
// Flush any buffered content
flush(): ProcessedOutput;
// Reset processor state
reset(): void;
// Get accumulated thinking
get accumulatedThinking(): string;
// Get accumulated message
get accumulatedMessage(): {
thinking: string;
content: string;
toolCalls: XmlToolCall[];
};
// Subscribe to events
on<K extends keyof StreamEventMap>(event: K, listener: StreamEventMap[K]): this;
// Unsubscribe from events
off<K extends keyof StreamEventMap>(event: K, listener: StreamEventMap[K]): this;
}Example:
const processor = new LLMStreamProcessor({
parseThinkTags: true,
knownTools: new Set(['search', 'edit']),
modelId: 'claude-opus',
});
// Subscribe to events
processor.on('thinking', delta => console.log('[thinking]', delta));
processor.on('text', delta => console.log('[text]', delta));
processor.on('tool_call', call => console.log('[tool]', call.name));
// Process stream
for await (const chunk of apiStream) {
const output = processor.process({
content: chunk.content,
thinking: chunk.thinking,
done: chunk.done,
});
if (output.done) {
console.log('Final thinking:', processor.accumulatedThinking);
break;
}
}
// Or process complete response
const result = processor.processComplete({
content: fullResponse,
done: true,
});Formatting & Helpers
Subpath:
@selfagency/llm-stream-parser/formatting
Output normalization and display formatting.
// Normalize a non-streaming response for consistency
function sanitizeNonStreamingModelOutput(input: string): string;
// Format XML-like responses for display
function formatXmlLikeResponseForDisplay(input: string): string;Example:
const normalized = sanitizeNonStreamingModelOutput(response);
const formatted = formatXmlLikeResponseForDisplay(normalized);
console.log(formatted);Markdown Processing
Subpath:
@selfagency/llm-stream-parser/markdown
Utilities for working with markdown content.
// Format text as a markdown blockquote, prefixing lines with '> '
function appendToBlockquote(text: string, atLineStart: boolean): string;Example:
// atLineStart=true adds '> ' prefix at the beginning
const quoted = appendToBlockquote('some text\nmore text', true);
// Result: '> some text\n> more text'Adapters
Subpath:
@selfagency/llm-stream-parser/adapters
Pre-built adapters for common streaming patterns.
Generic Adapter
export async function* processStream(
source: AsyncIterable<StreamChunk>,
options?: ProcessorOptions
): AsyncGenerator<ProcessedOutput>;Example:
import { processStream } from '@selfagency/llm-stream-parser/adapters';
for await (const output of processStream(apiStream, options)) {
console.log('Thinking:', output.thinking);
console.log('Content:', output.content);
console.log('Tool calls:', output.toolCalls);
}Callback-Based Adapter
export interface GenericAdapterCallbacks {
onThinking?: (text: string) => void | Promise<void>;
onContent?: (text: string) => void | Promise<void>;
onToolCall?: (call: XmlToolCall) => void | Promise<void>;
onDone?: () => void | Promise<void>;
}
export interface GenericAdapterOptions extends ProcessorOptions {
showThinking?: boolean; // Default: true
}
function createGenericAdapter(
callbacks: GenericAdapterCallbacks,
options?: GenericAdapterOptions,
): { write(chunk: StreamChunk): Promise<void>; end(): Promise<void> };Environment-agnostic callback adapter. Use for HTTP SSE, WebSocket, CLI, or any non-VS Code environment.
Example:
import { createGenericAdapter } from '@selfagency/llm-stream-parser/adapters';
const adapter = createGenericAdapter({
onContent: text => process.stdout.write(text),
onToolCall: call => handleTool(call),
onDone: () => console.log('\n[Done]'),
});
for await (const chunk of apiStream) {
await adapter.write(chunk);
}
await adapter.end();Error Handling
All parsing functions handle errors gracefully:
parseJsonreturnsnullif parsing failsvalidateJsonSchemareturns{ success: false; errors }on validation failureextractXmlToolCallsreturns empty array if no valid calls found- Streaming processors emit
warningevents for non-critical issues
Example:
const data = parseJson(response);
if (data === null) {
console.error('Failed to parse JSON');
}
const validation = validateJsonSchema(response, schema);
if (!validation.success) {
console.error('Validation failed:', validation.errors);
}Performance & Safety
Limits
All parsers enforce configurable limits to prevent DoS:
maxJsonDepth: Maximum nesting depth (default: 64)maxJsonKeys: Maximum object keys (default: 10,000)maxInputLength: Maximum input size (default: 256 KB)maxXmlNestingDepth: Maximum XML nesting (default: 64)maxWarnings: Maximum warnings emitted per processor lifetime (default: 100)maxToolCallsPerMessage: Maximum tool calls (default: 64)maxToolArgumentBytes: Maximum tool argument size (default: 128 KB)
ReDoS Protection
Schema regex patterns longer than 1024 characters are rejected during validation to prevent Regular Expression Denial of Service attacks. This applies to pattern properties in JSON Schema validated via validateJsonSchema.
Privacy
Context scrubbing is enabled by default. Privacy-sensitive tags are always enforced even when overrideScrubTags is provided (unless enforcePrivacyTags is set to false). A warning is emitted when an unsafe override is corrected.
const processor = new LLMStreamProcessor({
scrubContextTags: true, // Enabled by default
enforcePrivacyTags: true, // Enforce privacy markers
extraScrubTags: new Set([...]), // Add custom tags to scrub
});Type Safety
Full TypeScript support with strict types:
import type { XmlToolCall, ProcessedOutput, StreamChunk } from '@selfagency/llm-stream-parser';
const output: ProcessedOutput = processor.process(chunk);
const calls: XmlToolCall[] = output.toolCalls;