Telemetry

CrucibleTelemetry

ElixirOTPHex.pmDocumentationLicense

Research-grade instrumentation and metrics collection for AI/ML experiments in Elixir.

Overview

TelemetryResearch provides specialized observability for rigorous scientific experimentation, going beyond standard production telemetry with features designed for AI/ML research:

Why TelemetryResearch?

Standard production telemetry libraries focus on monitoring and alerting, but research experiments have different requirements:

Production TelemetryResearch Telemetry (this library)
Real-time dashboardsStatistical analysis and exports
Sampling for efficiencyComplete capture for reproducibility
Fixed metricsRich, experiment-specific metadata
Single workload trackingMultiple concurrent experiments
JSON/logs outputCSV, JSON Lines, Parquet for analysis tools

Installation

Add telemetry_research to your list of dependencies in mix.exs:

def deps do
[
{:crucible_telemetry, "~> 0.2.0"}
]
end

Or install from GitHub:

def deps do
[
{:crucible_telemetry, github: "nshkrdotcom/elixir_ai_research", sparse: "apps/telemetry_research"}
]
end

Quick Start

# 1. Start an experiment
{:ok, experiment} = CrucibleTelemetry.start_experiment(
name: "ensemble_vs_single",
hypothesis: "5-model ensemble achieves >99% reliability",
condition: "treatment",
tags: ["accuracy", "reliability"]
)
# 2. Run your AI workload - events are automatically collected
# Your existing code with :telemetry.execute() calls works unchanged
# 3. Stop and analyze
{:ok, experiment} = CrucibleTelemetry.stop_experiment(experiment.id)
metrics = CrucibleTelemetry.calculate_metrics(experiment.id)
# => %{
# latency: %{mean: 150.5, p95: 250.0, ...},
# cost: %{total: 0.025, mean_per_request: 0.0025, ...},
# reliability: %{success_rate: 0.99, ...}
# }
# 4. Export for analysis
{:ok, path} = CrucibleTelemetry.export(experiment.id, :csv)
# Now analyze in Python: pd.read_csv(path)

Core Concepts

Experiments

An experiment is an isolated collection session with its own:

{:ok, experiment} = CrucibleTelemetry.start_experiment(
name: "gpt4_baseline",
hypothesis: "Single GPT-4 achieves 90% accuracy on benchmark",
condition: "control",
tags: ["h1", "baseline", "gpt4"],
sample_size: 1000,
metadata: %{
researcher: "alice",
benchmark: "mmlu",
version: "v1"
}
)

Event Collection

TelemetryResearch automatically attaches to standard telemetry events:

Events are enriched with:

Storage

Events are stored in ETS tables for fast in-memory access:

# Query events by filters
events = CrucibleTelemetry.Store.query(experiment.id, %{
event_name: [:req_llm, :request, :stop],
success: true,
time_range: {start_time, end_time}
})

ETS storage is ideal for experiments with <1M events. For longer experiments or persistent storage, PostgreSQL backend support is planned.

Metrics & Analysis

Calculate comprehensive metrics automatically:

metrics = CrucibleTelemetry.calculate_metrics(experiment.id)
# Latency metrics
metrics.latency.mean # Average latency
metrics.latency.median # Median latency
metrics.latency.p50 # 50th percentile
metrics.latency.p95 # 95th percentile
metrics.latency.p99 # 99th percentile
metrics.latency.std_dev # Standard deviation
# Cost metrics
metrics.cost.total # Total cost in USD
metrics.cost.mean_per_request # Average cost per request
metrics.cost.cost_per_1k_requests # Projected cost for 1K requests
metrics.cost.cost_per_1m_requests # Projected cost for 1M requests
# Reliability metrics
metrics.reliability.success_rate # Success rate (0.0-1.0)
metrics.reliability.successful # Count of successful requests
metrics.reliability.failed # Count of failed requests
metrics.reliability.sla_99 # Meets 99% SLA?
metrics.reliability.sla_999 # Meets 99.9% SLA?
# Token metrics (if available)
metrics.tokens.total_prompt # Total prompt tokens
metrics.tokens.total_completion # Total completion tokens
metrics.tokens.mean_total # Average tokens per request

Export Formats

Export data for analysis in your preferred tool:

CSV (Excel, pandas, R)

{:ok, path} = CrucibleTelemetry.export(experiment.id, :csv,
path: "results/experiment.csv"
)
# Then in Python:
# import pandas as pd
# df = pd.read_csv("results/experiment.csv")
# df.groupby('condition')['latency_ms'].describe()

JSON Lines (streaming, jq)

{:ok, path} = CrucibleTelemetry.export(experiment.id, :jsonl,
path: "results/experiment.jsonl"
)
# Then with jq:
# cat results/experiment.jsonl | jq '.latency_ms' | jq -s 'add/length'

Real-Time Streaming Metrics

Streaming metrics are auto-started when an experiment begins and update on every collected event. You can query or reset them at any time without waiting for the experiment to stop:

# Grab live metrics (mean/min/max/std for latency & cost, success rate, event counts)
metrics = CrucibleTelemetry.StreamingMetrics.get_metrics(experiment.id)
# Reset the streaming accumulators
:ok = CrucibleTelemetry.StreamingMetrics.reset(experiment.id)
# Stop streaming metrics explicitly (cleanup/stop also stops it)
:ok = CrucibleTelemetry.StreamingMetrics.stop(experiment.id)

