{} The Go Reference

Concurrency pattern · Intermediate

Pipeline

Process a stream of data through a series of stages connected by channels, where each stage is a goroutine.

Concurrency Intermediate ⏱ 4 min read Complete

🏭 Analogy

A factory assembly line: one station stamps the part, the next paints it, the next boxes it. Each station does one job and hands the result to the next, all working at the same time on different items. If boxing slows down, the line naturally backs up — nobody piles parts on the floor. That’s a pipeline.

The problem

You have a stream of data and several transformations to apply. Doing them in one big loop is simple but serial — and mixes the steps together. A pipeline gives each stage its own goroutine, connected by channels, so stages run concurrently, stay decoupled, and get back-pressure and cancellation for free.

Structure

graph LR
G["generator<br/>goroutine"] -->|chan int| S1["square<br/>goroutine"]
S1 -->|chan int| S2["square<br/>goroutine"]
S2 -->|chan int| M["main<br/>range loop"]
D["done chan"] -.cancels.-> G
D -.cancels.-> S1
D -.cancels.-> S2

Each stage follows the same shape: take an inbound channel, return a new outbound channel, run a goroutine that reads → transforms → sends, and close the output when the input is exhausted.

Idiomatic Go

A generator turns values into a stream; each stage reads one channel and writes the next; main ranges the final channel. The done channel lets everything cancel cleanly. Edit and Run:

pipeline.go — editable & runnable
package main

import "fmt"

// generator turns a list of ints into a stream.
func generator(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
	defer close(out) // this stage owns out, so it closes out
	for _, n := range nums {
		select {
		case <-done:
			return
		case out <- n:
		}
	}
}()
return out
}

// square is a stage: read ints, emit their squares.
func square(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
	defer close(out)
	for n := range in {
		select {
		case <-done:
			return
		case out <- n * n:
		}
	}
}()
return out
}

func main() {
done := make(chan struct{})
defer close(done) // cancels every stage on return

// Compose stages: numbers -> square -> square
stream := square(done, square(done, generator(done, 1, 2, 3, 4)))

for n := range stream {
	fmt.Println(n) // 1, 16, 81, 256
}
}

Generators & take

A more composable style, popularized by Concurrency in Go, uses a repeatFn that produces an infinite stream and a take that pulls just the first N — both honoring done:

done := make(chan any)
defer close(done)

repeat := repeatFn(done, func() any { return rand.Int() }) // infinite
for n := range take(done, repeat, 10) {                    // bounded
	fmt.Println(n)
}

Because every stage selects on <-done, closing done unwinds even the infinite generator — no leaked goroutines.

🐹 The three rules that make pipelines safe

1. The stage that writes a channel closes it (defer close(out)), never the reader. 2. Every send and receive also selects on <-done (or ctx.Done()), so cancellation can’t deadlock. 3. Let channels carry the back-pressure — unbuffered channels keep a fast stage from racing ahead of a slow one, so memory stays bounded without any manual throttling.

From done channel to context

In production you’ll usually swap the done channel for a context.Context — the select pattern is identical, but you also get timeouts, deadlines, and propagation across API boundaries:

func square(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case <-ctx.Done(): // timeout, deadline, or cancel — all in one
				return
			case out <- n * n:
			}
		}
	}()
	return out
}

Where it leads

The pipeline is the foundation for the rest of Go’s concurrency toolkit:

Pitfalls

⚠️ Leaks happen when a consumer stops early

If main breaks out of the range loop before the stream is drained, upstream stages can block forever on a send nobody receives — a goroutine leak. The done channel (closed via defer) is what saves you: it signals every stage to abandon its pending send and return. Never start a pipeline stage without a cancellation path.

When to use it — and when not

✅ Reach for it when

  • You transform a stream in distinct steps (read → parse → filter → write) and want them to run concurrently.
  • You want natural back-pressure and bounded memory, with stages decoupled from each other.
  • You need clean cancellation so partial work shuts down without leaking goroutines.

⛔ Think twice when

  • It's a single, cheap transformation — a plain loop is simpler and faster.
  • Stages are so tiny that channel overhead dominates the actual work.
  • You need strict global ordering after fanning a stage out to many workers.

Check your understanding

Score: 0 / 5

1. In a Go pipeline, who closes a stage's output channel?

Each stage owns its output channel and closes it when done. Closing from the receiver side would panic; ranging over the channel downstream ends cleanly once it's closed.

2. What is the `done` channel for?

Each stage selects on <-done alongside its send/receive. Closing done unblocks them all so they return — preventing goroutines stuck forever on a channel nobody reads.

3. Where does back-pressure come from?

With unbuffered (or small-buffered) channels, a producer can't outrun its consumer — it blocks on send until the consumer receives, keeping memory bounded automatically.

4. What does replacing the done channel with context.Context add?

A done channel signals 'stop.' A context does that (ctx.Done()) plus timeouts (WithTimeout), deadlines, and value propagation, and it threads naturally through function signatures across packages — which is why production pipelines take a ctx.

5. After you fan a stage out to several workers, is the original item order preserved downstream?

Fan-out trades ordering for throughput: results arrive as workers finish, not in input order. If you need the original sequence, carry an index with each item and reassemble, or don't fan that stage out.

Comments

Sign in with GitHub to join the discussion.