🏭 Analogy
A factory assembly line: one station stamps the part, the next paints it, the next boxes it. Each station does one job and hands the result to the next, all working at the same time on different items. If boxing slows down, the line naturally backs up — nobody piles parts on the floor. That’s a pipeline.
The problem
You have a stream of data and several transformations to apply. Doing them in one big loop is simple but serial — and mixes the steps together. A pipeline gives each stage its own goroutine, connected by channels, so stages run concurrently, stay decoupled, and get back-pressure and cancellation for free.
Structure
graph LR G["generator<br/>goroutine"] -->|chan int| S1["square<br/>goroutine"] S1 -->|chan int| S2["square<br/>goroutine"] S2 -->|chan int| M["main<br/>range loop"] D["done chan"] -.cancels.-> G D -.cancels.-> S1 D -.cancels.-> S2
Each stage follows the same shape: take an inbound channel, return a new outbound channel, run a goroutine that reads → transforms → sends, and close the output when the input is exhausted.
Idiomatic Go
A generator turns values into a stream; each stage reads one channel and writes the next; main ranges the final channel. The done channel lets everything cancel cleanly. Edit and Run:
package main
import "fmt"
// generator turns a list of ints into a stream.
func generator(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out) // this stage owns out, so it closes out
for _, n := range nums {
select {
case <-done:
return
case out <- n:
}
}
}()
return out
}
// square is a stage: read ints, emit their squares.
func square(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-done:
return
case out <- n * n:
}
}
}()
return out
}
func main() {
done := make(chan struct{})
defer close(done) // cancels every stage on return
// Compose stages: numbers -> square -> square
stream := square(done, square(done, generator(done, 1, 2, 3, 4)))
for n := range stream {
fmt.Println(n) // 1, 16, 81, 256
}
}
Generators & take
A more composable style, popularized by Concurrency in Go, uses a repeatFn that produces an infinite stream and a take that pulls just the first N — both honoring done:
done := make(chan any)
defer close(done)
repeat := repeatFn(done, func() any { return rand.Int() }) // infinite
for n := range take(done, repeat, 10) { // bounded
fmt.Println(n)
}
Because every stage selects on <-done, closing done unwinds even the infinite generator — no leaked goroutines.
🐹 The three rules that make pipelines safe
1. The stage that writes a channel closes it (defer close(out)), never the reader. 2. Every send and receive also selects on <-done (or ctx.Done()), so cancellation can’t deadlock. 3. Let channels carry the back-pressure — unbuffered channels keep a fast stage from racing ahead of a slow one, so memory stays bounded without any manual throttling.
From done channel to context
In production you’ll usually swap the done channel for a context.Context — the select pattern is identical, but you also get timeouts, deadlines, and propagation across API boundaries:
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case <-ctx.Done(): // timeout, deadline, or cancel — all in one
return
case out <- n * n:
}
}
}()
return out
}
Where it leads
The pipeline is the foundation for the rest of Go’s concurrency toolkit:
- Parallelize a slow stage with Fan-out / Fan-in.
- Bound how many items are in flight with a Worker Pool.
- Replace the
donechannel withcontext.Contextfor deadlines and request scope. - Wrap a stream so it always stops on cancel with the Or-done channel.
Pitfalls
⚠️ Leaks happen when a consumer stops early
If main breaks out of the range loop before the stream is drained, upstream stages can block forever on a send nobody receives — a goroutine leak. The done channel (closed via defer) is what saves you: it signals every stage to abandon its pending send and return. Never start a pipeline stage without a cancellation path.
When to use it — and when not
✅ Reach for it when
- You transform a stream in distinct steps (read → parse → filter → write) and want them to run concurrently.
- You want natural back-pressure and bounded memory, with stages decoupled from each other.
- You need clean cancellation so partial work shuts down without leaking goroutines.
⛔ Think twice when
- It's a single, cheap transformation — a plain loop is simpler and faster.
- Stages are so tiny that channel overhead dominates the actual work.
- You need strict global ordering after fanning a stage out to many workers.
Related patterns
Distribute work across multiple goroutines (fan-out) and merge their results back into one stream (fan-in).
concurrencyWorker PoolBound concurrency by feeding jobs to a fixed number of long-lived worker goroutines.
concurrencyGeneratorProduce a stream of values from a goroutine over a channel, lazily and on demand.
concurrencyContext & CancellationPropagate cancellation, deadlines, and request-scoped values across API boundaries and goroutine trees with context.Context.
Check your understanding
Score: 0 / 51. In a Go pipeline, who closes a stage's output channel?
Each stage owns its output channel and closes it when done. Closing from the receiver side would panic; ranging over the channel downstream ends cleanly once it's closed.
2. What is the `done` channel for?
Each stage selects on <-done alongside its send/receive. Closing done unblocks them all so they return — preventing goroutines stuck forever on a channel nobody reads.
3. Where does back-pressure come from?
With unbuffered (or small-buffered) channels, a producer can't outrun its consumer — it blocks on send until the consumer receives, keeping memory bounded automatically.
4. What does replacing the done channel with context.Context add?
A done channel signals 'stop.' A context does that (ctx.Done()) plus timeouts (WithTimeout), deadlines, and value propagation, and it threads naturally through function signatures across packages — which is why production pipelines take a ctx.
5. After you fan a stage out to several workers, is the original item order preserved downstream?
Fan-out trades ordering for throughput: results arrive as workers finish, not in input order. If you need the original sequence, carry an index with each item and reassemble, or don't fan that stage out.
Comments
Sign in with GitHub to join the discussion.