Fivetrex

Hex.pmDocumentation

Elixir client library for the Fivetran REST API.

Fivetrex provides a powerful, idiomatic Elixir interface for managing Fivetran resources including Groups, Connectors, and Destinations. Built on Req, it offers streaming pagination, structured error handling, and a clean functional API.

Features

API Coverage

Fivetrex covers the core Fivetran API resources needed for managing data pipelines:

Fivetran API ResourceStatusFunctions
Groups✅ Fulllist, stream, get, create, update, delete
Connectors✅ Fulllist, stream, get, create, update, delete, sync, resync!, pause, resume, get_state, get_sync_status, get_schema_config, update_schema_config, reload_schema_config, get_table_columns, set_sync_frequency
Destinations✅ Fullget, create, update, delete, test
Webhooks✅ Fulllist, stream, get, create_account, create_group, update, delete, test
UsersNot implemented
TeamsNot implemented
RolesNot implemented
TransformationsNot implemented
CertificatesNot implemented
Log ServicesNot implemented

Note: The implemented resources (Groups, Connectors, Destinations, Webhooks) cover the most commonly used Fivetran functionality for managing data pipelines programmatically.

Installation

Add fivetrex to your list of dependencies in mix.exs:

def deps do
[
{:fivetrex, "~> 0.2.0"}
]
end

Quick Start

Creating a Client

All API operations require a client configured with your Fivetran API credentials:

# Create a client with explicit credentials
client = Fivetrex.client(
api_key: "your_api_key",
api_secret: "your_api_secret"
)
# Or use environment variables
client = Fivetrex.client(
api_key: System.get_env("FIVETRAN_API_KEY"),
api_secret: System.get_env("FIVETRAN_API_SECRET")
)

Basic Operations

# List all groups
{:ok, %{items: groups, next_cursor: _}} = Fivetrex.Groups.list(client)
# Get a specific group
{:ok, group} = Fivetrex.Groups.get(client, "group_id")
# Create a new group
{:ok, group} = Fivetrex.Groups.create(client, %{name: "My Data Warehouse"})
# List connectors in a group
{:ok, %{items: connectors}} = Fivetrex.Connectors.list(client, group.id)
# Trigger a sync
{:ok, _} = Fivetrex.Connectors.sync(client, "connector_id")
# Pause and resume connectors
{:ok, _} = Fivetrex.Connectors.pause(client, "connector_id")
{:ok, _} = Fivetrex.Connectors.resume(client, "connector_id")

Streaming

Fivetrex uses Elixir Streams to handle Fivetran's cursor-based pagination transparently. This allows you to iterate over thousands of resources without loading them all into memory:

# Stream all groups
client
|> Fivetrex.Groups.stream()
|> Enum.each(fn group ->
IO.puts("Group: #{group.name}")
end)
# Find all syncing connectors across all groups
syncing_connectors =
client
|> Fivetrex.Groups.stream()
|> Stream.flat_map(fn group ->
Fivetrex.Connectors.stream(client, group.id)
end)
|> Stream.filter(&Fivetrex.Models.Connector.syncing?/1)
|> Enum.to_list()
# Take only the first 10 broken connectors
broken =
Fivetrex.Connectors.stream(client, "group_id")
|> Stream.filter(fn c -> c.status["sync_state"] == "broken" end)
|> Enum.take(10)

Working with Connectors

Creating a Connector

{:ok, connector} = Fivetrex.Connectors.create(client, %{
group_id: "group_id",
service: "postgres",
config: %{
host: "db.example.com",
port: 5432,
database: "production",
user: "fivetran_user",
password: "secret"
}
})

Sync Operations

# Trigger an incremental sync
{:ok, _} = Fivetrex.Connectors.sync(client, connector.id)
# Get current sync state
{:ok, state} = Fivetrex.Connectors.get_state(client, connector.id)
# Historical resync (DANGEROUS - requires confirmation)
# This wipes all data and re-imports from scratch
{:ok, _} = Fivetrex.Connectors.resync!(client, connector.id, confirm: true)

Connector Helper Functions

alias Fivetrex.Models.Connector
# Check connector status
Connector.syncing?(connector) # => true/false
Connector.paused?(connector) # => true/false
Connector.sync_state(connector) # => "scheduled" | "syncing" | "paused" | nil

Working with Destinations

