๐ŸŒ Web: WHATWG Extension to BEAM

A protocol-agnostic, zero-buffer suite of Web Standard APIs for Elixir.

Hex.pmDocs


๐Ÿš€ Why Web?

Most Elixir networking libraries buffer data into memory by default. Web is built for Zero-Buffer Streaming, ensuring your applications remain responsive and low-memory even when handling gigabytes of data.

By implementing WHATWG standards as Native Process-backed entities (:gen_statem), Web provides a consistent, predictable, and backpressure-aware interface for HTTP, TCP, and custom protocols.

โšก Performance Notes

The current responsive-core work added ref-tagged control receives, local priority signaling, and :message_queue_data, :off_heap for stream engines. On the benchmark machine below, those changes materially improved the control plane under mailbox pressure.

Benchmark Before (fa74d7f) After (9babea0) Result
Selective receive at 100k mailbox depth 1.76 s 2.78 ms 99.84% faster
Selective receive at 10k mailbox depth 123.00 ms 0.79 ms 99.36% faster
100 MiB burst final heap size 225,340 words 4,264 words 98.11% lower
100 MiB burst peak queue length 1,415 905 36.04% lower

For the compression pipeline, CompressionStream averaged 263.68 ms for 32 MiB of input on this machine, which is about 121.36 MB/s.

The abort-latency benchmark remains noisy and is documented in BENCHMARKS.md as a caveat instead of a headline claim.


๐Ÿ›  The "Web-First" DSL

If youโ€™ve used the modern Web API in a browser, you already know how to use this library. We've mapped those standards to idiomatic Elixir.

defmodule GitHub do
  use Web

  def repositories(query \\ "elixir") do
    # 1. Standard URL manipulation
    url = URL.new("https://api.github.com/search/repositories")
    
    params = 
      URL.search_params(url)
      |> URLSearchParams.set("q", query)
      |> URLSearchParams.append("sort", "stars")

    url = URL.search(url, URLSearchParams.to_string(params))

    # 2. Construct a Request with automatic header inference
    request = Request.new(url, 
      method: "GET",
      headers: %{"Accept" => "application/vnd.github.v3+json"}
    )

    # 3. Fetch via Promise and await the response
    await fetch(request)
  end
end

response = GitHub.repositories()

# Zero-buffer streaming: chunks are written to stdout as they arrive
response.body 
|> Stream.take(5) 
|> Enum.each(&IO.write/1)

Web.Promise and await/1

Async APIs now return %Web.Promise{} values so you can compose fetches, body reads, and stream writes before awaiting the final result.

use Web

response = await fetch("https://api.github.com/zen")

text = await Response.text(response)

pair = await Promise.all([
    Promise.resolve(:ok),
    Promise.resolve(text)
  ])

๐Ÿ“– API Usage & Examples

Web.Promise โ€” Async Composition

Web.Promise gives you a browser-style promise API for composing async work before you call await/1.

# Resolve and reject explicit values
ok = Promise.resolve("ready")
value = await(ok)
# => "ready"

fallback =
  Promise.reject(:boom)
  |> Promise.catch(fn reason -> "recovered: #{reason}" end)
  |> await()
# => "recovered: boom"

# Wait for every promise to resolve
results =
  Promise.all([
    Promise.resolve(1),
    Promise.resolve(2)
  ])
  |> await()
# => [1, 2]

# Collect fulfillment and rejection outcomes together
settled =
  Promise.all_settled([
    Promise.resolve("ok"),
    Promise.reject(:nope)
  ])
  |> await()
# => [%{status: "fulfilled", value: "ok"}, %{status: "rejected", reason: :nope}]

# Return the first fulfilled promise
winner =
  Promise.any([
    Promise.reject(:first_failed),
    Promise.resolve("winner")
  ])
  |> await()
# => "winner"

# Return the first settled promise
raced =
  Promise.race([
    Promise.resolve("fast"),
    Promise.resolve("slow")
  ])
  |> await()
# => "fast"

Web.ReadableStream โ€” The Source

The source of every streaming pipeline. A ReadableStream is a managed process that provides data to consumers, handling Backpressure (throttling producers when consumers are slow), Locking (ensuring exclusive access), and Teeing (splitting streams for multiple consumers) in a zero-copy, process-safe way.

# Create a stream from any enumerable (List, File.stream, etc.)
stream = ReadableStream.from(["chunk1", "chunk2"])

# Split one stream into two independent branches (Zero-copy)
{branch_a, branch_b} = ReadableStream.tee(stream)

Task.start(fn -> Enum.each(branch_a, &process_data/1) end)
Task.start(fn -> Enum.each(branch_b, &log_data/1) end)

Web.WritableStream โ€” The Sink

Writable streams are endpoints for consuming data, supporting backpressure and lock management. Use them to write data from readable streams or directly from your application.

# Get a writable stream (for example, from a custom stream or a network response)
writable = WritableStream.new()
writer = WritableStream.get_writer(writable)

# Write chunks to the stream
:ok = await(WritableStreamDefaultWriter.write(writer, "hello "))
:ok = await(WritableStreamDefaultWriter.write(writer, "world!"))

# Close and release
:ok = await(WritableStreamDefaultWriter.close(writer))
:ok = WritableStreamDefaultWriter.release_lock(writer)

Web.TransformStream โ€” Stream Processing

Transform streams allow you to process, modify, or filter data as it flows through a pipeline. They are ideal for tasks like compression, encryption, or counting bytes.

