IdempotencyKit
IdempotencyKit helps you make mutation endpoints safe to retry.
It is built for Plug/Phoenix apps with Ecto/Postgres.
Installation
def deps do
[
{:idempotency_kit, "~> 0.1.0"}
]
endWhat it does
For the same (user_id, scope, idempotency_key):
- first request claims execution
-
duplicate while first is running returns
processing - after completion, duplicate replays stored response
-
same key + different payload returns
payload_mismatch
This prevents accidental duplicate creates when clients retry after lost responses.
How it works
-
payload is hashed deterministically (
sha256) -
claim row is stored in DB with unique key
(user_id, scope, idempotency_key) - action executes once
- final HTTP status/body is persisted
- later duplicates replay that persisted response
-
stale
processingrows can be reclaimed after a timeout
Payload hashing caveat:
- hashing is deterministic for the exact Elixir term shape
"1"and1hash differently%{"a" => 1}and%{a: 1}hash differently- normalize payloads before hashing if your client shapes vary
Important caveat (read this)
This is a toolkit, not a full drop-in service.
You still need to provide:
- your own Ecto schema + migration for idempotency rows
- your own Repo module
-
a small app adapter module implementing
IdempotencyKit.Store - a scheduled cleanup job for old rows
- client-side key generation and retry behavior
So yes, it is app-independent, but host apps must wire the infrastructure around it.
Real integration examples (YourApp)
Below are concrete examples using a placeholder app name (YourApp). This maps
directly to the 5 required host responsibilities above.
1) Ecto schema + migration
Paths in YourApp:
server/lib/your_app/idempotency/request.exserver/priv/repo/migrations/<timestamp>_create_idempotency_requests.exs
# server/lib/your_app/idempotency/request.ex
defmodule YourApp.Idempotency.Request do
use Ecto.Schema
import Ecto.Changeset
@all_statuses ["processing", "succeeded", "failed"]
@completed_statuses ["succeeded", "failed"]
schema "idempotency_requests" do
field :scope, :string
field :idempotency_key, :string
field :request_hash, :string
field :status, :string, default: "processing"
field :response_status, :integer
field :response_body, :map
field :completed_at, :utc_datetime
belongs_to :user, YourApp.Accounts.User
timestamps()
end
def create_changeset(request, attrs) do
request
|> cast(attrs, [:user_id, :scope, :idempotency_key, :request_hash, :status])
|> validate_required([:user_id, :scope, :idempotency_key, :request_hash, :status])
|> validate_length(:scope, min: 1, max: 120)
|> validate_length(:idempotency_key, min: 1, max: 255)
|> validate_length(:request_hash, is: 64)
|> validate_inclusion(:status, @all_statuses)
|> unique_constraint(:idempotency_key, name: :idempotency_requests_user_scope_key_idx)
end
def completion_changeset(request, attrs) do
request
|> cast(attrs, [:status, :response_status, :response_body, :completed_at])
|> validate_required([:status, :response_status, :response_body, :completed_at])
|> validate_inclusion(:status, @completed_statuses)
|> validate_number(:response_status, greater_than_or_equal_to: 100, less_than: 600)
end
end# server/priv/repo/migrations/20260517162721_create_idempotency_requests.exs
def change do
create table(:idempotency_requests) do
add :user_id, references(:users, on_delete: :delete_all), null: false
add :scope, :string, null: false
add :idempotency_key, :string, null: false
add :request_hash, :string, null: false
add :status, :string, null: false, default: "processing"
add :response_status, :integer
add :response_body, :map
add :completed_at, :utc_datetime
timestamps()
end
create constraint(:idempotency_requests, :idempotency_requests_status_check,
check: "status IN ('processing', 'succeeded', 'failed')"
)
create unique_index(:idempotency_requests, [:user_id, :scope, :idempotency_key],
name: :idempotency_requests_user_scope_key_idx
)
create index(:idempotency_requests, [:inserted_at],
name: :idempotency_requests_inserted_at_idx
)
end2) Repo module
Path in YourApp:
server/lib/your_app/repo.ex
defmodule YourApp.Repo do
use Ecto.Repo,
otp_app: :your_app,
adapter: Ecto.Adapters.Postgres
end
3) App adapter module implementing IdempotencyKit.Store
Paths in YourApp:
server/lib/your_app/idempotency/store/ecto.exserver/lib/your_app/idempotency.ex
# server/lib/your_app/idempotency/store/ecto.ex
defmodule YourApp.Idempotency.Store.Ecto do
@behaviour IdempotencyKit.Store
alias YourApp.Idempotency.Request
alias YourApp.Repo
alias IdempotencyKit.Store.Ecto, as: KitEctoStore
# Deterministically hash payloads before claim/mismatch checks.
defdelegate request_hash(payload), to: KitEctoStore
# Optional read-only exact-retry pre-check. Useful for app policy, such as
# avoiding a rate-limit debit before the real claim path runs.
def replay_candidate?(user_id, scope, idempotency_key, request_payload) do
KitEctoStore.replay_candidate?(
Repo,
Request,
user_id,
scope,
idempotency_key,
request_payload
)
end
# Main claim state machine for execute/processing/replay/mismatch outcomes.
def claim_request(user_id, scope, idempotency_key, request_hash) do
KitEctoStore.claim_request(
Repo,
Request,
user_id,
scope,
idempotency_key,
request_hash,
Application.get_env(:your_app, :idempotency, [])
)
end
# Persist the terminal response used for future idempotent replays.
def complete_request(request, status, response_status, response_body) do
KitEctoStore.complete_request(
Repo,
Request,
request,
status,
response_status,
response_body
)
end
# Retention cleanup for old idempotency records.
def purge_stale_requests do
KitEctoStore.purge_stale_requests(
Repo,
Request,
Application.get_env(:your_app, :idempotency, [])
)
end
end
# server/lib/your_app/idempotency.ex
defmodule YourApp.Idempotency do
alias YourApp.Idempotency.Store.Ecto, as: EctoStore
defdelegate request_hash(payload), to: EctoStore
# Expose the optional exact-retry pre-check for controllers/plugs that need it.
defdelegate replay_candidate?(user_id, scope, idempotency_key, request_payload),
to: EctoStore
defdelegate claim_request(user_id, scope, idempotency_key, request_hash),
to: EctoStore
defdelegate complete_request(request, status, response_status, response_body),
to: EctoStore
defdelegate purge_stale_requests(), to: EctoStore
end4) Scheduled cleanup job
Paths in YourApp:
server/lib/your_app/scheduler.exserver/config/config.exs
# server/lib/your_app/scheduler.ex
defmodule YourApp.Scheduler do
use Quantum, otp_app: :your_app
end# server/config/config.exs
config :your_app, YourApp.Scheduler,
timezone: "Etc/UTC",
jobs: [
cleanup_idempotency_requests: [
schedule: "@daily",
task: {YourApp.Idempotency, :purge_stale_requests, []}
]
]5) Client-side key generation and retry behavior
Path in YourApp:
client/api.ts
function createIdempotencyKey(prefix: string): string {
const normalizedPrefix = prefix.trim() || "request";
const randomPart = createRandomId();
return `${normalizedPrefix}-${randomPart}`;
}
async function requestWithIdempotentProcessingRetry<T>(
path: string,
options: RequestInit,
retryOptions: {
keyPrefix?: string;
processingErrorCode: string;
maxPollAttempts?: number;
pollDelayMs?: number;
networkRetryAttempts?: number;
}
): Promise<T> {
// Keep one key for one logical submission and all immediate retries.
const idempotencyKey = createIdempotencyKey(retryOptions.keyPrefix || "request");
const maxPollAttempts = retryOptions.maxPollAttempts ?? 48;
const pollDelayMs = retryOptions.pollDelayMs ?? 2500;
const networkRetryAttempts = retryOptions.networkRetryAttempts ?? 3;
let attempt = 0;
while (true) {
try {
return await request<T>(
path,
{
...options,
headers: {
...(options.headers as Record<string, string> | undefined),
"Idempotency-Key": idempotencyKey
}
},
{ networkRetryAttempts }
);
} catch (error) {
const isProcessingError =
error instanceof ApiRequestError &&
error.status === 409 &&
error.code === retryOptions.processingErrorCode;
if (!isProcessingError || attempt >= maxPollAttempts) {
throw error;
}
attempt += 1;
await wait(pollDelayMs);
}
}
}Main modules
IdempotencyKit.Core- orchestration helpersIdempotencyKit.Store- store behaviourIdempotencyKit.Store.Ecto- generic Ecto/Postgres implementation helpersIdempotencyKit.Phoenix.Action- controller adapter
Required storage fields
Your idempotency table should have:
user_idscopeidempotency_keyrequest_hash(64-char sha256 hex)status(processing|succeeded|failed)response_statusresponse_bodycompleted_at- timestamps
Recommended indexes/constraints:
-
unique index on
(user_id, scope, idempotency_key) - status check constraint
-
index on
inserted_atfor cleanup
Schema integration note:
-
by default,
IdempotencyKit.Store.Ectowill callYourSchema.create_changeset(struct(YourSchema), attrs)if it exists -
if your schema does not expose
create_changeset/2, passcreate_changeset_fun: fn schema_module, attrs -> ... endin store options
Copy-paste schema and migration
Example schema:
defmodule MyApp.Idempotency.Request do
use Ecto.Schema
import Ecto.Changeset
schema "idempotency_requests" do
field :scope, :string
field :idempotency_key, :string
field :request_hash, :string
field :status, :string, default: "processing"
field :response_status, :integer
field :response_body, :map
field :completed_at, :utc_datetime
belongs_to :user, MyApp.Accounts.User
timestamps()
end
def create_changeset(request, attrs) do
request
|> cast(attrs, [:user_id, :scope, :idempotency_key, :request_hash, :status])
|> validate_required([:user_id, :scope, :idempotency_key, :request_hash, :status])
|> validate_inclusion(:status, ["processing", "succeeded", "failed"])
|> unique_constraint(:idempotency_key, name: :idempotency_requests_user_scope_key_idx)
end
endExample migration:
def change do
create table(:idempotency_requests) do
add :user_id, references(:users, on_delete: :delete_all), null: false
add :scope, :string, null: false
add :idempotency_key, :string, null: false
add :request_hash, :string, null: false
add :status, :string, null: false, default: "processing"
add :response_status, :integer
add :response_body, :map
add :completed_at, :utc_datetime
timestamps()
end
create unique_index(
:idempotency_requests,
[:user_id, :scope, :idempotency_key],
name: :idempotency_requests_user_scope_key_idx
)
create constraint(
:idempotency_requests,
:idempotency_requests_status_check,
check: "status IN ('processing', 'succeeded', 'failed')"
)
create index(:idempotency_requests, [:inserted_at])
endCleanup
The library does not schedule cleanup for you.
Run a periodic job (for example daily) that calls your store purge function.
Controller usage example
defmodule MyAppWeb.EntityController do
use MyAppWeb, :controller
alias IdempotencyKit.Phoenix.Action, as: IdempotentAction
@idempotency_scope "entity_create"
def create(
conn, # Plug.Conn for the incoming HTTP request.
params # Request payload; this is hashed for idempotency matching.
) do
user = conn.assigns.current_user
execute_fun = fn idempotent_conn ->
# Runs only when claim result is :execute.
create_entity(idempotent_conn, user, params)
end
case IdempotentAction.maybe_run_for_user(
conn, # incoming conn
user.id, # unique partition by user
@idempotency_scope, # action scope
params, # payload used to compute request hash
execute_fun, # mutation closure
idempotency_opts(user.id) # adapter options
) do
{:handled, handled_conn} ->
# Already handled by idempotency adapter (execute/replay/error).
handled_conn
{:no_key, plain_conn} ->
# No key header: fallback non-idempotent path.
create_entity(plain_conn, user, params)
end
end
defp idempotency_opts(user_id) do
[
# Required: module exporting request_hash/1, claim_request/4, complete_request/4.
idempotency_module: MyApp.Idempotency,
# Optional: your standardized API error renderer.
render_error_fun: &MyAppWeb.ApiHelpers.render_error/4,
# Optional: log prefix.
log_context: "Entity controller user=#{user_id}"
]
end
endIdempotency-Key header
Default header name is Idempotency-Key (read as idempotency-key in Plug).
You can override the header per endpoint with opts[:header], but using the
default is recommended.
Typical client behavior
For mutation endpoints:
- generate a new key per user action
-
send it in
Idempotency-Key - retry network failures with the same key
- if you retry with edited payload, use a new key
Package integration tests (Postgres)
The package includes Postgres-backed integration tests for the real Ecto lifecycle semantics:
- claim -> processing -> complete -> replay
- payload mismatch conflicts
- stale processing reclaim
- retention purge
- concurrent identical claims
By default these tests are excluded.
Run them by setting:
IDEMPOTENCY_KIT_TEST_DATABASE_URL
Example:
IDEMPOTENCY_KIT_TEST_DATABASE_URL=postgres://postgres:postgres@localhost:5432/idempotency_kit_test \
mix test --include integration test/idempotency_kit/store_ecto_integration_test.exsReplacing DB storage with Redis
You can use Redis instead of the Ecto/Postgres helper.
The callback interface is currently (v0.1.0):
request_hash/1replay_candidate?/4claim_request/4complete_request/4purge_stale_requests/0
To do this, implement your own store module with @behaviour IdempotencyKit.Store:
defmodule MyApp.Idempotency.RedisStore do
@behaviour IdempotencyKit.Store
# Deterministically hash payload. Same logical payload must hash the same way.
def request_hash(payload), do: ...
# Optional read-only pre-check for callers that want to detect an exact retry
# before claim. Return true only when key + payload hash matches an existing
# idempotency record. Still call claim_request/4 for the authoritative result.
def replay_candidate?(user_id, scope, idempotency_key, request_payload), do: ...
# Main claim state machine:
# - first request => {:execute, request}
# - duplicate in-flight => {:processing, request}
# - completed request with same hash => {:replay, request}
# - same key + different hash => {:error, :payload_mismatch}
def claim_request(user_id, scope, idempotency_key, request_hash), do: ...
# Persist terminal outcome for a claimed request.
# `request` should be what you returned from claim_request/4.
# Replayed records must include response_status + response_body.
def complete_request(request, status, response_status, response_body), do: ...
# Retention cleanup for old rows/keys.
def purge_stale_requests(), do: ...
end
Then pass your Redis-backed module as idempotency_module in controller options.
Note: replay_candidate?/4 is part of the IdempotencyKit.Store behaviour,
but the Phoenix adapter itself only requires:
request_hash/1, claim_request/4, and complete_request/4.
Important requirements for Redis:
claim_request/4must be atomic (Lua script is recommended).-
Keep the same lifecycle semantics:
-
first request =>
{:execute, request} -
duplicate in-flight =>
{:processing, request} -
same key + different payload =>
{:error, :payload_mismatch} -
completed request =>
{:replay, request} -
replay request should include
response_statusandresponse_body(atom or string keys) so the Phoenix adapter can render it
-
first request =>
- Support stale processing reclaim (or enforce a bounded processing TTL).
- Set retention/TTL for old completed requests.
Note: Redis can work very well here, but make sure your durability settings (AOF/RDB/replication) match your reliability expectations.