Production Configuration

Deploying Kafka processors in production

This guide covers best practices for deploying and configuring Kafka processors in production environments.

Complete Configuration Example

A production-ready configuration:

kafka:
  brokers:
    - "{{env "KAFKA_BROKER_1" | default "kafka-1:9092"}}"
    - "{{env "KAFKA_BROKER_2" | default "kafka-2:9092"}}"
    - "{{env "KAFKA_BROKER_3" | default "kafka-3:9092"}}"
  group_id: "{{env "KAFKA_GROUP_ID" | default "order-processors"}}"
  topics:
    - name: "orders"
      semantic: "at-least-once"
    - name: "payments"
      semantic: "at-least-once"
  session_timeout: "{{env "KAFKA_SESSION_TIMEOUT" | default "30s"}}"
  rebalance_timeout: "{{env "KAFKA_REBALANCE_TIMEOUT" | default "45s"}}"
  fetch_max_bytes: {{env "KAFKA_FETCH_MAX_BYTES" | default "52428800"}}
  max_concurrent_fetches: {{env "KAFKA_MAX_CONCURRENT_FETCHES" | default "10"}}

otel:
  service:
    name: "{{env "SERVICE_NAME" | default "order-processor"}}"
    version: "{{env "SERVICE_VERSION" | default "1.0.0"}}"
    environment: "{{env "ENVIRONMENT" | default "production"}}"
  sdk:
    disabled: false
  exporter:
    otlp:
      endpoint: "{{env "OTEL_ENDPOINT" | default "otel-collector:4317"}}"
      protocol: grpc
  traces:
    sampler:
      type: "{{env "OTEL_TRACE_SAMPLER" | default "traceidratio"}}"
      arg: {{env "OTEL_TRACE_SAMPLE_RATE" | default "0.1"}}
  metrics:
    interval: 60s

database:
  host: "{{env "DB_HOST" | default "postgres"}}"
  port: {{env "DB_PORT" | default "5432"}}
  name: "{{env "DB_NAME" | default "orders"}}"
  user: "{{env "DB_USER" | default "orders_user"}}"
  password: "{{env "DB_PASSWORD"}}"
  max_connections: {{env "DB_MAX_CONNECTIONS" | default "25"}}

Kafka Configuration

Broker Configuration

Multiple Brokers:

kafka:
  brokers:
    - "kafka-1.prod:9092"
    - "kafka-2.prod:9092"
    - "kafka-3.prod:9092"

Best practices:

  • Use at least 3 brokers for redundancy
  • Use DNS names, not IP addresses
  • Configure all brokers, not just one

Consumer Group ID

Unique per deployment:

kafka:
  group_id: "order-processors-prod"  # Different from staging

Naming conventions:

{service}-{environment}
Examples:
  - order-processors-prod
  - payment-processors-staging
  - analytics-processors-dev

Topic Configuration

Production topics:

kafka:
  topics:
    - name: "orders"
      semantic: "at-least-once"
    - name: "payments"
      semantic: "at-least-once"
    - name: "analytics"
      semantic: "at-most-once"

Topic naming:

{domain}.{entity}.{event}
Examples:
  - ecommerce.orders.created
  - ecommerce.payments.completed
  - analytics.events.tracked

Timeouts

Production timeouts:

kafka:
  session_timeout: "30s"      # Balance between failure detection and GC tolerance
  rebalance_timeout: "45s"    # Must be > session_timeout

Guidelines:

  • Session timeout: 20-45s (default: 30s)
  • Rebalance timeout: 45-90s (default: 60s)
  • Increase if frequent rebalances occur
  • Decrease for faster failure detection

Fetch Settings

Production fetch settings:

kafka:
  fetch_max_bytes: 52428800        # 50 MB
  max_concurrent_fetches: 10       # Limit concurrent requests

Tuning guidelines:

  • Small messages: 10-25 MB fetch size
  • Large messages: 100+ MB fetch size
  • High throughput: Increase concurrent fetches
  • Memory constrained: Decrease fetch size

Application Configuration

Idempotency

Database-backed idempotency:

type Config struct {
    queue.Config `config:",squash"`
    Kafka struct {
        // ... kafka config
    } `config:"kafka"`
    Database struct {
        Host           string `config:"host"`
        Port           int    `config:"port"`
        Name           string `config:"name"`
        User           string `config:"user"`
        Password       string `config:"password"`
        MaxConnections int    `config:"max_connections"`
    } `config:"database"`
    IdempotencyWindow time.Duration `config:"idempotency_window"`
}