Streaming metrics use Welford’s online algorithm for exact mean/variance with constant memory. If the server is not running, get_metrics/1 will start it automatically.

Time-Window Queries & Windowed Metrics

Use CrucibleTelemetry.Store.query_window/3 to pull only the data you need:

# Last 5 minutes
recent = Store.query_window(experiment.id, {:last, 5, :minutes})
# Last 200 events
tail = Store.query_window(experiment.id, {:last_n, 200})
# Specific time range with an additional filter
events = Store.query_window(experiment.id, {:range, t_start, t_end}, fn e -> e.success end)

Compute sliding window rollups with windowed_metrics/3 (window and step in microseconds):

# 5-minute windows stepping every 1 minute
windows = Store.windowed_metrics(experiment.id, 5 * 60_000_000, 60_000_000)
# => [%{window_start: ..., window_end: ..., event_count: 42, mean_latency: 123.4, total_cost: 0.12, ...}, ...]

Pause and Resume Experiments

You can temporarily pause data collection without tearing down storage:

{:ok, paused} = CrucibleTelemetry.pause_experiment(experiment.id)
# ... perform maintenance or hold traffic ...
{:ok, resumed} = CrucibleTelemetry.resume_experiment(experiment.id)
CrucibleTelemetry.is_paused?(experiment.id) # => true/false

Pausing detaches telemetry handlers; resuming reattaches them and keeps your experiment state and data intact.

Use Cases

1. A/B Testing

Compare two approaches side-by-side:

# Control: Single model
{:ok, control} = CrucibleTelemetry.start_experiment(
name: "control_single_model",
condition: "control",
tags: ["ab_test"]
)
# Treatment: Ensemble
{:ok, treatment} = CrucibleTelemetry.start_experiment(
name: "treatment_ensemble",
condition: "treatment",
tags: ["ab_test"]
)
# ... run workloads ...
# Compare results
comparison = CrucibleTelemetry.Analysis.compare_experiments([
control.id,
treatment.id
])

2. Performance Benchmarking

Track performance over time:

{:ok, exp} = CrucibleTelemetry.start_experiment(
name: "gemini_2_flash_benchmark",
tags: ["benchmark", "latency", "2024-12"]
)
# Run benchmark suite
Enum.each(benchmark_queries, fn query ->
# Make LLM calls - automatically tracked
end)
{:ok, _} = CrucibleTelemetry.stop_experiment(exp.id)
# Export for historical tracking
CrucibleTelemetry.export(exp.id, :csv,
path: "benchmarks/gemini_2_flash_#{Date.utc_today()}.csv"
)

3. Hypothesis Testing

Test specific hypotheses about your system:

{:ok, exp} = CrucibleTelemetry.start_experiment(
name: "ensemble_reliability",
hypothesis: "5-model ensemble achieves >99% reliability",
condition: "ensemble_5x",
tags: ["h1", "reliability"],
sample_size: 1000
)
# ... collect 1000 samples ...
metrics = CrucibleTelemetry.calculate_metrics(exp.id)
# Test hypothesis
hypothesis_confirmed = metrics.reliability.success_rate > 0.99
IO.puts("Hypothesis #{if hypothesis_confirmed, do: "CONFIRMED", else: "REJECTED"}")
IO.puts("Success rate: #{metrics.reliability.success_rate * 100}%")

4. Cost Analysis

Track and optimize costs:

{:ok, exp} = CrucibleTelemetry.start_experiment(
name: "cost_optimization",
tags: ["cost", "optimization"]
)
# ... run workload ...
metrics = CrucibleTelemetry.calculate_metrics(exp.id)
IO.puts("Total cost: $#{metrics.cost.total}")
IO.puts("Cost per 1M requests: $#{metrics.cost.cost_per_1m_requests}")
# Identify expensive requests
expensive_events = CrucibleTelemetry.Store.query(exp.id, %{})
|> Enum.filter(&(&1.cost_usd > 0.01))
|> Enum.sort_by(&(&1.cost_usd), :desc)

API Reference

TelemetryResearch

Main module with convenience functions.

CrucibleTelemetry.Experiment

Experiment lifecycle management.

CrucibleTelemetry.Store

Data storage and querying.

CrucibleTelemetry.Export

Export to various formats.

CrucibleTelemetry.Analysis

Statistical analysis and metrics.

Examples

See the examples/ directory for complete examples:

Run examples with:

cd apps/telemetry_research
mix run examples/basic_usage.exs

Testing

Run the test suite:

cd apps/telemetry_research
mix test

Run with coverage:

mix test --cover

Architecture

┌─────────────────────────────────────────────────┐
TelemetryResearch
┌────────────┐ ┌─────────┐ ┌──────────────┐
Experiment Handler Store
Manager Pipeline│ ETS
└────────────┘ └─────────┘ └──────────────┘
└─────────────┴──────────────┘
┌─────────────▼─────────────┐
Export Analysis
CSV/JSON Metrics
└──────────────┴────────────┘
└─────────────────────────────────────────────────┘

Telemetry Events

TelemetryResearch listens for these standard events:

req_llm Events

Ensemble Events

Hedging Events

Causal Trace Events

Altar Tool Events

Performance

TelemetryResearch is designed for minimal overhead:

Roadmap

License

MIT License - see LICENSE file for details