Normandy

A powerful Elixir library for building AI agents with structured schemas, memory management, and tool calling capabilities.

Features

Installation

Add normandy to your list of dependencies in mix.exs:

def deps do
  [
    {:normandy, "~> 0.1.0"}
  ]
end

Quick Start

Defining Schemas

defmodule User do
  use Normandy.Schema

  schema do
    field(:name, :string, required: true)
    field(:age, :integer, default: 0)
    field(:email, :string, required: true)
  end
end

user = %User{name: "Alice", email: "alice@example.com"}

Creating an Agent with Claudio (Recommended)

Normandy includes a ready-to-use adapter for Claudio, the official Anthropic API client:

# Use the built-in Claudio adapter
client = %Normandy.LLM.ClaudioAdapter{
  api_key: System.get_env("ANTHROPIC_API_KEY"),
  options: %{
    timeout: 60_000,
    enable_caching: true  # 90% cost reduction on repeated prompts!
  }
}

# Initialize an agent
config = %{
  client: client,
  model: "claude-3-5-sonnet-20241022",
  temperature: 0.7
}

agent = Normandy.Agents.BaseAgent.init(config)

# Run a conversation
{updated_agent, response} = Normandy.Agents.BaseAgent.run(agent, %{chat_message: "Hello!"})

Creating a Custom Agent

You can also implement the Normandy.Agents.Model protocol for any LLM provider:

defmodule MyLLMClient do
  use Normandy.Schema

  schema do
    field(:api_key, :string)
  end

  defimpl Normandy.Agents.Model do
    def completitions(_client, _model, _temperature, _max_tokens, _messages, response_model) do
      # Legacy API - return response_model
      response_model
    end

    def converse(client, model, temperature, max_tokens, messages, response_schema, opts \\ []) do
      # Your LLM API call here
      # Return structured response matching response_schema
    end
  end
end

# Initialize an agent
config = %{
  client: %MyLLMClient{api_key: "..."},
  model: "your-model",
  temperature: 0.7
}

agent = Normandy.Agents.BaseAgent.init(config)

Using Tools

# Define a tool
defmodule CalculatorTool do
  defstruct [:operation, :a, :b]

  defimpl Normandy.Tools.BaseTool do
    def tool_name(_), do: "calculator"

    def tool_description(_) do
      "Performs basic arithmetic operations"
    end

    def input_schema(_) do
      %{
        type: "object",
        properties: %{
          operation: %{type: "string", enum: ["add", "subtract", "multiply", "divide"]},
          a: %{type: "number"},
          b: %{type: "number"}
        },
        required: ["operation", "a", "b"]
      }
    end

    def run(%{operation: "add", a: a, b: b}), do: {:ok, a + b}
    def run(%{operation: "subtract", a: a, b: b}), do: {:ok, a - b}
    def run(%{operation: "multiply", a: a, b: b}), do: {:ok, a * b}
    def run(%{operation: "divide", a: _a, b: 0}), do: {:error, "Cannot divide by zero"}
    def run(%{operation: "divide", a: a, b: b}), do: {:ok, a / b}
  end
end

# Register tools with agent
calc = %CalculatorTool{operation: "add", a: 0, b: 0}
agent = Normandy.Agents.BaseAgent.register_tool(agent, calc)

# Run with tool calling enabled
{updated_agent, response} = Normandy.Agents.BaseAgent.run_with_tools(
  agent,
  %{chat_message: "What is 15 + 27?"}
)

Memory Management

# Access conversation history
history = Normandy.Components.AgentMemory.history(agent.memory)

# Reset memory to initial state
agent = Normandy.Agents.BaseAgent.reset_memory(agent)

# Save and load memory
dump = Normandy.Components.AgentMemory.dump(agent.memory)
loaded_memory = Normandy.Components.AgentMemory.load(dump)

Streaming Responses

Normandy supports real-time streaming of LLM responses with callbacks for processing events as they arrive:

