Message Structure

Working with Kafka messages and metadata

Kafka messages in Humus are represented by the kafka.Message type, which provides access to all message data and metadata.

Message Type

type Message struct {
    Key       []byte
    Value     []byte
    Headers   []Header
    Timestamp time.Time
    Topic     string
    Partition int32
    Offset    int64
    Attrs     uint8
}

Message Fields

Value

The message payload as raw bytes:

type Message struct {
    Value []byte  // The message content
}

Usage:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    // Deserialize JSON
    var order OrderMessage
    if err := json.Unmarshal(msg.Value, &order); err != nil {
        return err
    }

    // Process the order
    return p.processOrder(ctx, order)
}

Key

Optional message key used for partitioning and compaction:

type Message struct {
    Key []byte  // Optional partition key
}

Usage:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    if msg.Key != nil {
        customerID := string(msg.Key)
        // All messages for this customer go to same partition
    }

    // Process message
    return nil
}

Key purposes:

  • Partition assignment (same key → same partition)
  • Log compaction (retain latest message per key)
  • Message ordering (within partition)

Headers

Key-value metadata attached to the message:

type Header struct {
    Key   string
    Value []byte
}

type Message struct {
    Headers []Header
}

Usage:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    // Find specific header
    for _, header := range msg.Headers {
        if header.Key == "trace-id" {
            traceID := string(header.Value)
            // Use for distributed tracing
        }
        if header.Key == "content-type" {
            contentType := string(header.Value)
            // Determine deserialization format
        }
    }

    return nil
}

Common header uses:

  • Distributed tracing IDs
  • Content type/encoding
  • Source application
  • Schema version
  • Correlation IDs

Timestamp

When the message was produced:

type Message struct {
    Timestamp time.Time
}

Usage:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    age := time.Since(msg.Timestamp)
    if age > 5*time.Minute {
        // Handle stale message
        log.Warn("Processing stale message", "age", age)
    }

    return nil
}

Timestamp types:

  • CreateTime: When producer sent the message (default)
  • LogAppendTime: When broker received the message (if configured)

Topic

The Kafka topic this message came from:

type Message struct {
    Topic string
}

Usage:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    switch msg.Topic {
    case "orders":
        return p.processOrder(ctx, msg)
    case "payments":
        return p.processPayment(ctx, msg)
    default:
        return fmt.Errorf("unknown topic: %s", msg.Topic)
    }
}

Useful when processing multiple topics with the same processor.

Partition

The partition this message came from:

type Message struct {
    Partition int32
}

Usage:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    log.Info("Processing message",
        "topic", msg.Topic,
        "partition", msg.Partition,
        "offset", msg.Offset,
    )

    return nil
}

Partition guarantees:

  • Messages in a partition are ordered
  • Each partition processed by one consumer at a time
  • Same key always goes to same partition

Offset

The message’s position within its partition:

type Message struct {
    Offset int64
}

Usage:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    // Save offset for manual checkpoint recovery
    if err := p.processMessage(ctx, msg); err != nil {
        return err
    }

    // Record last processed offset
    p.recordCheckpoint(msg.Topic, msg.Partition, msg.Offset)
    return nil
}

Offset characteristics:

  • Monotonically increasing within partition
  • Unique identifier for message position
  • Used for offset commits (acknowledgments)

Attrs

Message attributes (advanced):

type Message struct {
    Attrs uint8
}

Bitmap of message flags (compression, transaction markers, etc.). Rarely used directly in application code.

Deserialization Patterns

JSON Deserialization

Most common pattern for JSON messages:

type OrderMessage struct {
    OrderID string  `json:"order_id"`
    Amount  float64 `json:"amount"`
}

func (p *OrderProcessor) Process(ctx context.Context, msg kafka.Message) error {
    var order OrderMessage
    if err := json.Unmarshal(msg.Value, &order); err != nil {
        return fmt.Errorf("invalid JSON: %w", err)
    }

    return p.processOrder(ctx, order)
}

Protobuf Deserialization

For protobuf-encoded messages:

import "google.golang.org/protobuf/proto"

func (p *OrderProcessor) Process(ctx context.Context, msg kafka.Message) error {
    var order orderpb.Order
    if err := proto.Unmarshal(msg.Value, &order); err != nil {
        return fmt.Errorf("invalid protobuf: %w", err)
    }

    return p.processOrder(ctx, &order)
}

Avro Deserialization

With schema registry:

import "github.com/linkedin/goavro/v2"

type AvroProcessor struct {
    codec *goavro.Codec
}

func (p *AvroProcessor) Process(ctx context.Context, msg kafka.Message) error {
    native, _, err := p.codec.NativeFromBinary(msg.Value)
    if err != nil {
        return fmt.Errorf("invalid avro: %w", err)
    }

    record := native.(map[string]interface{})
    return p.processRecord(ctx, record)
}

