NsaiWork

NsaiWork

Unified job scheduler for the North-Shore-AI platform

Hex.pmDocumentation

NsaiWork provides a protocol-first, multi-tenant job scheduling system with priority queues, resource-aware scheduling, and pluggable backend execution.

Features

Installation

Add nsai_work to your list of dependencies in mix.exs:

def deps do
  [
    {:nsai_work, "~> 0.1.0"}
  ]
end

Quick Start

# Start the application
{:ok, _} = Application.ensure_all_started(:nsai_work)

# Create a job
job = NsaiWork.Job.new(
  kind: :tool_call,
  tenant_id: "acme",
  namespace: "default",
  priority: :interactive,
  payload: %{tool: "calculator", args: [2, 2]},
  resources: NsaiWork.Resources.new(cpu: 1.0, memory_mb: 512)
)

# Submit for execution
{:ok, submitted} = NsaiWork.submit(job)

# Check status
{:ok, job} = NsaiWork.get(submitted.id)
IO.inspect(job.status)  # => :running or :succeeded

# Get statistics
stats = NsaiWork.stats()
IO.inspect(stats)

Examples

Runnable examples are in the examples/ directory:

Run any example with:

mix run examples/basic_job.exs

Or run all examples:

./examples/run_all.sh

Architecture

NsaiWork consists of several cooperating components:

┌─────────────────────────────────────────────────┐
│               Application Layer                 │
│     (Crucible, Synapse, CNS, ALTAR, etc.)       │
└───────────────────────┬─────────────────────────┘
                        │ submit jobs
                        ▼
┌─────────────────────────────────────────────────┐
│               NsaiWork.Scheduler                │
│  - Admission control                            │
│  - Priority queue routing                       │
│  - Resource-aware dispatch                      │
└────────────────────┬────────────────────────────┘
                     │
        ┌────────────┼────────────┐
        ▼            ▼            ▼
   [realtime]  [interactive]  [batch]  [offline]
        │            │            │         │
        └────────────┴────────────┴─────────┘
                     │
                     ▼
┌─────────────────────────────────────────────────┐
│                NsaiWork.Executor                │
│  - Backend selection                            │
│  - Job execution                                │
│  - Retry handling                               │
└────────────────────┬────────────────────────────┘
                     │
        ┌────────────┼────────────┐
        ▼            ▼            ▼
    [Local]      [ALTAR]      [Modal]  [Custom]

Components

Job Lifecycle

Jobs move through the following states:

pending -> queued -> running -> {succeeded | failed | canceled | timeout}
                                       ↓
                                  (retry) -> queued
  1. pending: Job created, waiting for admission
  2. queued: Admitted and waiting in priority queue
  3. running: Executing on a backend
  4. succeeded: Completed successfully
  5. failed: Execution failed (may retry)
  6. canceled: Explicitly canceled
  7. timeout: Exceeded time limit

Priority Levels

Jobs are scheduled based on four priority levels:

Resource Requirements

Specify resource requirements for intelligent scheduling:

resources = NsaiWork.Resources.new(
  cpu: 4.0,                    # CPU cores
  memory_mb: 8192,             # Memory in MB
  gpu: "A100",                 # GPU type
  gpu_count: 1,                # Number of GPUs
  timeout_ms: 300_000,         # 5 minute timeout
  max_cost_usd: 1.0            # Cost cap
)

job = NsaiWork.Job.new(
  kind: :training_step,
  tenant_id: "acme",
  namespace: "ml",
  payload: %{model: "llama-3.1-8b"},
  resources: resources
)

Retry Policies

Configure retry behavior per job:

constraints = NsaiWork.Constraints.new(
  retry_policy: %{
    max_attempts: 5,
    backoff: :exponential,
    base_delay_ms: 1000,
    max_delay_ms: 60_000,
    jitter: true
  }
)

job = NsaiWork.Job.new(
  kind: :tool_call,
  tenant_id: "acme",
  namespace: "default",
  payload: %{},
  constraints: constraints
)

Backends

NsaiWork ships with three built-in backends:

Local Backend

Direct BEAM process execution (development, lightweight tasks):

