Kafka Runtime
Apache Kafka integration for message processing
Humus queue services provide a complete framework for processing messages from message queues with configurable delivery semantics, automatic concurrency management, and built-in observability.
Queue services in Humus are built on:
package main
import (
"context"
"encoding/json"
"github.com/z5labs/humus/queue"
"github.com/z5labs/humus/queue/kafka"
)
type Config struct {
queue.Config `config:",squash"`
Kafka struct {
Brokers []string `config:"brokers"`
GroupID string `config:"group_id"`
Topic string `config:"topic"`
} `config:"kafka"`
}
type OrderMessage struct {
OrderID string `json:"order_id"`
Amount float64 `json:"amount"`
}
type OrderProcessor struct{}
func (p *OrderProcessor) Process(ctx context.Context, msg *OrderMessage) error {
// Process the order
// This should be idempotent for at-least-once processing
return nil
}
func decodeOrder(data []byte) (*OrderMessage, error) {
var msg OrderMessage
err := json.Unmarshal(data, &msg)
return &msg, err
}
func main() {
queue.Run(queue.YamlSource("config.yaml"), Init)
}
func Init(ctx context.Context, cfg Config) (*queue.App, error) {
processor := &OrderProcessor{}
runtime, err := kafka.NewAtLeastOnceRuntime(
cfg.Kafka.Brokers,
cfg.Kafka.Topic,
cfg.Kafka.GroupID,
processor,
decodeOrder,
)
if err != nil {
return nil, err
}
return queue.NewApp(runtime), nil
}
Queue processing follows a three-phase pattern:
The order of these phases determines the delivery semantics.
At-Most-Once (Consume → Acknowledge → Process):
At-Least-Once (Consume → Process → Acknowledge):
All queue implementations provide a Runtime that orchestrates the processing phases:
type Runtime interface {
ProcessQueue(ctx context.Context) error
}
Currently supported:
Every queue service automatically includes:
This section covers:
Start with the Kafka Quick Start Guide to build your first queue processor.
Apache Kafka integration for message processing
Core abstractions and patterns for queue processing
Understanding at-most-once and at-least-once processing