The @evaluation_test decorator is the core component for creating pytest-based evaluation tests in the Evaluation Protocol. It enables you to evaluate AI models by running rollouts and applying evaluation criteria to measure performance.

Key Concepts

Before diving into the API, it’s important to understand the terminology used in the Evaluation Protocol:
  • Invocation: A single execution of a test function that can generate 1 or more experiments
  • Experiment: A group of runs for a combination of parameters (multiple experiments if num_runs > 1)
  • Run: A group of rollouts (multiple run IDs if num_runs > 1)
  • Rollout: The execution/process that produces a trajectory
  • Trajectory: The result produced by a rollout — a list of OpenAI Chat Completion messages
  • Row: Both input and output of an evaluation (e.g., a task within a dataset)
  • Dataset: A collection of rows (List[EvaluationRow])
  • Eval: A rubric implemented in the test function body that produces a score from 0 to 1
Each of these entities has a unique ID for easy grouping and identification.

Basic Usage

from typing import List
from eval_protocol.pytest import evaluation_test
from eval_protocol.models import EvaluationRow

@evaluation_test(
    completion_params=[
        {"model": "openai/gpt-4o", "temperature": 0.1},
        {"model": "openai/gpt-3.5-turbo", "temperature": 0.1},
    ],
    input_dataset=["path/to/dataset.jsonl"],
    passed_threshold=0.8,
    mode="all"
)
def test_math_reasoning(rows: List[EvaluationRow]) -> List[EvaluationRow]:
    """Evaluate mathematical reasoning capabilities."""
    for row in rows:
        # Your evaluation logic here
        score = evaluate_math_reasoning(row.messages)
        row.evaluation_result.score = score
    
    return rows

Parameters