# Automatically selected for simple jobs
job = NsaiWork.Job.new(
  kind: :tool_call,
  tenant_id: "test",
  namespace: "default",
  payload: %{tool: "echo"}
)

ALTAR Backend

Executes tool calls via ALTAR's LATER runtime with ADM function declarations:

# Enable ALTAR in config
config :nsai_work,
  enable_altar: true,
  altar_registry: NsaiWork.AltarRegistry

# Register a tool
NsaiWork.AltarTools.register(
  "proposer_extract",
  "Extract claims from document",
  %{
    type: :OBJECT,
    properties: %{
      "doc_id" => %{type: :STRING, description: "Document ID"},
      "max_claims" => %{type: :NUMBER, description: "Max claims"}
    },
    required: ["doc_id"]
  },
  &MyApp.CNS.Proposer.extract/1
)

# Submit a tool call job
job = NsaiWork.Job.new(
  kind: :tool_call,
  tenant_id: "acme",
  namespace: "cns",
  payload: %{
    tool_name: "proposer_extract",
    args: %{doc_id: "doc_123", max_claims: 10}
  }
)

{:ok, job} = NsaiWork.submit(job)

See the ALTAR Integration section for more details.

Mock Backend

Testing backend with configurable behavior:

# Configure mock to fail
NsaiWork.Backends.Mock.configure(behavior: {:fail, "Simulated failure"})

# Or delay execution
NsaiWork.Backends.Mock.configure(delay_ms: 500)

# Check execution history
history = NsaiWork.Backends.Mock.history()

Custom Backends

Implement the NsaiWork.Backend behaviour:

defmodule MyApp.Backend.Custom do
  @behaviour NsaiWork.Backend

  @impl true
  def execute(job) do
    # Your execution logic
    {:ok, result}
  end

  @impl true
  def supports?(job) do
    job.kind == :custom_work
  end
end

# Configure executor with custom backend
NsaiWork.Executor.start_link(
  backends: [
    local: NsaiWork.Backends.Local,
    custom: MyApp.Backend.Custom
  ]
)

Telemetry

NsaiWork emits telemetry events for observability:

# Attach a handler
:telemetry.attach(
  "nsai-work-metrics",
  [:nsai_work, :job, :completed],
  &MyApp.Metrics.handle_event/4,
  nil
)

# Or use the built-in console logger
NsaiWork.Telemetry.attach_console_logger()

Events

Multi-Tenancy

All jobs belong to a tenant and namespace:

job = NsaiWork.Job.new(
  tenant_id: "customer-123",
  namespace: "production",
  # ...
)

# List jobs for a tenant
jobs = NsaiWork.list("customer-123")

# Filter by namespace
jobs = NsaiWork.list("customer-123", namespace: "production")

# Filter by status
running = NsaiWork.list("customer-123", status: :running)

ALTAR Integration

ALTAR (Adaptive Language Tool and Action Runtime) provides a local execution runtime for tool calls with ADM (function declaration) support. The ALTAR backend enables NsaiWork to execute tool calls through ALTAR's LATER runtime.

Setup

  1. Add ALTAR to your dependencies (already included with NsaiWork):
{:altar, "~> 0.2.0"}
  1. Enable ALTAR in your application config:
# config/config.exs
config :nsai_work,
  enable_altar: true,
  altar_registry: NsaiWork.AltarRegistry

Registering Tools

Use NsaiWork.AltarTools to register tools with ALTAR:

# Simple tool
NsaiWork.AltarTools.register(
  "calculator_add",
  "Add two numbers",
  %{
    type: :OBJECT,
    properties: %{
      "a" => %{type: :NUMBER, description: "First number"},
      "b" => %{type: :NUMBER, description: "Second number"}
    },
    required: ["a", "b"]
  },
  fn %{"a" => a, "b" => b} -> {:ok, a + b} end
)

# Complex tool with nested objects
NsaiWork.AltarTools.register(
  "search_documents",
  "Search document corpus with filters",
  %{
    type: :OBJECT,
    properties: %{
      "query" => %{type: :STRING, description: "Search query"},
      "limit" => %{type: :NUMBER, description: "Max results"},
      "filters" => %{
        type: :OBJECT,
        properties: %{
          "category" => %{type: :STRING},
          "date_range" => %{
            type: :OBJECT,
            properties: %{
              "start" => %{type: :STRING},
              "end" => %{type: :STRING}
            }
          }
        }
      }
    },
    required: ["query"]
  },
  &MyApp.Search.search/1
)