# Stream a response with real-time processing
callback = fn
  :text_delta, text ->
    IO.write(text)  # Display text as it arrives

  :tool_use_start, tool ->
    IO.puts("\nCalling tool: #{tool["name"]}")

  :message_start, message ->
    IO.puts("Starting response from #{message["model"]}")

  :message_stop, _ ->
    IO.puts("\nResponse complete")

  _, _ ->
    :ok  # Ignore other events
end

{updated_agent, response} = Normandy.Agents.BaseAgent.stream_response(
  agent,
  %{chat_message: "Hello!"},
  callback
)

# Stream with tool calling support
{updated_agent, response} = Normandy.Agents.BaseAgent.stream_with_tools(
  agent,
  %{chat_message: "What is 15 + 27?"},
  callback
)

Streaming Event Types

Implementing Streaming for Custom Clients

To add streaming support to a custom LLM client, implement the stream_converse/7 function in your Model protocol implementation:

defimpl Normandy.Agents.Model do
  def converse(client, model, temperature, max_tokens, messages, response_schema, opts \\ []) do
    # Non-streaming implementation
  end

  def stream_converse(client, model, temperature, max_tokens, messages, _response_model, opts \\ []) do
    callback = Keyword.get(opts, :callback)

    # Return a stream of Server-Sent Events
    {:ok, stream} = your_llm_client.stream(...)

    # Parse events and invoke callback for each
    event_stream = Stream.map(stream, fn event ->
      # Invoke callback based on event type
      case event do
        %{type: "content_block_delta", delta: %{"type" => "text_delta", "text" => text}} ->
          if callback, do: callback.(:text_delta, text)
        %{type: "message_start", message: message} ->
          if callback, do: callback.(:message_start, message)
        _ -> :ok
      end

      event
    end)

    {:ok, event_stream}
  end
end

Prompt Caching

Normandy supports Anthropic's prompt caching feature, which can reduce costs by up to 90% for repeated prompts. When caching is enabled, system prompts and tool definitions are automatically cached.

# Enable caching in the Claudio adapter
client = %Normandy.LLM.ClaudioAdapter{
  api_key: System.get_env("ANTHROPIC_API_KEY"),
  options: %{
    enable_caching: true  # Enable prompt caching
  }
}

agent = Normandy.Agents.BaseAgent.init(%{
  client: client,
  model: "claude-3-5-sonnet-20241022",
  temperature: 0.7
})

# System prompts and tools are now automatically cached
{updated_agent, response} = Normandy.Agents.BaseAgent.run(agent, %{chat_message: "Hello!"})

What Gets Cached:

Cache Benefits:

When to Use Caching:

Note: Caching requires minimum content length (1024 tokens for system prompts). Shorter content won't benefit from caching.

Resilience & Error Handling

Normandy includes built-in resilience patterns to handle transient failures and prevent cascading errors in production systems.

Retry Mechanism

Automatically retry failed LLM calls with exponential backoff and jitter:

# Use a preset retry configuration
agent = Normandy.Agents.BaseAgent.init(%{
  client: client,
  model: "claude-3-5-sonnet-20241022",
  temperature: 0.7,
  retry_options: Normandy.Resilience.Retry.preset(:standard)
})

# Custom retry configuration
agent = Normandy.Agents.BaseAgent.init(%{
  client: client,
  model: "claude-3-5-sonnet-20241022",
  temperature: 0.7,
  retry_options: [
    max_attempts: 5,
    base_delay: 1000,      # 1 second
    max_delay: 30_000,     # 30 seconds
    backoff_factor: 2.0,   # Exponential backoff
    jitter: true           # Add randomness to delays
  ]
})

# Now agent calls will automatically retry on transient failures
{updated_agent, response} = Normandy.Agents.BaseAgent.run(agent, %{chat_message: "Hello!"})

Available Retry Presets:

Retry Features:

Circuit Breaker

Prevent cascading failures by failing fast when a threshold is reached:

# Enable circuit breaker with defaults
agent = Normandy.Agents.BaseAgent.init(%{
  client: client,
  model: "claude-3-5-sonnet-20241022",
  temperature: 0.7,
  enable_circuit_breaker: true
})

