BulkUpsert
Warning
This is a very early release. It works but has some rough edges, and shouldn't be considered production-ready for most use cases.
Upsert multiple Ecto schema structs, along with their nested associations, to the database with a single function call.
Unlike a plain insert_all/3, this package passes each list of attrs through Ecto changesets. This lets it validate your data and upsert a parent and its children across multiple tables in one call.
Supported features:
- Nested associations: upsert a parent and its
has_many,has_one, andmany_to_manyassociations across multiple tables from a single list of attrs (embedded schemas are stored inline on the parent) - Validation and data processing (via Ecto changesets)
- Custom values for autogenerated fields (e.g. insert/update timestamps)
For more information, see this project's documentation.
Getting started
Installation
Add this package to your list of dependencies in mix.exs, then run mix deps.get:
def deps do
[
{:bulk_upsert, "0.2.0"}
]
end
Usage
After the package has been installed, you may call BulkUpsert.bulk_upsert/4 function directly, or create a wrapper function to use in your context modules:
lib/your_project/repo.ex
defmodule YourProject.Repo do
use Ecto.Repo,
otp_app: :your_project,
adapter: Ecto.Adapters.Postgres
@doc "Wraps `BulkUpsert.bulk_upsert/4`."
def bulk_upsert(schema_module, attrs_list, opts \\ []),
do: BulkUpsert.bulk_upsert(__MODULE__, schema_module, attrs_list, opts)
end
Basic working example
Here is a contrived migration and schema that we can work with:
priv/repo/migrations/0001_create_persons.exs
defmodule YourProject.Repo.Migrations.CreatePersons do
use Ecto.Migration
def change do
create table(:persons) do
add :name, :string
end
end
end
lib/your_project/persons/person.ex
defmodule YourProject.Persons.Person do
use Ecto.Schema
import Ecto.Changeset
schema "persons" do
field :name, :string
end
def changeset(person \\ %__MODULE__{}, attrs) do
person
|> cast(attrs, [:id, :name])
|> validate_required([:id, :name])
end
end
Now, after running the migrations with mix ecto.reset, we can enter an IEx shell with iex -S mix and make sure everything works:
Interactive Elixir (1.18.3) - press Ctrl+C to exit (type h() ENTER for help)
iex> YourProject.Repo.bulk_upsert(
...> YourProject.Persons.Person,
...> [%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}]
...> )
:ok
iex> YourProject.Repo.all(YourProject.Persons.Person)
[
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 1,
name: "Alice"
},
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 2,
name: "Bob"
}
]
iex> YourProject.Repo.bulk_upsert(
...> YourProject.Persons.Person,
...> [%{id: 1, name: "Alicia"}, %{id: 2, name: "Bobby"}]
...> )
:ok
iex> YourProject.Repo.all(YourProject.Persons.Person)
[
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 1,
name: "Alicia"
},
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 2,
name: "Bobby"
}
]
Working with nested associations
The main reason to reach for this package over a plain insert_all/3 is that it can upsert a parent and its children at the same time, from a single list of attrs. The parent and each association are upserted into their own tables, all within one transaction.
Here we extend the Person example with a has_many :pets association:
priv/repo/migrations/0002_create_pets.exs
defmodule YourProject.Repo.Migrations.CreatePets do
use Ecto.Migration
def change do
create table(:pets) do
add :person_id, references(:persons)
add :name, :string
end
end
end
lib/your_project/persons/pet.ex
defmodule YourProject.Persons.Pet do
use Ecto.Schema
import Ecto.Changeset
schema "pets" do
field :person_id, :integer
field :name, :string
end
def changeset(pet \\ %__MODULE__{}, attrs) do
pet
|> cast(attrs, [:id, :person_id, :name])
|> validate_required([:id, :person_id, :name])
end
end
lib/your_project/persons/person.ex
defmodule YourProject.Persons.Person do
use Ecto.Schema
import Ecto.Changeset
schema "persons" do
field :name, :string
has_many :pets, YourProject.Persons.Pet
end
def changeset(person \\ %__MODULE__{}, attrs) do
person
|> cast(attrs, [:id, :name])
|> validate_required([:id, :name])
|> cast_assoc(:pets)
end
end
Note
Each child's foreign key (here,
person_id) must be present in its own attrs. Associations are upserted viainsert_all/3, so the foreign key is not inferred from the parent.
Now a single call upserts both the persons and their pets across both tables:
iex> YourProject.Repo.bulk_upsert(
...> YourProject.Persons.Person,
...> [
...> %{id: 1, name: "Alice", pets: [
...> %{id: 10, person_id: 1, name: "Rex"},
...> %{id: 11, person_id: 1, name: "Whiskers"}
...> ]},
...> %{id: 2, name: "Bob", pets: [
...> %{id: 20, person_id: 2, name: "Buddy"}
...> ]}
...> ]
...> )
:ok
iex> YourProject.Repo.all(YourProject.Persons.Pet)
[
%YourProject.Persons.Pet{id: 10, person_id: 1, name: "Rex"},
%YourProject.Persons.Pet{id: 11, person_id: 1, name: "Whiskers"},
%YourProject.Persons.Pet{id: 20, person_id: 2, name: "Buddy"}
]
Running the same call again with changed pet names upserts the existing rows in place, exactly like the top-level structs.
has_one and many_to_many associations work the same way: cast them in the changeset and include them in the attrs. For has_many and has_one, each child must carry its own foreign key (as shown above with person_id). For many_to_many, the associated records and the join table rows are both upserted for you, and duplicate records and links are removed automatically. Embedded schemas (embeds_one, embeds_many) have no table of their own, so they are stored inline on the parent row.
Working with autogenerated timestamps
Ecto's built-in insert_all/3 function does not support autogenerated fields such as timestamps. Therefore, if your project has Ecto schemas that use autogenerated timestamp fields, you will need to ensure that these values are present during the bulk upsert process.
The simplest way is the :placeholders option, which sets fields from shared values (sent to the database once) after changeset validation:
YourProject.Repo.bulk_upsert(
YourProject.Persons.Person,
[%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}],
placeholders: %{
YourProject.Persons.Person => %{inserted_at: DateTime.utc_now(), updated_at: DateTime.utc_now()}
}
)
Placeholder fields bypass the changeset, so they are not cast or validated. Do not mark a placeholder field as required in your changeset (its value is absent during validation, which would mark the row invalid and skip it).
If you need more control, you can instead provide custom logic via a custom insert_all function. Here is an example that shows how this can be accomplished:
lib/your_project/repo.ex
defmodule YourProject.Repo do
use Ecto.Repo,
otp_app: :your_project,
adapter: Ecto.Adapters.Postgres
require Logger
@doc "Wraps `BulkUpsert.bulk_upsert/4`."
def bulk_upsert(schema_module, attrs_list, opts \\ []) do
base_opts = [
# Use a custom function that accepts the same arguments as `insert_all/3`
insert_all_function_atom: :insert_all_with_autogenerated_timestamps,
# Do not overwrite the initial insert timestamp
replace_all_except: [:inserted_at]
]
opts = Keyword.merge(base_opts, opts)
BulkUpsert.bulk_upsert(__MODULE__, schema_module, attrs_list, opts)
end
@doc """
Extend `YourProject.Repo.insert_all/3` to automatically generate current insert and update
timestamps when performing bulk insert operations.
> #### Info {: .info}
>
> Unlike `YourProject.Repo.insert_all/3`, this function will only accept a schema as the first
> argument (not a source), and a list of entries as the second argument (not a query).
## Examples
iex> YourProject.Repo.insert_all_with_autogenerated_timestamps(
...> YourProject.Persons.Person,
...> _attrs_list = [%{...}, %{...}]
...> )
{2, nil}
"""
def insert_all_with_autogenerated_timestamps(schema_module, entries, opts \\ []) do
placeholders = Keyword.get(opts, :placeholders, %{})
# Build timestamp placeholders and attrs
inserted_at_field = :inserted_at
updated_at_field = :updated_at
current_timestamp = DateTime.utc_now()
timestamp_placeholders =
Map.new([{inserted_at_field, current_timestamp}, {updated_at_field, current_timestamp}])
# Reject timestamp fields that are not defined in the given `schema_module`
|> Map.reject(fn {field, _value} -> schema_module.__schema__(:type, field) |> is_nil() end)
if Enum.empty?(timestamp_placeholders) do
Logger.debug("""
The #{inspect(schema_module)} schema does not use any configured insert or update \
timestamp fields. Falling back to `#{inspect(__MODULE__)}.insert_all/3`...\
""")
__MODULE__.insert_all(schema_module, entries, opts)
else
Logger.debug("Performing bulk insert with autogenerated timestamps...")
timestamp_placeholder_attrs =
timestamp_placeholders
|> Map.new(fn {field, _value} -> {field, {:placeholder, field}} end)
# Merge the timestamp attrs and placeholders into an `insert_all/3` function call
entries = entries |> Enum.map(fn attrs -> Map.merge(attrs, timestamp_placeholder_attrs) end)
placeholders = placeholders |> Map.merge(timestamp_placeholders)
opts = opts |> Keyword.put(:placeholders, placeholders)
__MODULE__.insert_all(schema_module, entries, opts)
end
end
end
priv/repo/migrations/0001_create_persons.exs
defmodule YourProject.Repo.Migrations.CreatePersons do
use Ecto.Migration
def change do
create table(:persons) do
add :name, :string
timestamps(type: :utc_datetime_usec)
end
end
end
lib/your_project/persons/person.ex
defmodule YourProject.Persons.Person do
use Ecto.Schema
import Ecto.Changeset
schema "persons" do
field :name, :string
timestamps(type: :utc_datetime_usec)
end
def changeset(person \\ %__MODULE__{}, attrs) do
person
|> cast(attrs, [:id, :name])
|> validate_required([:id, :name])
end
end
Now, after running the migrations with mix ecto.reset, we can enter an IEx shell with iex -S mix and make sure everything still works:
Interactive Elixir (1.18.3) - press Ctrl+C to exit (type h() ENTER for help)
iex> YourProject.Repo.bulk_upsert(
...> YourProject.Persons.Person,
...> [%{id: 1, name: "Alice"}, %{id: 2, name: "Bob"}]
...> )
:ok
iex> YourProject.Repo.all(YourProject.Persons.Person)
[
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 1,
name: "Alice",
inserted_at: ~U[2025-04-29 07:01:10.180490Z],
updated_at: ~U[2025-04-29 07:01:10.180490Z]
},
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 2,
name: "Bob",
inserted_at: ~U[2025-04-29 07:01:10.180490Z],
updated_at: ~U[2025-04-29 07:01:10.180490Z]
}
]
iex> YourProject.Repo.bulk_upsert(
...> YourProject.Persons.Person,
...> [%{id: 1, name: "Alicia"}, %{id: 2, name: "Bobby"}]
...> )
:ok
iex> YourProject.Repo.all(YourProject.Persons.Person)
[
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 1,
name: "Alicia",
inserted_at: ~U[2025-04-29 07:01:10.180490Z],
updated_at: ~U[2025-04-29 07:01:19.549929Z]
},
%YourProject.Persons.Person{
__meta__: #Ecto.Schema.Metadata<:loaded, "persons">,
id: 2,
name: "Bobby",
inserted_at: ~U[2025-04-29 07:01:10.180490Z],
updated_at: ~U[2025-04-29 07:01:19.549929Z]
}
]
As you can see, our new custom logic ensures that the correct timestamps are generated.
For more information, see this project's documentation on HexDocs.
This project made possible by Interline Travel and Tour Inc.