Queue Framework
The Humus queue framework implements a three-phase message processing pattern that separates concerns for consuming, processing, and acknowledging messages from a queue.
Core Interfaces
The queue framework defines four core interfaces that work together to process messages:
Consumer
Retrieves messages from a queue:
type Consumer[T any] interface {
Consume(ctx context.Context) (T, error)
}
Responsibilities:
- Fetch the next message from the queue
- Handle connection management and retries
- Return
ErrEndOfQueuewhen the queue is exhausted (for graceful shutdown)
Example:
type KafkaConsumer struct {
client *kgo.Client
}
func (c *KafkaConsumer) Consume(ctx context.Context) (*Message, error) {
fetches := c.client.PollFetches(ctx)
if fetches.Empty() {
return nil, queue.ErrEndOfQueue
}
// Return first message
return &Message{...}, nil
}
Processor
Executes business logic on messages:
type Processor[T any] interface {
Process(ctx context.Context, msg T) error
}
Responsibilities:
- Implement business logic for message handling
- Be idempotent (for at-least-once processing)
- Return errors to trigger acknowledgment logic
Example:
type OrderProcessor struct {
db *sql.DB
}
func (p *OrderProcessor) Process(ctx context.Context, msg *OrderMessage) error {
// Check idempotency
exists, err := p.orderExists(ctx, msg.OrderID)
if err != nil {
return err
}
if exists {
return nil // Already processed
}
// Process the order
return p.createOrder(ctx, msg)
}
Acknowledger
Confirms successful processing back to the queue:
type Acknowledger[T any] interface {
Acknowledge(ctx context.Context, msg T) error
}
Responsibilities:
- Commit offsets or delete messages from the queue
- Ensure the queue knows the message was processed
- Handle acknowledgment failures
Example:
type KafkaAcknowledger struct {
client *kgo.Client
}
func (a *KafkaAcknowledger) Acknowledge(ctx context.Context, msg *Message) error {
// Commit the offset for this message
return a.client.CommitRecords(ctx, msg.record)
}
Runtime
Orchestrates the three phases and manages the application lifecycle:
type Runtime interface {
ProcessQueue(ctx context.Context) error
}
Responsibilities:
- Coordinate Consumer, Processor, and Acknowledger
- Implement the delivery semantics (order of phases)
- Handle graceful shutdown when context is cancelled
- Manage concurrency (e.g., goroutines per partition)
Example:
type MyRuntime struct {
consumer queue.Consumer[Message]
processor queue.Processor[Message]
acknowledger queue.Acknowledger[Message]
}
func (r *MyRuntime) ProcessQueue(ctx context.Context) error {
for {
// Phase 1: Consume
msg, err := r.consumer.Consume(ctx)
if errors.Is(err, queue.ErrEndOfQueue) {
return nil // Graceful shutdown
}
if err != nil {
return err
}
// Phase 2: Process
if err := r.processor.Process(ctx, msg); err != nil {
return err
}
// Phase 3: Acknowledge
if err := r.acknowledger.Acknowledge(ctx, msg); err != nil {
return err
}
}
}
Built-in Item Processors
The queue package provides two built-in processors that implement different delivery semantics:
ProcessAtMostOnce
At-most-once processing acknowledges messages before processing:
processor := queue.ProcessAtMostOnce(consumer, processor, acknowledger)
for {
err := processor.ProcessItem(ctx)
if errors.Is(err, queue.ErrEndOfQueue) {
return nil
}
// Continue even on errors - message already acknowledged
}
Processing Order: Consume → Acknowledge → Process
Guarantees:
- Each message processed at most once
- Messages may be lost on processing failures
- Fast throughput
ProcessAtLeastOnce
At-least-once processing acknowledges messages after successful processing:
processor := queue.ProcessAtLeastOnce(consumer, processor, acknowledger)
for {
err := processor.ProcessItem(ctx)
if errors.Is(err, queue.ErrEndOfQueue) {
return nil
}
if err != nil {
return err // Message not acknowledged, will be retried
}
}
Processing Order: Consume → Process → Acknowledge
Guarantees:
- Each message processed at least once
- Messages may be duplicated on failures
- Requires idempotent processors
See Delivery Semantics for a detailed comparison.
App Wrapper
The queue.App type wraps a Runtime and integrates it with the Bedrock framework:
func NewApp(runtime Runtime) *App
Features:
- Calls
runtime.ProcessQueue(ctx)on startup - Handles context cancellation
- Returns errors to the framework for logging
Example:
func Init(ctx context.Context, cfg Config) (*queue.App, error) {
runtime := &MyRuntime{...}
return queue.NewApp(runtime), nil
}
Builder Pattern
The queue.Builder function creates a Bedrock AppBuilder with automatic instrumentation:
func Builder[T any](init func(context.Context, T) (*App, error)) bedrock.AppBuilder[T]
Automatic Features:
- OpenTelemetry SDK initialization
- Panic recovery in handlers
- OS signal handling (SIGTERM, SIGINT, SIGKILL)
- Graceful shutdown coordination
Usage:
builder := queue.Builder(Init)
app, err := builder(ctx, cfg)
Run Function
The queue.Run function provides a complete entry point for queue services:
func Run[T any](
reader io.Reader,
init func(context.Context, T) (*App, error),
opts ...RunOption,
) error
Workflow:
- Read YAML configuration from reader
- Parse config into type T
- Call init function to build App
- Run app until completion or error
- Log errors and exit
Example:
func main() {
queue.Run(queue.YamlSource("config.yaml"), Init)
}
Graceful Shutdown
The framework handles graceful shutdown automatically:
- Signal Handling - SIGTERM/SIGINT triggers context cancellation
- Consumer Stops - Consumer returns
ErrEndOfQueue - In-Flight Processing - Completes current messages
- Final Acknowledgment - Commits final offsets (at-least-once)
- Cleanup - Closes connections and resources
Implementation:
func (r *MyRuntime) ProcessQueue(ctx context.Context) error {
for {
select {
case <-ctx.Done():
// Context cancelled, stop consuming
return nil
default:
}
msg, err := r.consumer.Consume(ctx)
if errors.Is(err, queue.ErrEndOfQueue) {
return nil
}
// ... process message
}
}
Error Handling
The framework provides structured error handling:
ErrEndOfQueue:
- Special error signaling queue exhaustion
- Triggers graceful shutdown
- Not treated as a failure
Processing Errors:
- Return errors from Processor for at-least-once retry
- Log and continue for at-most-once (message lost)
Fatal Errors:
- Consumer/Acknowledger errors typically fatal
- Return from ProcessQueue to shut down
OpenTelemetry Integration
All queue processing is automatically instrumented:
Automatic Tracing:
- Span per message
- Processing order visible in traces
- Context propagation through phases
Automatic Logging:
- Structured logs with message metadata
- Error recording in spans
- Performance metrics
No additional configuration needed in your Processor implementation.
Next Steps
- Learn about Delivery Semantics to choose the right processing model
- Build your first processor with the Kafka Quick Start