Building Kafka data pipelines is a journey rather than a one-time decision. At the simplest stage, a lightweight consumer may suffice. As requirements expand, pipelines naturally grow more complex. Various frameworks have emerged to meet these evolving needs, each with its own balance of reliability, scalability, and operational simplicity. Choosing the right one depends on your current challenges and future goals.

In this article, we'll explore a maturing integration strategy. Starting with a basic use case, we'll advance through real-time stream processing, high-throughput replication, and finally, full end-to-end orchestration, demonstrating why Spring Kafka, Kafka Streams, Kafka Connect, and Apache Camel each excel at different stages.

This article is part of an ongoing series on the Apache Kafka Streams API, examining various aspects of developing applications using this API and Spring Boot.

Schema-Driven Serialization

Throughout the examples below, we assume messages are encoded in Avro and managed via Confluent Schema Registry. This lets each framework handle (de)serialization and schema compatibility without custom glue: Spring Kafka uses the Avro deserializer, Kafka Streams the SpecificAvroSerde, Kafka Connect the AvroConverter, and Camel's Avro dataformat (or Confluent components).

Evolutionary Scenarios: From Simple Ingestion to Complex Integration

Imagine an online retail platform that publishes an Order event to the Kafka topic orders-topic whenever a customer checks out, each event includes order ID, customer ID, amount, and timestamp. As the platform evolves, simple persistence gives way to demands for real-time analytics, high-volume data replication, and end-to-end orchestration across multiple systems. To address these needs, we'll follow a four-stage journey—starting with basic "consume & insert," progressing into stateful stream processing, then high-throughput pipelines, and finally full multi-system workflows—illustrating why Spring Kafka, Kafka Streams, Kafka Connect, and Apache Camel each become the natural choice.

Stage 1: Basic Order Persistence

Initially, the only requirement is:

Consume each message from orders-topic and insert a corresponding record into the orders table of our relational database, ensuring at-least-once delivery without data loss.

We model incoming events as:

public class Order {
    private String orderId;
    private String customerId;
    private Long orderAmount;
    private LocalDateTime orderDateTime;
}

Why Spring Kafka fits here

This scenario involves a simple consumption and database insert with no complex processing. Spring Kafka makes this "consume-and-insert" pattern trivial by embedding Kafka consumers into the Spring ecosystem. Once you've configured the Avro deserializer (per our Schema-Driven Serialization), you can declare a simple listener method to receive each Order event and persist it via JPA:

@Autowired
private OrderRepository orderRepository;

@KafkaListener(topics = "orders-topic", groupId = "order-consumer-group")
public void listen(Order avroOrder) {

   // Map the incoming Order event to a JPA entity mapped to orders table
   var orderEntity = new OrderEntity(
       avroOrder.getOrderId(),
       avroOrder.getCustomerId(),
       avroOrder.getOrderAmount(),
       LocalDateTime.ofInstant(avroOrder.getOrderDateTime(), ZoneId.systemDefault()),
   );

   orderRepository.save(orderEntity);
}

Spring's listener container handles polling, deserialization, offset commits, and retries with sensible defaults. Out of the box (using the default AckMode.BATCH), it will commit the offsets for an entire batch only after all records in that batch have been processed successfully. If your orderRepository.save() throws an exception anywhere in the batch, none of that batch's offsets are committed, and every message in the batch will be redelivered, giving you at-least-once delivery without any manual offset-management code.

If you need finer control over when each record's offset is committed — for example, to commit every record immediately after it's saved or to group acknowledgments in custom ways — you can switch to manual acknowledgment (AckMode.MANUAL or MANUAL_IMMEDIATE) and call ack.acknowledge() in your listener at exactly the point you choose. Manual ack let you optimize batching, implement custom error-handling (dead-lettering, alerting), or align offset commits with downstream side effects.

For scenarios demanding the strongest consistency — where the database insert and the Kafka offset commit must occur in one atomic operation — you can enable Kafka transactions. By defining a KafkaTransactionManager (and, if desired, chaining it with your JDBC PlatformTransactionManager), Spring will open a Kafka transaction around your listener. Upon successful completion, both the database save and the offset commit are committed together; on failure, they are rolled back together, ensuring exactly-once processing semantics within the listener.

Stage 2: Real-Time Session Metrics

With basic persistence in place, the product team now needs live insights into customer behavior:

They want to know how much each customer spends during an active shopping session — say, any sequence of orders within a five-minute window — so they can trigger in-flight upsells or detect fraud when someone's order volume spikes in a short time.

