Skip to main content
The Langfuse adapter allows you to pull data from Langfuse deployments and convert it to EvaluationRow format for use in evaluation pipelines. This enables you to evaluate production conversations and tool calling traces directly from your Langfuse deployment.

Installation

To use the Langfuse adapter, you need to install the Langfuse dependencies:
pip install 'eval-protocol[langfuse]'

Basic Usage

"""
Example for using Langfuse with the aha judge.
"""

from datetime import datetime
import os
import pytest

from eval_protocol import (
    evaluation_test,
    aha_judge,
    multi_turn_assistant_to_ground_truth,
    EvaluationRow,
    SingleTurnRolloutProcessor,
    create_langfuse_adapter,
    DynamicDataLoader,
)

def langfuse_data_generator() -> list[EvaluationRow]:
    """Fetch trace summaries then sample and fetch details from Langfuse."""
    adapter = create_langfuse_adapter()
    return adapter.get_evaluation_rows(
        to_timestamp=datetime(2025, 9, 12, 0, 11, 18),
        limit=711,
        sample_size=50,
        sleep_between_gets=3.0,
        max_retries=5,
    )

@pytest.mark.parametrize(
    "completion_params",
    [
        {"model": "gpt-4.1"},
        {
            "max_tokens": 131000,
            "extra_body": {"reasoning_effort": "medium"},
            "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b",
        },
        {
            "max_tokens": 131000,
            "extra_body": {"reasoning_effort": "low"},
            "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b",
        },
    ],
)
@evaluation_test(
    data_loaders=DynamicDataLoader(
        generators=[langfuse_data_generator],
    ),
    rollout_processor=SingleTurnRolloutProcessor(),
    preprocess_fn=multi_turn_assistant_to_ground_truth,
    max_concurrent_evaluations=2,
)
async def test_llm_judge(row: EvaluationRow) -> EvaluationRow:
    return await aha_judge(row)

Configuration

The adapter uses the Langfuse client configuration. Set up your Langfuse credentials using environment variables:
export LANGFUSE_PUBLIC_KEY="your_public_key"
export LANGFUSE_SECRET_KEY="your_secret_key" 
export LANGFUSE_HOST="https://your-langfuse-deployment.com"  # Optional, defaults to Langfuse cloud

API Reference

LangfuseAdapter

The main adapter class for pulling data from Langfuse.

get_evaluation_rows()

Pull traces from Langfuse and convert to EvaluationRow format.
def get_evaluation_rows(
    self,
    limit: int = 100,
    sample_size: Optional[int] = None,
    tags: Optional[List[str]] = None,
    user_id: Optional[str] = None,
    session_id: Optional[str] = None,
    name: Optional[str] = None,
    environment: Optional[str] = None,
    version: Optional[str] = None,
    release: Optional[str] = None,
    fields: Optional[str] = None,
    hours_back: Optional[int] = None,
    from_timestamp: Optional[datetime] = None,
    to_timestamp: Optional[datetime] = None,
    include_tool_calls: bool = True,
    sleep_between_gets: float = 2.5,
    max_retries: int = 3,
    span_name: Optional[str] = None,
    converter: Optional[TraceConverter] = None,
) -> List[EvaluationRow]
Parameters:
  • limit - Max number of trace summaries to collect via pagination
  • sample_size - Optional number of traces to randomly sample (if None, process all)
  • tags - Filter by specific tags
  • user_id - Filter by user ID
  • session_id - Filter by session ID
  • name - Filter by trace name
  • environment - Filter by environment (e.g., production, staging, development)
  • version - Filter by trace version
  • release - Filter by trace release
  • fields - Comma-separated list of fields to include (e.g., ‘core,scores,metrics’)
  • hours_back - Filter traces from this many hours ago
  • from_timestamp - Explicit start time (overrides hours_back)
  • to_timestamp - Explicit end time (overrides hours_back)
  • include_tool_calls - Whether to include tool calling traces
  • sleep_between_gets - Sleep time between individual trace.get() calls (2.5s for 30 req/min limit)
  • max_retries - Maximum retries for rate limit errors
  • span_name - If provided, extract messages from generations within this named span
  • converter - Optional custom converter implementing TraceConverter protocol
  • metadata - Filter by exact metadata match (dict)
  • requester_metadata - Filter by exact requester metadata match (dict)
  • requester_metadata_contains - Filter by substring in requester metadata values

