Codat

Hex.pmDocsCICoverageLicense: MIT

Production-grade Elixir client for the Codat API — financial data connectivity for fintechs, lenders, and embedded finance platforms.

Codat normalises accounting, banking, and commerce data from 50+ integrations (QuickBooks, Xero, Sage, NetSuite, and more) into a single, consistent API.


Features


Installation

Add codat to your mix.exs dependencies:

def deps do
  [
    {:codat, "~> 1.0"}
  ]
end

Configuration

Global (recommended)

# config/runtime.exs
import Config

config :codat,
  api_key: System.fetch_env!("CODAT_API_KEY"),
  http_timeout: 30_000,
  max_retries: 3

Per-client (multi-tenant)

client = Codat.Client.new(api_key: tenant_api_key)
Codat.Platform.Companies.list(client)

Environment variable

export CODAT_API_KEY="your-api-key"

All config options:

Option Type Default Description
api_keystringnil Your Codat API key
base_urlstring"https://api.codat.io" API base URL
http_timeoutms30_000 Response timeout
http_connect_timeoutms5_000 TCP connect timeout
max_retriesinteger3 Max retry attempts for transient errors
retry_base_delayms500 Base delay for exponential backoff
retry_max_delayms10_000 Cap on backoff delay
rate_limit_retry_delayms60_000 Wait time on 429 before retrying
pool_sizeinteger10 Finch connection pool size
pool_countinteger1 Number of Finch connection pools

Quick Start

# Create a client (or use global config)
client = Codat.client()

# Create a company when a customer onboards
{:ok, company} = Codat.Platform.Companies.create(client, %{
  name: "Acme Corp",
  tags: %{"tenant_id" => "t_123", "region" => "us-east-1"}
})

# Redirect the user to authorize their accounting software
redirect_to(company["redirect"])

# Once linked, list their invoices
{:ok, page} = Codat.Accounting.Invoices.list(client, company["id"])
page.results    # => [%{"id" => "...", "status" => "Open", ...}, ...]
page.total      # => 342
page.has_more?  # => true

Platform API

Companies

# List with pagination
{:ok, page} = Codat.Platform.Companies.list(client, page: 1, page_size: 50)

# Filter by tag
{:ok, page} = Codat.Platform.Companies.list(client, tags: "region:us-east-1")

# Get a single company
{:ok, company} = Codat.Platform.Companies.get(client, "company-id")

# Create
{:ok, company} = Codat.Platform.Companies.create(client, %{
  name: "Acme Corp",
  tags: %{"tenant_id" => "t_123"}
})

# Update (always pass all tags you want to keep)
{:ok, company} = Codat.Platform.Companies.update(client, "company-id", %{
  name: "Acme Corp v2",
  tags: %{"tenant_id" => "t_123", "plan" => "enterprise"}
})

# Delete permanently
{:ok, nil} = Codat.Platform.Companies.delete(client, "company-id")

# Get access token for Link embed
{:ok, %{"accessToken" => token}} =
  Codat.Platform.Companies.get_access_token(client, "company-id")

Connections

# List connections for a company
{:ok, page} = Codat.Platform.Connections.list(client, "company-id")

# Filter by status
{:ok, page} = Codat.Platform.Connections.list(client, "company-id", query: "status=Linked")

# Create a new connection (QBO = "gbol")
{:ok, conn} = Codat.Platform.Connections.create(client, "company-id", "gbol")
redirect_to(conn["linkUrl"])

# Unlink (keep data, stop syncs)
{:ok, conn} = Codat.Platform.Connections.unlink(client, "company-id", "conn-id")

# Re-authorization URL for deauthorized connections
{:ok, %{"url" => url}} =
  Codat.Platform.Connections.get_authorization_url(client, "company-id", "conn-id")

Accounting API

Querying

Use Codat.QueryBuilder for typed, composable filter expressions:

import Codat.QueryBuilder

query =
  where("status", :eq, "Open")
  |> and_where("amountDue", :gt, 0)
  |> and_where("currency", :eq, "GBP")
  |> to_query_string()
# => "status=Open&&amountDue>0&&currency=GBP"

{:ok, page} = Codat.Accounting.Invoices.list(client, company_id,
  query: query,
  order_by: "-issueDate",
  page_size: 50
)

Reading data

# Single resource
{:ok, invoice} = Codat.Accounting.Invoices.get(client, company_id, invoice_id)

# All supported data types
Codat.Accounting.Bills.list(client, company_id)
Codat.Accounting.Customers.list(client, company_id)
Codat.Accounting.Suppliers.list(client, company_id)
Codat.Accounting.Accounts.list(client, company_id)
Codat.Accounting.Payments.list(client, company_id)
Codat.Accounting.BankAccounts.list(client, company_id)
Codat.Accounting.JournalEntries.list(client, company_id)
# ... and many more

Financial statements

# Balance sheet for the last 12 months (monthly periods)
{:ok, bs} = Codat.Accounting.BalanceSheet.get(client, company_id,
  period_length: 1,
  periods_to_compare: 12,
  start_month: "2024-01"
)

