Quick Start
Build your first Kafka message processor
The Humus Kafka runtime provides a production-ready integration with Apache Kafka, offering concurrent per-partition processing, automatic OpenTelemetry instrumentation, and flexible delivery semantics.
The Kafka runtime is built on:
package main
import (
"context"
"encoding/json"
"github.com/z5labs/humus/queue"
"github.com/z5labs/humus/queue/kafka"
)
type OrderMessage struct {
OrderID string `json:"order_id"`
Amount float64 `json:"amount"`
}
type OrderProcessor struct{}
func (p *OrderProcessor) Process(ctx context.Context, msg kafka.Message) error {
// Deserialize
var order OrderMessage
if err := json.Unmarshal(msg.Value, &order); err != nil {
return err
}
// Process order (should be idempotent)
return nil
}
func Init(ctx context.Context, cfg Config) (*queue.App, error) {
processor := &OrderProcessor{}
runtime := kafka.NewRuntime(
cfg.Kafka.Brokers,
cfg.Kafka.GroupID,
kafka.AtLeastOnce(cfg.Kafka.Topic, processor),
)
return queue.NewApp(runtime), nil
}
func main() {
queue.Run(queue.YamlSource("config.yaml"), Init)
}
The main Kafka runtime that manages the Kafka client, consumer group, and partition processing:
runtime := kafka.NewRuntime(
brokers []string,
groupID string,
opts ...Option,
)
Features:
Represents a Kafka message with all metadata:
type Message struct {
Key []byte
Value []byte
Headers []Header
Timestamp time.Time
Topic string
Partition int32
Offset int64
Attrs uint8
}
Your processor receives this type and must deserialize Value into your application’s message format.
Configure processing semantics per topic:
At-Least-Once:
kafka.AtLeastOnce(topic, processor)
Messages acknowledged after successful processing. Requires idempotent processors.
At-Most-Once:
kafka.AtMostOnce(topic, processor)
Messages acknowledged before processing. Fast but may lose messages on failures.
SessionTimeout:
kafka.SessionTimeout(10 * time.Second)
Default: 45 seconds. Maximum time between heartbeats before considered dead.
RebalanceTimeout:
kafka.RebalanceTimeout(30 * time.Second)
Default: 30 seconds. Maximum time for rebalance operations.
FetchMaxBytes:
kafka.FetchMaxBytes(100 * 1024 * 1024) // 100 MB
Default: 50 MB. Maximum bytes to fetch across all partitions per request.
MaxConcurrentFetches:
kafka.MaxConcurrentFetches(5)
Default: unlimited. Limit concurrent fetch requests to Kafka.
Process multiple topics in a single runtime:
runtime := kafka.NewRuntime(
brokers,
groupID,
kafka.AtLeastOnce("orders", ordersProcessor),
kafka.AtLeastOnce("events", eventsProcessor),
kafka.AtMostOnce("metrics", metricsProcessor),
)
Each topic can have different processors and delivery semantics.
The Kafka runtime uses a goroutine-per-partition architecture:
Topic "orders" with 3 partitions:
├─ Partition 0 → Goroutine 1
├─ Partition 1 → Goroutine 2
└─ Partition 2 → Goroutine 3
Benefits:
Rebalancing:
See Concurrency Model for details.
Every Kafka runtime automatically includes:
This section covers:
Start with the Quick Start Guide to build your first Kafka message processor.
Build your first Kafka message processor
Advanced Kafka runtime configuration
Working with Kafka messages and metadata
Understanding goroutine-per-partition processing
Handling duplicate messages in at-least-once processing
Processing multiple Kafka topics in one runtime
OpenTelemetry integration for Kafka processing
Deploying Kafka processors in production