Post

Distributed Observability: Kafka + Jaeger + Go for Resilient Tracing

Hey everyone!

Today I’ll show you how to create a resilient distributed tracing system using Apache Kafka and Jaeger. The idea is simple: what if Jaeger goes down? Do you lose all traces? No! We’ll use Kafka as a buffer to ensure no trace is lost.

If you don’t know Jaeger yet, check out the YouTube video I recorded about observability!

The Problem: What happens when Jaeger is down?

In a traditional setup, your application sends traces directly to Jaeger. If Jaeger is down:

  • Traces are lost
  • No visibility into what happened
  • Debugging becomes impossible
  • Performance issues go unnoticed

The Solution: Kafka as a Buffer

Instead of sending traces directly to Jaeger, we’ll:

  1. Send traces to Kafka (fast, reliable)
  2. Kafka stores traces (persistent, fault-tolerant)
  3. Consumer reads from Kafka and sends to Jaeger
  4. If Jaeger is down, traces stay in Kafka until it’s back

Architecture Overview

1
2
3
[Application] → [Kafka Topic] → [Consumer] → [Jaeger]
                    ↓
               [Persistent Storage]

Benefits:

  • No data loss: Traces are stored in Kafka
  • Fault tolerance: System works even if Jaeger is down
  • Scalability: Multiple consumers can process traces
  • Reliability: Kafka guarantees message delivery

Implementation

Let’s build this step by step:

Step 1: Project Setup

1
2
3
mkdir resilient-tracing
cd resilient-tracing
go mod init github.com/your-username/resilient-tracing

Install dependencies:

1
2
3
4
5
go get github.com/Shopify/sarama
go get go.opentelemetry.io/otel
go get go.opentelemetry.io/otel/trace
go get go.opentelemetry.io/otel/exporters/jaeger
go get go.opentelemetry.io/otel/sdk/trace

Step 2: Kafka Producer (Trace Sender)

Create producer.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/Shopify/sarama"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/trace"
)

type TraceProducer struct {
    producer sarama.SyncProducer
    topic    string
}

type TraceData struct {
    TraceID    string            `json:"trace_id"`
    SpanID     string            `json:"span_id"`
    ParentID   string            `json:"parent_id,omitempty"`
    Name       string            `json:"name"`
    StartTime  time.Time         `json:"start_time"`
    EndTime    time.Time         `json:"end_time"`
    Attributes map[string]string `json:"attributes"`
    Events     []Event           `json:"events,omitempty"`
}

type Event struct {
    Name      string            `json:"name"`
    Timestamp time.Time         `json:"timestamp"`
    Attributes map[string]string `json:"attributes"`
}

func NewTraceProducer(brokers []string, topic string) (*TraceProducer, error) {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 5
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        return nil, err
    }

    return &TraceProducer{
        producer: producer,
        topic:    topic,
    }, nil
}

func (tp *TraceProducer) SendTrace(ctx context.Context, span trace.Span) error {
    spanCtx := span.SpanContext()
    
    traceData := TraceData{
        TraceID:    spanCtx.TraceID().String(),
        SpanID:     spanCtx.SpanID().String(),
        Name:       span.SpanKind().String(),
        StartTime:  time.Now(),
        EndTime:    time.Now(),
        Attributes: make(map[string]string),
    }

    // Add parent span ID if exists
    if spanCtx.HasParent() {
        traceData.ParentID = spanCtx.Parent().SpanID().String()
    }

    // Convert attributes
    span.SetAttributes(attribute.String("kafka.topic", tp.topic))
    span.SetAttributes(attribute.String("kafka.broker", "kafka:9092"))

    // Marshal to JSON
    data, err := json.Marshal(traceData)
    if err != nil {
        return err
    }

    // Send to Kafka
    msg := &sarama.ProducerMessage{
        Topic: tp.topic,
        Key:   sarama.StringEncoder(spanCtx.TraceID().String()),
        Value: sarama.ByteEncoder(data),
    }

    partition, offset, err := tp.producer.SendMessage(msg)
    if err != nil {
        return err
    }

    log.Printf("Trace sent to Kafka - Topic: %s, Partition: %d, Offset: %d", 
               tp.topic, partition, offset)

    return nil
}

func (tp *TraceProducer) Close() error {
    return tp.producer.Close()
}

Step 3: Kafka Consumer (Trace Processor)

Create consumer.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "github.com/Shopify/sarama"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/trace"
)

