Multi-Topic Processing

Processing multiple Kafka topics in one runtime

The Kafka runtime supports consuming and processing multiple topics simultaneously, each with its own processor and delivery semantics.

Basic Multi-Topic Configuration

Configure multiple topics in a single runtime:

runtime := kafka.NewRuntime(
    brokers,
    groupID,
    kafka.AtLeastOnce("orders", ordersProcessor),
    kafka.AtLeastOnce("payments", paymentsProcessor),
    kafka.AtMostOnce("metrics", metricsProcessor),
)

Key features:

  • Each topic has its own processor
  • Different delivery semantics per topic
  • All topics share the same consumer group
  • Partitions from all topics processed concurrently

Processor per Topic

Define separate processors for each topic:

type OrdersProcessor struct {
    db *sql.DB
}

func (p *OrdersProcessor) Process(ctx context.Context, msg kafka.Message) error {
    var order Order
    if err := json.Unmarshal(msg.Value, &order); err != nil {
        return err
    }
    return p.processOrder(ctx, order)
}

type PaymentsProcessor struct {
    db *sql.DB
}

func (p *PaymentsProcessor) Process(ctx context.Context, msg kafka.Message) error {
    var payment Payment
    if err := json.Unmarshal(msg.Value, &payment); err != nil {
        return err
    }
    return p.processPayment(ctx, payment)
}

func Init(ctx context.Context, cfg Config) (*queue.App, error) {
    ordersProc := &OrdersProcessor{db: cfg.DB}
    paymentsProc := &PaymentsProcessor{db: cfg.DB}

    runtime := kafka.NewRuntime(
        cfg.Kafka.Brokers,
        cfg.Kafka.GroupID,
        kafka.AtLeastOnce("orders", ordersProc),
        kafka.AtLeastOnce("payments", paymentsProc),
    )

    return queue.NewApp(runtime), nil
}

Shared State Between Topics

Processors can share state:

type SharedProcessor struct {
    mu    sync.RWMutex
    cache map[string]string
    db    *sql.DB
}

type OrdersProcessor struct {
    *SharedProcessor
}

func (p *OrdersProcessor) Process(ctx context.Context, msg kafka.Message) error {
    var order Order
    json.Unmarshal(msg.Value, &order)

    // Access shared cache
    p.mu.RLock()
    customerData := p.cache[order.CustomerID]
    p.mu.RUnlock()

    return p.processOrder(ctx, order, customerData)
}

type PaymentsProcessor struct {
    *SharedProcessor
}

func (p *PaymentsProcessor) Process(ctx context.Context, msg kafka.Message) error {
    var payment Payment
    json.Unmarshal(msg.Value, &payment)

    // Update shared cache
    p.mu.Lock()
    p.cache[payment.CustomerID] = payment.Status
    p.mu.Unlock()

    return p.processPayment(ctx, payment)
}

func Init(ctx context.Context, cfg Config) (*queue.App, error) {
    shared := &SharedProcessor{
        cache: make(map[string]string),
        db:    cfg.DB,
    }

    runtime := kafka.NewRuntime(
        cfg.Kafka.Brokers,
        cfg.Kafka.GroupID,
        kafka.AtLeastOnce("orders", &OrdersProcessor{shared}),
        kafka.AtLeastOnce("payments", &PaymentsProcessor{shared}),
    )

    return queue.NewApp(runtime), nil
}

Topic-Based Routing

Route messages by topic to a unified handler:

type UnifiedProcessor struct {
    db *sql.DB
}

func (p *UnifiedProcessor) Process(ctx context.Context, msg kafka.Message) error {
    switch msg.Topic {
    case "orders":
        return p.processOrder(ctx, msg)
    case "payments":
        return p.processPayment(ctx, msg)
    case "shipments":
        return p.processShipment(ctx, msg)
    default:
        return fmt.Errorf("unknown topic: %s", msg.Topic)
    }
}

func (p *UnifiedProcessor) processOrder(ctx context.Context, msg kafka.Message) error {
    var order Order
    if err := json.Unmarshal(msg.Value, &order); err != nil {
        return err
    }
    // Process order
    return nil
}

func (p *UnifiedProcessor) processPayment(ctx context.Context, msg kafka.Message) error {
    var payment Payment
    if err := json.Unmarshal(msg.Value, &payment); err != nil {
        return err
    }
    // Process payment
    return nil
}

func Init(ctx context.Context, cfg Config) (*queue.App, error) {
    processor := &UnifiedProcessor{db: cfg.DB}

    runtime := kafka.NewRuntime(
        cfg.Kafka.Brokers,
        cfg.Kafka.GroupID,
        kafka.AtLeastOnce("orders", processor),
        kafka.AtLeastOnce("payments", processor),
        kafka.AtLeastOnce("shipments", processor),
    )

    return queue.NewApp(runtime), nil
}