# Get a destination
{:ok, destination} = Fivetrex.Destinations.get(client, "destination_id")
# Create a Snowflake destination
{:ok, destination} = Fivetrex.Destinations.create(client, %{
group_id: "group_id",
service: "snowflake",
region: "US",
time_zone_offset: "-5",
config: %{
host: "account.snowflakecomputing.com",
port: 443,
database: "ANALYTICS",
auth: "PASSWORD",
user: "FIVETRAN_USER",
password: "secret"
}
})
# Test destination connectivity
{:ok, result} = Fivetrex.Destinations.test(client, destination.id)

Working with Webhooks

Webhooks provide real-time notifications about Fivetran events like sync starts and completions.

Creating Webhooks

# Create an account-level webhook (receives events for all connectors)
{:ok, webhook} = Fivetrex.Webhooks.create_account(client, %{
url: "https://example.com/fivetran/webhook",
events: ["sync_start", "sync_end"],
active: true,
secret: "my_webhook_secret"
})
# Create a group-level webhook (receives events for connectors in that group)
{:ok, webhook} = Fivetrex.Webhooks.create_group(client, "group_id", %{
url: "https://example.com/fivetran/webhook",
events: ["sync_end"],
active: true
})
# List all webhooks
{:ok, %{items: webhooks}} = Fivetrex.Webhooks.list(client)
# Test a webhook
{:ok, _} = Fivetrex.Webhooks.test(client, webhook.id)

Handling Incoming Webhooks

Fivetrex includes a Plug for Phoenix/Bandit applications that handles signature verification automatically:

# In your Phoenix controller
defmodule MyAppWeb.FivetranWebhookController do
use MyAppWeb, :controller
plug Fivetrex.WebhookPlug,
secret: {MyApp.Config, :fivetran_webhook_secret, []}
def receive(conn, _params) do
event = conn.assigns.fivetran_event
case event.event do
"sync_end" -> handle_sync_completion(event)
"sync_start" -> handle_sync_start(event)
_ -> :ok
end
json(conn, %{status: "ok"})
end
end

For manual signature verification:

# Verify webhook signature
case Fivetrex.WebhookSignature.verify(raw_body, signature, secret) do
:ok -> process_webhook(payload)
{:error, :invalid_signature} -> reject_request()
{:error, :missing_signature} -> reject_request()
end

Schema Metadata

Query and configure which schemas, tables, and columns are synced.

# Get schema configuration for a connector
{:ok, config} = Fivetrex.Connectors.get_schema_config(client, "connector_id")
# Iterate through schemas and tables
for {schema_name, schema} <- config.schemas, schema.enabled do
IO.puts("Schema: #{schema_name}")
for {table_name, table} <- schema.tables, table.enabled do
IO.puts(" Table: #{table_name} (#{table.sync_mode})")
end
end
# Get columns for a specific table
{:ok, columns} = Fivetrex.Connectors.get_table_columns(
client,
"connector_id",
"schema_name",
"table_name"
)
# Update schema configuration
{:ok, updated} = Fivetrex.Connectors.update_schema_config(client, "connector_id", %{
schemas: %{
"public" => %{
enabled: true,
tables: %{
"users" => %{enabled: true},
"logs" => %{enabled: false}
}
}
}
})
# Reload schema (detect new tables/columns from source)
{:ok, config} = Fivetrex.Connectors.reload_schema_config(client, "connector_id")

Sync Status and Frequency

# Get current sync status
{:ok, status} = Fivetrex.Connectors.get_sync_status(client, "connector_id")
if Fivetrex.Models.SyncStatus.syncing?(status) do
IO.puts("Sync in progress...")
end
IO.puts("Last successful sync: #{status.succeeded_at}")
# Set sync frequency (in minutes)
{:ok, connector} = Fivetrex.Connectors.set_sync_frequency(client, "connector_id", 60)

Error Handling

All API functions return {:ok, result} on success or {:error, %Fivetrex.Error{}} on failure. Errors are structured for easy pattern matching:

case Fivetrex.Connectors.get(client, "connector_id") do
{:ok, connector} ->
# Success - connector is a %Fivetrex.Models.Connector{}
IO.puts("Found connector: #{connector.id}")
{:error, %Fivetrex.Error{type: :not_found}} ->
# 404 - Resource doesn't exist
IO.puts("Connector not found")
{:error, %Fivetrex.Error{type: :unauthorized}} ->
# 401 - Invalid API credentials
IO.puts("Check your API key and secret")
{:error, %Fivetrex.Error{type: :rate_limited, retry_after: seconds}} ->
# 429 - Too many requests
IO.puts("Rate limited, retry after #{seconds} seconds")
Process.sleep(seconds * 1000)
# Retry...
{:error, %Fivetrex.Error{type: :server_error, status: status}} ->
# 5xx - Fivetran server error
IO.puts("Server error: #{status}")
{:error, %Fivetrex.Error{message: message}} ->
# Catch-all for other errors
IO.puts("Error: #{message}")
end

