Skip to main content

pkg/gateway/proxy/testdata/graphql_stream_server.go

Source

Overview

What:

Test-only HTTP handlers used by the proxy package to simulate streaming upstream behavior.

Specifically, this file provides a handler that emits chunked JSON responses over time to mimic GraphQL subscription streams or deferred payloads.

Why:

Proxy streaming behavior is easy to regress:

  • buffering responses accidentally breaks streaming semantics
  • not flushing promptly increases latency for clients
  • cancellation may not propagate correctly

This handler exists so tests can:

  • verify the reverse proxy forwards chunked responses incrementally
  • verify upstream cancellation is observed
  • avoid depending on an external upstream server for streaming test coverage

How:

  • NewGraphQLStreamHandler() constructs an instance with a cancellation channel.
  • ServeHTTP checks that the response writer supports flushing, sets streaming-friendly headers, then loops on a ticker, emitting newline-delimited JSON payloads until the request context is cancelled.
  • When the request is cancelled, it closes cancelCh exactly once using sync.Once.
  • Cancelled() exposes a channel that tests can wait on to confirm cancellation propagation.

Notes:

This handler is in testdata and is not intended for production use. It intentionally writes small payloads frequently to exercise flush and streaming behavior.

Contents

Imports

import block 1

pkg/gateway/proxy/testdata/graphql_stream_server.go#L3
import (
"encoding/json"
"net/http"
"sync"
"time"
)

Types

type block 1

pkg/gateway/proxy/testdata/graphql_stream_server.go#L11
type GraphQLStreamHandler struct {
onCancel sync.Once
cancelCh chan struct{}
}

GraphQLStreamHandler

What: Stateful streaming handler that tracks cancellation.

Why: Holds the synchronization and channel state needed to emit and to signal cancellation safely across goroutines.

How: Uses onCancel sync.Once to guard a close(cancelCh) call, preventing panics from double-closing.

Notes: The fields are not exported because this is a test helper, not a stable API surface.

Functions and Methods

NewGraphQLStreamHandler

What: Constructs a new streaming handler instance.

Why: Ensures each test gets an isolated handler with its own cancellation channel.

How: Returns a GraphQLStreamHandler with cancelCh initialized.

pkg/gateway/proxy/testdata/graphql_stream_server.go#L16
func NewGraphQLStreamHandler() *GraphQLStreamHandler {
return &GraphQLStreamHandler{cancelCh: make(chan struct{})}
}

Walkthrough

The list below documents the statements inside the function body, including nested blocks and inline closures.

  • L17: return &GraphQLStreamHandler{cancelCh: make(chan struct{})}
    • What: Returns from the current function.
    • Why: Ends the current execution path and hands control back to the caller.
    • How: Executes a return statement (possibly returning values).

(*GraphQLStreamHandler).ServeHTTP

What: Implements http.Handler by streaming newline-delimited JSON payloads.

Why: Simulates GraphQL streaming responses so proxy tests can validate flush/cancellation behavior.

How: Verifies http.Flusher support, sets headers, ticks periodically, and on each tick marshals a small payload and writes it followed by a newline and a flush. On context cancellation, closes cancelCh once and returns.

Notes: This handler intentionally does not terminate on its own; tests should cancel the request context to stop it.

pkg/gateway/proxy/testdata/graphql_stream_server.go#L20
func (h *GraphQLStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.Header().Set("Cache-Control", "no-store")

ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()

type payload struct {
Data map[string]any `json:"data"`
}

index := 0
for {
select {
case <-r.Context().Done():
h.onCancel.Do(func() { close(h.cancelCh) })
return
case <-ticker.C:
index++
body := payload{Data: map[string]any{"message": index}}
data, _ := json.Marshal(body)
w.Write(data)
w.Write([]byte("\n"))
flusher.Flush()
}
}
}

Walkthrough