type TraceConsumer struct {
    consumer sarama.ConsumerGroup
    topic    string
    group    string
    tracer   trace.Tracer
}

func NewTraceConsumer(brokers []string, topic, group string) (*TraceConsumer, error) {
    config := sarama.NewConfig()
    config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
    config.Consumer.Offsets.Initial = sarama.OffsetOldest

    consumer, err := sarama.NewConsumerGroup(brokers, group, config)
    if err != nil {
        return nil, err
    }

    // Setup Jaeger exporter
    exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint("http://jaeger:14268/api/traces")))
    if err != nil {
        return nil, err
    }

    tp := trace.NewTracerProvider(
        trace.WithBatcher(exp),
        trace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String("trace-consumer"),
        )),
    )

    otel.SetTracerProvider(tp)
    tracer := tp.Tracer("trace-consumer")

    return &TraceConsumer{
        consumer: consumer,
        topic:    topic,
        group:    group,
        tracer:   tracer,
    }, nil
}

func (tc *TraceConsumer) Start(ctx context.Context) error {
    handler := &TraceHandler{tracer: tc.tracer}
    
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            err := tc.consumer.Consume(ctx, []string{tc.topic}, handler)
            if err != nil {
                log.Printf("Error consuming: %v", err)
                time.Sleep(time.Second)
            }
        }
    }
}

func (tc *TraceConsumer) Close() error {
    return tc.consumer.Close()
}

type TraceHandler struct {
    tracer trace.Tracer
}