func Init(ctx context.Context, cfg Config) (*queue.App, error) {
    // Setup database connection pool
    db, err := sql.Open("postgres", fmt.Sprintf(
        "host=%s port=%d user=%s password=%s dbname=%s sslmode=require",
        cfg.Database.Host,
        cfg.Database.Port,
        cfg.Database.User,
        cfg.Database.Password,
        cfg.Database.Name,
    ))
    if err != nil {
        return nil, err
    }

    db.SetMaxOpenConns(cfg.Database.MaxConnections)
    db.SetMaxIdleConns(cfg.Database.MaxConnections / 2)
    db.SetConnMaxLifetime(time.Hour)

    processor := &OrderProcessor{
        db:                db,
        idempotencyWindow: cfg.IdempotencyWindow,
    }

    runtime := kafka.NewRuntime(
        cfg.Kafka.Brokers,
        cfg.Kafka.GroupID,
        kafka.AtLeastOnce(cfg.Kafka.Topic, processor),
        kafka.SessionTimeout(cfg.Kafka.SessionTimeout),
        kafka.RebalanceTimeout(cfg.Kafka.RebalanceTimeout),
    )

    return queue.NewApp(runtime), nil
}

Resource Limits

Database connection pooling:

db.SetMaxOpenConns(25)       // Limit total connections
db.SetMaxIdleConns(10)       // Idle connections
db.SetConnMaxLifetime(1 * time.Hour)  // Connection lifetime

Calculation:

Max connections = Partitions × Concurrent operations per partition
Example: 12 partitions × 2 ops = 24 connections (use 25)

Deployment

Container Configuration

Dockerfile:

FROM golang:1.21-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .
RUN CGO_ENABLED=0 GOOS=linux go build -o processor ./cmd/processor

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/

COPY --from=builder /app/processor .
COPY config.yaml .

CMD ["./processor"]

Docker Compose:

version: '3.8'
services:
  order-processor:
    image: order-processor:latest
    environment:
      - KAFKA_BROKER_1=kafka-1:9092
      - KAFKA_BROKER_2=kafka-2:9092
      - KAFKA_GROUP_ID=order-processors-prod
      - KAFKA_SESSION_TIMEOUT=30s
      - DB_HOST=postgres
      - DB_PASSWORD=${DB_PASSWORD}
      - OTEL_ENDPOINT=otel-collector:4317
    depends_on:
      - kafka
      - postgres
    restart: unless-stopped
    deploy:
      replicas: 3  # Scale horizontally
      resources:
        limits:
          cpus: '1'
          memory: 512M
        reservations:
          cpus: '0.5'
          memory: 256M

Kubernetes Deployment

Deployment manifest:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-processor
  namespace: production
spec:
  replicas: 3
  selector:
    matchLabels:
      app: order-processor
  template:
    metadata:
      labels:
        app: order-processor
    spec:
      containers:
      - name: processor
        image: order-processor:1.0.0
        env:
        - name: KAFKA_BROKER_1
          value: "kafka-1.kafka.svc:9092"
        - name: KAFKA_BROKER_2
          value: "kafka-2.kafka.svc:9092"
        - name: KAFKA_GROUP_ID
          value: "order-processors-prod"
        - name: DB_PASSWORD
          valueFrom:
            secretKeyRef:
              name: db-credentials
              key: password
        - name: OTEL_ENDPOINT
          value: "otel-collector.monitoring.svc:4317"
        resources:
          requests:
            memory: "256Mi"
            cpu: "500m"
          limits:
            memory: "512Mi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health/liveness
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health/readiness
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5

Scaling

Horizontal scaling:

# Scale to 6 replicas
kubectl scale deployment order-processor --replicas=6

Scaling guidelines:

  • Max replicas ≤ number of partitions
  • Start with 2-3 replicas for HA
  • Scale up if consumer lag increases
  • Monitor CPU/memory usage

Example:

12 partitions:
  - 3 replicas: 4 partitions each
  - 6 replicas: 2 partitions each
  - 12 replicas: 1 partition each (max)

Monitoring

Metrics to Monitor

Consumer lag:

kafka_consumer_lag{group="order-processors",topic="orders"}

Alert when lag > 1000:

- alert: HighConsumerLag
  expr: kafka_consumer_lag > 1000
  for: 5m
  annotations:
    summary: "Consumer group {{ $labels.group }} lagging"

Processing rate:

rate(kafka_messages_processed_total[1m])

Error rate:

rate(kafka_processing_errors_total[1m])

Health Checks

Kubernetes probes:

livenessProbe:
  httpGet:
    path: /health/liveness
    port: 8080
  initialDelaySeconds: 30
  periodSeconds: 10
  failureThreshold: 3

readinessProbe:
  httpGet:
    path: /health/readiness
    port: 8080
  initialDelaySeconds: 5
  periodSeconds: 5
  failureThreshold: 3

Custom health check:

type HealthCheck struct {
    processor *OrderProcessor
}

func (h *HealthCheck) Healthy(ctx context.Context) (bool, error) {
    // Check database connection
    if err := h.processor.db.PingContext(ctx); err != nil {
        return false, err
    }

    // Check consumer lag (if available)
    // ...

    return true, nil
}

Security

TLS Configuration

Kafka TLS:

import "github.com/twmb/franz-go/pkg/kgo"

