Langfuse joins ClickHouse! Learn more →
DocsObservabilityFeaturesEvent queuing/batching

Event Queuing/Batching

Langfuse's client SDKs and integrations are all designed to queue and batch requests in the background to optimize API calls and network time. Batches are determined by a combination of time and size (number of events and size of batch).

Configuration

All integrations have a sensible default configuration, but you can customize the batching behaviour to suit your needs.

Option (Python) [SDK constructor, Environment]Option (JS)Description
flush_at, LANGFUSE_FLUSH_ATflushAtThe maximum number of events to batch up before sending.
flush_interval, LANGFUSE_FLUSH_INTERVAL (s)flushInterval (seconds)The maximum time to wait before sending a batch in seconds.

You can e.g. set flushAt=1 to send every event immediately, or flushInterval=1 to send every second.

Manual flushing

In short-lived environments like serverless functions (e.g., Vercel Functions, AWS Lambda), you should explicitly flush the traces before the process exits or the runtime environment is frozen. If you do not flush the client, you may lose events.

If you want to send a batch immediately, you can call the flush method on the client. In case of network issues, flush will log an error and retry the batch, it will never throw an exception.

from langfuse import get_client

# access the client directly

langfuse = get_client()

# Flush all pending observations

langfuse.flush()

If you exit the application, use shutdown method to make sure all requests are flushed and pending requests are awaited before the process exits. On success of this function, no more events will be sent to Langfuse API.

from langfuse import get_client

langfuse = get_client()

langfuse.shutdown()

The LangfuseSpanProcessor buffers events and sends them in batches, so a final flush ensures no data is lost.

You can export the processor from your OTEL SDK setup file.

import { NodeSDK } from "@opentelemetry/sdk-node";
import { LangfuseSpanProcessor } from "@langfuse/otel";

// Export the processor to be able to flush it
export const langfuseSpanProcessor = new LangfuseSpanProcessor();

const sdk = new NodeSDK({
  spanProcessors: [langfuseSpanProcessor],
});

sdk.start();

Then, in your serverless function handler, call forceFlush() before the function exits.

import { langfuseSpanProcessor } from "./instrumentation";

export async function handler(event, context) {
  // ... your application logic ...

  // Flush before exiting
  await langfuseSpanProcessor.forceFlush();
}
from langfuse import get_client

# access the client directly
langfuse = get_client()

# Flush all pending observations
langfuse.flush()
from langfuse import get_client

langfuse = get_client()

langfuse.flush()

# access the client directly
langfuse_handler.client.flush()
await langfuseHandler.flushAsync();

If you exit the application, use shutdownAsync method to make sure all requests are flushed and pending requests are awaited before the process exits.

await langfuseHandler.shutdownAsync();

Was this page helpful?