# Profit & loss
{:ok, pl} = Codat.Accounting.ProfitAndLoss.get(client, company_id,
  period_length: 3,
  periods_to_compare: 4
)

# Cash flow statement
{:ok, cf} = Codat.Accounting.CashFlowStatement.get(client, company_id)

Lazy streaming (memory-efficient)

# Stream all open invoices without loading everything into memory
Codat.Accounting.Invoices.stream(client, company_id,
  query: "status=Open",
  page_size: 100
)
|> Stream.filter(&(&1["amountDue"] > 1000))
|> Stream.map(&enrich_invoice/1)
|> Stream.each(&MyApp.Repo.insert!/1)
|> Stream.run()

Concurrent fetch all pages

# Fetch all pages concurrently and return a flat list
{:ok, all_invoices} = Codat.Accounting.Invoices.fetch_all(client, company_id,
  max_concurrency: 5,
  page_size: 100
)

Write operations (async)

All write operations are async and return a push operation:

# 1. Check the model to understand required fields
{:ok, model} = Codat.Accounting.Invoices.get_create_model(client, company_id,
  connection_id: conn_id
)

# 2. Create (returns immediately with Pending status)
{:ok, push_op} = Codat.Accounting.Invoices.create(client, company_id, conn_id, %{
  issueDate: "2024-01-15",
  dueDate: "2024-02-15",
  customerRef: %{id: "customer-id"},
  lineItems: [
    %{
      description: "Consulting",
      quantity: 1,
      unitAmount: 1000.00,
      accountRef: %{id: "account-id"}
    }
  ]
})
key = push_op["pushOperationKey"]

# 3a. Poll for completion
{:ok, done} = Codat.Platform.PushOperations.poll_until_done(client, company_id, key,
  poll_interval: 2_000,
  timeout: 60_000
)
IO.puts(done["status"])  # => "Success"

# 3b. Or subscribe to webhook (recommended for production)
# See Webhook section below

Webhooks

Receiving webhooks in Phoenix

Step 1: Add the body reader to your endpoint (preserves raw body for signature verification):

# lib/my_app_web/endpoint.ex
plug Plug.Parsers,
  parsers: [:urlencoded, :multipart, :json],
  pass: ["*/*"],
  body_reader: {Codat.Webhooks.BodyReader, :read_body, []},
  json_decoder: Jason

Step 2: Mount the webhook plug in your router:

# lib/my_app_web/router.ex
forward "/webhooks/codat", Codat.Webhooks.Plug,
  secret: System.get_env("CODAT_WEBHOOK_SECRET"),
  handler: MyApp.CodatWebhookHandler

Step 3: Implement your handler:

defmodule MyApp.CodatWebhookHandler do
  use Codat.Webhooks.Handler

  require Codat.Webhooks.EventTypes
  import Codat.Webhooks.EventTypes

  @impl true
  def handle_event(event_type, payload, _meta)
      when event_type == invoices_write_successful() do
    company_id  = payload["companyId"]
    push_op_key = payload["data"]["pushOperationKey"]
    MyApp.InvoiceSync.handle_success(company_id, push_op_key)
    :ok
  end

  @impl true
  def handle_event(event_type, payload, _meta)
      when event_type == invoices_write_unsuccessful() do
    company_id = payload["companyId"]
    error_msg  = payload["data"]["errorMessage"]
    MyApp.InvoiceSync.handle_failure(company_id, error_msg)
    :ok
  end

  @impl true
  def handle_event(_event_type, _payload, _meta), do: :ok
end

Dispatching to multiple handlers

defmodule MyApp.CodatWebhooks do
  use Codat.Webhooks.Dispatcher

  on ["invoices.write.successful", "invoices.write.unsuccessful"],
     to: MyApp.InvoiceSyncHandler

  on ~r/^bills\.write\./,
     to: MyApp.BillSyncHandler

  on ~r/^client\.rateLimit\./,
     to: MyApp.OpsAlertsHandler

  on :all, to: MyApp.AuditLogHandler
end

Standalone signature verification

case Codat.Webhooks.Verifier.verify(secret, raw_body, conn.req_headers) do
  :ok ->
    payload = Jason.decode!(raw_body)
    process_event(payload)

  {:error, :invalid_signature} ->
    conn |> send_resp(401, "Invalid signature")

  {:error, :expired} ->
    conn |> send_resp(401, "Webhook expired")
end

Bank Feeds

# Create source accounts
{:ok, account} = Codat.BankFeeds.SourceAccounts.create(
  client, company_id, conn_id,
  %{
    accountName: "Business Checking",
    accountNumber: "12345678",
    accountType: "Checking",
    currency: "USD",
    balance: 10_000.00
  }
)

# Get account mapping options (available GL accounts)
{:ok, options} = Codat.BankFeeds.AccountMapping.get_options(
  client, company_id, conn_id, account["id"]
)

# Map the source account to a GL account
{:ok, _} = Codat.BankFeeds.AccountMapping.set(
  client, company_id, conn_id, account["id"],
  %{targetAccountId: "gl-account-id", feedStartDate: "2024-01-01"}
)