Mixed Delivery Semantics

Use different semantics for different topics:

runtime := kafka.NewRuntime(
    brokers,
    groupID,
    // Critical topics: at-least-once
    kafka.AtLeastOnce("orders", ordersProcessor),
    kafka.AtLeastOnce("payments", paymentsProcessor),
    kafka.AtLeastOnce("inventory", inventoryProcessor),

    // Non-critical topics: at-most-once
    kafka.AtMostOnce("metrics", metricsProcessor),
    kafka.AtMostOnce("logs", logsProcessor),
    kafka.AtMostOnce("analytics", analyticsProcessor),
)

Rationale:

  • Orders, payments, inventory: Cannot lose data → at-least-once
  • Metrics, logs, analytics: Can tolerate loss → at-most-once

Configuration

YAML Configuration

kafka:
  brokers:
    - "localhost:9092"
  group_id: "multi-topic-processor"
  topics:
    orders:
      semantic: "at-least-once"
    payments:
      semantic: "at-least-once"
    metrics:
      semantic: "at-most-once"

Dynamic Topic Configuration

type TopicConfig struct {
    Name     string
    Semantic string
}

type Config struct {
    queue.Config `config:",squash"`
    Kafka struct {
        Brokers []string      `config:"brokers"`
        GroupID string        `config:"group_id"`
        Topics  []TopicConfig `config:"topics"`
    } `config:"kafka"`
}

func Init(ctx context.Context, cfg Config) (*queue.App, error) {
    opts := make([]kafka.Option, 0, len(cfg.Kafka.Topics))

    for _, topic := range cfg.Kafka.Topics {
        switch topic.Semantic {
        case "at-least-once":
            processor := newProcessor(topic.Name)
            opts = append(opts, kafka.AtLeastOnce(topic.Name, processor))
        case "at-most-once":
            processor := newProcessor(topic.Name)
            opts = append(opts, kafka.AtMostOnce(topic.Name, processor))
        }
    }

    runtime := kafka.NewRuntime(
        cfg.Kafka.Brokers,
        cfg.Kafka.GroupID,
        opts...,
    )

    return queue.NewApp(runtime), nil
}

Workflow Patterns

Sequential Processing

Process related messages across topics:

type WorkflowProcessor struct {
    db *sql.DB
}

// Orders topic
func (p *WorkflowProcessor) ProcessOrder(ctx context.Context, msg kafka.Message) error {
    var order Order
    json.Unmarshal(msg.Value, &order)

    // 1. Save order
    if err := p.saveOrder(ctx, order); err != nil {
        return err
    }

    // 2. Check payment (from payments topic)
    // This will be processed by ProcessPayment when payment arrives
    return nil
}

// Payments topic
func (p *WorkflowProcessor) ProcessPayment(ctx context.Context, msg kafka.Message) error {
    var payment Payment
    json.Unmarshal(msg.Value, &payment)

    // 1. Save payment
    if err := p.savePayment(ctx, payment); err != nil {
        return err
    }

    // 2. Update order status
    return p.updateOrderStatus(ctx, payment.OrderID, "paid")
}

Event Aggregation

Aggregate events from multiple topics:

type AggregationProcessor struct {
    mu         sync.RWMutex
    aggregates map[string]*Aggregate
}

type Aggregate struct {
    OrderReceived  bool
    PaymentReceived bool
    ShipmentReceived bool
}

func (p *AggregationProcessor) ProcessOrder(ctx context.Context, msg kafka.Message) error {
    var order Order
    json.Unmarshal(msg.Value, &order)

    p.mu.Lock()
    defer p.mu.Unlock()

    agg := p.getAggregate(order.OrderID)
    agg.OrderReceived = true

    if p.isComplete(agg) {
        return p.finalizeOrder(ctx, order.OrderID)
    }

    return nil
}

func (p *AggregationProcessor) ProcessPayment(ctx context.Context, msg kafka.Message) error {
    var payment Payment
    json.Unmarshal(msg.Value, &payment)

    p.mu.Lock()
    defer p.mu.Unlock()

    agg := p.getAggregate(payment.OrderID)
    agg.PaymentReceived = true

    if p.isComplete(agg) {
        return p.finalizeOrder(ctx, payment.OrderID)
    }

    return nil
}

func (p *AggregationProcessor) isComplete(agg *Aggregate) bool {
    return agg.OrderReceived && agg.PaymentReceived && agg.ShipmentReceived
}

Topic Fan-In

Multiple topics feed into one aggregator:

type EventAggregator struct {
    db *sql.DB
}