This requirement extends beyond the stateless consumption we previously had. If we tried to do this with just Spring Kafka and the database, the approach might be to write each order to the DB (which we have), and then have some scheduled job or query that periodically computes the sum of orders per customer in the last 5 minutes. However, doing that in real-time for every event is non-trivial and inefficient: you'd be constantly polling the database or maintaining a caching layer, managing window boundaries (when does a session start and end?), handling late arrivals, and so on. It would add latency (since queries might run every minute, not instant) and put load on the database with frequent reads.

Why Kafka Streams becomes a stronger candidate

Kafka Streams excels when you need stateful, event-time processing directly in your application. Instead of handling each message in isolation, it lets you maintain per-key state and build windowed computations.

Kafka Streams provides a session window abstraction that groups records by key (e.g., customerId) into sessions that close after a defined inactivity gap (for example, five minutes). If another event for the same key arrives before the gap elapses, the session extends; if it comes afterward, a new session begins. Overlapping sessions are merged automatically, and you can configure a grace period to accommodate late-arriving events.

Under the covers, Kafka Streams keeps your aggregates in state stores and, for fault tolerance, records every update to Kafka changelog topics. If an instance fails, another can replay those changelogs to rebuild its state and resume processing without data loss.

Below is a simplified Java DSL example that consumes Order events, groups them into five-minute sessions with a one-minute grace period, aggregates the order amounts per session, and emits SessionSummary records to a new topic:

@Bean
public KStream<String, Order> kStream(StreamsBuilder builder) {

   // Configure Serdes (assuming Order and SessionSummary are Avro types with Schema Registry)
   Map<String,String> serdeConfig = Collections.singletonMap(
       "schema.registry.url", schemaRegistryUrl
   );
   SpecificAvroSerde<Order> orderSerde = new SpecificAvroSerde<>();
   orderSerde.configure(serdeConfig, false);
   SpecificAvroSerde<SessionSummary> summarySerde = new SpecificAvroSerde<>();
   summarySerde.configure(serdeConfig, false);

   // 1. Stream from the orders topic
   KStream<String, Order> orders = builder.stream(
       "orders-topic",
       Consumed.with(Serdes.String(), orderSerde)
          // ensure we use order timestamp for event-time
          .withTimestampExtractor(new OrderTimestampExtractor())
   );

   // 2. Group orders by customerId for aggregation
   KGroupedStream<String, Order> byCustomer = orders.groupBy(
       (String key, Order value) -> value.getCustomerId(),
       Grouped.<String, Order>with(Serdes.String(), orderSerde)
   );

   // 3. Define a session window of 5 minutes with a 1 minute grace period for late events
   SessionWindowedKStream<String, Order> sessions =
       byCustomer.windowedBy(
          SessionWindows.ofInactivityGapAndGrace(
              Duration.ofMinutes(5),
              Duration.ofMinutes(1))
   );

   // 4. Aggregate orders in each session to compute total amount per session
   KTable<Windowed<String>, Long> totals = sessions.aggregate(
       () -> 0L,
       (String custId, Order o, Long runningTotal) ->
          runningTotal + o.getOrderAmount(),
       (String custId, Long aggOne, Long aggTwo) ->
          aggOne + aggTwo,
       Materialized.with(Serdes.String(), Serdes.Long())
   );

   // 5. Convert the session aggregate KTable to a stream of SessionSummary records
   KStream<String, SessionSummary> summaryStream = totals.toStream()
       .map((Windowed<String> windowedKey, Long total) -> {
          String customerId = windowedKey.key();
          Instant start = Instant.ofEpochMilli(windowedKey.window().start());
          Instant end   = Instant.ofEpochMilli(windowedKey.window().end());

          SessionSummary summary = SessionSummary.newBuilder()
              .setCustomerId(customerId)
              .setSessionStart(start)
              .setSessionEnd(end)
              .setTotalAmount(total)
              .build();

          return KeyValue.pair(customerId, summary);
   });

   // 6. Output the session summaries to another Kafka topic (could be consumed by an analytics DB sink)
   summaryStream.to(
       "order-session-aggregates-topic", 
       Produced.with(Serdes.String(), summarySerde)
   );
   
   return orders;
}

By offloading session aggregation to Kafka Streams, you relieve your database of continuous query load and gain low-latency, fault-tolerant processing. Furthermore, Kafka Streams supports exactly-once semantics (when enabled), so even in the face of crashes or retries, each order contributes precisely once to the final aggregate.

