pkg/gateway/proxy/testdata/graphql_stream_server.go
Source
- Package:
testdata - File:
pkg/gateway/proxy/testdata/graphql_stream_server.go - GitHub: https://github.com/theroutercompany/api_router/blob/main/pkg/gateway/proxy/testdata/graphql_stream_server.go
Overview
What:
Test-only HTTP handlers used by the proxy package to simulate streaming upstream behavior.
Specifically, this file provides a handler that emits chunked JSON responses over time to mimic GraphQL subscription streams or deferred payloads.
Why:
Proxy streaming behavior is easy to regress:
- buffering responses accidentally breaks streaming semantics
- not flushing promptly increases latency for clients
- cancellation may not propagate correctly
This handler exists so tests can:
- verify the reverse proxy forwards chunked responses incrementally
- verify upstream cancellation is observed
- avoid depending on an external upstream server for streaming test coverage
How:
NewGraphQLStreamHandler()constructs an instance with a cancellation channel.ServeHTTPchecks that the response writer supports flushing, sets streaming-friendly headers, then loops on a ticker, emitting newline-delimited JSON payloads until the request context is cancelled.- When the request is cancelled, it closes
cancelChexactly once usingsync.Once. Cancelled()exposes a channel that tests can wait on to confirm cancellation propagation.
Notes:
This handler is in testdata and is not intended for production use.
It intentionally writes small payloads frequently to exercise flush and streaming behavior.
Contents
Imports
import block 1
import (
"encoding/json"
"net/http"
"sync"
"time"
)
Types
type block 1
type GraphQLStreamHandler struct {
onCancel sync.Once
cancelCh chan struct{}
}
GraphQLStreamHandler
What: Stateful streaming handler that tracks cancellation.
Why: Holds the synchronization and channel state needed to emit and to signal cancellation safely across goroutines.
How: Uses onCancel sync.Once to guard a close(cancelCh) call, preventing panics from double-closing.
Notes: The fields are not exported because this is a test helper, not a stable API surface.
Functions and Methods
NewGraphQLStreamHandler
What: Constructs a new streaming handler instance.
Why: Ensures each test gets an isolated handler with its own cancellation channel.
How: Returns a GraphQLStreamHandler with cancelCh initialized.
func NewGraphQLStreamHandler() *GraphQLStreamHandler {
return &GraphQLStreamHandler{cancelCh: make(chan struct{})}
}
Walkthrough
The list below documents the statements inside the function body, including nested blocks and inline closures.
- L17:
return &GraphQLStreamHandler{cancelCh: make(chan struct{})}- What: Returns from the current function.
- Why: Ends the current execution path and hands control back to the caller.
- How: Executes a
returnstatement (possibly returning values).
(*GraphQLStreamHandler).ServeHTTP
What: Implements http.Handler by streaming newline-delimited JSON payloads.
Why: Simulates GraphQL streaming responses so proxy tests can validate flush/cancellation behavior.
How: Verifies http.Flusher support, sets headers, ticks periodically, and on each tick marshals a small payload and writes it followed by a newline and a flush. On context cancellation, closes cancelCh once and returns.
Notes: This handler intentionally does not terminate on its own; tests should cancel the request context to stop it.
func (h *GraphQLStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Cache-Control", "no-store")
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
type payload struct {
Data map[string]any `json:"data"`
}
index := 0
for {
select {
case <-r.Context().Done():
h.onCancel.Do(func() { close(h.cancelCh) })
return
case <-ticker.C:
index++
body := payload{Data: map[string]any{"message": index}}
data, _ := json.Marshal(body)
w.Write(data)
w.Write([]byte("\n"))
flusher.Flush()
}
}
}
Walkthrough
The list below documents the statements inside the function body, including nested blocks and inline closures.
- L21:
flusher, ok := w.(http.Flusher)- What: Defines flusher, ok.
- Why: Keeps intermediate state available for later steps in the function.
- How: Evaluates the right-hand side expressions and stores results in the left-hand variables.
- L22:
if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) return }- What: Branches conditionally.
- Why: Short-circuits early when a precondition is not met or an error/edge case is detected.
- How: Evaluates the condition and executes the matching branch.
- Nested steps:
- L23:
http.Error(w, "streaming unsupported", http.StatusInternalServerError)- What: Calls http.Error.
- Why: Performs side effects or delegates work to a helper.
- How: Executes the expression statement.
- L24:
return- What: Returns from the current function.
- Why: Ends the current execution path and hands control back to the caller.
- How: Executes a
returnstatement (possibly returning values).
- L23:
- L27:
w.Header().Set("Content-Type", "application/json")- What: Calls w.Header().Set.
- Why: Performs side effects or delegates work to a helper.
- How: Executes the expression statement.
- L28:
w.Header().Set("Cache-Control", "no-store")- What: Calls w.Header().Set.
- Why: Performs side effects or delegates work to a helper.
- How: Executes the expression statement.
- L30:
ticker := time.NewTicker(40 * time.Millisecond)- What: Defines ticker.
- Why: Keeps intermediate state available for later steps in the function.
- How: Evaluates the right-hand side expressions and stores results in the left-hand variables.
- L31:
defer ticker.Stop()- What: Defers a call for cleanup.
- Why: Ensures the deferred action runs even on early returns.
- How: Schedules the call to run when the surrounding function returns.
- L33:
type payload struct { Data map[string]any `json:"data"` }- What: Declares local names.
- Why: Introduces variables or types used later in the function.
- How: Executes a Go declaration statement inside the function body.
- L37:
index := 0- What: Defines index.
- Why: Keeps intermediate state available for later steps in the function.
- How: Evaluates the right-hand side expressions and stores results in the left-hand variables.
- L38:
for { select { case <-r.Context().Done(): h.onCancel.Do(func() { close(h.cancelCh) }) return case <-ticker.C: index++ body := payload{Data:…- What: Runs a loop.
- Why: Repeats logic until a condition is met or the loop terminates.
- How: Executes a
forloop statement. - Nested steps:
- L39:
select { case <-r.Context().Done(): h.onCancel.Do(func() { close(h.cancelCh) }) return case <-ticker.C: index++ body := payload{Data: map[s…- What: Selects among concurrent operations.
- Why: Coordinates channel operations without blocking incorrectly.
- How: Executes a
selectstatement and runs one ready case. - Nested steps:
- L40:
case <-r.Context().Done():- What: Selects a select-case branch.
- Why: Coordinates concurrent operations without blocking incorrectly.
- How: Runs this case body when its channel operation is ready (or runs default immediately).
- Nested steps:
- L41:
h.onCancel.Do(func() { close(h.cancelCh) })- What: Calls h.onCancel.Do.
- Why: Performs side effects or delegates work to a helper.
- How: Executes the expression statement.
- Nested steps:
- L41:
func() { close(h.cancelCh) }- What: Defines an inline function (closure).
- Why: Encapsulates callback logic and may capture variables from the surrounding scope.
- How: Declares a
funcliteral and uses it as a value (for example, as an HTTP handler or callback). - Nested steps:
- L41:
close(h.cancelCh)- What: Calls close.
- Why: Performs side effects or delegates work to a helper.
- How: Executes the expression statement.
- L41:
- L41:
- L42:
return- What: Returns from the current function.
- Why: Ends the current execution path and hands control back to the caller.
- How: Executes a
returnstatement (possibly returning values).
- L41:
- L43:
case <-ticker.C:- What: Selects a select-case branch.
- Why: Coordinates concurrent operations without blocking incorrectly.
- How: Runs this case body when its channel operation is ready (or runs default immediately).
- Nested steps:
- L44:
index++- What: Updates a counter.
- Why: Maintains an index or tally used by subsequent logic.
- How: Executes an increment/decrement statement.
- L45:
body := payload{Data: map[string]any{"message": index}}- What: Defines body.
- Why: Keeps intermediate state available for later steps in the function.
- How: Evaluates the right-hand side expressions and stores results in the left-hand variables.
- L46:
data, _ := json.Marshal(body)- What: Defines data, _.
- Why: Keeps intermediate state available for later steps in the function.
- How: Evaluates the right-hand side expressions and stores results in the left-hand variables.
- L47:
w.Write(data)- What: Calls w.Write.
- Why: Performs side effects or delegates work to a helper.
- How: Executes the expression statement.
- L48:
w.Write([]byte("\n"))- What: Calls w.Write.
- Why: Performs side effects or delegates work to a helper.
- How: Executes the expression statement.
- L49:
flusher.Flush()- What: Calls flusher.Flush.
- Why: Performs side effects or delegates work to a helper.
- How: Executes the expression statement.
- L44:
- L40:
- L39:
(*GraphQLStreamHandler).Cancelled
What: Returns a channel that is closed when the client request context is cancelled.
Why: Tests need a reliable signal that the upstream handler observed cancellation.
How: Exposes the internal cancelCh as a receive-only channel.
Notes: The channel is closed once using sync.Once in ServeHTTP.
func (h *GraphQLStreamHandler) Cancelled() <-chan struct{} {
return h.cancelCh
}
Walkthrough
The list below documents the statements inside the function body, including nested blocks and inline closures.
- L56:
return h.cancelCh- What: Returns from the current function.
- Why: Ends the current execution path and hands control back to the caller.
- How: Executes a
returnstatement (possibly returning values).