func (p *EventAggregator) Process(ctx context.Context, msg kafka.Message) error {
    // Common event envelope
    type Event struct {
        Type      string          `json:"type"`
        Timestamp time.Time       `json:"timestamp"`
        Data      json.RawMessage `json:"data"`
    }

    var event Event
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return err
    }

    // Store in unified events table
    _, err := p.db.ExecContext(ctx,
        `INSERT INTO events (topic, type, timestamp, data)
         VALUES ($1, $2, $3, $4)`,
        msg.Topic, event.Type, event.Timestamp, event.Data,
    )

    return err
}

func Init(ctx context.Context, cfg Config) (*queue.App, error) {
    aggregator := &EventAggregator{db: cfg.DB}

    runtime := kafka.NewRuntime(
        cfg.Kafka.Brokers,
        cfg.Kafka.GroupID,
        // All topics use the same aggregator
        kafka.AtLeastOnce("user-events", aggregator),
        kafka.AtLeastOnce("order-events", aggregator),
        kafka.AtLeastOnce("payment-events", aggregator),
    )

    return queue.NewApp(runtime), nil
}

Partition Distribution

How Partitions Are Assigned

With multiple topics, partitions from all topics are distributed:

Consumer Group "processors" with 2 consumers:

Consumer 1:                   Consumer 2:
  orders (partition 0)          orders (partition 1)
  orders (partition 2)          payments (partition 0)
  payments (partition 1)        payments (partition 2)

Key points:

  • Partitions assigned across all topics
  • Load balanced across consumers
  • No guarantee which consumer gets which topic

Topic Affinity

To ensure a consumer processes specific topics, use separate consumer groups:

// Separate runtimes for different topics
func Init(ctx context.Context, cfg Config) ([]*queue.App, error) {
    // Runtime 1: Critical topics
    runtime1 := kafka.NewRuntime(
        cfg.Kafka.Brokers,
        "critical-processor",  // Different group ID
        kafka.AtLeastOnce("orders", ordersProcessor),
        kafka.AtLeastOnce("payments", paymentsProcessor),
    )

    // Runtime 2: Analytics topics
    runtime2 := kafka.NewRuntime(
        cfg.Kafka.Brokers,
        "analytics-processor",  // Different group ID
        kafka.AtMostOnce("metrics", metricsProcessor),
        kafka.AtMostOnce("logs", logsProcessor),
    )

    return []*queue.App{
        queue.NewApp(runtime1),
        queue.NewApp(runtime2),
    }, nil
}

Note: This requires running multiple apps, which is outside the standard pattern. Consider separate deployments instead.

Monitoring Multi-Topic Processing

Topic-Level Metrics

Monitor lag per topic:

kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group multi-topic-processor \
  --describe

Output:

TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
orders     0          1000            1000            0
orders     1          1050            1050            0
payments   0          500             520             20   # Lagging
payments   1          510             510             0
metrics    0          5000            5000            0

Processor-Level Logging

Log which processor handles each message:

func (p *OrdersProcessor) Process(ctx context.Context, msg kafka.Message) error {
    log.Info("Processing order",
        "topic", msg.Topic,
        "partition", msg.Partition,
        "offset", msg.Offset,
        "processor", "orders",
    )

    return nil
}

Best Practices

Separate Concerns

Use different processors for different business domains:

// Good: Separate processors
kafka.AtLeastOnce("orders", ordersProcessor)
kafka.AtLeastOnce("payments", paymentsProcessor)

// Avoid: One processor for everything
kafka.AtLeastOnce("orders", genericProcessor)
kafka.AtLeastOnce("payments", genericProcessor)

Match Semantics to Criticality

// Critical: At-least-once
kafka.AtLeastOnce("financial-transactions", processor)

// Non-critical: At-most-once
kafka.AtMostOnce("usage-metrics", processor)

Shared State Needs Locks

type Processor struct {
    mu    sync.RWMutex
    state map[string]int
}

// Safe: Uses locks
func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    p.mu.Lock()
    p.state[string(msg.Key)]++
    p.mu.Unlock()
    return nil
}

Independent Topic Processing

Avoid dependencies between topics when possible:

// Good: Independent processing
func (p *OrdersProcessor) Process(ctx context.Context, msg kafka.Message) error {
    // Only processes orders, no dependencies
    return p.saveOrder(ctx, msg)
}

// Avoid: Cross-topic dependencies
func (p *OrdersProcessor) Process(ctx context.Context, msg kafka.Message) error {
    // Waiting for payment from another topic creates coupling
    payment := p.waitForPayment(msg.OrderID)  // Anti-pattern
    return p.saveOrder(ctx, msg, payment)
}

Next Steps