Stage 3: High-Throughput Bulk Replication

Now that real-time session summaries are being produced (on the Kafka topic order-session-aggregates-topic from the previous step), the analytics team wants to use this data for long-term analysis. They ask:

Can we get a nightly or continuous feed of these session summary records into our analytics store (for example, a PostgreSQL data warehouse, Snowflake, Redshift, or even Elasticsearch)? We want to run heavy queries and build dashboards, but we prefer not to query Kafka directly or disrupt the real-time pipeline.

One way to do this would be to write another custom consumer application: basically, read from order-session-aggregates-topic and bulk insert into the analytics DB. However, this is exactly the kind of "data pipeline plumbing" that Kafka Connect is designed to handle without custom code.

Why Kafka Connect is ideal here

Kafka Connect offers a ready-to-use solution for transferring data from Kafka topics to external systems, including popular databases and warehouses, via connectors. Instead of coding, you can configure a sink connector for your target system. For instance, Confluent offers a JDBC Sink Connector that can write to any database with a JDBC driver. There are also connector plugins for specific data warehouses (e.g., a Snowflake Sink Connector and an Elasticsearch Sink). Connect will handle all the heavy lifting: it will automatically consume messages from the topic, use an internal pool of workers to batch them if needed, apply any schema or format conversions, and write them to the target. It will also keep track of the offsets and ensure exactly-once delivery if possible (for JDBC sinks, exactly-once is usually achieved by executing in a transaction or upsert mode).

Setting up Kafka Connect can be as simple as writing a JSON config like the following (for a JDBC sink writing to PostgreSQL):

{
  "name": "order-session-aggregates-jdbc-sink",
  "config": {
    "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max":"1",
    "topics":"order-session-aggregates-topic",
    "connection.url":"jdbc:postgresql://postgres:5432/ordersdb",
    "connection.user":"postgres",
    "connection.password":"postgres",
    "insert.mode":"insert",
    "table.name.format":"order_session_aggregates",
    "auto.create":"true",
    "auto.evolve":"true",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://schema-registry:8081"
  }
}

Once this connector is deployed, Kafka Connect will spawn a task that starts reading from order-session-aggregates-topic. It will use the Avro schema of SessionSummary to create or evolve the target table (with columns for each field of the summary), and then every new message on the topic will result in a row inserted into the table. The connector will commit offsets back to Kafka Connect's internal offsets topic to record how far it has read.

By adding Kafka Connect, we free our application from "last-mile" integration code. We don't need to deploy a new service for this ETL — Connect runs as a service and can handle the throughput (it's designed to handle high-volume data pipelines). We also gain operational benefits: the connector can be monitored via Kafka Connect REST API or monitoring tools, and we get built-in retry and dead-letter queue capabilities. Connect is also schema-aware, meaning it will catch schema mismatches or evolution issues. Scaling out is as simple as increasing "tasks.max" or adding more Connect worker instances—the framework automatically balances tasks across the cluster.

Stage 4: Multi-System Routing and Orchestration

A few weeks later, the team's needs expand even further:

Whenever a new order event comes in, several different actions need to happen in addition to inserting into the database:

1. Persist the incoming Order to the orders database. 2. Enrich it by calling a Customer Profile REST API. 3. Send the enriched order to the Shipping Service REST endpoint. 4. Email order confirmation to the customer via SMTP. 5. If any step fails, apply retries or route the message to a dead-letter queue for later inspection.

You could attempt to handle this with Spring Kafka by writing a listener that calls all these services sequentially in code, with try-catch blocks around each, and maybe using Spring's RestTemplate/WebClient and JavaMail API. But very soon, you'd be building a lot of plumbing: handling different failure cases for each call, ensuring one failure doesn't block the others incorrectly, maybe needing to spawn parallel calls, etc. Maintaining this logic in imperative Java code can become a nightmare, and it's exactly where an integration framework shines.

Why Apache Camel is the best fit here

Apache Camel enables us to declare the entire flow in one coherent route, leveraging built-in components and patterns for connectivity, transformation, and fault tolerance. Below is a concise Java DSL route that accomplishes these steps, assuming Avro-encoded orders via Schema Registry and standard Spring Boot properties for Kafka and the database:

