Skip to main content

Streaming

liblab-generated SDKs provide built-in support for streaming responses, enabling your applications to consume real-time data from APIs that support Server-Sent Events (SSE) or other streaming protocols. This allows you to process data incrementally as it arrives, rather than waiting for the entire response to complete.

What is Streaming?

Streaming is a technique where data is sent from the server to the client in chunks over a persistent connection, rather than as a single complete response. This is particularly useful for:

  • Real-time AI responses: Large Language Models (LLMs) that stream tokens as they're generated
  • Long-running operations: Progress updates for tasks that take time to complete
  • Live data feeds: Continuous data streams like logs, metrics, or events
  • Improved user experience: Users see results immediately rather than waiting for complete responses

How Streaming Works

When an API endpoint supports streaming, the server keeps the connection open and sends data incrementally. The liblab-generated SDK handles the complexity of:

  1. Establishing and maintaining the persistent connection
  2. Parsing streaming data formats (like Server-Sent Events)
  3. Deserializing each chunk into strongly-typed objects
  4. Providing an idiomatic interface for your programming language

Configuring Streaming Endpoints

liblab automatically generates streaming methods in your SDK when it detects streaming capabilities in your OpenAPI specification. An operation will be converted to a streaming method when either of the following conditions is met:

Content Type Detection

The operation's response includes the text/event-stream content type:

paths:
/api/generate:
post:
responses:
'200':
description: Streaming response
content:
text/event-stream:
schema:
$ref: '#/components/schemas/GenerateResponse'

Explicit Streaming Annotation

You can explicitly mark an operation as streaming using the x-liblab-streaming extension at the operation level:

paths:
/api/generate:
post:
x-liblab-streaming: true
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/GenerateResponse'

This annotation is useful when your API returns application/json but still streams data incrementally, or when you want to ensure streaming behavior regardless of the content type.

Language-Specific Implementations

TypeScript Implementation

TypeScript SDKs generated by liblab use async generators (AsyncGenerator) for streaming responses. This provides type-safe iteration over streaming data with native JavaScript async/await support.

Basic Streaming Usage with for-await-of

Consume streaming responses using JavaScript's for await...of loop:

import { StreamingSdk } from 'streaming-sdk';

// Initialize the client
const sdk = new StreamingSdk({
baseUrl: 'https://api.example.com',
token: 'your-api-token'
});

// Create a request
const request = {
model: 'gpt-4o',
prompt: 'Say hello world'
};

// Process streaming responses
const results = [];
for await (const response of sdk.api.generate(request)) {
// Each response.data is a fully deserialized model object
console.log('Model:', response.data.model);
console.log('Response:', response.data.response);
console.log('Created at:', response.data.createdAt);

results.push(response.data);
}

console.log(`Received ${results.length} streaming chunks`);
Manual Iterator Control

For more granular control, use the iterator protocol directly:

// Get the stream
const stream = sdk.api.generate({
model: 'gpt-4o',
prompt: 'Say hello world'
});

// Manually advance the iterator
let iterator = await stream.next();
if (!iterator.done) {
console.log('First chunk:', iterator.value.data);
}

iterator = await stream.next();
if (!iterator.done) {
console.log('Second chunk:', iterator.value.data);
}

iterator = await stream.next();
if (iterator.done) {
console.log('Stream completed');
}
Handling Different Response Types

The SDK automatically handles both single-chunk responses (application/json) and multi-chunk streaming responses (text/event-stream):

// Single application/json response
// Server sends: {"model": "gpt-4o", "response": "Hello, world!", "created_at": "2023-01-01T00:00:00Z"}
for await (const response of sdk.api.generate(request)) {
// Yields once with the complete response
console.log(response.data.response); // "Hello, world!"
}

// Multiple text/event-stream chunks
// Server sends:
// data: {"model": "gpt-4o", "response": "Hello", "created_at": "2023-01-01T00:00:00Z"}
// data: {"model": "gpt-4o", "response": "World", "created_at": "2023-01-01T00:00:00Z"}
for await (const response of sdk.api.generate(request)) {
// Yields twice - once per chunk
process.stdout.write(response.data.response + ' '); // "Hello " then "World "
}
Real-time Processing

Process streaming data in real-time as it arrives:

console.log('Generating response...');

for await (const chunk of sdk.api.generate(request)) {
// Process each chunk immediately as it arrives
process.stdout.write(chunk.data.response); // Display token by token

// Perform additional processing
if (chunk.data.response.length > 100) {
console.log('\n[Long response detected]');
}
}

console.log('\nGeneration complete!');
Configuring Retry Behavior

Configure retry attempts for streaming requests:

const stream = sdk.api.generate(
{
model: 'gpt-4o',
prompt: 'Say hello world'
},
{
retry: {
attempts: 3,
delayMs: 1000
}
}
);

for await (const response of stream) {
console.log(response.data.response);
}

Generated Service with Streaming

When your OpenAPI specification defines streaming endpoints, liblab generates service methods that return AsyncGenerator objects. Each yielded item is automatically deserialized into the appropriate model class and wrapped in an HttpResponse object.

