At GoTo, I've spent significant time developing Kafka consumers and producers in Golang — work that has shaped how we approach real-time data streaming at scale. If you've used confluent-kafka-go, you know the drill, your consumer probably looks something like this:
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{/*...*/})
err = consumer.SubscribeTopics([]string{/*...*/}, nil)
// some way to cancel and stop the consumer
run := true
for run {
msg, err := consumer.ReadMessage(time.Second)
if !err.(kafka.Error).IsTimeout() {
// handle error from consumer/broker
}
// process message
// manually commit the offset, if needed
}
consumer.Close()There's a lot that goes into the processing loop:
- read messages
- handle Kafka and application errors
- retry transient errors
- metrics, logging, tracing, etc.
- secondary Dead Letter Queues
- wiring everything together
A surprising amount of code often isn't dedicated to application logic. When building an application that processes more than one type of message, the code quickly gets verbose. Most of the code is just scaffolding.
What if we could make using Kafka in Go feel more like writing a simple HTTP service?
HTTP-like Kafka
xkafka (GitHub) is a Go library that provides HTTP-like abstractions for Kafka. It tries to make the experience of working with Kafka more like writing a simple HTTP service, with significantly less boilerplate and plumbing.
Here are the core abstractions:
- Message: Similar to an HTTP request. It has the topic, partition, offset, key, value, headers, and so on. It also allows callbacks to track message processing.
- Handler: Functions like an HTTP handler. It's where business logic lives.
- Middleware: Just like HTTP middleware, but for Kafka. Logging, metrics, retries, etc., can be added without cluttering core logic.
Publishing Messages
First, let's get simple things out of the way. Here's what publishing a message looks like with xkafka:
producer, err := xkafka.NewProducer(
"producer-id",
xkafka.Brokers{"localhost:9092"},
xkafka.ConfigMap{
"socket.keepalive.enable": true,
},
)
producer.Use(/* add middlewares */)
msg := &xkafka.Message{
Topic: "test",
Key: []byte("key"),
Value: []byte("value"),
}
err = producer.Publish(ctx, msg)That's it. You can also publish asynchronously for higher throughput or when there is a need to handle delivery events asynchronously:
producer, err := xkafka.NewProducer(
// ...
// configure a callback to handle delivery events
xkafka.DeliveryCallback(func(msg *xkafka.Message) {
// ...
}),
)
// ...create a message
// or, configure a callback on the message itself
msg.AddCallback(func(msg *xkafka.Message) {
// ...
})
// start the producer. this will start a background goroutine
// that will handle message delivery events.
go producer.Run(ctx)
// publish a message. this will return immediately.
err = producer.AsyncPublish(ctx, msg)Consuming Messages
Now let's talk about the other side of Kafka: consuming messages. In my experience, this is where most of the complexity (and headaches) with Kafka show up. There are many ways to configure and process messages in a consumer. The tradeoffs between throughput, durability, and delivery guarantees can get confusing and complicated.
xkafka distills the most common patterns into simple abstractions and sensible defaults, while still allowing the flexibility to fine-tune Kafka configurations for various needs.
handler := xkafka.HandlerFunc(func(ctx context.Context, msg *xkafka.Message) error {
// ...
return nil
})
consumer, err := xkafka.NewConsumer(
"consumer-id", // consumer group id
handler,
xkafka.Brokers{"localhost:9092"},
xkafka.Topics{"test"},
xkafka.ConfigMap{/*...*/},
)
consumer.Use(/* add middlewares */)
err = consumer.Run(ctx)Streaming vs. Batch
There are two main ways to consume messages:
- Streaming (with
xkafka.Consumer): Messages are processed one at a time, as soon as they arrive. This is great for low-throughput systems, or when there is a need to keep memory usage low and have strong processing guarantees. - Batch (with
xkafka.BatchConsumer): Messages are processed in batches, either by size or by time window. This is useful for high-throughput systems to buffer spikes and to avoid hammering downstream systems or databases with every single message.
Both approaches keep messages in order.
Sequential or Async
After reading a message or batch, xkafka.Concurrency(N) determines how messages or batches are processed:
- Sequential (Default): Processes one message or batch at a time. The next message isn't read until the handler completes processing the current one.
- Asynchronous (N > 1): Multiple messages or batches are processed in parallel using a pool of goroutines.
Offsets
One thing that always tripped me up with Kafka consumers is offset management. By default, Kafka consumer moves the offset forward as soon as it delivers a message, not when processing is completed. That means if the downstream is temporarily down, or if the app crashes mid-processing, messages might get lost.
To solve this, I have seen developers add a separate database or queue to guarantee message processing. This adds another system to maintain and an additional point of failure. This is unnecessary.
xkafka simply sets enable.auto.offset.store=false and only stores the offset after the handler has finished processing the message or batch. As such, if something goes wrong, the consumer will just re-process the last message, not lose it. For batches, it tracks the highest offset, per topic and partition, in the batch.
This means a separate database or queue is not needed just to keep track of what has been processed as it is already managed by Kafka.
At-Most-Once Guarantee
By default, xkafka relies on Kafka's enable.auto.commit=true and auto.commit.interval.ms to commit offsets, periodically in the background.
By enabling xkafka.ManualCommit(true) in sequential mode, at-most-once processing guarantees can be achieved for each message or batch. xkafka ensures that the offset is committed before reading the next message.
At-Least-Once Guarantee
When xkafka.ManualCommit(true) is combined with xkafka.Concurrency(N > 1), messages or batches can be processed in parallel, while xkafka will ensure offsets are committed synchronously in order. This way, at-least-once processing guarantees can be achieved.
Error Handling
One of the tricky parts of Kafka is handling broker errors, application errors, transient errors, and retries. xkafka allows the handling of errors in a layered way:
Handler Level
The simplest way is to handle application errors in the handler implementation itself.
handler := func(ctx context.Context, msg *xkafka.Message) error {
err := processMessage(ctx, msg)
if err != nil {
// log and/or trigger alert
// optionally, move message to a dead letter topic or queue
msg.AckSkip()
return nil
}
msg.AckSuccess()
return nil
}Middleware Level
Middleware is a great way to reuse application-specific error handling logic across handlers and consumers.
handler := xkafka.HandlerFunc(func(ctx context.Context, msg *xkafka.Message) error {
// ...
if err != nil {
// propagate error to middlewares
msg.AckFail(err)
return err
}
// ack the message
msg.AckSuccess()
return nil
})A combination of retry and custom error handling middlewares can be used to implement different retry strategies.
consumer.Use(
RetryMiddleware(/*...*/),
xkafka.MiddlewareFunc(func(next xkafka.Handler) xkafka.Handler {
return xkafka.HandlerFunc(func(ctx context.Context, m *xkafka.Message) error {
err := next.Handle(ctx, m)
if errors.Is(err, app.SomeError) {
// handle application error
}
// differentiate between transient, retryable errors
// and permanent failures
return err
})
}),
)Global Level
xkafka.ErrorHandler is a mandatory option when creating a producer or consumer. Kafka broker and library errors are only visible to the xkafka.ErrorHandler.
consumer, err := xkafka.NewConsumer(
// ...
xkafka.ErrorHandler(func(err error) error {
// returning a non-nil error will stop the consumer
return err
}),
)This layered approach enforces clearer consideration of error boundaries and encourages explicit handling of errors within the application.
Conclusion
xkafka brings HTTP-like abstractions to Apache Kafka. It simplifies producing and consuming messages by using familiar concepts like handlers and middleware, reducing boilerplate and thus allowing the focus to be on application logic.
Here's a comparison of confluent-kafka-go and xkafka:
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{/*...*/})
err = consumer.SubscribeTopics([]string{/*...*/}, nil)
ctx, cancel := context.WithCancelCause(context.Background())
for {
select {
case <-ctx.Done():
return
default:
msg, err := consumer.ReadMessage(time.Second)
if !err.(kafka.Error).IsTimeout() {
cancel(err)
}
decodedMessage, err := decodeMessage(msg)
if err != nil {
cancel(err)
}
err = processMessage(ctx, decodedMessage)
if err != nil {
cancel(err)
}
consumer.CommitMessage(msg)
}
}
consumer.Close()
handler := xkafka.HandlerFunc(func(ctx context.Context, msg *xkafka.Message) error {
// ...
return nil
})
consumer, err := xkafka.NewConsumer(
"consumer-id",
handler,
xkafka.Brokers{"localhost:9092"},
xkafka.Topics{"test"},
xkafka.ConfigMap{/*...*/},
xkafka.ErrorHandler(func(err error) error {
// returning a non-nil error will stop the consumer
return err
}),
)
consumer.Use(/* add middlewares */)
err = consumer.Run(ctx)Check out the documentation and examples to get started.