tools/bench_alloc.zig
Purpose
Runs a concurrent ingest workload against an in-process engine.Engine instance and reports:
- throughput (ops/sec)
- end-to-end ingest call latency distribution
- engine queue drain behavior
- flush statistics (counts, time, points)
- queue lock contention metrics
- allocator stats (when built with the
small_poolallocator mode)
This tool is intended to compare allocator modes (default vs mimalloc vs small_pool) and to surface queue/flush bottlenecks.
Imports
build_options– exposesallocator_modestring at build time.sydra_tooling– a tooling module providing:alloc(src/sydra/alloc.zig)config(src/sydra/config.zig)engine(src/sydra/engine.zig)types(src/sydra/types.zig)
CLI flags
Parsed by parseArgs:
--ops N(default200000)--concurrency N(default4)--series N(default128)--drain-timeout-ms N(default60000)0disables the timeout (wait indefinitely for queue drain).
--poll-ms N(default5)- poll interval while waiting for the writer thread to drain.
--flush-ms N(default200)- engine flush interval.
--memtable-mb N(default32)- memtable size limit.
--stress-seconds N(default0)- enables a sustained stress loop when > 0.
--stress-ops N(default10000)- ops per stress batch per thread.
--help/-h
Validation:
- Rejects
concurrency == 0,series == 0. - Rejects
total_ops == 0unlessstress_seconds > 0.
Data directory lifecycle
main creates a per-run data directory in the current working directory:
- name pattern:
bench-data-{timestamp_ms}
It is deleted via deleteTree in a defer block after eng.deinit().
Config construction
fn makeConfig(alloc, data_dir, flush_interval, memtable_max) !cfg.Config
Builds a cfg.Config with:
data_dirduplicated into allocator-owned memoryhttp_port = 0fsync = .noneflush_interval_ms = flush_intervalmemtable_max_bytes = memtable_maxretention_days = 0auth_token = ""(duplicated)enable_influx = falseenable_prom = falsemem_limit_bytes = 512 MiBretention_ns = StringHashMap(u32).init(alloc)
The returned config must be deinit’d by the caller when no longer needed.
Workload generator
const ProducerContext
Per-thread parameters:
engine: *engine.Engineseries_ids: []const types.SeriesIdops: usizeseries_offset: usizets_base: i64thread_id: usizelatencies: ?*std.ArrayList(u64) = null- optional per-op latency sink (nanoseconds)
stress_result: ?*ThreadStressResult = null
fn producer(ctx: ProducerContext) void
For ctx.ops iterations:
- Picks a series id:
sid = series_ids[(series_offset + i) % series_ids.len]
- Builds a point:
ts = ts_base + ivalue = float(ts)
- Calls
eng.ingest(Engine.IngestItem{ series_id, ts, value, tags_json = "{}" }). - Measures latency around the
ingestcall viastd.time.nanoTimestamp(). - Writes
latency_nstolatencieswhen provided. - Accumulates totals into
ThreadStressResultwhen provided.
On ingest error, prints:
ingest error on thread {thread_id}: {errorName}
And terminates the thread early.
Latency statistics
fn percentile(sorted: []u64, ratio: f64) u64
Computes a percentile using linear interpolation between adjacent elements in a sorted sample array.
fn printLatencySummary(latencies: []u64) void
- Sorts
latenciesin-place (ascending). - Prints
p50,p95,p99,p999in microseconds.
Drain + metrics loop
After producers join, main waits for the engine’s writer thread to drain:
- Reads:
eng.metrics.ingest_total(atomic)eng.queue.len()
- Exits when:
ingested >= ops_totalandpending == 0
- Sleeps
poll_msbetween polls. - Times out after
drain_timeout_msunless timeout is disabled.
The loop also records:
max_pendingavg_pending(viapending_sum / samples)
Output summary
main prints several std.debug.print lines, including:
- overall throughput and flush summary
- queue drain stats (pending samples, timeout flag)
- queue metrics:
queue_pop_total,queue_wait_ns_total- average wait time and average queue length
- queue lock stats:
- average push/pop lock wait/hold times
- contention counts
small_pool-only reporting
When compiled with alloc_mod.mode == "small_pool", the tool prints:
- shard allocator stats (if enabled): hits/misses, deferred totals, epoch range
- fallback allocator counters and size histogram
- per-bucket usage and lock timing/contended acquisition counts
It also advances/leaves an epoch before sampling stats to encourage garbage collection of deferred frees.
Stress mode
fn runStress(allocator, handle, eng, series_ids, threads, contexts, stress_ops, stress_seconds) !void
Runs for roughly stress_seconds, spawning repeated batches:
- Each batch spawns
threads.lenproducer threads. - Each producer runs
stress_opsingest calls. - Per-thread latencies are recorded in
ThreadStressResult(not in per-op arrays).
After each batch:
- aggregates total ops and total/max latency
- when
alloc_mod.is_small_pool, tracks the maximum observedshard_deferred_total
Prints a final:
stress_summary batches=... total_ops=... avg_latency_us=... max_latency_us=... max_deferred=...
Timing helpers
fn sleepMs(ms: u64) void
Uses std.time.sleep if available, otherwise falls back to std.Thread.sleep.