transform = TransformStream.new(%{
  transform: fn chunk, controller ->
    # Example: uppercase transformation
    upper = String.upcase(IO.iodata_to_binary(chunk))
    ReadableStreamDefaultController.enqueue(controller, upper)
  end,
  flush: fn controller ->
    ReadableStreamDefaultController.close(controller)
  end
})

source = ReadableStream.from(["foo", "bar"])
upper = ReadableStream.pipe_through(source, transform)
output = await(Response.text(Response.new(body: upper)))
# => "FOOBAR"

Web.ByteLengthQueuingStrategy and Web.CountQueuingStrategy

Use queuing strategies to control how stream backpressure is measured. CountQueuingStrategy counts each chunk as 1, while ByteLengthQueuingStrategy measures buffered chunks by their byte size.

count_strategy = Web.CountQueuingStrategy.new(16)
byte_strategy = Web.ByteLengthQueuingStrategy.new(1024)

stream = ReadableStream.new(%{strategy: count_strategy})
sink = WritableStream.new(%{high_water_mark: 16})

ReadableStream.pipe_to/3 & ReadableStream.pipe_through/3

Use pipe_to/3 to connect a source to any writable sink with backpressure-awareness, and pipe_through/3 to attach transforms while returning the next readable stage.

source = ReadableStream.from(["hello", " world"])

transform = TransformStream.new(%{
  transform: fn chunk, controller ->
    ReadableStreamDefaultController.enqueue(controller, String.upcase(chunk))
  end,
  flush: fn controller ->
    ReadableStreamDefaultController.close(controller)
  end
})

# pipe_through returns the transformed readable side
upper = ReadableStream.pipe_through(source, transform)
output = await(Response.text(Response.new(body: upper)))
# => "HELLO WORLD"

# pipe_to returns a %Web.Promise{} you can await
sink = WritableStream.new(%{})
pipe = ReadableStream.pipe_to(ReadableStream.from(["a"]), sink)
:ok = await(pipe)

Web.TextEncoder, Web.TextDecoder, and Their Stream Variants

Use the text encoding helpers to move between Elixir strings and UTF-8 byte views while keeping streamed decoding safe across chunk boundaries.

encoder = TextEncoder.new()
bytes = TextEncoder.encode(encoder, "Hello, ๐ŸŒ")
Web.Uint8Array.to_binary(bytes)
# => "Hello, ๐ŸŒ"

decoder = TextDecoder.new("utf-8", %{fatal: false})
TextDecoder.decode(decoder, bytes)
# => "Hello, ๐ŸŒ"

The stream wrappers compose directly with ReadableStream.pipe_through/3 pipelines.

source = ReadableStream.from(["Hello, ", "๐ŸŒ"])

encoded =
  source
  |> ReadableStream.pipe_through(TextEncoderStream.new())
  |> ReadableStream.pipe_through(TextDecoderStream.new())

await(Response.text(Response.new(body: encoded)))
# => "Hello, ๐ŸŒ"

Web.CompressionStream and Web.DecompressionStream

Use the compression stream wrappers to compress or decompress byte streams inside the same ReadableStream.pipe_through/3 pipelines.

source = ReadableStream.from(["hello world"])

compressed =
  source
  |> ReadableStream.pipe_through(CompressionStream.new("gzip"))

round_tripped =
  compressed
  |> ReadableStream.pipe_through(DecompressionStream.new("gzip"))

await(Response.text(Response.new(body: round_tripped)))
# => "hello world"

Web.Request & Web.Response

First-class containers for network data with high-level factories and standard body readers.

# High-level factories for common responses
res = Response.json(%{status: "ok"})
redirect = Response.redirect("https://elixir-lang.org")

# Automatic status and body readers
if res.ok do
  data = await(Response.json(res))
end

# Multi-protocol support (HTTP/TCP)
req = Request.new("tcp://localhost:8080", method: "SEND", body: "ping")

Web.URL & Web.URLSearchParams

Pure, immutable URL parsing and ordered query parameter management.

# URL parsing
url = URL.new("https://user:pass@example.com:8080/p/a/t/h?query=string#hash")
url.port # => 8080

# Params management
params = URLSearchParams.new("foo=bar&foo=baz")
URLSearchParams.get_all(params, "foo") # => ["bar", "baz"]

Web.Headers โ€” Security-First

Case-insensitive, enumerable header management with built-in protection against credential leakage.

headers = Headers.new(%{"Content-Type" => "text/plain"})
headers = Headers.append(headers, "Set-Cookie", "id=123")

# Automatic Redaction in logs/IEx
IO.inspect(Headers.set(headers, "Authorization", "Bearer secret"))
# => %Web.Headers{"authorization" => "[REDACTED]", ...}

Web.AbortController & Web.AbortSignal

A unified mechanism for cancelling any asynchronous operation.

controller = AbortController.new()
signal = AbortController.signal(controller)

# Pass the signal to a fetch or any async task
Task.start(fn -> 
  # Logic that listens for AbortSignal.aborted?(signal)
end)

# Trigger cancellation
AbortController.abort(controller, "Too slow!")

Web.Blob, Web.ArrayBuffer, & Web.Uint8Array

Immutable data types for efficient binary handling without premature memory flattening.

# Build a Blob from multiple parts lazily
blob = Blob.new(["part1", some_other_blob], type: "text/plain")

# Standard byte views
buffer = ArrayBuffer.new(1024)
view = Uint8Array.new(buffer, 10, 100) # Offset 10, Length 100

๐Ÿ“ฆ Features at a Glance


๐Ÿงช Industrial-Grade Testing

Reliability is a core requirement. Web features exhaustive coverage for stream transitions, body consumption, and redirect handling.

mix test --cover

Built with โค๏ธ for the Elixir community.