func (h *TraceHandler) Setup(sarama.ConsumerGroupSession) error   { return nil }
func (h *TraceHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

func (h *TraceHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for {
        select {
        case message := <-claim.Messages():
            if message == nil {
                return nil
            }

            // Process trace message
            if err := h.processTrace(message.Value); err != nil {
                log.Printf("Error processing trace: %v", err)
                continue
            }

            session.MarkMessage(message, "")
        case <-session.Context().Done():
            return nil
        }
    }
}

func (h *TraceHandler) processTrace(data []byte) error {
    var traceData TraceData
    if err := json.Unmarshal(data, &traceData); err != nil {
        return err
    }

    // Create span from trace data
    ctx := context.Background()
    span := h.tracer.Start(ctx, traceData.Name)
    defer span.End()

    // Add attributes
    for key, value := range traceData.Attributes {
        span.SetAttributes(attribute.String(key, value))
    }

    // Add events
    for _, event := range traceData.Events {
        span.AddEvent(event.Name, trace.WithAttributes(
            attribute.String("event.timestamp", event.Timestamp.Format(time.RFC3339)),
        ))
    }

    log.Printf("Processed trace: %s", traceData.TraceID)
    return nil
}

Step 4: Application with Tracing

Create app.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main

import (
    "context"
    "log"
    "time"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/trace"
)

type App struct {
    tracer        trace.Tracer
    traceProducer *TraceProducer
}

func NewApp(traceProducer *TraceProducer) *App {
    tracer := otel.Tracer("resilient-tracing-app")
    return &App{
        tracer:        tracer,
        traceProducer: traceProducer,
    }
}

func (a *App) ProcessRequest(ctx context.Context, requestID string) error {
    // Create root span
    ctx, span := a.tracer.Start(ctx, "process-request")
    defer span.End()

    span.SetAttributes(
        attribute.String("request.id", requestID),
        attribute.String("service.name", "resilient-tracing-app"),
    )

    // Simulate some work
    if err := a.validateRequest(ctx, requestID); err != nil {
        span.RecordError(err)
        return err
    }

    if err := a.processBusinessLogic(ctx, requestID); err != nil {
        span.RecordError(err)
        return err
    }

    if err := a.sendResponse(ctx, requestID); err != nil {
        span.RecordError(err)
        return err
    }

    // Send trace to Kafka
    if err := a.traceProducer.SendTrace(ctx, span); err != nil {
        log.Printf("Failed to send trace to Kafka: %v", err)
        // Don't fail the request if tracing fails
    }

    return nil
}

func (a *App) validateRequest(ctx context.Context, requestID string) error {
    ctx, span := a.tracer.Start(ctx, "validate-request")
    defer span.End()

    span.SetAttributes(attribute.String("request.id", requestID))

    // Simulate validation
    time.Sleep(10 * time.Millisecond)
    
    span.AddEvent("validation.completed")
    return nil
}

func (a *App) processBusinessLogic(ctx context.Context, requestID string) error {
    ctx, span := a.tracer.Start(ctx, "process-business-logic")
    defer span.End()

    span.SetAttributes(attribute.String("request.id", requestID))

    // Simulate business logic
    time.Sleep(50 * time.Millisecond)
    
    span.AddEvent("business-logic.completed")
    return nil
}

func (a *App) sendResponse(ctx context.Context, requestID string) error {
    ctx, span := a.tracer.Start(ctx, "send-response")
    defer span.End()

    span.SetAttributes(attribute.String("request.id", requestID))

    // Simulate response sending
    time.Sleep(20 * time.Millisecond)
    
    span.AddEvent("response.sent")
    return nil
}

Step 5: Main Application

Create main.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
    "context"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    // Setup Kafka producer
    producer, err := NewTraceProducer([]string{"kafka:9092"}, "traces")
    if err != nil {
        log.Fatal("Failed to create producer:", err)
    }
    defer producer.Close()

    // Setup Kafka consumer
    consumer, err := NewTraceConsumer([]string{"kafka:9092"}, "traces", "trace-consumer-group")
    if err != nil {
        log.Fatal("Failed to create consumer:", err)
    }
    defer consumer.Close()

    // Create app
    app := NewApp(producer)

    // Start consumer in background
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go func() {
        if err := consumer.Start(ctx); err != nil {
            log.Printf("Consumer error: %v", err)
        }
    }()

    // Simulate requests
    go func() {
        for i := 0; i < 100; i++ {
            requestID := fmt.Sprintf("req-%d", i)
            if err := app.ProcessRequest(ctx, requestID); err != nil {
                log.Printf("Request failed: %v", err)
            }
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // Wait for interrupt
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan

    log.Println("Shutting down...")
    cancel()
}

Step 6: Docker Compose

Create docker-compose.yml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"
      - "14268:14268"
    environment:
      COLLECTOR_OTLP_ENABLED: true

  app:
    build: .
    depends_on:
      - kafka
      - jaeger
    environment:
      KAFKA_BROKERS: kafka:9092
      JAEGER_ENDPOINT: http://jaeger:14268/api/traces

Testing the System

Test 1: Normal Operation

1
2
3
4
5
6
7
8
# Start the system
docker-compose up -d

# Check if traces are being sent
docker-compose logs app

# View traces in Jaeger UI
open http://localhost:16686

Test 2: Jaeger Down

1
2
3
4
5
6
7
8
# Stop Jaeger
docker-compose stop jaeger

# Check if traces are still being sent to Kafka
docker-compose logs app

# Check Kafka topic
docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic traces --from-beginning

Test 3: Jaeger Recovery

1
2
3
4
5
6
7
8
# Start Jaeger again
docker-compose start jaeger

# Check if traces are being processed
docker-compose logs app

# View traces in Jaeger UI
open http://localhost:16686

Monitoring and Alerting

Kafka Metrics

1
2
3
4
5
6
7
8
9
// Add metrics to producer
func (tp *TraceProducer) SendTrace(ctx context.Context, span trace.Span) error {
    // ... existing code ...
    
    // Increment counter
    metrics.IncrementCounter("traces.sent.total")
    
    return nil
}

Consumer Metrics

1
2
3
4
5
6
7
8
9
// Add metrics to consumer
func (h *TraceHandler) processTrace(data []byte) error {
    // ... existing code ...
    
    // Increment counter
    metrics.IncrementCounter("traces.processed.total")
    
    return nil
}

Best Practices

  1. Error Handling: Always handle Kafka errors gracefully
  2. Retry Logic: Implement exponential backoff for failed sends
  3. Monitoring: Monitor Kafka lag and consumer health
  4. Partitioning: Use trace ID as partition key for ordering
  5. Compression: Enable compression for large trace payloads
  6. Retention: Set appropriate retention policies for Kafka topics

Conclusion

This resilient tracing system ensures that you never lose trace data, even when Jaeger is down. Kafka acts as a reliable buffer, and the consumer processes traces when Jaeger is available.

Key Benefits:

  • No data loss: Traces are persisted in Kafka
  • Fault tolerance: System works even if Jaeger is down
  • Scalability: Multiple consumers can process traces
  • Reliability: Kafka guarantees message delivery

Next Steps:

  • Add more sophisticated error handling
  • Implement trace sampling
  • Add metrics and alerting
  • Optimize for high-throughput scenarios

Happy tracing! 🚀

This post is licensed under CC BY 4.0 by the author.