tlsConfig := &tls.Config{
    MinVersion: tls.VersionTLS12,
}

runtime := kafka.NewRuntime(
    brokers,
    groupID,
    kafka.AtLeastOnce("orders", processor),
    kafka.WithKafkaOptions(
        kgo.DialTLSConfig(tlsConfig),
    ),
)

SASL Authentication

SASL/SCRAM:

import (
    "github.com/twmb/franz-go/pkg/kgo"
    "github.com/twmb/franz-go/pkg/sasl/scram"
)

scramAuth := scram.Auth{
    User: cfg.Kafka.User,
    Pass: cfg.Kafka.Password,
}

runtime := kafka.NewRuntime(
    brokers,
    groupID,
    kafka.AtLeastOnce("orders", processor),
    kafka.WithKafkaOptions(
        kgo.SASL(scramAuth.AsSha256Mechanism()),
    ),
)

Secrets Management

Kubernetes secrets:

apiVersion: v1
kind: Secret
metadata:
  name: kafka-credentials
type: Opaque
stringData:
  username: "order-processor-user"
  password: "secure-password-here"

Reference in deployment:

env:
- name: KAFKA_USER
  valueFrom:
    secretKeyRef:
      name: kafka-credentials
      key: username
- name: KAFKA_PASSWORD
  valueFrom:
    secretKeyRef:
      name: kafka-credentials
      key: password

Performance Tuning

Partition Count

Optimal partition count:

Partitions = Target throughput / Partition throughput

Example:
  Target: 100k msgs/sec
  Per partition: 10k msgs/sec
  Partitions: 100k / 10k = 10

Recommendations:

  • Start with 3× expected consumer count
  • Monitor lag and throughput
  • Increase if lag grows under load
  • Cannot decrease (Kafka limitation)

Batch Processing

Process messages in batches:

type BatchProcessor struct {
    db        *sql.DB
    batchSize int
}

func (p *BatchProcessor) ProcessBatch(ctx context.Context, messages []kafka.Message) error {
    tx, err := p.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    for _, msg := range messages {
        var order Order
        json.Unmarshal(msg.Value, &order)

        _, err := tx.ExecContext(ctx,
            "INSERT INTO orders (order_id, total) VALUES ($1, $2)",
            order.OrderID, order.Total,
        )
        if err != nil {
            return err
        }
    }

    return tx.Commit()
}

Connection Pooling

Optimize database connections:

// Formula: Max connections = Partitions × 2
db.SetMaxOpenConns(cfg.Partitions * 2)
db.SetMaxIdleConns(cfg.Partitions)
db.SetConnMaxLifetime(1 * time.Hour)
db.SetConnMaxIdleTime(10 * time.Minute)

Disaster Recovery

Consumer Group Reset

Reset to earliest:

kafka-consumer-groups.sh \
  --bootstrap-server kafka:9092 \
  --group order-processors \
  --reset-offsets \
  --to-earliest \
  --all-topics \
  --execute

Reset to specific offset:

kafka-consumer-groups.sh \
  --bootstrap-server kafka:9092 \
  --group order-processors \
  --reset-offsets \
  --topic orders:0 \
  --to-offset 12345 \
  --execute

Backup and Restore

Export offsets:

kafka-consumer-groups.sh \
  --bootstrap-server kafka:9092 \
  --group order-processors \
  --describe > offsets-backup.txt

Dead Letter Queue:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    if err := p.processMessage(ctx, msg); err != nil {
        // Send to DLQ on failure
        return p.sendToDLQ(ctx, msg, err)
    }
    return nil
}

func (p *Processor) sendToDLQ(ctx context.Context, msg kafka.Message, err error) error {
    dlqRecord := &kgo.Record{
        Topic: msg.Topic + ".dlq",
        Key:   msg.Key,
        Value: msg.Value,
        Headers: append(msg.Headers, kafka.Header{
            Key:   "error",
            Value: []byte(err.Error()),
        }),
    }
    p.producer.Produce(ctx, dlqRecord, nil)
    return nil  // Don't return error, message handled
}

Troubleshooting

High Consumer Lag

Causes:

  • Slow processing
  • Insufficient consumers
  • Hot partitions

Solutions:

# Scale up consumers
kubectl scale deployment order-processor --replicas=6

# Check for hot partitions
kafka-consumer-groups.sh --describe --group order-processors

# Optimize processing code
# Add database indexes
# Batch operations

Frequent Rebalances

Causes:

  • Short session timeout
  • Long processing time
  • Network issues

Solutions:

kafka:
  session_timeout: "45s"      # Increase
  rebalance_timeout: "90s"    # Increase

Memory Issues

Causes:

  • Large fetch buffers
  • Too many partitions
  • Memory leaks

Solutions:

kafka:
  fetch_max_bytes: 25000000   # Reduce to 25 MB
  max_concurrent_fetches: 5   # Limit concurrent fetches

Next Steps