# Push transactions
{:ok, push_op} = Codat.BankFeeds.Transactions.create(
  client, company_id, conn_id,
  %{
    accountId: account["id"],
    transactions: [
      %{
        id: "txn-001",
        date: "2024-01-15",
        description: "Stripe payout",
        amount: 5_000.00,
        balance: 15_000.00
      }
    ]
  }
)

Lending

# Enhanced accounts receivable
{:ok, invoices} = Codat.Lending.AccountsReceivable.invoices(client, company_id)
{:ok, customers} = Codat.Lending.AccountsReceivable.customers(client, company_id)

# Enhanced financial statements
{:ok, bs} = Codat.Lending.FinancialStatements.balance_sheet(client, company_id,
  period_length: 1,
  periods_to_compare: 12
)

# Data integrity cross-referencing
{:ok, status} = Codat.Lending.DataIntegrity.status(client, company_id)
{:ok, details} = Codat.Lending.DataIntegrity.details(client, company_id, "invoices",
  query: "status=Unmatched"
)

# Generate and download Excel audit report
{:ok, _} = Codat.Lending.ExcelReports.generate(client, company_id, "audit")
{:ok, status} = Codat.Lending.ExcelReports.status(client, company_id, "audit")
{:ok, bytes} = Codat.Lending.ExcelReports.download(client, company_id, "audit")
File.write!("audit-report.xlsx", bytes)

Expenses

# 1. Initialize a sync session
{:ok, sync} = Codat.Expenses.Sync.initialize(client, company_id, %{dataType: "expense"})
sync_id = sync["id"]

# 2. Push expense transactions
{:ok, _} = Codat.Expenses.Transactions.create(client, company_id, sync_id, [
  %{
    id: "txn-001",
    type: "payment",
    issueDate: "2024-01-15T00:00:00Z",
    currency: "USD",
    currencyRate: 1.0,
    contactRef: %{id: "supplier-id", type: "Supplier"},
    lineItems: [
      %{
        accountRef: %{id: "account-id"},
        description: "Office supplies",
        netAmount: 99.99,
        taxAmount: 8.00,
        taxRateRef: %{id: "tax-rate-id"}
      }
    ]
  }
])

# 3. Complete the sync (writes to accounting platform)
{:ok, _} = Codat.Expenses.Sync.complete(client, company_id, sync_id)
# Subscribe to expenses.sync.successful / expenses.sync.failed webhooks

Error Handling

case Codat.Accounting.Invoices.list(client, company_id) do
  {:ok, page} ->
    page.results

  {:error, %Codat.Error{type: :unauthorized}} ->
    # Check API key configuration
    {:error, :invalid_api_key}

  {:error, %Codat.Error{type: :not_found}} ->
    {:error, :company_not_found}

  {:error, %Codat.Error{type: :rate_limited, retry_after: ms}} ->
    Process.sleep(ms)
    retry()

  {:error, %Codat.Error{type: :server_error, correlation_id: id}} ->
    Logger.error("Codat error, report this correlation_id to support: #{id}")
    {:error, :codat_unavailable}

  {:error, %Codat.Error{type: :payment_required}} ->
    # Free tier limit — upgrade plan or reduce company count
    {:error, :plan_limit_exceeded}
end

Telemetry

# Attach the built-in logger (optional)
Codat.Telemetry.attach_default_logger(:debug)

# Or attach your own handler
:telemetry.attach(
  "my-codat-handler",
  [:codat, :request, :stop],
  fn _event, %{duration: duration, status_code: code}, %{operation: op}, _config ->
    ms = System.convert_time_unit(duration, :native, :millisecond)
    MyApp.Metrics.histogram("codat.request.duration_ms", ms, tags: [operation: op])
  end,
  nil
)

# With telemetry_metrics
[
  Telemetry.Metrics.counter("codat.request.stop",
    tags: [:operation, :result]
  ),
  Telemetry.Metrics.summary("codat.request.stop.duration",
    unit: {:native, :millisecond},
    tags: [:api_module, :operation]
  ),
  Telemetry.Metrics.counter("codat.rate_limit.hit",
    tags: []
  )
]

Testing

The package includes full test support:

# Run tests
mix test

# With coverage
mix test.coverage

# Linting
mix check

In your own application tests, use Bypass to mock the Codat API:

setup do
  bypass = Bypass.open()
  client = Codat.Client.new(
    api_key: "test-key",
    base_url: "http://localhost:#{bypass.port}"
  )
  %{bypass: bypass, client: client}
end

test "processes new invoices", %{bypass: bypass, client: client} do
  Bypass.expect_once(bypass, "GET", "/companies/co-1/data/invoices", fn conn ->
    conn
    |> Plug.Conn.put_resp_content_type("application/json")
    |> Plug.Conn.send_resp(200, Jason.encode!(%{
      "results" => [%{"id" => "inv-1", "status" => "Open"}],
      "pageNumber" => 1,
      "pageSize" => 100,
      "totalResults" => 1
    }))
  end)

  assert {:ok, page} = Codat.Accounting.Invoices.list(client, "co-1")
  assert page.total == 1
end

License

MIT — see LICENSE for details.

Links