No single parameter is strictly required. Provide completion_params whenever your rollout processor performs model calls (e.g., SingleTurnRolloutProcessor).
completion_params
List[dict]
required
Generation parameters for the rollout. The required fields depend on the rollout processor used:For SingleTurnRolloutProcessor and AgentRolloutProcessor:
  • Must include a model field using a LiteLLM-compatible provider route (e.g., openai/gpt-4o, anthropic/claude-3-sonnet, fireworks_ai/*)
  • Optional: temperature, max_tokens, extra_body, etc.
  • See the LiteLLM providers list for supported prefixes and models: https://docs.litellm.ai/docs/providers
For PydanticAgentRolloutProcessor:
  • Must include model field (the canonical way to pass model names to LLM clients)
  • Optional provider field (defaults to “openai” if not specified)
  • Example: {"model": "accounts/fireworks/models/kimi-k2-instruct", "provider": "fireworks"}
  • The agent factory uses the model field to create the appropriate Pydantic AI model
For MCPGymRolloutProcessor:
  • Must include a model field using a LiteLLM-compatible provider route
  • Used to create the policy for environment interaction
For NoOpRolloutProcessor:
  • Can be any value (not used for actual model calls)
  • Often set to {"model": "not-used-offline"} for clarity
data_loaders
Optional[Sequence[EvaluationDataLoader] | EvaluationDataLoader]
Data loaders to produce evaluation rows. Preferred for reusable, parameterized inputs. Each loader may emit multiple variants; rows inherit metadata describing the loader, variant ID, and preprocessing state. Cannot be combined with input_dataset, input_messages, or input_rows.See Data Loader for details.
input_messages
Optional[List[InputMessagesParam]]
Messages to send to the model. Useful when you don’t have a dataset but can hard-code messages. Will be passed as “input_dataset” to the test function.
input_dataset
Optional[List[DatasetPathParam]]
Paths to JSONL datasets that will be loaded using load_jsonl(). Each path can be either a local file path or an HTTP/HTTPS URL. Provide a dataset_adapter to convert the raw JSONL data to EvaluationRows.Behavior:
  • Files are loaded using load_jsonl() which reads JSONL format (one JSON object per line)
  • Supports both local files and HTTP URLs: Local file paths and HTTP/HTTPS URLs are both supported
  • Robust parsing: Automatically skips blank or whitespace-only lines to handle trailing newlines gracefully
  • Error handling: Provides detailed error messages including line numbers and row IDs when JSON parsing fails
  • Timeout support: HTTP requests have a 30-second timeout
  • When multiple paths are provided and combine_datasets=True (default), files are concatenated into one dataset
  • When combine_datasets=False, each path is parameterized into separate test invocations
  • Raw JSONL data is passed to the dataset_adapter function for conversion to EvaluationRow format
Supported formats:
  • Local files: "path/to/dataset.jsonl"
  • HTTP URLs: "http://example.com/dataset.jsonl"
  • HTTPS URLs: "https://example.com/dataset.jsonl"
Example:
@evaluation_test(
    input_dataset=[
        "path/to/local_dataset.jsonl",
        "https://example.com/remote_dataset.jsonl"
    ],
    dataset_adapter=my_adapter,
    completion_params=[{"model": "gpt-4"}]
)
input_rows
Optional[List[EvaluationRow]]
Pre-constructed EvaluationRow objects to use directly. Useful when you already have messages and/or metadata prepared. Will be passed as “input_dataset” to the test function.Note: cannot be combined with data_loaders.
dataset_adapter
Callable[[List[Dict[str, Any]]], Dataset]
Function to convert input dataset to a list of EvaluationRows. Defaults to default_dataset_adapter.
rollout_processor
RolloutProcessor
Function used to perform the rollout. Defaults to NoOpRolloutProcessor().
evaluation_test_kwargs
Optional[List[EvaluationInputParam]]
Additional keyword arguments for the evaluation function.
rollout_processor_kwargs
Optional[RolloutProcessorInputParam]
Additional keyword arguments for the rollout processor.
aggregation_method
AggregationMethod
How to aggregate scores across runs. One of: “mean”, “max”, “min”. Defaults to “mean”.
preprocess_fn
Optional[Callable[[List[EvaluationRow]], List[EvaluationRow]]]
Optional preprocessing function applied to rows before rollout. Use this to expand multi-turn conversations (e.g., multi_turn_assistant_to_ground_truth) or filter/transform rows.Note: when using data_loaders, pass preprocess_fn to the loader itself (e.g., DynamicDataLoader(preprocess_fn=...)). When data_loaders is provided, the decorator-level preprocess_fn is not applied to avoid double-processing.
passed_threshold
Optional[Union[EvaluationThreshold, float, dict]]
Threshold configuration for test success. Can be a float or EvaluationThreshold object. Success rate must be above success, and if set, standard error must be below standard_error.
num_runs
int
Number of times to repeat the rollout and evaluations. Defaults to 1.
max_dataset_rows
Optional[int]
Limit dataset to the first N rows.
mcp_config_path
Optional[str]
Path to MCP config file that follows MCPMultiClientConfiguration schema.
max_concurrent_rollouts
int
Maximum number of concurrent rollouts to run in parallel. Defaults to 8.
max_concurrent_evaluations
int
Maximum number of concurrent evaluations to run in parallel. Defaults to 64.
server_script_path
Optional[str]
Path to the MCP server script to run. Defaults to “examples/tau2_mcp/server.py”.
steps
int
Number of rollout steps to execute. Defaults to 30.
mode
EvaluationTestMode
Evaluation mode. “pointwise” (default) applies test function to each row individually. “groupwise” applies test function to a group of rollout results from the same original row (for use cases such as DPO/GRPO). “all” applies test function to the whole dataset.
combine_datasets
bool
Whether to combine multiple datasets. Defaults to True.
logger
Optional[DatasetLogger]
DatasetLogger to use for logging. If not provided, a default logger will be used.
exception_handler_config
Optional[ExceptionHandlerConfig]
Configuration for exception handling and backoff retry logic. If not provided, a default configuration will be used with common retryable exceptions. See the ExceptionHandlerConfig section below for detailed configuration options.

ExceptionHandlerConfig

The ExceptionHandlerConfig parameter allows you to customize exception handling and retry logic for your evaluation tests. This configuration is defined in eval_protocol/pytest/exception_config.ExceptionHandlerConfig.

Key Features

  • Retryable Exceptions: Configure which exceptions should trigger retry attempts
  • Backoff Strategies: Choose between exponential or constant backoff with configurable delays
  • Environment Variable Overrides: Automatically respect EP_MAX_RETRY and EP_FAIL_ON_MAX_RETRY settings
  • Custom Giveup Logic: Define custom conditions for when to stop retrying

Configuration Classes

ExceptionHandlerConfig

The main configuration class that controls exception handling behavior:
@dataclass
class ExceptionHandlerConfig:
    # Exceptions that should be retried using backoff
    retryable_exceptions: Set[Type[Exception]] = DEFAULT_RETRYABLE_EXCEPTIONS
    
    # Backoff configuration
    backoff_config: BackoffConfig = BackoffConfig()

BackoffConfig

Controls the retry backoff behavior:
@dataclass
class BackoffConfig:
    strategy: str = "expo"           # "expo" or "constant"
    base_delay: float = 1.0         # Base delay in seconds
    max_delay: float = 60.0         # Maximum delay in seconds
    max_tries: int = 3              # Maximum number of retry attempts
    jitter: Union[None, Callable] = None  # Jitter function for randomization
    factor: float = 2.0             # Factor for exponential backoff
    raise_on_giveup: bool = True    # Whether to raise exception when giving up
    giveup_func: Callable[[Exception], bool] = lambda e: False  # Custom giveup logic

Default Configuration

By default, the following exceptions are considered retryable:
  • Standard library exceptions: ConnectionError, TimeoutError, OSError
  • Requests library exceptions: requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.HTTPError, requests.exceptions.RequestException
  • HTTPX library exceptions: httpx.ConnectError, httpx.TimeoutException, httpx.NetworkError, httpx.RemoteProtocolError

Backoff Strategies

Exponential Backoff (Default)

  • Starts with base_delay and multiplies by factor each retry
  • Good for transient failures that may resolve quickly
  • Example: 1s → 2s → 4s → 8s → 16s (capped at max_delay)

Constant Backoff

  • Uses the same delay (base_delay) for all retries
  • Good for predictable, consistent retry timing
  • Example: 2s → 2s → 2s → 2s

Environment Variable Integration

The configuration automatically respects these environment variables:
  • EP_MAX_RETRY: Overrides max_tries in BackoffConfig
  • EP_FAIL_ON_MAX_RETRY: Controls raise_on_giveup behavior

Example Usage

Basic Custom Configuration

from eval_protocol.pytest.exception_config import ExceptionHandlerConfig, BackoffConfig

# Custom exception handling configuration
custom_config = ExceptionHandlerConfig(
    backoff_config=BackoffConfig(
        strategy="expo",
        base_delay=2.0,
        max_delay=120.0,
        max_tries=5,
        jitter=None
    )
)

@evaluation_test(
    completion_params=[{"model": "gpt-4"}],
    exception_handler_config=custom_config
)
def test_with_custom_retry_logic(rows: List[EvaluationRow]) -> List[EvaluationRow]:
    # Your evaluation logic here
    return rows

Aggressive Retry Strategy

# Aggressive retry for unreliable networks
aggressive_config = ExceptionHandlerConfig(
    backoff_config=BackoffConfig(
        strategy="expo",
        base_delay=0.5,    # Start with 0.5s delay
        max_delay=30.0,    # Cap at 30s
        max_tries=10,      # Try up to 10 times
        jitter=None        # No jitter for predictable timing
    )
)

Conservative Retry Strategy

# Conservative retry for stable networks
conservative_config = ExceptionHandlerConfig(
    backoff_config=BackoffConfig(
        strategy="constant",
        base_delay=5.0,    # 5 second constant delay
        max_tries=3,       # Only 3 attempts
        jitter=None
    )
)

Custom Exception Handling

from typing import Set, Type

# Only retry on specific exceptions
custom_exceptions: Set[Type[Exception]] = {
    ConnectionError,
    TimeoutError,
    # Add your custom exceptions here
}

custom_config = ExceptionHandlerConfig(
    retryable_exceptions=custom_exceptions,
    backoff_config=BackoffConfig(
        strategy="expo",
        base_delay=1.0,
        max_tries=3
    )
)

Evaluation Modes

Pointwise Mode (Default)

In pointwise mode, your test function processes each row individually, enabling pipelined evaluation:
@evaluation_test(
    completion_params=[{"model": "gpt-4"}],
    input_dataset=["dataset.jsonl"],
    mode="pointwise"
)
def test_pointwise_evaluation(row: EvaluationRow) -> EvaluationRow:
    """Process each row individually."""
    # Evaluate single row
    score = evaluate_single_row(row)
    row.evaluation_result.score = score
    
    return row
Requirements:
  • Function must have a parameter named row of type EvaluationRow
  • Function must return EvaluationRow

Groupwise Mode

In groupwise mode, your test function processes groups of rollout results from the same original row, useful for comparing different models or parameters:
@evaluation_test(
    completion_params=[
        {"model": "gpt-4", "temperature": 0.1},
        {"model": "gpt-3.5-turbo", "temperature": 0.1},
    ],
    input_dataset=["dataset.jsonl"],
    mode="groupwise"
)
def test_groupwise_evaluation(rows: List[EvaluationRow]) -> List[EvaluationRow]:
    """Process groups of rows from the same original input."""
    # Compare results across different models/parameters
    scores = compare_model_outputs(rows)
    for i, row in enumerate(rows):
        row.evaluation_result.score = scores[i]
    
    return rows
Requirements:
  • Function must have a parameter named rows of type List[EvaluationRow]
  • Function must return List[EvaluationRow]
  • Must provide at least 2 completion parameters

All Mode

In all mode, your test function receives the entire dataset and processes all rows together:
@evaluation_test(
    completion_params=[{"model": "gpt-4"}],
    input_dataset=["dataset.jsonl"],
    mode="all"
)
def test_all_evaluation(rows: List[EvaluationRow]) -> List[EvaluationRow]:
    """Process all rows together."""
    # Access to full dataset for cross-row analysis
    for row in rows:
        # Evaluate each row
        score = evaluate_single_row(row)
        row.evaluation_result.score = score
    
    return rows
Requirements:
  • Function must have a parameter named rows of type List[EvaluationRow]
  • Function must return List[EvaluationRow]

Threshold Configuration

You can set thresholds for test success using the passed_threshold parameter:
# Simple threshold (just success rate)
@evaluation_test(
    completion_params=[{"model": "gpt-4"}],
    passed_threshold=0.8
)

# Advanced threshold with standard error
@evaluation_test(
    completion_params=[{"model": "gpt-4"}],
    passed_threshold={
        "success": 0.8,
        "standard_error": 0.05
    }
)

# Using EvaluationThreshold object
from eval_protocol.models import EvaluationThreshold

@evaluation_test(
    completion_params=[{"model": "gpt-4"}],
    passed_threshold=EvaluationThreshold(success=0.8, standard_error=0.05)
)

Multiple Runs and Aggregation

Set num_runs > 1 to run multiple evaluations and aggregate results:
@evaluation_test(
    completion_params=[{"model": "gpt-4"}],
    input_dataset=["dataset.jsonl"],
    num_runs=5,
    aggregation_method="mean"
)
def test_with_multiple_runs(rows: List[EvaluationRow]) -> List[EvaluationRow]:
    # This function will be called 5 times
    # Results will be aggregated using the mean
    return rows

Environment Variables

The decorator supports several environment variables for configuration:
  • EP_MAX_DATASET_ROWS: Override max_dataset_rows parameter. Applies to both datasets and input_messages (slices to first N rows).
  • EP_NUM_RUNS: Override the number of runs for evaluation_test.
  • EP_MAX_CONCURRENT_ROLLOUTS: Override the maximum number of concurrent rollouts.
  • EP_INPUT_PARAMS_JSON: JSON object deep-merged into completion_params. Example: {"temperature":0,"extra_body":{"reasoning":{"effort":"low"}}}.
  • EP_PRINT_SUMMARY: Set to “1” to print a one-line evaluation summary to stdout.
  • EP_SUMMARY_JSON: File or directory path to write a JSON summary artifact. See “Summary artifacts” for naming behavior.
  • Retry-related environment variables are documented in the Retries and failure policy section.

Return Values

Your test function must return the appropriate type based on the mode:
  • Pointwise mode: EvaluationRow
  • Groupwise mode: List[EvaluationRow]
  • All mode: List[EvaluationRow]
Each returned row should have:
  • evaluation_result.score: A float between 0 and 1
  • Optional evaluation_result.metrics: Additional metric scores

Dataset loading and input formats

  • Data loaders (data_loaders): Preferred for reusable and parameterized inputs. Accepts one or more EvaluationDataLoader instances (e.g., DynamicDataLoader, InlineDataLoader). Each loader can emit multiple variants and apply preprocess_fn internally. Cannot be combined with input_dataset, input_messages, or input_rows.
  • Datasets (input_dataset): You can pass a single path or a list of paths to JSONL files. Files are loaded using load_jsonl() which supports both local files and HTTP/HTTPS URLs. The function reads JSONL format (one JSON object per line) with robust error handling, automatically skips blank lines, and provides detailed error messages with line numbers and row IDs. When a list is provided and combine_datasets=True (default), files are concatenated into one dataset; when combine_datasets=False, each path is parameterized into separate test invocations.
  • Input messages (input_messages): Accepts either a single row as List[Message] or many rows as List[List[Message]]. When EP_MAX_DATASET_ROWS is set, the list is sliced before parameterization.
  • Input rows (input_rows): Similar to input_messages, when EP_MAX_DATASET_ROWS is set, the list is sliced before parameterization.
  • Dataset adapter (dataset_adapter): Receives raw JSONL rows (as loaded by load_jsonl()) and must return List[EvaluationRow].

Error Handling

The decorator handles errors gracefully:
  • Failed rollouts are still evaluated (you can choose to give them a score of 0)
  • Assertion errors are logged with status “finished”
  • Other exceptions are logged with status “error”
  • Summary generation failures don’t cause test failures
  • For retry behavior and configuration, see ExceptionHandlerConfig and Retries and failure policy.

Row IDs and metadata

  • Stable row_id values are generated for rows missing row.input_metadata.row_id, using a deterministic hash of row content. This ensures consistent IDs across processes and runs.
  • EvalMetadata is created for each evaluation with: name (test function name), description (docstring), num_runs, aggregation_method, and threshold info. Its status transitions from “running” to “finished” or “error”.
  • completion_params used for a row are recorded in row.input_metadata.completion_params.

Dataset combination and parameterization

  • Parameter combinations are generated across data_loaders, input_dataset, completion_params, input_messages, input_rows, and evaluation_test_kwargs.
  • Pytest parameter names (in order when present): dataset_path, completion_params, input_messages, input_rows, data_loaders, evaluation_test_kwargs.
  • Set combine_datasets=False to parameterize each dataset path separately. With True (default), multiple paths are combined into a single logical dataset per invocation.

Summary artifacts

When EP_SUMMARY_JSON is set:
  • If a directory or a non-.json path is provided, a file is written inside with the base name: "{suite}__{model}__{mode}__runs{num_runs}.json", where suite is the test function name and model is a sanitized slug.
  • If a file path is provided, it writes that file. If an “effort” tag is detected in completion_params (e.g., reasoning_effort), a variant suffixed with __{effort} is written instead.
  • The summary includes: suite, model, agg_score, num_runs, rows, optional 95% CI (agg_ci_low, agg_ci_high) when aggregation_method="mean", and a timestamp.

Retries and failure policy

  • Rollouts are retried up to EP_MAX_RETRY times using the rollout_processor_with_retry wrapper.
  • Permanent failures are, by default, raised immediately to fail the test. Override with EP_FAIL_ON_MAX_RETRY=false to continue and include errored rows (you can score them as 0 in your evaluation).
  • Exception handling and retry logic can be customized via exception_handler_config.

Environment Variables

The following environment variables control retry behavior:
  • EP_MAX_RETRY: Maximum number of retry attempts (default: 0, meaning no retries)
  • EP_FAIL_ON_MAX_RETRY: Whether to fail the test after max retries (default: “true”)

Retry Implementation Details

The retry logic is implemented in the rollout_processor_with_retry function which:
  • Wraps the rollout processor with configurable backoff retry
  • Handles both retryable and non-retryable exceptions
  • Uses the Python backoff library for exponential/constant backoff strategies
  • Processes rows concurrently while handling retries transparently
  • Logs all results (success or failure) through the configured logger

Custom Retry Configuration

For advanced retry logic, you can provide a custom ExceptionHandlerConfig:
from eval_protocol.pytest.exception_config import ExceptionHandlerConfig, BackoffConfig

# Aggressive retry strategy for unreliable networks
aggressive_retry = ExceptionHandlerConfig(
    backoff_config=BackoffConfig(
        strategy="expo",
        base_delay=0.5,  # Start with 0.5s delay
        max_delay=30.0,  # Cap at 30s
        max_tries=10,    # Try up to 10 times
        jitter=None      # No jitter for predictable timing
    )
)

@evaluation_test(
    completion_params=[{"model": "gpt-4"}],
    exception_handler_config=aggressive_retry
)
def test_with_aggressive_retries(rows: List[EvaluationRow]) -> List[EvaluationRow]:
    # Your evaluation logic here
    return rows

Rollout processors

  • A rollout processor turns input rows into completed rows (e.g., by calling a model). The decorator passes a RolloutProcessorConfig containing completion_params, mcp_config_path, server_script_path, max_concurrent_rollouts, and steps.
  • Built-ins include:
    • NoOpRolloutProcessor(): passes rows through unchanged (useful for offline evaluation of pre-generated outputs).
    • SingleTurnRolloutProcessor(): performs a single chat completion via LiteLLM and appends the assistant message.
    • AgentRolloutProcessor(): runs multi-turn agent loops with MCP tool calling.
    • PydanticAgentRolloutProcessor(): runs Pydantic AI agents with structured tool calling.
    • MCPGymRolloutProcessor(): runs interactive environments via MCP servers.
  • All processors are wrapped with rollout_processor_with_retry for automatic retry handling.

RolloutProcessorConfig

The RolloutProcessorConfig is passed to all rollout processors and contains the configuration needed to execute rollouts. It’s defined in eval_protocol/pytest/types.py.

Configuration Fields

completion_params
CompletionParams
required
Model and generation parameters for the rollout. The structure and required fields depend on the rollout processor:SingleTurnRolloutProcessor & AgentRolloutProcessor:
  • Must include model field with LiteLLM-compatible provider route
  • Supports standard LiteLLM parameters: temperature, max_tokens, extra_body, etc.
PydanticAgentRolloutProcessor:
  • Must include model field (canonical way to pass model names)
  • Optional provider field (defaults to “openai” if not specified)
  • Used to create Pydantic AI model instances via the agent factory
MCPGymRolloutProcessor:
  • Must include model field for environment policy creation
  • Additional parameters passed to the policy constructor
NoOpRolloutProcessor:
  • Can contain any values (not used for actual model calls)
  • Often set to placeholder values for clarity
mcp_config_path
str
required
Path to an MCP client configuration file that follows the MCPMultiClientConfiguration schema. Used by agent and tool-based rollout processors to enumerate available tools and capabilities.
semaphore
asyncio.Semaphore
required
Shared semaphore for unified concurrency control across all rollout processors. Controls the maximum number of concurrent rollouts that can run simultaneously.
server_script_path
Optional[str]
Path to an MCP server script to run. Used by gym-like processors (e.g., MCPGymRolloutProcessor) to launch interactive environments. Defaults to None.
steps
int
Maximum number of rollout steps to execute. Used by multi-turn processors to limit the length of agent conversations. Defaults to 30.
logger
DatasetLogger
Logger to use for capturing mid-rollout logs and debugging information. Defaults to default_logger.
kwargs
dict[str, Any]
Additional keyword arguments specific to the rollout processor. This is where processor-specific configuration is passed, such as:
  • usage_limits for Pydantic AI agents
  • agent for pre-configured agents
  • Custom tool configurations
  • Environment-specific settings
exception_handler_config
Optional[ExceptionHandlerConfig]
Configuration for exception handling and backoff retry logic. If not provided, a default configuration will be used with common retryable exceptions. See the ExceptionHandlerConfig section for detailed configuration options.

Usage in Custom Rollout Processors

When implementing custom rollout processors, you can access these configuration values:
from eval_protocol.pytest.rollout_processor import RolloutProcessor
from eval_protocol.pytest.types import RolloutProcessorConfig
from eval_protocol.models import EvaluationRow
import asyncio

class CustomRolloutProcessor(RolloutProcessor):
    def __call__(self, rows: list[EvaluationRow], config: RolloutProcessorConfig) -> list[asyncio.Task[EvaluationRow]]:
        # Access model configuration
        model = config.completion_params.get("model")
        temperature = config.completion_params.get("temperature", 0.0)
        
        # Access concurrency control
        semaphore = config.semaphore
        
        # Access custom configuration
        custom_setting = config.kwargs.get("custom_setting", "default_value")
        
        # Access MCP configuration
        mcp_config_path = config.mcp_config_path
        
        # Access step limits
        max_steps = config.steps
        
        # Access logger
        logger = config.logger
        
        # Your rollout logic here
        async def process_row(row: EvaluationRow) -> EvaluationRow:
            async with semaphore:
                # Process the row
                return row
        
        return [asyncio.create_task(process_row(row)) for row in rows]

Environment Variable Integration

Several configuration values can be overridden at runtime using environment variables:
  • EP_MAX_CONCURRENT_ROLLOUTS: Overrides the semaphore limit
  • EP_NUM_RUNS: Affects the number of runs for evaluation_test
  • EP_MAX_RETRY: Controls retry behavior via exception_handler_config
  • EP_FAIL_ON_MAX_RETRY: Controls failure behavior after max retries

Processor-Specific Configuration

Different rollout processors use the kwargs field for their specific needs:

AgentRolloutProcessor

config = RolloutProcessorConfig(
    completion_params={
        "model": "openai/gpt-4",
        "temperature": 0.1,
        "max_tokens": 1000
    },
    mcp_config_path="./mcp_config.json",
    semaphore=asyncio.Semaphore(8),
    kwargs={
        "custom_tool_config": {...},
        "agent_instructions": "You are a helpful assistant"
    }
)

PydanticAgentRolloutProcessor

config = RolloutProcessorConfig(
    completion_params={
        "model": "accounts/fireworks/models/kimi-k2-instruct",
        "provider": "fireworks"  # Optional: defaults to "openai"
    },
    mcp_config_path="./mcp_config.json", 
    semaphore=asyncio.Semaphore(8),
    kwargs={
        "agent": my_pydantic_agent,
        "usage_limits": UsageLimits(max_tokens=1000)
    }
)

MCPGymRolloutProcessor

config = RolloutProcessorConfig(
    completion_params={
        "model": "openai/gpt-4",
        "temperature": 0.0
    },
    mcp_config_path="./mcp_config.json",
    semaphore=asyncio.Semaphore(8),
    server_script_path="./gym_server.py",
    kwargs={
        "environment_config": {...},
        "gym_timeout": 300
    }
)

Direct invocation (dual-mode)

Decorated functions can be called directly in addition to running under pytest:
  • Pointwise mode: await test_fn(row) or await test_fn(row=...)
  • Groupwise mode: await test_fn(rows) or await test_fn(rows=[...])
  • All mode: await test_fn(rows) or await test_fn(rows=[...])
When using data_loaders, direct invocation works the same way; the decorator resolves loaders into rows before calling your function. If a decorated function is called directly with row/rows arguments, those are used as-is.

Examples

Basic Math Evaluation (Pointwise Mode)

@evaluation_test(
    completion_params=[{"model": "gpt-4"}],
    input_messages=[
        [Message(role="user", content="What is 2 + 2?")]
    ],
    passed_threshold=0.9,
    mode="pointwise"
)
def test_basic_math(row: EvaluationRow) -> EvaluationRow:
    # Simple correctness check
    response = row.messages[-1].content
    if "4" in response:
        row.evaluation_result.score = 1.0
    else:
        row.evaluation_result.score = 0.0
    
    return row

Multi-Model Comparison (All Mode)

@evaluation_test(
    completion_params=[
        {"model": "gpt-4", "temperature": 0.1},
        {"model": "gpt-3.5-turbo", "temperature": 0.1},
        {"model": "claude-3-sonnet", "temperature": 0.1},
    ],
    input_dataset=["reasoning_tasks.jsonl"],
    passed_threshold=0.7,
    num_runs=3,
    mode="all"
)
def test_reasoning_capabilities(rows: List[EvaluationRow]) -> List[EvaluationRow]:
    for row in rows:
        # Complex evaluation logic
        score = evaluate_reasoning_quality(row.messages)
        row.evaluation_result.score = score
        
        # Add additional metrics
        row.evaluation_result.metrics = {
            "clarity": evaluate_clarity(row.messages),
            "correctness": evaluate_correctness(row.messages)
        }
    
    return rows

Groupwise Evaluation for Model Comparison

@evaluation_test(
    completion_params=[
        {"model": "gpt-4", "temperature": 0.1},
        {"model": "gpt-3.5-turbo", "temperature": 0.1},
    ],
    input_dataset=["comparison_tasks.jsonl"],
    mode="groupwise"
)
def test_model_comparison(rows: List[EvaluationRow]) -> List[EvaluationRow]:
    """Compare outputs from different models on the same input."""
    # Group rows by their original input
    for row in rows:
        # Evaluate relative to other models or absolute quality
        score = evaluate_model_output(row.messages, row.input_metadata.completion_params)
        row.evaluation_result.score = score
    
    return rows

Pointwise Evaluation with Custom Dataset

def custom_dataset_adapter(data: List[Dict[str, Any]]) -> List[EvaluationRow]:
    """Convert custom format to EvaluationRows."""
    rows = []
    for item in data:
        messages = [
            Message(role="user", content=item["question"]),
            Message(role="assistant", content=item["answer"])
        ]
        row = EvaluationRow(messages=messages)
        rows.append(row)
    return rows

@evaluation_test(
    completion_params=[{"model": "gpt-4"}],
    input_dataset=["custom_format.jsonl"],
    dataset_adapter=custom_dataset_adapter,
    mode="pointwise"
)
def test_custom_format(row: EvaluationRow) -> EvaluationRow:
    # Process individual row
    score = evaluate_custom_metric(row.messages)
    row.evaluation_result.score = score
    return row

Complete runnable example (offline, no model calls)

This example evaluates pre-generated assistant messages using the no-op rollout processor.
from typing import Any, Dict, List
from eval_protocol.models import EvaluationRow, Message, EvaluateResult
from eval_protocol.pytest.evaluation_test import evaluation_test
from eval_protocol.pytest.default_no_op_rollout_processor import NoOpRolloutProcessor

def adapter(json_rows: List[Dict[str, Any]]) -> List[EvaluationRow]:
    rows: List[EvaluationRow] = []
    for r in json_rows:
        # Expect fields: question, model_answer, ground_truth
        rows.append(
            EvaluationRow(
                messages=[
                    Message(role="user", content=str(r["question"])) ,
                    Message(role="assistant", content=str(r["model_answer"]))
                ],
                ground_truth=str(r.get("ground_truth", ""))
            )
        )
    return rows

@evaluation_test(
    input_dataset=["offline_answers.jsonl"],
    dataset_adapter=adapter,
    completion_params=[{"model": "not-used-offline"}],
    rollout_processor=NoOpRolloutProcessor(),
    mode="all"
)
def test_offline_eval(rows: List[EvaluationRow]) -> List[EvaluationRow]:
    for row in rows:
        pred = (row.get_assistant_messages()[-1].content or "").strip()
        gt = (row.ground_truth or "").strip()
        score = 1.0 if pred == gt else 0.0
        row.evaluation_result = EvaluateResult(score=score, reason="exact match")
    return rows

Complete runnable example (single-turn online via LiteLLM)

Using data_loaders with DynamicDataLoader

from eval_protocol import evaluation_test, DynamicDataLoader, SingleTurnRolloutProcessor
from eval_protocol.adapters.langfuse import create_langfuse_adapter

def langfuse_data_generator():
    adapter = create_langfuse_adapter()
    return adapter.get_evaluation_rows(limit=20, sample_size=5)

@evaluation_test(
    data_loaders=DynamicDataLoader(generators=[langfuse_data_generator]),
    completion_params=[{"model": "openai/gpt-4o"}],
    rollout_processor=SingleTurnRolloutProcessor(),
    mode="pointwise",
)
def test_with_loader(row: EvaluationRow) -> EvaluationRow:
    # Evaluate row here
    return row
Requires pip install litellm and provider credentials configured.
from typing import List
from eval_protocol.models import EvaluationRow, Message, EvaluateResult
from eval_protocol.pytest.evaluation_test import evaluation_test
from eval_protocol.pytest.default_single_turn_rollout_process import SingleTurnRolloutProcessor

@evaluation_test(
    input_messages=[[Message(role="user", content="What is 2 + 2?")]],
    completion_params=[{"model": "openai/gpt-4o-mini", "temperature": 0}],
    rollout_processor=SingleTurnRolloutProcessor(),
    passed_threshold=0.8,
    mode="pointwise"
)
def test_online_math(row: EvaluationRow) -> EvaluationRow:
    answer = (row.get_assistant_messages()[-1].content or "").strip()
    score = 1.0 if "4" in answer else 0.0
    row.evaluation_result = EvaluateResult(score=score, reason="contains 4")
    return row

Integration with pytest

The decorator automatically creates pytest-compatible test functions:
# Run all evaluation tests
pytest test_file.py

# Run specific test
pytest test_file.py::test_math_reasoning

# Run with specific parameters
pytest test_file.py::test_math_reasoning[dataset_path0-completion_params0]

Programmatic Usage

Decorated functions can be called directly in addition to running under pytest. See Direct invocation (dual-mode) for patterns by mode.

Best Practices

  1. Clear Documentation: Always include docstrings explaining what your evaluation measures
  2. Error Handling: Handle edge cases gracefully and provide meaningful scores for failed rollouts
  3. Metric Design: Design metrics that are objective and reproducible
  4. Reason: Include a reason field in the evaluation_result to explain the score
  5. Threshold Setting: Set realistic thresholds based on your use case
  6. Multiple Runs: Use num_runs > 1 for more reliable results when possible
  7. Resource Management: Consider max_concurrent_rollouts and max_concurrent_evaluations based on your system capabilities
  8. Mode Selection: Choose the appropriate mode for your evaluation needs:
    • Use “pointwise” for simple per-row evaluation
    • Use “groupwise” for comparing multiple models/parameters on the same inputs
    • Use “all” for batch processing with cross-row analysis

Troubleshooting

Common Issues

  • “No combinations of parameters found”: Ensure you provide both completion_params and either input_dataset, input_messages, or input_rows
  • “No model provided”: Check that your CompletionParams includes a model field
  • Signature validation errors: Ensure your function signature matches the mode requirements:
    • Pointwise mode: def func(row: EvaluationRow) -> EvaluationRow
    • Groupwise mode: def func(rows: List[EvaluationRow]) -> List[EvaluationRow]
    • All mode: def func(rows: List[EvaluationRow]) -> List[EvaluationRow]
  • Return type errors: Verify you’re returning the correct type based on your mode
  • “In groupwise mode, you must provide at least 2 completion parameters”: Groupwise mode requires multiple completion parameters to compare

Debug Tips

  • Set EP_PRINT_SUMMARY=1 to see evaluation results in console
  • Use EP_SUMMARY_JSON to save detailed results to a file
  • Check the generated pytest parameterization for complex setups
  • Use max_dataset_rows to limit dataset size during development
  • Monitor max_concurrent_rollouts and max_concurrent_evaluations for performance tuning