Streaming
liblab-generated SDKs provide built-in support for streaming responses, enabling your applications to consume real-time data from APIs that support Server-Sent Events (SSE) or other streaming protocols. This allows you to process data incrementally as it arrives, rather than waiting for the entire response to complete.
What is Streaming?
Streaming is a technique where data is sent from the server to the client in chunks over a persistent connection, rather than as a single complete response. This is particularly useful for:
- Real-time AI responses: Large Language Models (LLMs) that stream tokens as they're generated
- Long-running operations: Progress updates for tasks that take time to complete
- Live data feeds: Continuous data streams like logs, metrics, or events
- Improved user experience: Users see results immediately rather than waiting for complete responses
How Streaming Works
When an API endpoint supports streaming, the server keeps the connection open and sends data incrementally. The liblab-generated SDK handles the complexity of:
- Establishing and maintaining the persistent connection
- Parsing streaming data formats (like Server-Sent Events)
- Deserializing each chunk into strongly-typed objects
- Providing an idiomatic interface for your programming language
Configuring Streaming Endpoints
liblab automatically generates streaming methods in your SDK when it detects streaming capabilities in your OpenAPI specification. An operation will be converted to a streaming method when either of the following conditions is met:
Content Type Detection
The operation's response includes the text/event-stream content type:
paths:
/api/generate:
post:
responses:
'200':
description: Streaming response
content:
text/event-stream:
schema:
$ref: '#/components/schemas/GenerateResponse'
Explicit Streaming Annotation
You can explicitly mark an operation as streaming using the x-liblab-streaming extension at the operation level:
paths:
/api/generate:
post:
x-liblab-streaming: true
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/GenerateResponse'
This annotation is useful when your API returns application/json but still streams data incrementally, or when you want to ensure streaming behavior regardless of the content type.
Language-Specific Implementations
- TypeScript
- Python
- Java
- Go
- PHP
- C#
TypeScript Implementation
TypeScript SDKs generated by liblab use async generators (AsyncGenerator) for streaming responses. This provides type-safe iteration over streaming data with native JavaScript async/await support.
Basic Streaming Usage with for-await-of
Consume streaming responses using JavaScript's for await...of loop:
import { StreamingSdk } from 'streaming-sdk';
// Initialize the client
const sdk = new StreamingSdk({
baseUrl: 'https://api.example.com',
token: 'your-api-token'
});
// Create a request
const request = {
model: 'gpt-4o',
prompt: 'Say hello world'
};
// Process streaming responses
const results = [];
for await (const response of sdk.api.generate(request)) {
// Each response.data is a fully deserialized model object
console.log('Model:', response.data.model);
console.log('Response:', response.data.response);
console.log('Created at:', response.data.createdAt);
results.push(response.data);
}
console.log(`Received ${results.length} streaming chunks`);
Manual Iterator Control
For more granular control, use the iterator protocol directly:
// Get the stream
const stream = sdk.api.generate({
model: 'gpt-4o',
prompt: 'Say hello world'
});
// Manually advance the iterator
let iterator = await stream.next();
if (!iterator.done) {
console.log('First chunk:', iterator.value.data);
}
iterator = await stream.next();
if (!iterator.done) {
console.log('Second chunk:', iterator.value.data);
}
iterator = await stream.next();
if (iterator.done) {
console.log('Stream completed');
}
Handling Different Response Types
The SDK automatically handles both single-chunk responses (application/json) and multi-chunk streaming responses (text/event-stream):
// Single application/json response
// Server sends: {"model": "gpt-4o", "response": "Hello, world!", "created_at": "2023-01-01T00:00:00Z"}
for await (const response of sdk.api.generate(request)) {
// Yields once with the complete response
console.log(response.data.response); // "Hello, world!"
}
// Multiple text/event-stream chunks
// Server sends:
// data: {"model": "gpt-4o", "response": "Hello", "created_at": "2023-01-01T00:00:00Z"}
// data: {"model": "gpt-4o", "response": "World", "created_at": "2023-01-01T00:00:00Z"}
for await (const response of sdk.api.generate(request)) {
// Yields twice - once per chunk
process.stdout.write(response.data.response + ' '); // "Hello " then "World "
}
Real-time Processing
Process streaming data in real-time as it arrives:
console.log('Generating response...');
for await (const chunk of sdk.api.generate(request)) {
// Process each chunk immediately as it arrives
process.stdout.write(chunk.data.response); // Display token by token
// Perform additional processing
if (chunk.data.response.length > 100) {
console.log('\n[Long response detected]');
}
}
console.log('\nGeneration complete!');
Configuring Retry Behavior
Configure retry attempts for streaming requests:
const stream = sdk.api.generate(
{
model: 'gpt-4o',
prompt: 'Say hello world'
},
{
retry: {
attempts: 3,
delayMs: 1000
}
}
);
for await (const response of stream) {
console.log(response.data.response);
}
Generated Service with Streaming
When your OpenAPI specification defines streaming endpoints, liblab generates service methods that return AsyncGenerator objects. Each yielded item is automatically deserialized into the appropriate model class and wrapped in an HttpResponse object.
import { BaseService } from '../base-service';
import { HttpResponse, RequestConfig } from '../../http/types';
import { RequestBuilder } from '../../http/request-builder';
import { ContentType } from '../../http/content-type';
import { Environment } from '../../http/environment';
import { z } from 'zod';
import { StatusOkResponse } from './models/status-ok-response';
import { GenerateRequest } from './models/generate-request';
import { GenerateResponse } from './models/generate-response';
import {
statusOkResponseResponse,
generateRequestRequest,
generateResponseResponse
} from './schemas';
export class ApiService extends BaseService {
/**
* Get the status of a LLM.
* @param {RequestConfig} [requestConfig] - The request configuration for retry and validation.
* @returns {AsyncGenerator<HttpResponse<StatusOkResponse>>} - OK
*/
public async *status(
requestConfig?: RequestConfig,
): AsyncGenerator<HttpResponse<StatusOkResponse>> {
const request = new RequestBuilder()
.setBaseUrl(
requestConfig?.baseUrl ||
this.config.baseUrl ||
this.config.environment ||
Environment.DEFAULT,
)
.setConfig(this.config)
.setMethod('GET')
.setPath('/api/generate')
.setRequestSchema(z.any())
.setRequestContentType(ContentType.Json)
.addResponse({
schema: statusOkResponseResponse,
contentType: ContentType.Json,
status: 200,
})
.setRetryAttempts(this.config, requestConfig)
.setRetryDelayMs(this.config, requestConfig)
.setResponseValidation(this.config, requestConfig)
.build();
yield* this.client.stream<StatusOkResponse>(request);
}
/**
* Send a prompt to a LLM.
* @param {RequestConfig} [requestConfig] - The request configuration for retry and validation.
* @returns {AsyncGenerator<HttpResponse<GenerateResponse>>} - OK
*/
public async *generate(
body: GenerateRequest,
requestConfig?: RequestConfig,
): AsyncGenerator<HttpResponse<GenerateResponse>> {
const request = new RequestBuilder()
.setBaseUrl(
requestConfig?.baseUrl ||
this.config.baseUrl ||
this.config.environment ||
Environment.DEFAULT,
)
.setConfig(this.config)
.setMethod('POST')
.setPath('/api/generate')
.setRequestSchema(generateRequestRequest)
.setRequestContentType(ContentType.Json)
.addResponse({
schema: generateResponseResponse,
contentType: ContentType.Json,
status: 200,
})
.setRetryAttempts(this.config, requestConfig)
.setRetryDelayMs(this.config, requestConfig)
.setResponseValidation(this.config, requestConfig)
.addHeaderParam({ key: 'Content-Type', value: 'application/json' })
.addBody(body)
.build();
yield* this.client.stream<GenerateResponse>(request);
}
}
Key Features
- Type Safety: Full TypeScript type support with
AsyncGenerator<HttpResponse<T>> - Native Async/Await: Works seamlessly with JavaScript's async/await and for-await-of syntax
- Automatic Parsing: The SDK handles SSE format parsing, including incomplete chunks and multiple data lines
- Flexible Response Handling: Works with both
application/json(single response) andtext/event-stream(multiple chunks) - Retry Support: Built-in retry mechanism for failed streaming requests
- Memory Efficient: Async generators don't load the entire dataset into memory
Best Practices
- Use for-await-of for simplicity: The
for await...ofloop provides the cleanest syntax for consuming streams - Handle connection errors: Wrap streaming operations in try-catch blocks to handle network interruptions
- Configure timeouts: Set appropriate timeout values in your request configuration
- Test both response types: Test your implementation with both single-chunk and multi-chunk responses
- Process incrementally: Take advantage of streaming by processing data as it arrives
- Use retry configuration: Configure retry attempts for production reliability
Common Use Cases
AI and LLM Integration
Streaming is essential for AI applications where responses are generated incrementally:
// Stream AI-generated content
for await (const chunk of sdk.ai.generateText(prompt)) {
process.stdout.write(chunk.data.text); // Display each token as it's generated
}
Progress Monitoring
Track long-running operations in real-time:
for await (const update of sdk.jobs.process(jobId)) {
console.log(`Progress: ${update.data.percentComplete}%`);
if (update.data.status === 'completed') {
console.log('Job finished!');
break;
}
}
Live Data Feeds
Process continuous data streams:
for await (const logEntry of sdk.monitoring.streamLogs()) {
if (logEntry.data.level === 'ERROR') {
// Handle errors immediately
alertOnError(logEntry.data);
}
}
Error Handling
Handle errors during streaming operations appropriately:
try {
for await (const chunk of sdk.api.stream(request)) {
processChunk(chunk.data);
}
} catch (error) {
if (error.name === 'NetworkError') {
console.error('Connection lost:', error.message);
} else if (error.name === 'ApiError') {
console.error('API error:', error.message);
}
}
Python Implementation
Python SDKs generated by liblab use Python's Generator pattern for streaming responses. This provides memory-efficient iteration over streaming data without loading the entire response into memory.
Basic Streaming Usage
Consume streaming responses using Python's for loop:
from streaming_test import StreamingTest
from streaming_test.models import GenerateRequest
# Initialize the client
sdk = StreamingTest(base_url="https://api.example.com")
# Create a request
body = GenerateRequest(model="llama3", prompt="Say hello world")
# Process streaming responses
results = []
for result in sdk.api.generate(body):
# Each result is a fully deserialized model object
print(f"Model: {result.model}")
print(f"Response: {result.response}")
print(f"Created at: {result.created_at}")
results.append(result)
print(f"Received {len(results)} streaming chunks")
Handling Different Response Types
The SDK automatically handles both single-chunk responses (application/json) and multi-chunk streaming responses (text/event-stream):
# Single application/json response
# Server sends: {"model": "llama3", "created_at": "2024-08-12", "response": "Hello"}
for result in sdk.api.generate(body):
# Yields once with the complete response
print(result.response) # "Hello"
# Multiple text/event-stream chunks
# Server sends:
# data: {"model": "llama3", "created_at": "2024-08-12", "response": "Hello"}
# data: {"model": "llama3", "created_at": "2024-08-12", "response": "World"}
for result in sdk.api.stream(body):
# Yields twice - once per chunk
print(result.response, end=" ") # "Hello " then "World "
Real-time Processing
Process streaming data in real-time as it arrives:
print("Generating response...")
for chunk in sdk.api.generate(body):
# Process each chunk immediately as it arrives
print(chunk.response, end="", flush=True) # Display token by token
# Perform additional processing
if len(chunk.response) > 100:
print("\n[Long response detected]")
print("\nGeneration complete!")
Authentication with Streaming
Use bearer token authentication with streaming endpoints:
# Initialize with authentication
sdk = StreamingTest(
base_url="https://api.example.com",
access_token="your-bearer-token"
)
body = GenerateRequest(model="llama3", prompt="Say hello world")
for result in sdk.api.generate(body):
print(result.response)
Retry Behavior
The SDK automatically retries failed streaming requests:
# The SDK will automatically retry on failures
# Default retry behavior is built-in
for result in sdk.api.generate(body):
print(result.response)
# If all retries fail, an ApiError is raised
from streaming_test.net.transport.api_error import ApiError
try:
for result in sdk.api.generate(body):
print(result.response)
except ApiError as e:
print(f"Streaming failed after retries: {e}")
Generated Service with Streaming
When your OpenAPI specification defines streaming endpoints, liblab generates service methods that return Generator objects. Each yielded item is automatically deserialized into the appropriate model class.
from typing import Generator
from .utils.validator import Validator
from .utils.base_service import BaseService
from ..net.transport.serializer import Serializer
from ..net.environment.environment import Environment
from ..models.utils.cast_models import cast_models
from ..models import (
GenerateRequest,
GenerateResponse,
RefreshTokenOkResponse,
RefreshTokenRequest,
StatusOkResponse,
)
class ApiService(BaseService):
@cast_models
def status(self) -> Generator[StatusOkResponse, None, None]:
"""Get the status of a LLM.
...
:raises RequestError: Raised when a request fails, with optional HTTP status code and details.
...
:return: The parsed response data.
:rtype: Generator[StatusOkResponse, None, None]
"""
serialized_request = (
Serializer(
f"{self.base_url or Environment.DEFAULT.url}/api/generate",
)
.serialize()
.set_method("GET")
)
for response, _, _ in self.stream_request(serialized_request):
yield StatusOkResponse._unmap(response)
@cast_models
def generate(
self, request_body: GenerateRequest = None
) -> Generator[GenerateResponse, None, None]:
"""Send a prompt to a LLM.
:param request_body: The request body., defaults to None
:type request_body: GenerateRequest, optional
...
:raises RequestError: Raised when a request fails, with optional HTTP status code and details.
...
:return: The parsed response data.
:rtype: Generator[GenerateResponse, None, None]
"""
Validator(GenerateRequest).is_optional().validate(request_body)
serialized_request = (
Serializer(
f"{self.base_url or Environment.DEFAULT.url}/api/generate",
)
.serialize()
.set_method("POST")
.set_body(request_body)
)
for response, _, _ in self.stream_request(serialized_request):
yield GenerateResponse._unmap(response)
Key Features
- Memory Efficient: Using Python Generators, streaming responses don't load the entire dataset into memory
- Type Safety: Each streamed chunk is automatically deserialized into strongly-typed model objects with full type hints
- Automatic Parsing: The SDK handles SSE format parsing, including event fields and multiple data lines
- Flexible Response Handling: Works with both
application/json(single response) andtext/event-stream(multiple chunks) - Built-in Retry: Automatic retry mechanism with configurable attempts (default: 4 attempts)
- Decorator Support: Uses
@cast_modelsdecorator for automatic model casting
Best Practices
- Use for loops for simplicity: Python's
forloop provides the cleanest syntax for consuming generators - Handle exceptions: Wrap streaming operations in try-except blocks to handle
ApiErrorexceptions - Flush output when needed: Use
flush=Truewith print() when displaying streaming data in real-time - Test both response types: Test your implementation with both single-chunk and multi-chunk responses
- Validate inputs: The SDK automatically validates request bodies using the
Validatorclass - Process incrementally: Take advantage of streaming by processing data as it arrives
Common Use Cases
AI and LLM Integration
Streaming is essential for AI applications where responses are generated incrementally:
# Stream AI-generated content
for chunk in sdk.api.generate_text(prompt):
print(chunk.text, end="", flush=True) # Display each token as it's generated
Progress Monitoring
Track long-running operations in real-time:
for update in sdk.jobs.process(job_id):
print(f"Progress: {update.percent_complete}%")
if update.status == 'completed':
print("Job finished!")
break
Live Data Feeds
Process continuous data streams:
for log_entry in sdk.monitoring.stream_logs():
if log_entry.level == 'ERROR':
# Handle errors immediately
alert_on_error(log_entry)
Error Handling
Handle errors during streaming operations appropriately:
from streaming_test.net.transport.api_error import ApiError
try:
for chunk in sdk.api.stream(request):
process_chunk(chunk)
except ApiError as e:
# Handle API errors (includes network errors and retries)
print(f"API error: {e}")
Java Implementation
Java SDKs generated by liblab use Java's Stream<T> API for streaming responses. This provides a functional, type-safe way to process streaming data with support for both synchronous and asynchronous operations.
Basic Synchronous Streaming
Consume streaming responses using Java Streams API:
import com.example.streamingsdk.StreamingSdk;
import com.example.streamingsdk.models.GenerateRequest;
import com.example.streamingsdk.models.GenerateResponse;
import java.util.stream.Stream;
// Initialize the client
StreamingSdk client = new StreamingSdk();
// Create a request
GenerateRequest request = GenerateRequest.builder()
.model("gpt-4o")
.prompt("Say hello world")
.build();
// Process streaming responses synchronously
Stream<GenerateResponse> responseStream = client.api.generate(request);
// Iterate over the stream
responseStream.forEach(response -> {
System.out.println("Model: " + response.getModel());
System.out.println("Response: " + response.getResponse());
System.out.println("Created at: " + response.getCreatedAt());
});
Asynchronous Streaming
Use CompletableFuture for non-blocking streaming operations:
import java.util.concurrent.CompletableFuture;
// Process streaming responses asynchronously
CompletableFuture<Stream<GenerateResponse>> futureStream =
client.api.generateAsync(request);
// Wait for the stream and process it
futureStream.thenAccept(stream -> {
stream.forEach(response -> {
System.out.println("Response: " + response.getResponse());
});
}).join();
Using Iterator for Manual Control
For more granular control over stream consumption:
import java.util.Iterator;
Stream<GenerateResponse> response = client.api.generate(request);
Iterator<GenerateResponse> iterator = response.iterator();
// Get first chunk
if (iterator.hasNext()) {
GenerateResponse firstChunk = iterator.next();
System.out.println("First chunk: " + firstChunk.getResponse());
}
// Get second chunk
if (iterator.hasNext()) {
GenerateResponse secondChunk = iterator.next();
System.out.println("Second chunk: " + secondChunk.getResponse());
}
// Check if stream is complete
if (!iterator.hasNext()) {
System.out.println("Stream completed");
}
Collecting Stream Results
Collect all streaming chunks into a list:
import java.util.List;
import java.util.stream.Collectors;
Stream<GenerateResponse> response = client.api.generate(request);
// Collect all chunks
List<GenerateResponse> chunks = response.collect(Collectors.toList());
System.out.println("Received " + chunks.size() + " chunks");
chunks.forEach(chunk -> System.out.println(chunk.getResponse()));
Real-time Processing with Stream Operations
Use Java Stream operations for real-time data processing:
// Filter and process specific chunks
client.api.generate(request)
.filter(chunk -> chunk.getResponse().length() > 0)
.map(chunk -> chunk.getResponse().toUpperCase())
.forEach(System.out::println);
// Find first chunk matching a condition
GenerateResponse firstMatch = client.api.generate(request)
.filter(chunk -> chunk.getResponse().contains("important"))
.findFirst()
.orElse(null);
// Limit the number of chunks processed
client.api.generate(request)
.limit(5)
.forEach(chunk -> System.out.println(chunk.getResponse()));
Handling Different Response Types
The SDK automatically handles both single-chunk responses (application/json) and multi-chunk streaming responses (text/event-stream):
// Single application/json response
// Server returns one complete object
Stream<GenerateResponse> singleResponse = client.api.generate(request);
singleResponse.forEach(response -> {
// Yields once with the complete response
System.out.println(response.getResponse());
});
// Multiple text/event-stream chunks
// Server sends:
// data: {"model": "gpt-4o", "response": "Hello", "created_at": "2023-04-20T13:37:00Z"}
// data: {"model": "gpt-4o", "response": "World", "created_at": "2023-04-20T13:37:00Z"}
Stream<GenerateResponse> multiChunkResponse = client.api.stream(request);
multiChunkResponse.forEach(response -> {
// Yields twice - once per chunk
System.out.print(response.getResponse() + " ");
});
Generated Service with Streaming
When your OpenAPI specification defines streaming endpoints, liblab generates service methods that return Stream<T> objects for synchronous operations and CompletableFuture<Stream<T>> for asynchronous operations.
package com.example.javasdk.services;
import com.example.javasdk.config.JavaSdkConfig;
import com.example.javasdk.exceptions.ApiError;
import com.example.javasdk.http.Environment;
import com.example.javasdk.http.HttpMethod;
import com.example.javasdk.http.ModelConverter;
import com.example.javasdk.http.util.RequestBuilder;
import com.example.javasdk.models.GenerateRequest;
import com.example.javasdk.models.GenerateResponse;
import com.example.javasdk.models.StatusOkResponse;
import com.example.javasdk.utils.StreamUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import lombok.NonNull;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
/**
* ApiService Service
*/
public class ApiService extends BaseService {
public ApiService(@NonNull OkHttpClient httpClient, JavaSdkConfig config) {
super(httpClient, config);
}
/**
* Get the status of a LLM.
*
* @return response of {@code Stream<StatusOkResponse>}
*/
public Stream<StatusOkResponse> status() throws ApiError {
Request request = this.buildStatusRequest();
Response response = this.execute(request);
return StreamUtils.streamResponseData(response).map((data) ->
ModelConverter.convert(data, new TypeReference<StatusOkResponse>() {})
);
}
/**
* Get the status of a LLM.
*
* @return response of {@code CompletableFuture<Stream<StatusOkResponse>>}
*/
public CompletableFuture<Stream<StatusOkResponse>> statusAsync() throws ApiError {
Request request = this.buildStatusRequest();
CompletableFuture<Response> futureResponse = this.executeAsync(request);
return futureResponse.thenApplyAsync((response) -> {
return StreamUtils.streamResponseData(response).map((data) ->
ModelConverter.convert(data, new TypeReference<StatusOkResponse>() {})
);
});
}
/**
* Send a prompt to a LLM.
*
* @param generateRequest {@link GenerateRequest} Request Body
* @return response of {@code Stream<GenerateResponse>}
*/
public Stream<GenerateResponse> generate(@NonNull GenerateRequest generateRequest)
throws ApiError {
Request request = this.buildGenerateRequest(generateRequest);
Response response = this.execute(request);
return StreamUtils.streamResponseData(response).map((data) ->
ModelConverter.convert(data, new TypeReference<GenerateResponse>() {})
);
}
/**
* Send a prompt to a LLM.
*
* @param generateRequest {@link GenerateRequest} Request Body
* @return response of {@code CompletableFuture<Stream<GenerateResponse>>}
*/
public CompletableFuture<Stream<GenerateResponse>> generateAsync(
@NonNull GenerateRequest generateRequest
) throws ApiError {
Request request = this.buildGenerateRequest(generateRequest);
CompletableFuture<Response> futureResponse = this.executeAsync(request);
return futureResponse.thenApplyAsync((response) -> {
return StreamUtils.streamResponseData(response).map((data) ->
ModelConverter.convert(data, new TypeReference<GenerateResponse>() {})
);
});
}
private Request buildGenerateRequest(@NonNull GenerateRequest generateRequest) {
return new RequestBuilder(
HttpMethod.POST,
Optional.ofNullable(this.config.getBaseUrl()).orElse(Environment.DEFAULT.getUrl()),
"api/generate"
)
.setJsonContent(generateRequest)
.build();
}
}
Key Features
- Type Safety: Full type safety with Java generics
Stream<T>andCompletableFuture<Stream<T>> - Dual Mode: Support for both synchronous and asynchronous streaming operations
- Stream API Integration: Seamless integration with Java Streams API for functional programming
- Lazy Evaluation: Streams are lazily evaluated, processing data only when consumed
- Automatic Parsing: The SDK handles SSE format parsing, including event fields and multiple data lines
- Flexible Response Handling: Works with both
application/jsonandtext/event-streamcontent types - Builder Pattern: Uses Lombok's
@Builderfor clean, fluent request construction
Best Practices
- Close streams properly: Streams should be closed after use to free resources (or use try-with-resources)
- Choose sync vs async appropriately: Use async methods for non-blocking operations in concurrent applications
- Use Stream operations: Leverage filter, map, limit, and other Stream operations for data processing
- Handle ApiError: Wrap streaming operations in try-catch blocks to handle
ApiErrorexceptions - Test both response types: Test your implementation with both single-chunk and multi-chunk responses
- Consider memory usage: Be cautious with
collect()operations on large streams; preferforEach()for processing
Common Use Cases
AI and LLM Integration
Streaming is essential for AI applications where responses are generated incrementally:
// Stream AI-generated content
client.ai.generateText(prompt).forEach(chunk -> {
System.out.print(chunk.getText()); // Display each token as it's generated
});
Progress Monitoring
Track long-running operations in real-time:
client.jobs.process(jobId).forEach(update -> {
System.out.println("Progress: " + update.getPercentComplete() + "%");
if ("completed".equals(update.getStatus())) {
System.out.println("Job finished!");
}
});
Live Data Feeds
Process continuous data streams:
client.monitoring.streamLogs()
.filter(logEntry -> "ERROR".equals(logEntry.getLevel()))
.forEach(logEntry -> {
// Handle errors immediately
alertOnError(logEntry);
});
Error Handling
Handle errors during streaming operations appropriately:
try {
client.api.stream(request).forEach(chunk -> {
processChunk(chunk);
});
} catch (ApiError e) {
System.err.println("API error: " + e.getMessage());
} catch (NetworkException e) {
System.err.println("Connection lost: " + e.getMessage());
}
Go Implementation
Go SDKs generated by liblab use a custom Stream type that provides iterator-style access to streaming responses. This approach integrates naturally with Go's error handling and context patterns.
Basic Streaming Usage
Consume streaming responses using the iterator pattern:
package main
import (
"context"
"fmt"
"log"
"github.com/streaming/pkg/api"
"github.com/streaming/pkg/streamingsdk"
"github.com/streaming/pkg/streamingsdkconfig"
)
func main() {
// Initialize the SDK
config := streamingsdkconfig.NewConfig()
sdk := streamingsdk.NewStreamingSdk(config)
// Create a request
request := api.GenerateRequest{}
request.SetModel("gpt-4o")
request.SetPrompt("Say hello world")
// Get the stream
stream, err := sdk.Api.Generate(context.Background(), request)
if err != nil {
log.Fatal(err)
}
// Iterate over chunks using Next()
for {
chunk, streamErr := stream.Next()
if streamErr != nil {
// Stream closed or error occurred
break
}
// Access the deserialized data
fmt.Printf("Model: %s\n", *chunk.Data.Model)
fmt.Printf("Response: %s\n", *chunk.Data.Response)
fmt.Printf("Created at: %s\n", *chunk.Data.CreatedAt)
// Access metadata
fmt.Printf("Status Code: %d\n", chunk.Metadata.StatusCode)
fmt.Printf("Content-Type: %s\n", chunk.Metadata.Headers["Content-Type"])
}
}
Collecting All Chunks
Collect all streaming chunks into a slice:
var chunks []api.GenerateResponse
stream, err := sdk.Api.Generate(context.Background(), request)
if err != nil {
log.Fatal(err)
}
for {
chunk, streamErr := stream.Next()
if streamErr != nil {
break
}
chunks = append(chunks, chunk.Data)
}
fmt.Printf("Received %d chunks\n", len(chunks))
for _, chunk := range chunks {
fmt.Println(*chunk.Response)
}
Context Cancellation
Use Go contexts to cancel streaming operations:
import (
"context"
"time"
)
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := sdk.Api.Generate(ctx, request)
if err != nil {
log.Fatal(err)
}
for {
chunk, streamErr := stream.Next()
if streamErr != nil {
if ctx.Err() == context.DeadlineExceeded {
fmt.Println("Streaming timed out")
}
break
}
fmt.Println(*chunk.Data.Response)
}
Authentication with Streaming
Use bearer token authentication:
config := streamingsdkconfig.NewConfig()
config.SetAccessToken("your-bearer-token")
sdk := streamingsdk.NewStreamingSdk(config)
request := api.GenerateRequest{}
request.SetModel("gpt-4o")
request.SetPrompt("Say hello world")
stream, err := sdk.Api.Generate(context.Background(), request)
if err != nil {
log.Fatal(err)
}
for {
chunk, streamErr := stream.Next()
if streamErr != nil {
break
}
fmt.Println(*chunk.Data.Response)
}
Accessing Raw Response Data
Access the raw bytes alongside deserialized data:
stream, err := sdk.Api.Generate(context.Background(), request)
if err != nil {
log.Fatal(err)
}
for {
chunk, streamErr := stream.Next()
if streamErr != nil {
break
}
// Access deserialized data
fmt.Printf("Parsed: %s\n", *chunk.Data.Response)
// Access raw bytes
fmt.Printf("Raw length: %d bytes\n", len(chunk.Raw))
}
Handling Different Response Types
The SDK automatically handles both single-chunk responses (application/json) and multi-chunk streaming responses (text/event-stream):
// Single application/json response
// Server sends: data: {"model": "gpt-4o", "response": "Hello, world!", "created_at": "2023-01-01T00:00:00Z"}
stream, err := sdk.Api.Generate(context.Background(), request)
if err != nil {
log.Fatal(err)
}
chunk, streamErr := stream.Next()
if streamErr == nil {
// One complete response
fmt.Println(*chunk.Data.Response)
}
// Multiple text/event-stream chunks
// Server sends:
// data: {"model":"gpt-4o","response":"Hello","created_at":"2023-01-01T00:00:00Z"}
// data: {"model":"gpt-4o","response":"World","created_at":"2023-01-01T00:00:00Z"}
stream, err = sdk.Api.Generate(context.Background(), request)
if err != nil {
log.Fatal(err)
}
for {
chunk, streamErr := stream.Next()
if streamErr != nil {
break
}
// Yields twice - once per chunk
fmt.Printf("%s ", *chunk.Data.Response) // "Hello " then "World "
}
Generated Service with Streaming
When your OpenAPI specification defines streaming endpoints, liblab generates service methods that return *httptransport.Stream[T] for successful responses and *httptransport.ErrorResponse[T] for errors.
package api
import (
"context"
restClient "github.com/liblaber/go-sdk/internal/clients/rest"
"github.com/liblaber/go-sdk/internal/clients/rest/httptransport"
"github.com/liblaber/go-sdk/internal/configmanager"
"github.com/liblaber/go-sdk/pkg/shared"
"github.com/liblaber/go-sdk/pkg/testconfig"
"time"
)
type ApiService struct {
manager *configmanager.ConfigManager
}
func NewApiService() *ApiService {
return &ApiService{
manager: configmanager.NewConfigManager(testconfig.Config{}),
}
}
// Get the status of a LLM.
func (api *ApiService) Status(ctx context.Context) (*httptransport.Stream[StatusOkResponse], *httptransport.ErrorResponse[StatusOkResponse]) {
config := *api.getConfig()
request := httptransport.NewRequestBuilder().WithContext(ctx).
WithMethod("GET").
WithPath("/api/generate").
WithConfig(config).
WithContentType(httptransport.ContentTypeJson).
WithResponseContentType(httptransport.ContentTypeJson).
Build()
client := restClient.NewRestClient[StatusOkResponse](config)
stream, err := client.Stream(*request)
if err != nil {
return nil, err
}
return stream, nil
}
// Send a prompt to a LLM.
func (api *ApiService) Generate(ctx context.Context, generateRequest GenerateRequest) (*httptransport.Stream[GenerateResponse], *httptransport.ErrorResponse[GenerateResponse]) {
config := *api.getConfig()
request := httptransport.NewRequestBuilder().WithContext(ctx).
WithMethod("POST").
WithPath("/api/generate").
WithConfig(config).
WithBody(generateRequest).
AddHeader("CONTENT-TYPE", "application/json").
WithContentType(httptransport.ContentTypeJson).
WithResponseContentType(httptransport.ContentTypeJson).
Build()
client := restClient.NewRestClient[GenerateResponse](config)
stream, err := client.Stream(*request)
if err != nil {
return nil, err
}
return stream, nil
}
Key Features
- Iterator Pattern: Use
Next()method to iterate through chunks with explicit error handling - Type Safety: Fully typed responses using Go generics
- Context Support: Native Go context integration for cancellation and timeouts
- Metadata Access: Access HTTP metadata (status codes, headers) alongside data
- Raw Data: Access both deserialized objects and raw response bytes
- Automatic Parsing: Handles SSE format parsing, including incomplete chunks
- Built-in Retry: Automatic retry mechanism for failed requests
- Flexible Response Handling: Works with both
application/jsonandtext/event-stream
Best Practices
- Always check errors: Check the error return from
Next()to detect stream end or errors - Use contexts: Pass contexts with timeouts or cancellation to all API calls
- Handle stream exhaustion: When
Next()returns an error, the stream is closed - Defer cleanup: Although the stream handles cleanup, use defer with context cancellation
- Test with httptest: Use Go's
httptestpackage for reliable streaming tests - Check metadata: Use chunk metadata to access HTTP status codes and headers
- Configure timeouts: Set appropriate timeouts in your config for streaming operations
Common Use Cases
AI and LLM Integration
Streaming is essential for AI applications where responses are generated incrementally:
// Stream AI-generated content
stream, err := sdk.ai.GenerateText(context.Background(), prompt)
if err != nil {
log.Fatal(err)
}
for {
chunk, streamErr := stream.Next()
if streamErr != nil {
break
}
fmt.Print(*chunk.Data.Text) // Display each token as it's generated
}
Progress Monitoring
Track long-running operations in real-time:
stream, err := sdk.jobs.Process(context.Background(), jobId)
if err != nil {
log.Fatal(err)
}
for {
update, streamErr := stream.Next()
if streamErr != nil {
break
}
fmt.Printf("Progress: %d%%\n", *update.Data.PercentComplete)
if *update.Data.Status == "completed" {
fmt.Println("Job finished!")
break
}
}
Live Data Feeds
Process continuous data streams:
stream, err := sdk.monitoring.StreamLogs(context.Background())
if err != nil {
log.Fatal(err)
}
for {
logEntry, streamErr := stream.Next()
if streamErr != nil {
break
}
if *logEntry.Data.Level == "ERROR" {
// Handle errors immediately
alertOnError(logEntry.Data)
}
}
Error Handling
Handle errors during streaming operations appropriately:
import (
"context"
"errors"
"io"
"log"
)
stream, err := sdk.api.Stream(context.Background(), request)
if err != nil {
log.Printf("Failed to start stream: %v", err)
return
}
for {
chunk, streamErr := stream.Next()
if streamErr != nil {
// Check for normal stream completion using io.EOF
if errors.Is(streamErr, io.EOF) {
// Normal stream completion
break
}
// Handle actual errors
log.Printf("Stream error: %v", streamErr)
break
}
processChunk(chunk.Data)
}
PHP Implementation
PHP SDKs generated by liblab use PHP's Generator pattern for streaming responses. This provides memory-efficient iteration over streaming data without loading the entire response into memory.
Basic Streaming Usage
Consume streaming responses using PHP's foreach loop:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use StreamingClient\Client;
use StreamingClient\Models\GenerateRequest;
// Initialize the client
$client = new Client('your-api-token');
// Create a request
$request = new GenerateRequest('gpt-4o', 'Say hello world');
// Process streaming responses
$results = [];
foreach ($client->api->generate($request) as $response) {
// Each $response is a fully deserialized Model object
echo "Model: " . $response->model . "\n";
echo "Response: " . $response->response . "\n";
echo "Created at: " . $response->createdAt . "\n";
$results[] = $response;
}
echo "Received " . count($results) . " streaming chunks\n";
Accessing Response Metadata
When the responseHeaders flag is set to true in your liblab config, the response structure includes additional metadata. In this case, you access the deserialized data through the data property and metadata through the metadata property:
// With responseHeaders: true in liblab config
foreach ($client->api->generate($request) as $response) {
// Access the deserialized data via the data property
echo "Model: " . $response->data->model . "\n";
echo "Response: " . $response->data->response . "\n";
echo "Created at: " . $response->data->createdAt . "\n";
// Access metadata (headers, status code, etc.)
echo "Status Code: " . $response->metadata->statusCode . "\n";
echo "Content-Type: " . $response->metadata->headers['Content-Type'] . "\n";
}
Without the responseHeaders flag, you access fields directly on the response object as shown in the basic examples above.
Handling Different Response Types
The SDK automatically handles both single-chunk responses (application/json) and multi-chunk streaming responses (text/event-stream):
// Single application/json response
// Server sends: {"model": "gpt-4o", "response": "Hello, world!", "created_at": "2023-01-01T00:00:00Z"}
foreach ($client->api->generate($request) as $response) {
// Yields once with the complete response
echo $response->response; // "Hello, world!"
}
// Multiple text/event-stream chunks
// Server sends:
// data: {"model": "gpt-4o", "response": "Hello", "created_at": "2023-01-01T00:00:00Z"}
// data: {"model": "gpt-4o", "response": "World", "created_at": "2023-01-01T00:00:00Z"}
foreach ($client->api->generate($request) as $response) {
// Yields twice - once per chunk
echo $response->response . " "; // "Hello " then "World "
}
Real-time Processing
Process streaming data in real-time as it arrives:
echo "Generating response...\n";
foreach ($client->api->generate($request) as $chunk) {
// Process each chunk immediately as it arrives
echo $chunk->response; // Display token by token
flush(); // Ensure output is sent immediately
// Perform additional processing
if (strlen($chunk->response) > 100) {
// Handle long responses differently
echo "\n[Long response detected]\n";
}
}
echo "\nGeneration complete!\n";
Generated Service with Streaming
When your OpenAPI specification defines streaming endpoints, liblab generates service methods that return \Generator objects. Each yielded item is automatically deserialized into the appropriate model class.
namespace PhpSdk\Services;
use PhpSdk\Utils\Serializer;
use PhpSdk\Models;
class Api extends BaseService
{
/**
* Get the status of a LLM.
* @return \Generator<Models\StatusOkResponse>
*/
public function status(): \Generator
{
$stream = $this->sendStreamingRequest('get', '/api/generate', []);
foreach ($stream as $data) {
yield Serializer::deserialize($data, Models\StatusOkResponse::class);
}
}
/**
* Send a prompt to a LLM.
* @return \Generator<Models\GenerateResponse>
*/
public function generate(?Models\GenerateRequest $input = null): \Generator
{
$stream = $this->sendStreamingRequest('post', '/api/generate', [
'json' => Serializer::serialize($input)
]);
foreach ($stream as $data) {
yield Serializer::deserialize($data, Models\GenerateResponse::class);
}
}
}
Key Features
- Memory Efficient: Using PHP Generators, streaming responses don't load the entire dataset into memory
- Type Safety: Each streamed chunk is automatically deserialized into strongly-typed model objects
- Automatic Parsing: The SDK handles SSE format parsing, including incomplete chunks and multiple data lines
- Flexible Response Handling: Works with both
application/json(single response) andtext/event-stream(multiple chunks) - Error Handling: Standard PHP exception handling applies to streaming operations
Best Practices
- Process incrementally: Take advantage of streaming by processing data as it arrives, rather than collecting all chunks first
- Handle connection errors: Wrap streaming operations in try-catch blocks to handle network interruptions
- Set appropriate timeouts: Configure HTTP client timeouts based on expected stream duration
- Test both formats: Test your implementation with both single-chunk and multi-chunk responses
- Flush output: When displaying streaming data to users, remember to flush output buffers
Common Use Cases
AI and LLM Integration
Streaming is essential for AI applications where responses are generated incrementally:
// Stream AI-generated content
foreach ($client->ai->generateText($prompt) as $chunk) {
echo $chunk->text; // Display each token as it's generated
flush();
}
Progress Monitoring
Track long-running operations in real-time:
foreach ($client->jobs->process($jobId) as $update) {
echo "Progress: {$update->percentComplete}%\n";
if ($update->status === 'completed') {
echo "Job finished!\n";
break;
}
}
Live Data Feeds
Process continuous data streams:
foreach ($client->monitoring->streamLogs() as $logEntry) {
if ($logEntry->level === 'ERROR') {
// Handle errors immediately
alertOnError($logEntry);
}
}
Error Handling
Handle errors during streaming operations appropriately:
try {
foreach ($client->api->stream($request) as $chunk) {
processChunk($chunk);
}
} catch (NetworkException $e) {
// Handle connection errors
echo "Connection lost: " . $e->getMessage();
} catch (ApiException $e) {
// Handle API errors
echo "API error: " . $e->getMessage();
}
C# Implementation
C# SDKs generated by liblab use IAsyncEnumerable<T> for streaming responses. This provides native async/await support with automatic cancellation token propagation, making it ideal for modern .NET applications.
Basic Streaming with await foreach
Consume streaming responses using C#'s await foreach:
using StreamingSdk;
using StreamingSdk.Models;
// Initialize the client
using var client = new StreamingSdkClient();
// Create a request
var request = new GenerateRequest("gpt-4o", "Say hello world");
// Process streaming responses
await foreach (var response in client.Api.GenerateAsync(request))
{
Console.WriteLine($"Model: {response.Model}");
Console.WriteLine($"Response: {response.Response_}");
Console.WriteLine($"Created at: {response.CreatedAt}");
}
Accessing Response Metadata
When the responseHeaders flag is set to true in your liblab config, the response structure includes additional metadata. In this case, you access the deserialized data through the Data property and metadata through the Metadata property:
// With responseHeaders: true in liblab config
await foreach (var response in client.Api.GenerateAsync(request))
{
// Access the deserialized data via the Data property
Console.WriteLine($"Model: {response.Data.Model}");
Console.WriteLine($"Response: {response.Data.Response_}");
Console.WriteLine($"Created at: {response.Data.CreatedAt}");
// Access metadata (headers, status code, etc.)
Console.WriteLine($"Status Code: {response.Metadata.StatusCode}");
Console.WriteLine($"Content-Type: {response.Metadata.Headers["Content-Type"]}");
}
Without the responseHeaders flag, you access fields directly on the response object as shown in the basic examples above.
Manual Enumeration with IAsyncEnumerator
For more granular control over stream consumption:
var responseStream = client.Api.GenerateAsync(request);
var enumerator = responseStream.GetAsyncEnumerator();
try
{
// Get first chunk
if (await enumerator.MoveNextAsync())
{
var firstChunk = enumerator.Current;
Console.WriteLine($"First chunk: {firstChunk.Response_}");
}
// Get second chunk
if (await enumerator.MoveNextAsync())
{
var secondChunk = enumerator.Current;
Console.WriteLine($"Second chunk: {secondChunk.Response_}");
}
// Check if stream is complete
if (!await enumerator.MoveNextAsync())
{
Console.WriteLine("Stream completed");
}
}
finally
{
await enumerator.DisposeAsync();
}
Collecting Stream Results
Collect all streaming chunks into a list:
var chunks = new List<GenerateResponse>();
await foreach (var response in client.Api.GenerateAsync(request))
{
chunks.Add(response);
}
Console.WriteLine($"Received {chunks.Count} chunks");
foreach (var chunk in chunks)
{
Console.WriteLine(chunk.Response_);
}
Cancellation Token Support
Cancel streaming operations gracefully:
using var cts = new CancellationTokenSource();
// Cancel after 5 seconds
cts.CancelAfter(TimeSpan.FromSeconds(5));
try
{
await foreach (var response in client.Api.GenerateAsync(request, cts.Token))
{
Console.WriteLine(response.Response_);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Streaming was cancelled");
}
Real-time Processing with LINQ
Use LINQ operations on async streams:
using System.Linq;
// Take only the first 5 chunks
await foreach (var response in client.Api.GenerateAsync(request).Take(5))
{
Console.WriteLine(response.Response_);
}
// Filter chunks
await foreach (var response in client.Api.GenerateAsync(request)
.Where(r => r.Response_.Length > 0))
{
Console.WriteLine(response.Response_);
}
Handling Different Response Types
The SDK automatically handles both single-chunk responses (application/json) and multi-chunk streaming responses (text/event-stream):
// Single application/json response
// Server returns one complete object
await foreach (var response in client.Api.GenerateAsync(request))
{
// Yields once with the complete response
Console.WriteLine(response.Response_);
}
// Multiple text/event-stream chunks
// Server sends:
// data: {"model": "gpt-4o", "response": "Hello", "created_at": "2023-04-20T13:37:00Z"}
// data: {"model": "gpt-4o", "response": "World", "created_at": "2023-04-20T13:37:00Z"}
await foreach (var response in client.Api.StreamAsync(request))
{
// Yields twice - once per chunk
Console.Write(response.Response_ + " ");
}
Generated Service with Streaming
When your OpenAPI specification defines streaming endpoints, liblab generates service methods that return IAsyncEnumerable<T> objects with full CancellationToken support.
using System.Net.Http.Json;
using Test.Http;
using Test.Http.Exceptions;
using Test.Http.Extensions;
using Test.Http.Serialization;
using Test.Models;
using Test.Validation;
using Test.Validation.Extensions;
namespace Test.Services;
public class ApiService : BaseService
{
internal ApiService(HttpClient httpClient)
: base(httpClient) { }
/// <summary>Get the status of a LLM.</summary>
public IAsyncEnumerable<StatusOkResponse> StatusAsync(
CancellationToken cancellationToken = default
)
{
var validationResults = new List<FluentValidation.Results.ValidationResult> { };
var combinedFailures = validationResults.SelectMany(result => result.Errors).ToList();
if (combinedFailures.Any())
{
throw new Http.Exceptions.ValidationException(combinedFailures);
}
var request = new RequestBuilder(HttpMethod.Get, "api/generate").Build();
return ExecuteStreamAsync<StatusOkResponse>(request, cancellationToken);
}
/// <summary>Send a prompt to a LLM.</summary>
public IAsyncEnumerable<GenerateResponse> GenerateAsync(
GenerateRequest input,
CancellationToken cancellationToken = default
)
{
ArgumentNullException.ThrowIfNull(input, nameof(input));
var validationResults = new List<FluentValidation.Results.ValidationResult> { };
var validator = new GenerateRequestValidator();
var validationResult = validator.Validate(input);
validationResults.Add(validationResult);
var combinedFailures = validationResults.SelectMany(result => result.Errors).ToList();
if (combinedFailures.Any())
{
throw new Http.Exceptions.ValidationException(combinedFailures);
}
var request = new RequestBuilder(HttpMethod.Post, "api/generate")
.SetContentAsJson(input, _jsonSerializerOptions)
.Build();
return ExecuteStreamAsync<GenerateResponse>(request, cancellationToken);
}
}
The base service provides the streaming infrastructure:
using System.Text.Json;
using Test.Http.Extensions;
using Test.Utils;
namespace Test.Services;
public class BaseService
{
protected readonly HttpClient _httpClient;
protected readonly JsonSerializerOptions _jsonSerializerOptions;
public BaseService(HttpClient httpClient)
{
_httpClient = httpClient;
_jsonSerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web);
}
protected async IAsyncEnumerable<T> ExecuteStreamAsync<T>(
HttpRequestMessage request,
[System.Runtime.CompilerServices.EnumeratorCancellation]
CancellationToken cancellationToken = default
)
{
var response = await _httpClient
.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken)
.ConfigureAwait(false);
response.EnsureSuccessfulResponse();
await foreach (var line in StreamUtils.StreamResponseDataAsync(response, cancellationToken))
{
if (string.IsNullOrWhiteSpace(line))
continue;
T? item = default;
try
{
item = JsonSerializer.Deserialize<T>(line, _jsonSerializerOptions);
}
catch (JsonException)
{
// Skip invalid JSON lines
continue;
}
if (item != null)
{
yield return item;
}
}
}
}
Key Features
- Native Async Streams: Uses C# 8.0+
IAsyncEnumerable<T>for idiomatic async streaming - Cancellation Support: Full
CancellationTokensupport withEnumeratorCancellationattribute - Automatic Validation: Built-in FluentValidation support for request validation
- Error Handling: Graceful handling of invalid JSON lines and empty responses
- Type Safety: Full type safety with nullable reference types
- HTTP Optimization: Uses
HttpCompletionOption.ResponseHeadersReadfor efficient streaming - Flexible Response Handling: Works with both
application/jsonandtext/event-streamcontent types
Best Practices
- Use await foreach: The
await foreachsyntax provides the cleanest way to consume async streams - Always use using statements: Wrap clients in
usingstatements to ensure proper disposal - Leverage cancellation tokens: Pass cancellation tokens to enable graceful cancellation
- Handle OperationCanceledException: Catch this exception when using cancellation tokens
- Dispose enumerators: When manually using enumerators, always dispose them with
DisposeAsync() - Use LINQ for async streams: Take advantage of
System.Linq.Asyncfor filtering and transforming streams - Validate inputs: The SDK automatically validates request objects using FluentValidation
Common Use Cases
AI and LLM Integration
Streaming is essential for AI applications where responses are generated incrementally:
// Stream AI-generated content
await foreach (var chunk in client.ai.GenerateTextAsync(prompt))
{
Console.Write(chunk.Text); // Display each token as it's generated
}
Progress Monitoring
Track long-running operations in real-time:
await foreach (var update in client.jobs.ProcessAsync(jobId))
{
Console.WriteLine($"Progress: {update.PercentComplete}%");
if (update.Status == "completed")
{
Console.WriteLine("Job finished!");
break;
}
}
Live Data Feeds
Process continuous data streams:
await foreach (var logEntry in client.monitoring.StreamLogsAsync())
{
if (logEntry.Level == "ERROR")
{
// Handle errors immediately
await AlertOnErrorAsync(logEntry);
}
}
Error Handling
Handle errors during streaming operations appropriately:
try
{
await foreach (var chunk in client.api.StreamAsync(request))
{
ProcessChunk(chunk);
}
}
catch (HttpRequestException e)
{
Console.WriteLine($"Connection lost: {e.Message}");
}
catch (ValidationException e)
{
Console.WriteLine($"Validation error: {e.Message}");
}
Performance Considerations
- Memory Usage: Streaming responses are memory-efficient because data is processed incrementally
- Latency: First chunk arrives faster than waiting for a complete response
- Timeouts: Configure appropriate timeouts for streaming connections
- Connection Management: The SDK handles connection lifecycle automatically
Next Steps
- Learn about Authentication in liblab SDKs
- Explore Hooks for customizing request/response handling
- Check out Language Design & Methodology for more details