{} The Go Reference

Concurrency pattern · Intermediate

Fan-out / Fan-in

Distribute work across multiple goroutines (fan-out) and merge their results back into one stream (fan-in).

Concurrency Intermediate ⏱ 3 min read Complete

🛒 Analogy

A supermarket opens extra checkout lanes when the queue grows (fan-out) — shoppers split across cashiers and get through faster. At the end of the day, every lane’s takings are merged into one ledger (fan-in). More lanes, more throughput; the receipts just aren’t in arrival order anymore.

The problem

One stage of your pipeline is the bottleneck — say a CPU-heavy transform. Running it single-file wastes the other cores. Fan-out runs several copies of that stage reading the same input; fan-in merges their outputs back into one channel so the rest of the pipeline is unchanged.

Structure

graph LR
IN["input chan"] --> W1["worker 1"]
IN --> W2["worker 2"]
IN --> W3["worker 3"]
W1 --> FI["fan-in<br/>merge"]
W2 --> FI
W3 --> FI
FI --> OUT["output chan"]

Idiomatic Go

Three workers read the same input channel (fan-out); fanIn merges their outputs with a WaitGroup that closes the merged channel once all are done. Edit and Run:

fan_out_in.go — editable & runnable
package main

import (
"fmt"
"sort"
"sync"
)

func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
	defer close(out)
	for _, n := range nums {
		out <- n
	}
}()
return out
}

// a stage that does some "expensive" work
func worker(in <-chan int) <-chan int {
out := make(chan int)
go func() {
	defer close(out)
	for n := range in {
		out <- n * n
	}
}()
return out
}

// fanIn merges several channels into one.
func fanIn(chans ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(chans))
for _, c := range chans {
	go func(c <-chan int) {
		defer wg.Done()
		for v := range c {
			out <- v
		}
	}(c)
}
go func() { wg.Wait(); close(out) }() // close once all sources drain
return out
}

func main() {
in := gen(1, 2, 3, 4, 5, 6, 7, 8)

// fan-out: three workers share one input channel
w1, w2, w3 := worker(in), worker(in), worker(in)

// fan-in: merge their results
var results []int
for v := range fanIn(w1, w2, w3) {
	results = append(results, v)
}

sort.Ints(results) // order is not guaranteed with fan-out
fmt.Println(results)
}

🐹 Two things that make it correct

Many readers, one channel: several goroutines can receive from the same channel safely — Go hands each value to exactly one of them. Close-after-all: the merged channel must be closed only once every source goroutine has finished, which is exactly what the wg.Wait()-then-close goroutine guarantees. Closing too early would panic a still-sending worker.

Restoring order

Fan-out trades ordering for throughput. When you need the original sequence back, carry an index and reassemble — the work still runs in parallel:

type item struct {
	idx int
	val int
}

// workers process item{idx, val}, preserving idx in the result.
results := make([]int, n)
for r := range fanIn(workers...) { // r is an item
	results[r.idx] = r.val // slot each result into its original position
}
// results is now in input order, no sort needed

Fan-out vs Worker Pool

They’re close cousins. Fan-out spins up a goroutine per “copy” of a stage; a Worker Pool keeps a fixed number of long-lived workers pulling from a queue. Use fan-out to parallelize a pipeline stage; use a pool when you need to bound how many run at once (see also Semaphore and errgroup.SetLimit).

Pitfalls

⚠️ Don't fan out unbounded over a huge input

Three workers is deliberate. Fanning out one-goroutine-per-item over a million inputs can exhaust memory or thrash the scheduler. When the input is large or the work hits a limited resource (DB, API), bound the parallelism with a worker pool or a semaphore.

When to use it — and when not

✅ Reach for it when

  • A pipeline stage is slow or CPU-bound and can be parallelized across cores.
  • Work items are independent and order doesn't matter (or can be restored).
  • You want to scale throughput by adding more workers to one stage.

⛔ Think twice when

  • Strict ordering is required and you can't re-sort afterward.
  • The work is tiny — channel and goroutine overhead outweighs the gain.
  • Work is I/O-bound with a resource cap — a bounded worker pool or semaphore fits better.

Check your understanding

Score: 0 / 5

1. What is 'fan-out'?

Multiple goroutines can safely receive from one channel; each grabs the next item, spreading the work — that's fan-out.

2. Why does fan-in need a sync.WaitGroup?

Each source feeds the merged channel in its own goroutine; a WaitGroup lets a closer goroutine wait for all of them before closing out, so the consumer's range ends cleanly.

3. What property do you lose when you fan a stage out?

Parallel workers finish at different times, so output order is nondeterministic. If you need order, attach an index and sort, or don't fan out.

4. You need the results in input order after fanning out. What's the fix?

Tag each item with its original index, fan out, then either sort the collected results by index or write each into results[i]. The parallelism stays; ordering is reconstructed at the merge.

5. Why merge sources with a WaitGroup-of-goroutines rather than a single select over all channels?

select with N cases is fine for a small, fixed N, but doesn't scale to a variable number of sources. The standard fan-in spawns one forwarding goroutine per source and a wg.Wait()-then-close, which works for any N and terminates cleanly.

Comments

Sign in with GitHub to join the discussion.