# Custom circuit breaker configuration
agent = Normandy.Agents.BaseAgent.init(%{
  client: client,
  model: "claude-3-5-sonnet-20241022",
  temperature: 0.7,
  enable_circuit_breaker: true,
  circuit_breaker_options: [
    failure_threshold: 5,      # Open after 5 failures
    success_threshold: 2,      # Close after 2 successes in half-open
    timeout: 60_000,           # Try half-open after 60 seconds
    half_open_max_calls: 1     # Allow 1 test call in half-open state
  ]
})

# Circuit breaker will automatically manage state
{updated_agent, response} = Normandy.Agents.BaseAgent.run(agent, %{chat_message: "Hello!"})

Circuit Breaker States:

State Transitions:

Closed โ”€โ”€(failures > threshold)โ”€โ”€> Open
  โ†‘                                   โ”‚
  โ”‚                                   โ”‚
  โ””โ”€โ”€(success)โ”€โ”€ Half-Open โ†โ”€(timeout)โ”˜

Combining Retry and Circuit Breaker

Both patterns work together for comprehensive protection:

agent = Normandy.Agents.BaseAgent.init(%{
  client: client,
  model: "claude-3-5-sonnet-20241022",
  temperature: 0.7,
  # Retry handles transient errors
  retry_options: [
    max_attempts: 3,
    base_delay: 1000
  ],
  # Circuit breaker prevents cascading failures
  enable_circuit_breaker: true,
  circuit_breaker_options: [
    failure_threshold: 5,
    timeout: 60_000
  ]
})

How They Work Together:

  1. Retry wraps the LLM call and attempts to recover from transient failures
  2. Circuit Breaker wraps the retry logic and sees the aggregate result
  3. Transient errors are handled by retry without affecting circuit breaker
  4. Persistent failures eventually open the circuit to prevent cascading issues

Using Retry Directly

You can also use the retry mechanism for any function:

alias Normandy.Resilience.Retry

# Basic retry
{:ok, result} = Retry.with_retry(fn ->
  {:ok, perform_risky_operation()}
end)

# With custom configuration
{:ok, result} = Retry.with_retry(
  fn -> {:ok, api_call()} end,
  max_attempts: 5,
  base_delay: 500,
  retry_if: fn
    {:error, %{status: status}} when status >= 500 -> true
    {:error, :network_error} -> true
    _ -> false
  end
)

# Using presets
Retry.with_retry(fn -> {:ok, slow_operation()} end, Retry.preset(:patient))

Using Circuit Breaker Directly

You can also use circuit breaker as a standalone GenServer:

alias Normandy.Resilience.CircuitBreaker

# Start a circuit breaker
{:ok, cb} = CircuitBreaker.start_link(
  name: :api_breaker,
  failure_threshold: 5,
  timeout: 60_000
)

# Execute protected calls
case CircuitBreaker.call(cb, fn ->
  {:ok, MyAPI.risky_operation()}
end) do
  {:ok, result} -> handle_success(result)
  {:error, :open} -> handle_circuit_open()
  {:error, reason} -> handle_failure(reason)
end

# Check state
CircuitBreaker.state(cb)  #=> :closed | :open | :half_open

# Get metrics
CircuitBreaker.metrics(cb)
#=> %{
  state: :closed,
  failure_count: 2,
  success_count: 100,
  opened_at: nil
}

# Manual control
CircuitBreaker.reset(cb)  # Force close
CircuitBreaker.trip(cb)   # Force open

Batch Processing

Process multiple inputs concurrently with configurable concurrency, progress tracking, and error handling:

alias Normandy.Batch.Processor

# Simple batch processing
inputs = [
  %{chat_message: "Hello"},
  %{chat_message: "How are you?"},
  %{chat_message: "Goodbye"}
]

{:ok, results} = Processor.process_batch(agent, inputs)

