{} The Go Reference

Concurrency pattern · Intermediate

Pub/Sub

Broadcast events to many dynamic subscribers through a broker, decoupling producers from consumers.

Also known as — Publish–Subscribe, Event Bus

Concurrency Intermediate ⏱ 3 min read Complete

📻 Analogy

A radio station broadcasts on a frequency. Anyone can tune in or switch off at any time, and the station has no idea who’s listening — it just transmits. Subscribers come and go freely; the publisher is blissfully unaware. That’s Pub/Sub.

The problem

Several parts of your program each need to react to the same events, and which parts are listening changes at runtime. Wiring producers directly to consumers couples them tightly. A broker sits in the middle: producers Publish, consumers Subscribe to get their own channel, and the broker fans each event out to all current subscribers.

Structure

graph LR
P["publisher"] -->|Publish| B["Broker<br/>map of subscriber chans"]
B --> S1["subscriber 1"]
B --> S2["subscriber 2"]
B --> S3["subscriber 3"]

Idiomatic Go

A broker is a mutex-guarded map of subscriber channels. Publish does a non-blocking send to each, so a slow subscriber can’t stall the rest. Edit and Run:

pubsub.go — editable & runnable
package main

import (
"fmt"
"sync"
)

type Broker struct {
mu   sync.RWMutex
subs map[chan string]struct{}
}

func NewBroker() *Broker { return &Broker{subs: make(map[chan string]struct{})} }

func (b *Broker) Subscribe() chan string {
ch := make(chan string, 4) // small buffer absorbs bursts
b.mu.Lock()
b.subs[ch] = struct{}{}
b.mu.Unlock()
return ch
}

func (b *Broker) Publish(msg string) {
b.mu.RLock()
defer b.mu.RUnlock()
for ch := range b.subs {
	select {
	case ch <- msg: // deliver
	default:        // subscriber is slow — drop rather than block everyone
	}
}
}

func (b *Broker) closeAll() {
b.mu.Lock()
defer b.mu.Unlock()
for ch := range b.subs {
	delete(b.subs, ch)
	close(ch) // ends each subscriber's range loop
}
}

func main() {
b := NewBroker()
var wg sync.WaitGroup

for i := 1; i <= 3; i++ {
	ch := b.Subscribe()
	wg.Add(1)
	go func(id int, ch chan string) {
		defer wg.Done()
		for msg := range ch {
			fmt.Printf("subscriber %d got %q\n", id, msg)
		}
	}(i, ch)
}

b.Publish("hello")
b.Publish("world")

b.closeAll() // unsubscribe everyone and stop their goroutines
wg.Wait()
}

🐹 The concurrent cousin of Observer

Observer notifies dependents in-process, often synchronously. Pub/Sub generalizes it: a broker decouples publishers from a dynamic set of subscribers, each running in its own goroutine with its own channel. The two key choices are back-pressure (drop, block, or buffer for slow subscribers) and lifecycle (closing channels on unsubscribe so goroutines exit). For cross-process or durable delivery, swap this in-process broker for NATS, Kafka, or Redis.

In-process bus vs a real broker

The channel-based broker here is perfect for in-process events — cache invalidation, UI updates, decoupling packages. Know its limits, and when to graduate:

In-process (this page)Real broker (NATS, Kafka, Redis)
Scopeone processacross processes/machines
Deliveryat-most-once (lossy on drop)at-least-once, persistence, replay
Orderingper-subscriber, best-effortper-partition/topic guarantees
Costa struct + channelsinfra to run and operate

If you need durability, cross-service fan-out, or replay, reach for a message broker (see the APIs & messaging track) rather than scaling the in-process bus. And it’s the concurrent cousin of Observer: same intent (broadcast change), with a broker decoupling the two ends.

Pitfalls

⚠️ Slow subscribers force a hard choice

With the non-blocking default above, a subscriber that can’t keep up simply loses messages. The alternatives — block the publisher, or grow an unbounded buffer — trade liveness for memory. There’s no free lunch: decide explicitly whether your system prefers to drop, slow down, or buffer, and document it. Also remember to unsubscribe, or dead subscribers leak goroutines and channels forever.

When to use it — and when not

✅ Reach for it when

  • Several independent consumers each need every event, and they come and go at runtime.
  • You want producers and consumers fully decoupled — neither knows the other.
  • An in-process event bus fits (notifications, cache invalidation, UI updates).

⛔ Think twice when

  • There's a single consumer — a plain channel is enough.
  • You need delivery guarantees, persistence, or cross-process fan-out — use a real broker (NATS, Kafka, Redis).
  • Strict global ordering across subscribers is required.

Check your understanding

Score: 0 / 5

1. How does Pub/Sub differ from Observer?

Observer is the in-object, usually synchronous form; Pub/Sub adds a broker between producers and a dynamic set of subscribers, and in Go is naturally built on channels and goroutines.

2. Why does the broker use select-with-default (or buffered channels) when publishing?

A blocking send to a full subscriber would stall the whole Publish loop. A non-blocking send (drop, or rely on a buffer) keeps a slow consumer from freezing everyone.

3. What ends a subscriber's goroutine cleanly?

Closing the subscriber's channel (on unsubscribe/shutdown) terminates its range loop, letting the goroutine exit without leaking.

4. With the non-blocking 'drop on full' publish, what delivery guarantee do you get?

Dropping to avoid blocking trades reliability for liveness: a subscriber that can't keep up loses messages. That's fine for cache invalidation or UI nudges; if you need durability or at-least-once, you need a real broker, not an in-process bus.

5. The broker's subscribers map is touched by Publish, Subscribe, and Unsubscribe from many goroutines. What's required?

Go maps are not safe for concurrent read/write. The broker must protect its subscriber registry with a mutex (RWMutex if reads dominate) or own it in a single goroutine that all operations message — otherwise the race detector lights up and the map can corrupt.

Comments

Sign in with GitHub to join the discussion.