get_evaluation_rows_by_ids()

Get specific traces by their IDs and convert to EvaluationRow format.
def get_evaluation_rows_by_ids(
    self,
    trace_ids: List[str],
    include_tool_calls: bool = True,
    span_name: Optional[str] = None,
    converter: Optional[TraceConverter] = None,
) -> List[EvaluationRow]
Parameters:
  • trace_ids - List of trace IDs to fetch
  • include_tool_calls - Whether to include tool calling traces
  • span_name - If provided, extract messages from generations within this named span
  • converter - Optional custom converter implementing TraceConverter protocol

upload_scores()

Upload evaluation scores back to Langfuse traces.
def upload_scores(
    self,
    rows: List[EvaluationRow], 
    model_name: str, 
    mean_score: float
) -> None
Parameters:
  • rows - List of EvaluationRow objects with session_data containing trace IDs
  • model_name - Name of the model (used as the score name in Langfuse)
  • mean_score - The calculated mean score to push to Langfuse

Factory Function

For convenience, you can use the factory function:
from eval_protocol.integrations.langfuse_adapter import create_langfuse_adapter

adapter = create_langfuse_adapter()
rows = adapter.get_evaluation_rows(limit=10)

Source Code

The complete implementation is available on GitHub: eval_protocol/adapters/langfuse.py

Filtering Examples

Filter by Tags

# Get production conversations
rows = adapter.get_evaluation_rows(
    limit=50,
    tags=["production", "experiment_v2"]
)

Filter by User

# Get conversations for specific user
rows = adapter.get_evaluation_rows(
    limit=50,
    user_id="user_123"
)

Filter by Time Range

# Get conversations from last 24 hours
rows = adapter.get_evaluation_rows(
    limit=200,        # Collect up to 200 trace summaries
    sample_size=50,   # Sample 50 for full processing
    hours_back=24
)

# Or use explicit timestamps
from datetime import datetime, timedelta
rows = adapter.get_evaluation_rows(
    limit=100,
    sample_size=30,
    from_timestamp=datetime(2024, 1, 1),
    to_timestamp=datetime(2024, 1, 31)
)

Filter by Metadata

# Filter by exact metadata match
rows = adapter.get_evaluation_rows(
    limit=100,
    metadata={"model": "gpt-4", "version": "v1.0"},
    requester_metadata={"user_type": "premium"}
)

# Filter by substring in requester metadata values
rows = adapter.get_evaluation_rows(
    limit=100,
    requester_metadata_contains="experiment_123"
)

Combined Filters with Rate Limiting

# Complex filtering with rate limit handling
rows = adapter.get_evaluation_rows(
    limit=500,                    # Collect many trace summaries
    sample_size=100,              # Sample subset for processing
    tags=["production"],
    user_id="user_123", 
    hours_back=24,
    include_tool_calls=True,
    sleep_between_gets=2.0,       # 2 second delay between API calls
    max_retries=5,                # Retry up to 5 times on rate limits
)

Tool Calling Support

The adapter automatically handles tool calling traces from Langfuse:
# Include tool calls (default behavior)
rows = adapter.get_evaluation_rows(
    limit=10,
    include_tool_calls=True
)

# Exclude tool calls for simpler evaluation
rows = adapter.get_evaluation_rows(
    limit=10,
    include_tool_calls=False
)
Tool calls are preserved in the Message format with tool_calls, tool_call_id, and function_call fields as appropriate.