Error Types

TypeHTTP StatusDescription
:unauthorized401Invalid or missing API credentials
:not_found404Resource does not exist
:rate_limited429Too many requests (check retry_after)
:server_error5xxFivetran server error
:unknownOtherUnexpected error

API Reference

Groups

FunctionDescription
Fivetrex.Groups.list/2List all groups with pagination
Fivetrex.Groups.stream/2Stream all groups (handles pagination)
Fivetrex.Groups.get/2Get a group by ID
Fivetrex.Groups.create/2Create a new group
Fivetrex.Groups.update/3Update a group
Fivetrex.Groups.delete/2Delete a group

Connectors

FunctionDescription
Fivetrex.Connectors.list/3List connectors in a group
Fivetrex.Connectors.stream/3Stream all connectors in a group
Fivetrex.Connectors.get/2Get a connector by ID
Fivetrex.Connectors.create/2Create a new connector
Fivetrex.Connectors.update/3Update a connector
Fivetrex.Connectors.delete/2Delete a connector
Fivetrex.Connectors.sync/2Trigger an incremental sync
Fivetrex.Connectors.resync!/3Trigger a historical resync (destructive!)
Fivetrex.Connectors.get_state/2Get connector sync state
Fivetrex.Connectors.pause/2Pause a connector
Fivetrex.Connectors.resume/2Resume a paused connector
Fivetrex.Connectors.get_sync_status/2Get sync status summary
Fivetrex.Connectors.set_sync_frequency/3Set sync frequency in minutes
Fivetrex.Connectors.get_schema_config/2Get schema/table/column configuration
Fivetrex.Connectors.update_schema_config/3Update schema configuration
Fivetrex.Connectors.reload_schema_config/2Reload schema from source
Fivetrex.Connectors.get_table_columns/4Get columns for a specific table

Destinations

FunctionDescription
Fivetrex.Destinations.get/2Get a destination by ID
Fivetrex.Destinations.create/2Create a new destination
Fivetrex.Destinations.update/3Update a destination
Fivetrex.Destinations.delete/2Delete a destination
Fivetrex.Destinations.test/2Run destination connection tests

Webhooks

FunctionDescription
Fivetrex.Webhooks.list/2List all webhooks
Fivetrex.Webhooks.stream/2Stream all webhooks
Fivetrex.Webhooks.get/2Get a webhook by ID
Fivetrex.Webhooks.create_account/2Create an account-level webhook
Fivetrex.Webhooks.create_group/3Create a group-level webhook
Fivetrex.Webhooks.update/3Update a webhook
Fivetrex.Webhooks.delete/2Delete a webhook
Fivetrex.Webhooks.test/2Send a test event to a webhook

Webhook Handling

Function/ModuleDescription
Fivetrex.WebhookPlugPlug for Phoenix webhook endpoints
Fivetrex.WebhookSignature.verify/3Verify webhook signature
Fivetrex.WebhookSignature.compute_signature/2Compute HMAC-SHA256 signature

Configuration

Runtime Configuration

Fivetrex is designed for runtime configuration. Create clients with credentials at runtime rather than compile-time:

# In your application code
defmodule MyApp.Fivetran do
def client do
Fivetrex.client(
api_key: Application.get_env(:my_app, :fivetran_api_key),
api_secret: Application.get_env(:my_app, :fivetran_api_secret)
)
end
end
# In config/runtime.exs
config :my_app,
fivetran_api_key: System.get_env("FIVETRAN_API_KEY"),
fivetran_api_secret: System.get_env("FIVETRAN_API_SECRET")

Testing with Custom Base URL

For testing, you can override the base URL:

client = Fivetrex.client(
api_key: "test",
api_secret: "test",
base_url: "http://localhost:4000"
)

Development

# Run all checks (format, credo, compile, test)
mix precommit
# Run CI checks (check-formatted, credo, compile, test + integration)
mix ci
# Run tests
mix test
# Run tests with coverage
mix test --cover

Fivetrex uses Bypass for unit tests. Integration tests run against the real Fivetran API and require credentials in .env.

Documentation

Generate documentation locally:

mix docs
open doc/index.html

License

MIT License. See LICENSE for details.