Runtime Configuration
The Kafka runtime provides extensive configuration options for controlling consumer behavior, fetch settings, and topic processing.
Creating a Runtime
The basic runtime constructor:
func NewRuntime(
brokers []string,
groupID string,
opts ...Option,
) Runtime
Parameters:
brokers- List of Kafka broker addresses (e.g.,[]string{"localhost:9092"})groupID- Consumer group ID for offset management and rebalancingopts- Variadic configuration options
Example:
runtime := kafka.NewRuntime(
[]string{"kafka1:9092", "kafka2:9092", "kafka3:9092"},
"my-consumer-group",
kafka.AtLeastOnce("orders", ordersProcessor),
kafka.SessionTimeout(10 * time.Second),
)
Topic Configuration
Configure which topics to consume and how to process them:
AtLeastOnce
Reliable processing with message acknowledgment after successful processing:
func AtLeastOnce(topic string, processor queue.Processor[kafka.Message]) Option
Example:
type OrderProcessor struct{}
func (p *OrderProcessor) Process(ctx context.Context, msg kafka.Message) error {
// Process message (must be idempotent)
return nil
}
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtLeastOnce("orders", &OrderProcessor{}),
)
Guarantees:
- Messages acknowledged only after successful processing
- Failed processing results in redelivery
- Requires idempotent processors
AtMostOnce
Fast processing with message acknowledgment before processing:
func AtMostOnce(topic string, processor queue.Processor[kafka.Message]) Option
Example:
type MetricsProcessor struct{}
func (p *MetricsProcessor) Process(ctx context.Context, msg kafka.Message) error {
// Process message (may be lost on failure)
return nil
}
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtMostOnce("metrics", &MetricsProcessor{}),
)
Guarantees:
- Messages acknowledged immediately after consumption
- Processing failures result in message loss
- Higher throughput
Multiple Topics
Process multiple topics with different semantics:
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtLeastOnce("orders", ordersProcessor),
kafka.AtLeastOnce("payments", paymentsProcessor),
kafka.AtMostOnce("metrics", metricsProcessor),
kafka.AtMostOnce("logs", logsProcessor),
)
Each topic gets its own processor and delivery semantics. See Multi-Topic Processing for details.
Consumer Group Settings
Configure consumer group behavior and rebalancing:
SessionTimeout
Maximum time between heartbeats before a consumer is considered dead:
func SessionTimeout(d time.Duration) Option
Default: 45 seconds
Example:
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtLeastOnce("orders", processor),
kafka.SessionTimeout(10 * time.Second),
)
Guidelines:
- Short timeout (5-15s): Fast failure detection, but may cause false positives during GC pauses
- Long timeout (30-60s): Tolerates GC pauses, but slower failure detection
- Production recommendation: 20-30 seconds
RebalanceTimeout
Maximum time allowed for rebalance operations:
func RebalanceTimeout(d time.Duration) Option
Default: 30 seconds
Example:
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtLeastOnce("orders", processor),
kafka.RebalanceTimeout(60 * time.Second),
)
Guidelines:
- Should be longer than session timeout
- Increase if rebalances frequently timeout
- Production recommendation: 45-60 seconds
Fetch Settings
Control how messages are fetched from Kafka:
FetchMaxBytes
Maximum total bytes to buffer from fetch responses across all partitions:
func FetchMaxBytes(bytes int32) Option
Default: 50 MB (52,428,800 bytes)
Example:
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtLeastOnce("orders", processor),
kafka.FetchMaxBytes(100 * 1024 * 1024), // 100 MB
)
Guidelines:
- Small messages: Lower value (10-25 MB) reduces memory usage
- Large messages: Higher value (100+ MB) improves throughput
- Must be larger than largest single message
- Production recommendation: 50-100 MB
MaxConcurrentFetches
Maximum number of concurrent fetch requests to Kafka:
func MaxConcurrentFetches(fetches int) Option
Default: Unlimited (0)
Example:
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtLeastOnce("orders", processor),
kafka.MaxConcurrentFetches(10),
)
Guidelines:
- Unlimited (0): Maximum throughput, higher network load
- Limited (5-10): Controlled network load, predictable resource usage
- Production recommendation: 5-10 for most workloads
Configuration Examples
High-Throughput Configuration
Optimize for maximum message throughput:
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtMostOnce("events", processor),
kafka.FetchMaxBytes(200 * 1024 * 1024), // 200 MB
kafka.MaxConcurrentFetches(0), // Unlimited
kafka.SessionTimeout(45 * time.Second),
kafka.RebalanceTimeout(60 * time.Second),
)
Use cases:
- Event streaming
- Log aggregation
- Metrics collection
High-Reliability Configuration
Optimize for message reliability and ordered processing:
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtLeastOnce("transactions", processor),
kafka.FetchMaxBytes(10 * 1024 * 1024), // 10 MB
kafka.MaxConcurrentFetches(5),
kafka.SessionTimeout(20 * time.Second),
kafka.RebalanceTimeout(45 * time.Second),
)
Use cases:
- Financial transactions
- Database replication
- Critical event processing
Balanced Configuration
General-purpose configuration for most workloads:
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtLeastOnce("orders", processor),
kafka.FetchMaxBytes(50 * 1024 * 1024), // 50 MB (default)
kafka.MaxConcurrentFetches(10),
kafka.SessionTimeout(30 * time.Second),
kafka.RebalanceTimeout(45 * time.Second),
)
Use cases:
- General message processing
- Microservice communication
- Event-driven workflows
Environment-Based Configuration
Use YAML templating for environment-specific settings:
config.yaml:
kafka:
brokers:
- "{{env "KAFKA_BROKER_1" | default "localhost:9092"}}"
- "{{env "KAFKA_BROKER_2" | default "localhost:9093"}}"
group_id: "{{env "KAFKA_GROUP_ID" | default "my-service"}}"
topic: "{{env "KAFKA_TOPIC" | default "events"}}"
session_timeout: "{{env "KAFKA_SESSION_TIMEOUT" | default "30s"}}"
fetch_max_bytes: {{env "KAFKA_FETCH_MAX_BYTES" | default "52428800"}}
otel:
service:
name: "{{env "SERVICE_NAME" | default "queue-processor"}}"
Parsing in code:
type Config struct {
queue.Config `config:",squash"`
Kafka struct {
Brokers []string `config:"brokers"`
GroupID string `config:"group_id"`
Topic string `config:"topic"`
SessionTimeout time.Duration `config:"session_timeout"`
FetchMaxBytes int32 `config:"fetch_max_bytes"`
} `config:"kafka"`
}
func Init(ctx context.Context, cfg Config) (*queue.App, error) {
runtime := kafka.NewRuntime(
cfg.Kafka.Brokers,
cfg.Kafka.GroupID,
kafka.AtLeastOnce(cfg.Kafka.Topic, processor),
kafka.SessionTimeout(cfg.Kafka.SessionTimeout),
kafka.FetchMaxBytes(cfg.Kafka.FetchMaxBytes),
)
return queue.NewApp(runtime), nil
}
Monitoring Configuration
Check runtime behavior through logs and metrics:
Consumer Group Lag:
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--describe
Key metrics to monitor:
- Consumer lag per partition
- Messages processed per second
- Processing errors
- Rebalance frequency
- Session timeout violations
See Observability for OpenTelemetry integration.
Common Configuration Issues
Frequent Rebalances
Symptoms: Consumer group frequently rebalancing
Solutions:
// Increase session and rebalance timeouts
kafka.SessionTimeout(45 * time.Second),
kafka.RebalanceTimeout(90 * time.Second),
High Memory Usage
Symptoms: Application consuming excessive memory
Solutions:
// Reduce fetch buffer size
kafka.FetchMaxBytes(25 * 1024 * 1024), // 25 MB
kafka.MaxConcurrentFetches(5),
Slow Processing
Symptoms: Consumer lag growing, messages processed slowly
Solutions:
- Check processor logic for inefficiencies
- Increase partition count for more concurrency
- Scale horizontally (more consumer instances)
- Consider at-most-once for non-critical messages
Messages Larger Than FetchMaxBytes
Symptoms: Errors fetching messages
Solutions:
// Increase fetch buffer
kafka.FetchMaxBytes(200 * 1024 * 1024), // 200 MB
// Or reduce message size at producer
Next Steps
- Learn about Message Structure for working with message metadata
- Understand Concurrency Model for partition processing
- Explore Configuration for production deployment patterns