import { BaseService } from '../base-service';
import { HttpResponse, RequestConfig } from '../../http/types';
import { RequestBuilder } from '../../http/request-builder';
import { ContentType } from '../../http/content-type';
import { Environment } from '../../http/environment';
import { z } from 'zod';
import { StatusOkResponse } from './models/status-ok-response';
import { GenerateRequest } from './models/generate-request';
import { GenerateResponse } from './models/generate-response';
import {
statusOkResponseResponse,
generateRequestRequest,
generateResponseResponse
} from './schemas';

export class ApiService extends BaseService {
/**
* Get the status of a LLM.
* @param {RequestConfig} [requestConfig] - The request configuration for retry and validation.
* @returns {AsyncGenerator<HttpResponse<StatusOkResponse>>} - OK
*/
public async *status(
requestConfig?: RequestConfig,
): AsyncGenerator<HttpResponse<StatusOkResponse>> {
const request = new RequestBuilder()
.setBaseUrl(
requestConfig?.baseUrl ||
this.config.baseUrl ||
this.config.environment ||
Environment.DEFAULT,
)
.setConfig(this.config)
.setMethod('GET')
.setPath('/api/generate')
.setRequestSchema(z.any())
.setRequestContentType(ContentType.Json)
.addResponse({
schema: statusOkResponseResponse,
contentType: ContentType.Json,
status: 200,
})
.setRetryAttempts(this.config, requestConfig)
.setRetryDelayMs(this.config, requestConfig)
.setResponseValidation(this.config, requestConfig)
.build();
yield* this.client.stream<StatusOkResponse>(request);
}

/**
* Send a prompt to a LLM.
* @param {RequestConfig} [requestConfig] - The request configuration for retry and validation.
* @returns {AsyncGenerator<HttpResponse<GenerateResponse>>} - OK
*/
public async *generate(
body: GenerateRequest,
requestConfig?: RequestConfig,
): AsyncGenerator<HttpResponse<GenerateResponse>> {
const request = new RequestBuilder()
.setBaseUrl(
requestConfig?.baseUrl ||
this.config.baseUrl ||
this.config.environment ||
Environment.DEFAULT,
)
.setConfig(this.config)
.setMethod('POST')
.setPath('/api/generate')
.setRequestSchema(generateRequestRequest)
.setRequestContentType(ContentType.Json)
.addResponse({
schema: generateResponseResponse,
contentType: ContentType.Json,
status: 200,
})
.setRetryAttempts(this.config, requestConfig)
.setRetryDelayMs(this.config, requestConfig)
.setResponseValidation(this.config, requestConfig)
.addHeaderParam({ key: 'Content-Type', value: 'application/json' })
.addBody(body)
.build();
yield* this.client.stream<GenerateResponse>(request);
}
}

Key Features

  • Type Safety: Full TypeScript type support with AsyncGenerator<HttpResponse<T>>
  • Native Async/Await: Works seamlessly with JavaScript's async/await and for-await-of syntax
  • Automatic Parsing: The SDK handles SSE format parsing, including incomplete chunks and multiple data lines
  • Flexible Response Handling: Works with both application/json (single response) and text/event-stream (multiple chunks)
  • Retry Support: Built-in retry mechanism for failed streaming requests
  • Memory Efficient: Async generators don't load the entire dataset into memory

Best Practices

  1. Use for-await-of for simplicity: The for await...of loop provides the cleanest syntax for consuming streams
  2. Handle connection errors: Wrap streaming operations in try-catch blocks to handle network interruptions
  3. Configure timeouts: Set appropriate timeout values in your request configuration
  4. Test both response types: Test your implementation with both single-chunk and multi-chunk responses
  5. Process incrementally: Take advantage of streaming by processing data as it arrives
  6. Use retry configuration: Configure retry attempts for production reliability

Common Use Cases

AI and LLM Integration

Streaming is essential for AI applications where responses are generated incrementally:

// Stream AI-generated content
for await (const chunk of sdk.ai.generateText(prompt)) {
process.stdout.write(chunk.data.text); // Display each token as it's generated
}
Progress Monitoring

Track long-running operations in real-time:

for await (const update of sdk.jobs.process(jobId)) {
console.log(`Progress: ${update.data.percentComplete}%`);
if (update.data.status === 'completed') {
console.log('Job finished!');
break;
}
}
Live Data Feeds

Process continuous data streams:

for await (const logEntry of sdk.monitoring.streamLogs()) {
if (logEntry.data.level === 'ERROR') {
// Handle errors immediately
alertOnError(logEntry.data);
}
}

Error Handling

Handle errors during streaming operations appropriately:

try {
for await (const chunk of sdk.api.stream(request)) {
processChunk(chunk.data);
}
} catch (error) {
if (error.name === 'NetworkError') {
console.error('Connection lost:', error.message);
} else if (error.name === 'ApiError') {
console.error('API error:', error.message);
}
}

Performance Considerations

  • Memory Usage: Streaming responses are memory-efficient because data is processed incrementally
  • Latency: First chunk arrives faster than waiting for a complete response
  • Timeouts: Configure appropriate timeouts for streaming connections
  • Connection Management: The SDK handles connection lifecycle automatically

Next Steps