# With concurrency control and progress tracking
{:ok, results} = Processor.process_batch(
  agent,
  inputs,
  max_concurrency: 5,
  on_progress: fn completed, total ->
    IO.puts("Progress: #{completed}/#{total}")
  end
)

# Get detailed statistics
{:ok, stats} = Processor.process_batch_with_stats(agent, inputs)
#=> %{
  success: [result1, result2],
  errors: [{error1, input1}],
  total: 3,
  success_count: 2,
  error_count: 1
}

# Process large batches in chunks
{:ok, results} = Processor.process_batch_chunked(
  agent,
  large_input_list,
  chunk_size: 50,
  chunk_delay: 1000,  # 1 second between chunks
  max_concurrency: 5
)

# Error handling with callbacks
{:ok, results} = Processor.process_batch(
  agent,
  inputs,
  on_error: fn input, error ->
    Logger.error("Failed to process #{inspect(input)}: #{inspect(error)}")
  end
)

Batch Processing Options:

Using Batch Processing with BaseAgent:

# Directly on agent
{:ok, results} = Normandy.Agents.BaseAgent.process_batch(agent, inputs)

# With statistics
{:ok, stats} = Normandy.Agents.BaseAgent.process_batch_with_stats(agent, inputs)

Context Window Management

Normandy provides utilities for managing context window limits with automatic truncation to prevent token limit errors.

alias Normandy.Context.WindowManager

# Create a window manager for a specific model
manager = WindowManager.for_model("claude-3-5-sonnet-20241022")

# Check if conversation is within limits
{:ok, within_limit?} = WindowManager.within_limit?(manager, agent)

# Automatically truncate if needed
{:ok, updated_agent} = WindowManager.ensure_within_limit(agent, manager)

# Estimate token usage
tokens = WindowManager.estimate_conversation_tokens(agent.memory)
IO.puts("Current conversation uses ~#{tokens} tokens")

Token Estimation:

# Estimate tokens for text
tokens = WindowManager.estimate_tokens("Hello, world!")
#=> ~4 tokens (rough estimate: 1 token โ‰ˆ 4 characters)

# Estimate tokens for entire conversation
total = WindowManager.estimate_conversation_tokens(agent.memory)

Truncation Strategies:

# Oldest-first strategy (default) - removes oldest messages
manager = WindowManager.new(
  max_tokens: 100_000,
  reserved_tokens: 4096,
  strategy: :oldest_first
)

# Sliding window - keeps most recent messages
manager = WindowManager.new(strategy: :sliding_window)

# Ensure conversation stays within limit
{:ok, updated_agent} = WindowManager.ensure_within_limit(agent, manager)

Token Counting API:

For accurate token counts, use the Anthropic API:

alias Normandy.Context.TokenCounter

# Count tokens for a message
{:ok, count} = TokenCounter.count_message(client, "Hello, world!")
#=> {:ok, %{"input_tokens" => 4}}

# Count tokens for entire conversation
{:ok, count} = TokenCounter.count_conversation(client, agent)
#=> {:ok, %{"input_tokens" => 1234}}

# Get detailed breakdown
{:ok, details} = TokenCounter.count_detailed(client, agent)
#=> {:ok, %{
  total_tokens: 1234,
  system_tokens: 100,
  message_tokens: 1134,
  messages: [...]
}}

Model Context Limits:

# Automatically configured for known models
manager = WindowManager.for_model("claude-3-5-sonnet-20241022")
#=> 200,000 token limit

# Supported models:
# - claude-3-5-sonnet-20241022: 200K tokens
# - claude-3-5-haiku-20241022: 200K tokens
# - claude-3-opus-20240229: 200K tokens
# - claude-3-sonnet-20240229: 200K tokens
# - claude-3-haiku-20240307: 200K tokens

Multi-Agent Coordination

Normandy provides powerful patterns for coordinating multiple agents concurrently with different execution strategies.

Reactive Patterns

Execute multiple agents with different completion strategies:

alias Normandy.Coordination.Reactive

