src/sydra/query/operator.zig
Purpose
Implements the physical execution pipeline as a tree of operators producing rows.
This is the core runtime layer used by executor.zig.
See also
- Executor
- Physical plan builder
- Expression evaluation and Value representation
- Engine (scan operator delegates to
Engine.queryRange)
Definition index (public)
pub const Value
Alias:
value.Valuefromsrc/sydra/query/value.zig
pub const ExecuteError
Composite error set:
std.mem.Allocator.Error– allocations for operator buffers / owned rows / group stateexpression.EvalError– expression evaluation inside operatorsQueryRangeError– derived fromEngine.queryRangereturn type (scan I/O)- plus:
UnsupportedPlanUnsupportedAggregate
const QueryRangeError = @typeInfo(@typeInfo(@TypeOf(engine_mod.Engine.queryRange)).@"fn".return_type.?).error_union.error_set;
pub const ExecuteError = std.mem.Allocator.Error || expression.EvalError || QueryRangeError || error{
UnsupportedPlan,
UnsupportedAggregate,
};
pub const Row
Row returned by Operator.next():
schema: []const plan.ColumnInfo– borrowed schema slicevalues: []Value– values aligned withschema
Lifetime notes:
- Most operators reuse an internal buffer (
scan,project,aggregate), sovaluesis only valid until the nextnext()call. filterandlimitpass through child row buffers without copying.sortreturns owned row copies valid until the sort operator is destroyed.
pub const Operator
An operator is a heap-allocated struct with:
next()– returns the next row ornullfor end-of-streamdestroy()– frees operator and internal payloadcollectStats(list)– collects operator stats snapshots from the pipeline
Operator types (payload variants):
scanfilterprojectaggregatesortlimittest_source(test-only helper)
Important fields (public + runtime internals):
allocator: std.mem.Allocatorschema: []const plan.ColumnInfonext_fn,destroy_fn: function pointers implementing the operator behaviorstats: Stats– name, elapsed_us, rows_out
Nested public types:
Stats– live countersStatsSnapshot– copyable view collected bycollectStats
Behavior notes:
Operator.next()wrapsnext_fnto measure elapsed time (std.time.microTimestamp) and incrementsrows_outwhen a row is produced.Operator.destroy()calls the payload destroy routine then frees the operator allocation.
pub fn next(self: *Operator) ExecuteError!?Row {
const start = std.time.microTimestamp();
const result = self.next_fn(self);
const elapsed = std.time.microTimestamp() - start;
self.stats.elapsed_us += @as(u64, @intCast(elapsed));
const maybe_row = result catch |err| return err;
if (maybe_row) |_| {
self.stats.rows_out += 1;
}
return maybe_row;
}
pub fn buildPipeline(allocator, engine, node: *physical.Node) ExecuteError!*Operator
Builds an operator pipeline from a physical plan node.
Notable constraints (as implemented):
- Scan currently supports only
series_ref.by_idselectors; name-based selection returnsUnsupportedPlan. - Time bounds are taken from the physical scan node (
TimeBounds) and passed intoEngine.queryRange.- Current implementation uses
min/maxonly; inclusive flags are ignored at execution time. - When bounds are absent, it queries
[minInt(i64), maxInt(i64)].
- Current implementation uses
Physical-to-operator mapping:
scan→scanfilter→filter(recursively builds child)project→project(may be elided when schema is reusable)aggregate→aggregate(recursively builds child)sort→sort(materializes child rows; then sorts)limit→limit- Special case:
limit(child=sort)becomes a sort with alimit_hint(top-k-ish behavior)
- Special case:
Observability
Each operator tracks:
name(e.g."scan")elapsed_us(accumulated time spent innext())rows_out
These stats are exposed via ExecutionCursor.collectOperatorStats().
Operator payloads (implementation details)
This section documents the concrete payload structs inside Operator.Payload. These types are not pub, but they define the runtime semantics and ownership rules.
scan
Backs scan plans by materializing points from storage:
- Creation (
createScanOperator):- Requires
physical.Scan.selector != null; otherwiseUnsupportedPlan - Only supports
selector.series.by_id;.namereturnsUnsupportedPlan - Executes a single
engine.queryRange(series_id, start_ts, end_ts, &points)during construction - Allocates a
buffer: []Valuesized to the output schema and reuses it per row
- Requires
- Row production (
scanNext):- Only supports identifier columns named
timeandvalue(case-insensitive) - Any non-identifier output column or unknown identifier name returns
UnsupportedPlan
- Only supports identifier columns named
- Destruction (
scanDestroy): frees points and buffer
for (op.schema, 0..) |column, idx| {
if (column.expr.* != .identifier) return error.UnsupportedPlan;
const name = column.expr.identifier.value;
if (namesEqual(name, "time")) {
payload.buffer[idx] = Value{ .integer = point.ts };
} else if (namesEqual(name, "value")) {
payload.buffer[idx] = Value{ .float = point.value };
} else {
return error.UnsupportedPlan;
}
}
filter
Streams child rows and keeps only those matching a boolean predicate:
- Payload:
{ child, predicate } - Row production (
filterNext):- loops
child.next()until predicate evaluates to true - returns the child row values without copying
- loops
- Destruction (
filterDestroy): destroys child
project
Computes a new schema by evaluating expressions per incoming row:
buildProjectOperatormay return the child operator directly when:physical.Project.reuse_child_schema == true, or- the child schema already matches the requested schema (names + expression equality)
- Payload:
{ child, buffer } - Row production (
projectNext):- reads one child row
- evaluates each output column expression with
expression.evaluate - writes into
bufferand returns it
- Destruction (
projectDestroy): destroys child, frees buffer
aggregate
Grouping/aggregation implementation that materializes all groups before producing output.
Constraints:
- Output columns must be either a grouping expression (structural match), or an aggregate call
avg,sum,count(case-insensitive). - Other output column forms return
UnsupportedAggregate.
Key payload fields:
child,group_exprs,aggregates,column_metagroups,key_buffer,output_bufferinitialized,index
Initialization and execution:
- First
next()triggersmaterializeGroupswhich:- iterates every child row
- evaluates group keys into
key_buffer - finds/creates a
GroupStateby linear scan (valuesEqual) - updates per-group aggregate states (0 args means
count(*)-like)
- After materialization, emits one row per group using key values and
finalizeState.
Ownership notes:
- Group key
[]Valuearrays are owned by the aggregate operator (Value.copySlice), but.stringvalues still borrow their underlying bytes. output_bufferis reused per emitted row.
sort
Materializes all rows from its child, computes ordering keys, sorts, then streams the sorted results.
Key points:
- Child is always destroyed after materialization (
defer child.destroy()). - Each row is copied into an
OwnedRow:valuesis an owned copy of the[]Valuearray (shallow copy;.stringbytes are not duplicated)keysare computed by evaluating ordering expressions against the copied values
- Sort order:
nullsorts first- numeric-ish (
integer,float,boolean) are compared as floats - strings compare lexicographically
DESCis implemented by inverting the ordering
LIMIT hint optimization:
- With
limit_hint = { offset, take }, the sort operator caps memory tooffset + takerows while scanning and evicts the current “worst” row. - After sorting, it drops the first
offsetrows and truncates totake.
Lifetime notes:
- Returned
Row.valuesremain valid until sort operator destruction (owned[]Valuearrays), but.stringvalues still borrow their underlying bytes from upstream storage (often the query arena).
limit
Offset + take streaming wrapper:
- Payload:
{ child, offset, remaining } - Row production discards
offsetrows then emits up toremaining. - Destruction destroys child.
test_source (test-only)
Simple row source used by inline tests: returns a pre-baked list of value slices sequentially without copying.