Tool Function Requirements

Tool functions must:

Example:

defmodule MyApp.CNS.Proposer do
  def extract(%{"doc_id" => doc_id} = args) do
    max_claims = Map.get(args, "max_claims", 10)

    case fetch_document(doc_id) do
      {:ok, document} ->
        claims = extract_claims(document, max_claims)
        {:ok, %{claims: claims, doc_id: doc_id}}

      {:error, reason} ->
        {:error, "Failed to extract claims: #{reason}"}
    end
  end
end

Submitting Tool Call Jobs

Create and submit jobs with the :tool_call kind:

job = NsaiWork.Job.new(
  kind: :tool_call,
  tenant_id: "acme",
  namespace: "default",
  priority: :interactive,
  payload: %{
    tool_name: "proposer_extract",
    args: %{
      doc_id: "doc_123",
      max_claims: 5
    }
  }
)

{:ok, submitted_job} = NsaiWork.submit(job)

# Wait for completion
case NsaiWork.get(submitted_job.id) do
  {:ok, %{status: :succeeded, result: result}} ->
    IO.inspect(result)

  {:ok, %{status: :failed, error: error}} ->
    IO.puts("Job failed: #{error.message}")
end

Tool Management

# Check if tool exists
if NsaiWork.AltarTools.registered?("calculator_add") do
  IO.puts("Tool is available")
end

CNS Integration Example

For CNS (Critic-Network Synthesis) agents:

# Register Proposer agent tool
NsaiWork.AltarTools.register(
  "cns_proposer",
  "Extract structured claims from documents",
  %{
    type: :OBJECT,
    properties: %{
      "document" => %{type: :STRING, description: "Source document"},
      "schema" => %{type: :STRING, description: "Target schema"}
    },
    required: ["document"]
  },
  &CNS.Agents.Proposer.extract/1
)

# Register Antagonist agent tool
NsaiWork.AltarTools.register(
  "cns_antagonist",
  "Find contradictions in claim networks",
  %{
    type: :OBJECT,
    properties: %{
      "sno_graph" => %{type: :OBJECT, description: "SNO graph"},
      "beta1_threshold" => %{type: :NUMBER, description: "β₁ threshold"}
    },
    required: ["sno_graph"]
  },
  &CNS.Agents.Antagonist.critique/1
)

# Execute dialectical workflow
proposer_job = NsaiWork.Job.new(
  kind: :tool_call,
  tenant_id: "research",
  namespace: "cns",
  priority: :batch,
  payload: %{
    tool_name: "cns_proposer",
    args: %{document: document_text, schema: "scifact"}
  }
)

{:ok, proposer_result} = NsaiWork.submit(proposer_job)

Integration Examples

Crucible Experiments

defmodule Crucible.Pipeline.Runner do
  def run_stage(experiment, stage, context) do
    job = NsaiWork.Job.new(
      kind: :experiment_step,
      tenant_id: context[:tenant_id],
      namespace: experiment.id,
      priority: :batch,
      payload: %{
        experiment_id: experiment.id,
        stage_name: stage.name,
        input_context: context
      },
      constraints: NsaiWork.Constraints.new(
        concurrency_group: "experiment:#{experiment.id}"
      )
    )

    NsaiWork.submit(job)
  end
end

ALTAR Tool Calls

defmodule ALTAR.GRID do
  def dispatch(tool, input, opts \\ []) do
    job = NsaiWork.Job.new(
      kind: :tool_call,
      tenant_id: opts[:tenant_id] || "default",
      namespace: opts[:namespace] || "default",
      priority: opts[:priority] || :interactive,
      payload: %{
        tool_id: tool.id,
        function_name: tool.name,
        arguments: input
      },
      resources: translate_resources(tool.resources)
    )

    NsaiWork.submit(job)
  end
end

Roadmap

Contributing

This is part of the North-Shore-AI platform. Contributions welcome!

License

MIT License - see LICENSE file for details

Links