Slither

Bounded, observable BEAM<->Python pipelines with shared state on ETS

Hex.pmHexDocsLicense

Slither is a low-level concurrency substrate for BEAM + Python systems, built on top of SnakeBridge and Snakepit.

It gives you three primitives:

Why use Slither

Use Slither when you want Python to keep doing compute, while BEAM owns concurrency, state, and coordination.

Common wins:

Installation

defp deps do
  [
    {:slither, "~> 0.1.0"}
  ]
end

Then:

mix deps.get
mix compile

Quick Start

1) Configure a Python worker pool

# config/runtime.exs
import Config

SnakeBridge.ConfigHelper.configure_snakepit!(pool_size: 2)

2) Define a pipeline

defmodule MyApp.ScorePipe do
  use Slither.Pipe

  pipe :score do
    stage :prepare, :beam,
      handler: fn item, _ctx ->
        %{"text" => item.payload}
      end

    stage :predict, :python,
      executor: Slither.Dispatch.Executors.SnakeBridge,
      module: "my_model",
      function: "predict_batch",
      pool: :default,
      batch_size: 32,
      max_in_flight: 4

    stage :route, :router,
      routes: [
        {fn item -> item.payload["score"] >= 0.8 end, :accept},
        {fn item -> item.payload["score"] >= 0.4 end, :review}
      ]

    output :accept
    output :review
    output :default

    on_error :predict, :skip
    on_error :*, :halt
  end
end

3) Run it

{:ok, outputs} = Slither.run_pipe(MyApp.ScorePipe, ["first", "second", "third"])

accepted_payloads = Enum.map(outputs.accept, & &1.payload)
review_payloads = Enum.map(outputs.review, & &1.payload)

Run the built-in demos

mix slither.example
mix slither.example text_analysis
mix slither.example batch_stats
mix slither.example data_etl
mix slither.example ml_scoring
mix slither.example image_pipeline
mix slither.example --all
mix slither.example --no-baseline

ml_scoring and image_pipeline auto-install Python deps via Snakepit/uv.

Core APIs

Pipe (orchestration)

Dispatch (batched Python calls)

items = Slither.Item.wrap_many([1, 2, 3, 4])

{:ok, results} =
  Slither.dispatch(items,
    executor: Slither.Dispatch.Executors.SnakeBridge,
    module: "my_model",
    function: "predict_batch",
    pool: :default,
    batch_size: 64,
    max_in_flight: 8,
    ordering: :preserve,
    on_error: :halt
  )

For large workloads, use Slither.Dispatch.stream/2.

Store (shared state on ETS)

defmodule MyApp.FeatureStore do
  @behaviour Slither.Store

  @impl true
  def tables do
    [
      %{name: :features, type: :set, read_concurrency: true}
    ]
  end

  @impl true
  def views do
    [
      %{
        name: :lookup_feature,
        mode: :scalar,
        scope: :session,
        handler: fn %{"key" => key}, _ctx ->
          case Slither.Store.Server.get(__MODULE__, :features, key) do
            nil -> %{"error" => "not_found"}
            value -> %{"value" => value}
          end
        end,
        timeout_ms: 5_000
      }
    ]
  end

  @impl true
  def load(tables) do
    :ets.insert(tables[:features], {"user:42", %{tier: "gold"}})
    :ok
  end
end

Start store processes by listing modules in config:

config :slither,
  stores: [MyApp.FeatureStore]

Read/write API:

Slither.Store.Server.get(MyApp.FeatureStore, :features, "user:42")
Slither.Store.Server.put(MyApp.FeatureStore, :features, "user:99", %{tier: "silver"})

Configuration knobs

config :slither,
  stores: [],
  dispatch: [
    default_batch_size: 64,
    default_max_in_flight: 8,
    default_ordering: :preserve,
    default_on_error: :halt
  ],
  bridge: [
    default_scope: :session
  ]

Telemetry events

Slither emits under [:slither, ...].

Troubleshooting

Guides

License

MIT