public class OrderIntegrationRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

      // Global error handler: retry up to 3 times with 5s delay, then send to dead-letter topic
       onException(Exception.class)
          .maximumRedeliveries(3).redeliveryDelay(5000)
          .handled(true)
          .to("kafka:order-errors-topic");

       // 1) Consume Avro‐encoded Order from Kafka
       from("kafka:{{app.topic.name}}"
          + "?brokers={{spring.kafka.bootstrap-servers}}"
          + "&groupId={{spring.kafka.consumer.group-id}}"
          + "&valueDeserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer"
          + "&schemaRegistryUrl={{spring.kafka.properties.schema.registry.url}}"
          + "&autoOffsetReset=earliest")
          .routeId("order-end-to-end")

          // 2) Map Avro Order → JPA entity and persist
          .process(new Processor() {
              public void process(Exchange ex) {
              Order avro = ex.getIn().getBody(Order.class);
              OrderEntity entity = new OrderEntity(
                  avro.getOrderId().toString(),
                  avro.getCustomerId().toString(),
                  avro.getOrderAmount(),
                  LocalDateTime.ofInstant(avro.getOrderDateTime(), ZoneId.systemDefault())
              );
              ex.getIn().setBody(entity);
              }
          })
          .to("jpa://space.zeinab.demo.model.OrderEntity?usePersist=true&flushOnSend=true")

          // 3) Enrich: call CRM and merge via AggregationStrategy
          .enrich()
              .simple("http://crm-service/api/customers/${body.customerId}")
              .aggregationStrategy(new AggregationStrategy() {
                 public Exchange aggregate(Exchange original, Exchange resource) {
                     OrderEntity order = original.getIn().getBody(OrderEntity.class);
                    CustomerProfile profile = resource.getIn().getBody(CustomerProfile.class);
                    order.setCustomerName(profile.getName());
                    order.setCustomerTier(profile.getTier());
                    order.setCustomerEmail(profile.getEmail());
                    original.getIn().setBody(order);
                    return original;
                 }
          })

          // 4) Ship: POST enriched order as JSON
          .marshal().json()
          .to("http://shipping-service/api/shipments?httpMethod=POST")

          // 5) Email confirmation
          .setHeader("subject", constant("Order Confirmation"))
          .setHeader("To", simple("${body.customerEmail}"))
          .setHeader("From", constant("noreply@yourcompany.com"))
          .setBody(simple(
              "Hi ${body.customerName},\n\n" +
                  "Your order ${body.orderId} for amount ${body.orderAmount} has been processed.\n" +
                  "Thank you for shopping with us!"
          ))
          .to("smtp://{{mail.smtp.server}}?username={{mail.username}}&password={{mail.password}}")

          .log("Order ${body.orderId} processed end-to-end");
    }
}

How it works:

  • Kafka consumption: Uses Camel's Kafka component with Avro deserialization from Schema Registry.
  • Persistence: A lightweight processor maps the Avro Order to a JPA OrderEntity, then the JPA component persists it.
  • Enrichment: The .enrich() EIP issues an HTTP GET to the CRM service; the custom AggregationStrategy merges the returned CustomerProfile into the existing OrderEntity.
  • Shipping: The enriched entity is marshaled to JSON and POSTed to the shipping API.
  • Email: Camel's Mail component sends a confirmation, using headers set from the enriched entity.
  • Error handling: A global onException retries any exception three times with a 5-second delay, then routes failed exchanges to the Kafka topic order-errors-topic for manual review.

This single Camel route declaratively orchestrates the entire multi-system workflow — freeing you from boilerplate and complex error-handling code, while ensuring each step is executed reliably.

Final Thoughts

There's no one-size-fits-all when it comes to Kafka integration. For simple message persistence, Spring Kafka offers tight Spring integration and familiar transaction management. When you need real-time, stateful computations — like windowed aggregations — Kafka Streams provides a lightweight, fault-tolerant library with exactly-once guarantees. To move large volumes of data between Kafka and external stores without hand-coding, Kafka Connect excels with its configuration-driven, pluggable connectors. And when an event must trigger multi-step workflows — such as calling REST APIs, databases, email, or other systems — Apache Camel's rich DSL and component ecosystem enables you to orchestrate complex routes declaratively.

In practice, you might combine these: use Spring Kafka for basic ingestion, Kafka Streams for live analytics, Kafka Connect for data replication, and Camel for orchestration. By matching each tool to its sweet spot, you build pipelines that are reliable, scalable, and maintainable — without having to reinvent the wheel.

All the source code is available in the GitHub repository.