# Race: Return first successful result (fastest response)
agents = [research_agent, cached_agent, search_agent]
{:ok, fastest_result} = Reactive.race(agents, "What is the capital of France?")

# All: Wait for all agents to complete (ensemble)
{:ok, all_results} = Reactive.all(agents, "Analyze this data")
#=> {:ok, %{
  "agent_0" => {:ok, result1},
  "agent_1" => {:ok, result2},
  "agent_2" => {:ok, result3}
}}

# Some: Wait for N successful results (quorum)
{:ok, quorum_results} = Reactive.some(agents, "Is this safe?", count: 2)
#=> {:ok, %{"agent_0" => result1, "agent_2" => result3}}

Reactive Pattern Options:

# Race with timeout and callbacks
Reactive.race(agents, input,
  timeout: 5000,
  on_complete: fn agent_id, result ->
    Logger.info("Agent #{agent_id} completed: #{inspect(result)}")
  end
)

# All with concurrency control and fail-fast
Reactive.all(agents, input,
  max_concurrency: 5,
  fail_fast: true,
  timeout: 10_000,
  on_complete: fn agent_id, result ->
    update_progress(agent_id, result)
  end
)

# Some with callbacks
Reactive.some(agents, input,
  count: 3,
  timeout: 15_000,
  on_complete: fn agent_id, result ->
    Logger.debug("Got result from #{agent_id}")
  end
)

Transform Agent Results:

# Map: Transform successful results
result = Reactive.map(agent, input, fn
  {:ok, %{confidence: c}} when c > 0.8 -> {:ok, :high_confidence}
  {:ok, %{confidence: c}} when c < 0.5 -> {:ok, :low_confidence}
  {:ok, _} -> {:ok, :medium_confidence}
  error -> error
end)

# when_result: Conditional execution based on results
Reactive.when_result(agent, input) do
  {:ok, %{needs_review: true}} ->
    AgentProcess.run(review_agent, "Please review")

  {:ok, %{confidence: c}} when c < 0.5 ->
    AgentProcess.run(fallback_agent, "Use fallback")

  {:ok, result} ->
    {:ok, result}

  error ->
    error
end

Agent Pooling

Efficiently manage pools of identical agents with automatic lifecycle management:

alias Normandy.Coordination.AgentPool

# Start a pool of agents
{:ok, pool} = AgentPool.start_link(
  name: :research_pool,
  agent_config: %{
    client: client,
    model: "claude-3-5-sonnet-20241022",
    temperature: 0.7
  },
  size: 10,              # Fixed pool size
  max_overflow: 5,       # Allow 5 overflow agents
  strategy: :fifo        # :fifo or :lifo checkout
)

# Use transaction for automatic checkout/checkin
{:ok, result} = AgentPool.transaction(pool, fn agent_pid ->
  AgentProcess.run(agent_pid, "Analyze this data")
end)

# Manual checkout/checkin for more control
{:ok, agent_pid} = AgentPool.checkout(pool)
result = AgentProcess.run(agent_pid, input)
:ok = AgentPool.checkin(pool, agent_pid)

# Get pool statistics
stats = AgentPool.stats(pool)
#=> %{
  size: 10,
  available: 7,
  in_use: 3,
  overflow: 0,
  max_overflow: 5,
  waiting: 0
}

Pool Configuration Options:

AgentPool.start_link(
  name: :my_pool,
  agent_config: agent_config,
  size: 10,              # Base pool size
  max_overflow: 5,       # Max overflow agents when pool exhausted
  strategy: :lifo        # :lifo (stack) or :fifo (queue)
)

# Non-blocking checkout
case AgentPool.checkout(pool, block: false) do
  {:ok, agent_pid} -> use_agent(agent_pid)
  {:error, :no_agents} -> handle_pool_exhausted()
end

# Checkout with timeout
{:ok, agent_pid} = AgentPool.checkout(pool, timeout: 10_000)

Pool Features:

Agent Processes

Long-running agent processes with GenServer lifecycle:

alias Normandy.Coordination.{AgentProcess, AgentSupervisor}