Sampling and Rate Limiting

The adapter includes intelligent sampling and rate limiting to work efficiently with Langfuse’s API limits:

Two-Stage Process

  1. Collect trace summaries - Fast pagination to gather up to limit trace IDs
  2. Sample and fetch details - Randomly sample sample_size traces for full processing
This approach lets you:
  • Survey large datasets efficiently without hitting rate limits
  • Get representative samples from your trace population
  • Control API usage while still getting meaningful evaluation data

Rate Limit Handling

# Default settings work with Langfuse free tier (30 req/min)
rows = adapter.get_evaluation_rows(
    limit=1000,               # Survey 1000 traces
    sample_size=50,           # Process 50 in detail
    sleep_between_gets=2.5,   # 2.5s delay = ~24 req/min
    max_retries=3,            # Retry on 429 errors
)
For higher rate limits, you can reduce the sleep time:
# For paid plans with higher limits
rows = adapter.get_evaluation_rows(
    sleep_between_gets=0.5,   # Faster processing
    max_retries=5,
)

Data Conversion

The adapter converts Langfuse traces to EvaluationRow format with intelligent handling of different input formats:

Supported Trace Formats

  • Dict Format
  • List Format
  • String Format
# OpenAI-style messages
{
    "messages": [
        {"role": "user", "content": "Hello"},
        {"role": "assistant", "content": "Hi there!"}
    ],
    "tools": [
        {
            "type": "function",
            "function": {
                "name": "get_weather",
                "description": "Get current weather",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "location": {"type": "string"}
                    }
                }
            }
        }
    ]
}

# Single message object
{
    "role": "user",
    "content": "What is the weather?"
}

# Simple prompt format
{
    "prompt": "Explain quantum computing"
}

Metadata Preservation

The adapter stores the original Langfuse trace ID in the evaluation row metadata:
for row in rows:
    trace_id = row.input_metadata.session_data.get("langfuse_trace_id")
    print(f"Processing trace: {trace_id}")

Environment and Release Filtering

Filter traces by deployment environment and release versions:
# Get traces from production environment only
rows = adapter.get_evaluation_rows(
    limit=200,
    environment="production",
    version="v2.1.0",
    release="stable"
)

Advanced Features

Span-Based Message Extraction

Extract messages from specific spans within traces. Perfect for multi-agent workflows where different subagents use different LLMs - specify the span name to evaluate a particular subagent’s LLM performance in isolation.
# Extract messages from a specific span (e.g., "chat_completion")
rows = adapter.get_evaluation_rows(
    limit=100,
    span_name="judge_subagent",  # Only get messages from this span
    tags=["production"]
)

Custom Trace Conversion

If you have traces in a particular pattern, you can also implement custom trace to EvaluationRow logic using the TraceConverter protocol:
def converter(trace: TraceWithFullDetails, include_tool_calls: bool, span_name: Optional[str]) -> Optional[EvaluationRow]:
    messages = []
    tools = []

    if trace.input:
        for d in trace.input:
            if d["role"] == "tool":
                tools.append(d["content"])
            else:
                messages.append(dict_to_message(d, include_tool_calls))
    if trace.output:
        messages.append(dict_to_message(trace.output, include_tool_calls))
    
    return EvaluationRow(
        messages=messages,
        tools=tools,
        input_metadata=InputMetadata(
            session_data={
                "langfuse_trace_id": trace.id,  # Store the trace ID here
            }
        ),
    )

rows = adapter.get_evaluation_rows(
    limit=100,
    converter=converter
)

Complete Example

For a production-ready example that integrates Langfuse with Arena-Hard-Auto evaluation, see: llm_judge_langfuse.py This example demonstrates large-scale data collection, rate limit handling, multi-model comparison, and automated Arena-Hard-Auto judging to create model leaderboards without writing custom evaluation logic.
I