Concurrency Model
The Kafka runtime uses a goroutine-per-partition architecture that provides automatic parallelism and partition isolation.
Architecture Overview
Each Kafka partition is processed in its own goroutine:
Topic "orders" (3 partitions):
┌─────────────┐
│ Partition 0 │──────> Goroutine 1 ──> Processor
├─────────────┤
│ Partition 1 │──────> Goroutine 2 ──> Processor
├─────────────┤
│ Partition 2 │──────> Goroutine 3 ──> Processor
└─────────────┘
Key characteristics:
- One goroutine per assigned partition
- Partitions processed independently
- No locks needed between partitions
- Scales with partition count
Consumer Group Coordination
The runtime coordinates with other consumers in the group:
Consumer Group "order-processors" with 3 instances:
Instance 1 Instance 2 Instance 3
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Partition 0 │ │ Partition 1 │ │ Partition 2 │
│ Partition 3 │ │ Partition 4 │ │ Partition 5 │
└──────────────┘ └──────────────┘ └──────────────┘
Guarantees:
- Each partition assigned to exactly one consumer
- Automatic rebalancing when consumers join/leave
- Partition reassignment coordinates gracefully
Partition Lifecycle
Assignment
When a partition is assigned to this consumer:
- Receive assignment from consumer group coordinator
- Spawn goroutine for the partition
- Start processing messages from the partition
- Continue until partition is revoked or app shuts down
Code flow:
// Internal runtime behavior (conceptual)
func (r *Runtime) onPartitionAssigned(topic string, partition int32) {
ctx, cancel := context.WithCancel(r.ctx)
// Spawn goroutine for this partition
go r.processPartition(ctx, topic, partition)
// Store cancel function for revocation
r.partitionCancels[topicPartition{topic, partition}] = cancel
}
Revocation
When a partition is revoked (rebalance):
- Receive revocation signal from coordinator
- Cancel context for partition’s goroutine
- Wait for goroutine to finish current message
- Complete gracefully before rebalance proceeds
Code flow:
// Internal runtime behavior (conceptual)
func (r *Runtime) onPartitionRevoked(topic string, partition int32) {
// Cancel the goroutine's context
cancel := r.partitionCancels[topicPartition{topic, partition}]
cancel()
// Wait for goroutine to finish
r.wait.Wait()
// Partition now free for reassignment
}
Concurrency Guarantees
Within a Partition
Messages in a partition are processed serially (one at a time):
// For Partition 0:
Message 1 ──> Process ──> Complete
Message 2 ──> Process ──> Complete
Message 3 ──> Process ──> Complete
Guarantees:
- Messages processed in order
- No concurrent processing within partition
- Simple reasoning about state
Across Partitions
Messages in different partitions are processed concurrently:
// Concurrent processing:
Partition 0: Message 1 ──> Process ──┐
Partition 1: Message 1 ──> Process ──┼──> All concurrent
Partition 2: Message 1 ──> Process ──┘
Implications:
- No ordering guarantees across partitions
- Shared state needs synchronization
- Independent failure isolation
Scaling Patterns
Vertical Scaling (More Partitions)
Increase partition count to enable more parallelism:
# Create topic with 12 partitions
kafka-topics.sh --create \
--topic orders \
--partitions 12 \
--bootstrap-server localhost:9092
One consumer instance can process 12 partitions concurrently:
Consumer Instance 1 (12 goroutines):
Partition 0 ──> Goroutine 1
Partition 1 ──> Goroutine 2
...
Partition 11 ──> Goroutine 12
Limits:
- Maximum parallelism = number of partitions
- Cannot exceed partition count with consumers
Horizontal Scaling (More Consumers)
Add consumer instances to distribute partitions:
3 consumers, 12 partitions (4 partitions each):
Consumer 1: Consumer 2: Consumer 3:
Partition 0 Partition 4 Partition 8
Partition 1 Partition 5 Partition 9
Partition 2 Partition 6 Partition 10
Partition 3 Partition 7 Partition 11
Recommendations:
- Start with partitions = 2-4× expected consumers
- Allows room for scaling
- Example: 12 partitions supports 1-12 consumers
Shared State Patterns
When processors need shared state:
Thread-Safe Data Structures
Use concurrent-safe types:
import "sync"
type Processor struct {
mu sync.RWMutex
cache map[string]string
}
func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
// Read from cache
p.mu.RLock()
value := p.cache[string(msg.Key)]
p.mu.RUnlock()
// Process message
result := p.compute(value)
// Write to cache
p.mu.Lock()
p.cache[string(msg.Key)] = result
p.mu.Unlock()
return nil
}
Partition-Local State
Maintain separate state per partition:
type Processor struct {
// Map from partition to its state
partitionState map[int32]*PartitionState
mu sync.RWMutex
}
type PartitionState struct {
// No locks needed - only accessed by one goroutine
counter int
lastSeen time.Time
}
func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
// Get state for this partition (no lock needed within partition)
p.mu.RLock()
state := p.partitionState[msg.Partition]
p.mu.RUnlock()
// Update state without locks
state.counter++
state.lastSeen = time.Now()
return nil
}
Atomic Operations
Use atomic types for counters:
import "sync/atomic"
type Processor struct {
messagesProcessed atomic.Int64
}
func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
// Atomic increment - safe across goroutines
p.messagesProcessed.Add(1)
return nil
}
Rebalancing
Triggering Rebalances
Rebalances occur when:
- Consumer joins the group (new instance starts)
- Consumer leaves the group (instance stops/crashes)
- Topic partitions are added
- Session timeout expires (consumer considered dead)
Rebalance Process
1. Coordinator initiates rebalance
↓
2. All consumers stop fetching
↓
3. Current partitions revoked
├─> Cancel partition goroutines
└─> Wait for completion
↓
4. New partitions assigned
├─> Spawn new goroutines
└─> Resume processing
↓
5. Normal processing resumes
Minimizing Rebalance Impact
Fast Rebalancing:
// Use shorter timeouts for faster rebalancing
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtLeastOnce("orders", processor),
kafka.SessionTimeout(10 * time.Second),
kafka.RebalanceTimeout(20 * time.Second),
)
Graceful Shutdown:
// Ensure clean shutdown to avoid forced rebalancing
// The framework handles this automatically via context cancellation
Performance Considerations
Partition Count
Too few partitions:
- Limited parallelism
- One slow processor blocks others
- Difficult to scale horizontally
Too many partitions:
- Higher overhead per partition
- More goroutines
- More memory usage
Recommendation:
- Start with 2-4× expected consumer count
- Example: 3 consumers → 12 partitions
- Allows scaling to 12 consumers without repartitioning
Message Distribution
Even distribution across partitions:
// Producer uses key to distribute evenly
key := fmt.Sprintf("customer-%d", customerID % numPartitions)
Avoid hot partitions:
- Don’t route all traffic to one partition
- Use random or round-robin for keyless messages
- Monitor partition lag for imbalance
Processing Time
Fast processors:
- Can handle more partitions per consumer
- Higher throughput
- Lower latency
Slow processors:
- Limit partitions per consumer
- Scale horizontally
- Consider async I/O
Monitoring Concurrency
Consumer Lag per Partition
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe
Output shows lag per partition:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
orders 0 1000 1000 0
orders 1 950 1020 70 # Lagging!
orders 2 1000 1000 0
Hot partition detected: Partition 1 is lagging.
Goroutine Count
Monitor active goroutines:
import "runtime"
func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
// Periodically log goroutine count
if msg.Offset % 1000 == 0 {
log.Info("Runtime stats",
"goroutines", runtime.NumGoroutine(),
"partition", msg.Partition,
)
}
return nil
}
Error Handling
Partition-Level Errors
Errors in one partition don’t affect others:
Partition 0: Processing ✓
Partition 1: Error! ← Partition 1 stops
Partition 2: Processing ✓ ← Continues normally
At-least-once behavior:
- Failed partition stops processing
- Offset not committed
- Messages will be redelivered after rebalance
At-most-once behavior:
- Failed partition stops processing
- Offset already committed
- Messages lost
Context Cancellation
All partition goroutines respect context cancellation:
func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
// Check context before expensive operation
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Process message
return p.doWork(ctx, msg)
}
This ensures clean shutdown and rebalancing.
Next Steps
- Learn Idempotency patterns for at-least-once processing
- Explore Multi-Topic Processing patterns
- Configure Production Settings for optimal performance