# Start a supervised agent process
{:ok, supervisor} = AgentSupervisor.start_link()
{:ok, agent_pid} = AgentSupervisor.start_agent(supervisor, agent: agent)

# Or start standalone
{:ok, agent_pid} = AgentProcess.start_link(agent: agent)

# Run agent
{:ok, response} = AgentProcess.run(agent_pid, "Hello!")

# Get agent state
agent = AgentProcess.get_agent(agent_pid)

# Update agent configuration
:ok = AgentProcess.update_agent(agent_pid, fn agent ->
  %{agent | temperature: 0.9}
end)

# Stop gracefully
:ok = AgentProcess.stop(agent_pid)

Use Cases:

Conversation Summarization

When conversations grow too long, Normandy can automatically summarize old messages using an LLM to preserve context while reducing token usage.

alias Normandy.Context.Summarizer

# Summarize specific messages
messages = AgentMemory.history(agent.memory)
{:ok, summary} = Summarizer.summarize_messages(client, agent, messages)
#=> "User discussed project requirements, assistant provided implementation suggestions"

# Compress conversation by replacing old messages with summary
{:ok, updated_agent} = Summarizer.compress_conversation(
  client,
  agent,
  keep_recent: 10  # Keep 10 most recent messages
)

# The updated agent now has:
# - A summary message replacing old messages
# - 10 most recent messages preserved
# - Significantly reduced token count

Summarization Options:

{:ok, updated_agent} = Summarizer.compress_conversation(
  client,
  agent,
  keep_recent: 10,              # Messages to keep (default: 10)
  summary_role: "system",       # Role for summary message (default: "system")
  max_tokens: 500,              # Max tokens for summary (default: 500)
  prompt: "Custom prompt..."    # Custom summarization prompt
)

Automatic Summarization with WindowManager:

The :summarize strategy automatically summarizes old messages when approaching token limits:

# Use summarization strategy
manager = WindowManager.new(
  max_tokens: 100_000,
  strategy: :summarize
)

# Automatically summarizes when needed
{:ok, updated_agent} = WindowManager.ensure_within_limit(agent, manager)

# The agent now has:
# - Old messages summarized into a single system message
# - Recent messages preserved for context
# - Token count reduced to stay within limits

Estimate Token Savings:

# Estimate how many tokens you&#39;ll save
messages = AgentMemory.history(agent.memory)
{:ok, savings} = Summarizer.estimate_savings(messages, summary_tokens: 200)
#=> %{
  original: 1500,
  summary: 200,
  savings: 1300,
  savings_percent: 86.7
}

IO.puts("Summarization will save ~#{savings.savings_percent}% tokens")

How It Works:

  1. Split Conversation - Splits history into old (to summarize) and recent (to keep)
  2. Generate Summary - Uses LLM to create concise summary of old messages
  3. Replace Messages - Replaces old messages with single summary message
  4. Preserve Context - Keeps recent messages intact for immediate context

Benefits:

Validation

defmodule UserInput do
  use Normandy.Schema

  schema do
    field(:name, :string)
    field(:age, :integer)
    field(:email, :string)
  end
end

# Validate input
changeset = Normandy.Validate.cast(%UserInput{}, params, [:name, :age, :email])
  |> Normandy.Validate.validate_required([:name, :email])
  |> Normandy.Validate.validate_format(:email, ~r/@/)
  |> Normandy.Validate.validate_number(:age, greater_than_or_equal_to: 0)

if changeset.valid? do
  user = Normandy.Validate.apply_changes(changeset)
else
  errors = changeset.errors
end

Architecture

Core Components

Agent System

Streaming System

Resilience System

Batch Processing System

Context Management System

Multi-Agent Coordination System

Tool System

Testing

Run the test suite:

mix test

Run with coverage:

mix test --cover

Run Dialyzer for type checking:

mix dialyzer

Example Tools

Normandy includes several example tools to demonstrate the tool system:

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

[Add your license here]

Documentation

Full documentation can be generated with:

mix docs

Then open doc/index.html in your browser.