Content-Type Routing

Use headers to determine deserialization format:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    contentType := p.getHeader(msg, "content-type")

    switch contentType {
    case "application/json":
        return p.processJSON(ctx, msg.Value)
    case "application/protobuf":
        return p.processProtobuf(ctx, msg.Value)
    case "application/avro":
        return p.processAvro(ctx, msg.Value)
    default:
        return fmt.Errorf("unsupported content-type: %s", contentType)
    }
}

func (p *Processor) getHeader(msg kafka.Message, key string) string {
    for _, h := range msg.Headers {
        if h.Key == key {
            return string(h.Value)
        }
    }
    return ""
}

Working with Headers

Reading Headers

Helper function for header access:

func getHeader(msg kafka.Message, key string) (string, bool) {
    for _, h := range msg.Headers {
        if h.Key == key {
            return string(h.Value), true
        }
    }
    return "", false
}

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    if traceID, ok := getHeader(msg, "trace-id"); ok {
        // Use trace ID
        ctx = context.WithValue(ctx, "trace-id", traceID)
    }

    return nil
}

Header-Based Filtering

Skip messages based on headers:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    // Skip test messages
    if env, ok := getHeader(msg, "environment"); ok && env == "test" {
        return nil
    }

    // Skip old schema versions
    if version, ok := getHeader(msg, "schema-version"); ok && version != "v2" {
        log.Warn("Skipping old schema version", "version", version)
        return nil
    }

    return p.processMessage(ctx, msg)
}

Trace Context Propagation

Extract distributed tracing context from headers:

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/propagation"
)

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    // Extract trace context from headers
    carrier := &headerCarrier{headers: msg.Headers}
    ctx = otel.GetTextMapPropagator().Extract(ctx, carrier)

    // Now ctx contains the parent span context
    _, span := tracer.Start(ctx, "process-order")
    defer span.End()

    return p.processOrder(ctx, msg)
}

type headerCarrier struct {
    headers []kafka.Header
}

func (c *headerCarrier) Get(key string) string {
    for _, h := range c.headers {
        if h.Key == key {
            return string(h.Value)
        }
    }
    return ""
}

func (c *headerCarrier) Set(key, value string) {
    // Not needed for extraction
}

func (c *headerCarrier) Keys() []string {
    keys := make([]string, len(c.headers))
    for i, h := range c.headers {
        keys[i] = h.Key
    }
    return keys
}

Note: The Kafka runtime automatically handles OTel trace propagation, so this is usually not needed.

Error Handling

Validation Errors

Validate messages and decide how to handle invalid data:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    var order OrderMessage
    if err := json.Unmarshal(msg.Value, &order); err != nil {
        // Log and skip invalid JSON
        log.Error("Invalid JSON message",
            "topic", msg.Topic,
            "partition", msg.Partition,
            "offset", msg.Offset,
            "error", err,
        )
        return nil // Skip message, don't retry
    }

    // Validate business rules
    if order.Amount < 0 {
        log.Error("Invalid order amount",
            "order_id", order.OrderID,
            "amount", order.Amount,
        )
        return nil // Skip invalid message
    }

    return p.processOrder(ctx, order)
}

Dead Letter Queue

Route failed messages to a DLQ:

type Processor struct {
    producer *kgo.Client  // For DLQ publishing
}

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    var order OrderMessage
    if err := json.Unmarshal(msg.Value, &order); err != nil {
        // Send to DLQ
        return p.sendToDLQ(ctx, msg, err)
    }

    if err := p.processOrder(ctx, order); err != nil {
        // Send to DLQ on processing error
        return p.sendToDLQ(ctx, msg, err)
    }

    return nil
}

func (p *Processor) sendToDLQ(ctx context.Context, msg kafka.Message, err error) error {
    dlqRecord := &kgo.Record{
        Topic: msg.Topic + ".dlq",
        Key:   msg.Key,
        Value: msg.Value,
        Headers: append(msg.Headers, kafka.Header{
            Key:   "error",
            Value: []byte(err.Error()),
        }),
    }

    p.producer.Produce(ctx, dlqRecord, nil)
    return nil // Don't return error, message handled
}

Message Logging

Log messages for debugging:

func (p *Processor) Process(ctx context.Context, msg kafka.Message) error {
    log.Info("Processing message",
        "topic", msg.Topic,
        "partition", msg.Partition,
        "offset", msg.Offset,
        "key", string(msg.Key),
        "timestamp", msg.Timestamp,
        "headers", len(msg.Headers),
    )

    // Don't log msg.Value in production (may contain PII)
    // Instead log a hash or truncated version
    valueHash := fmt.Sprintf("%x", sha256.Sum256(msg.Value))
    log.Debug("Message content hash", "hash", valueHash)

    return p.processMessage(ctx, msg)
}

Next Steps