The list below documents the statements inside the function body, including nested blocks and inline closures.

  • L21: flusher, ok := w.(http.Flusher)
    • What: Defines flusher, ok.
    • Why: Keeps intermediate state available for later steps in the function.
    • How: Evaluates the right-hand side expressions and stores results in the left-hand variables.
  • L22: if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) return }
    • What: Branches conditionally.
    • Why: Short-circuits early when a precondition is not met or an error/edge case is detected.
    • How: Evaluates the condition and executes the matching branch.
    • Nested steps:
      • L23: http.Error(w, "streaming unsupported", http.StatusInternalServerError)
        • What: Calls http.Error.
        • Why: Performs side effects or delegates work to a helper.
        • How: Executes the expression statement.
      • L24: return
        • What: Returns from the current function.
        • Why: Ends the current execution path and hands control back to the caller.
        • How: Executes a return statement (possibly returning values).
  • L27: w.Header().Set("Content-Type", "application/json")
    • What: Calls w.Header().Set.
    • Why: Performs side effects or delegates work to a helper.
    • How: Executes the expression statement.
  • L28: w.Header().Set("Cache-Control", "no-store")
    • What: Calls w.Header().Set.
    • Why: Performs side effects or delegates work to a helper.
    • How: Executes the expression statement.
  • L30: ticker := time.NewTicker(40 * time.Millisecond)
    • What: Defines ticker.
    • Why: Keeps intermediate state available for later steps in the function.
    • How: Evaluates the right-hand side expressions and stores results in the left-hand variables.
  • L31: defer ticker.Stop()
    • What: Defers a call for cleanup.
    • Why: Ensures the deferred action runs even on early returns.
    • How: Schedules the call to run when the surrounding function returns.
  • L33: type payload struct { Data map[string]any `json:"data"` }
    • What: Declares local names.
    • Why: Introduces variables or types used later in the function.
    • How: Executes a Go declaration statement inside the function body.
  • L37: index := 0
    • What: Defines index.
    • Why: Keeps intermediate state available for later steps in the function.
    • How: Evaluates the right-hand side expressions and stores results in the left-hand variables.
  • L38: for { select { case <-r.Context().Done(): h.onCancel.Do(func() { close(h.cancelCh) }) return case <-ticker.C: index++ body := payload{Data:…
    • What: Runs a loop.
    • Why: Repeats logic until a condition is met or the loop terminates.
    • How: Executes a for loop statement.
    • Nested steps:
      • L39: select { case <-r.Context().Done(): h.onCancel.Do(func() { close(h.cancelCh) }) return case <-ticker.C: index++ body := payload{Data: map[s…
        • What: Selects among concurrent operations.
        • Why: Coordinates channel operations without blocking incorrectly.
        • How: Executes a select statement and runs one ready case.
        • Nested steps:
          • L40: case <-r.Context().Done():
            • What: Selects a select-case branch.
            • Why: Coordinates concurrent operations without blocking incorrectly.
            • How: Runs this case body when its channel operation is ready (or runs default immediately).
            • Nested steps:
              • L41: h.onCancel.Do(func() { close(h.cancelCh) })
                • What: Calls h.onCancel.Do.
                • Why: Performs side effects or delegates work to a helper.
                • How: Executes the expression statement.
                • Nested steps:
                  • L41: func() { close(h.cancelCh) }
                    • What: Defines an inline function (closure).
                    • Why: Encapsulates callback logic and may capture variables from the surrounding scope.
                    • How: Declares a func literal and uses it as a value (for example, as an HTTP handler or callback).
                    • Nested steps:
                      • L41: close(h.cancelCh)
                        • What: Calls close.
                        • Why: Performs side effects or delegates work to a helper.
                        • How: Executes the expression statement.
              • L42: return
                • What: Returns from the current function.
                • Why: Ends the current execution path and hands control back to the caller.
                • How: Executes a return statement (possibly returning values).
          • L43: case <-ticker.C:
            • What: Selects a select-case branch.
            • Why: Coordinates concurrent operations without blocking incorrectly.
            • How: Runs this case body when its channel operation is ready (or runs default immediately).
            • Nested steps:
              • L44: index++
                • What: Updates a counter.
                • Why: Maintains an index or tally used by subsequent logic.
                • How: Executes an increment/decrement statement.
              • L45: body := payload{Data: map[string]any{"message": index}}
                • What: Defines body.
                • Why: Keeps intermediate state available for later steps in the function.
                • How: Evaluates the right-hand side expressions and stores results in the left-hand variables.
              • L46: data, _ := json.Marshal(body)
                • What: Defines data, _.
                • Why: Keeps intermediate state available for later steps in the function.
                • How: Evaluates the right-hand side expressions and stores results in the left-hand variables.
              • L47: w.Write(data)
                • What: Calls w.Write.
                • Why: Performs side effects or delegates work to a helper.
                • How: Executes the expression statement.
              • L48: w.Write([]byte("\n"))
                • What: Calls w.Write.
                • Why: Performs side effects or delegates work to a helper.
                • How: Executes the expression statement.
              • L49: flusher.Flush()
                • What: Calls flusher.Flush.
                • Why: Performs side effects or delegates work to a helper.
                • How: Executes the expression statement.

(*GraphQLStreamHandler).Cancelled

What: Returns a channel that is closed when the client request context is cancelled.

Why: Tests need a reliable signal that the upstream handler observed cancellation.

How: Exposes the internal cancelCh as a receive-only channel.

Notes: The channel is closed once using sync.Once in ServeHTTP.

pkg/gateway/proxy/testdata/graphql_stream_server.go#L55
func (h *GraphQLStreamHandler) Cancelled() <-chan struct{} {
return h.cancelCh
}

Walkthrough

The list below documents the statements inside the function body, including nested blocks and inline closures.

  • L56: return h.cancelCh
    • What: Returns from the current function.
    • Why: Ends the current execution path and hands control back to the caller.
    • How: Executes a return statement (possibly returning values).

Architecture

Guides

Neighboring source