NashTech Blog

Mastering Kafka Client Patterns: Part 3 – Thread Pool Pattern

Table of Contents

From Basics to Battle-Tested Implementation

Reading time: 8 minutes | Series: Part 3 of 9


In Part 2, we saw how single-threaded consumption creates rebalancing death spirals. The solution is the Thread Pool with Blocking Queue pattern—the most important pattern in this series and the one you’ll use in 80% of real-world scenarios.

This post covers everything: the core architecture, basic implementation, and critical features for reliability (backpressure, rebalancing, graceful shutdown). By the end, you’ll have robust, deployable code.


Architecture: The Two-Layer Design

The thread pool pattern separates consumption from processing through a clean two-layer architecture:

Layer 1: Consumer Thread (The Fast Poller)

  • Continuously polls Kafka for new messages
  • Never blocks on processing logic
  • Manages backpressure by pausing/resuming partitions
  • Handles rebalancing and offset commits

Layer 2: Worker Thread Pool (The Processors)

  • Pull messages from a blocking queue
  • Execute business logic (database queries, API calls, etc.)
  • Scale horizontally by adding more workers

The Bridge: Blocking Queue

  • Decouples the two layers
  • Provides natural backpressure through bounded capacity
  • Thread-safe coordination point

This solves the fundamental problem: the consumer thread never waits for processing, so it always polls Kafka on time, eliminating rebalancing risks.


Implementation with Reliability Features

Here’s the implementation with all essential features:

java

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;

public class ThreadPoolConsumer {
    
    private final KafkaConsumer<String, String> consumer;
    private final ExecutorService workerPool;
    private final BlockingQueue<ConsumerRecord<String, String>> messageQueue;
    private final Map<TopicPartition, OffsetAndMetadata> pendingOffsets;
    private final int queueCapacity;
    private volatile boolean running = true;
    
    public ThreadPoolConsumer(String bootstrapServers, String groupId, 
                                       String topic, int numWorkers, int queueCapacity) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                 "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                 "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
        
        this.consumer = new KafkaConsumer<>(props);
        this.queueCapacity = queueCapacity;
        this.workerPool = Executors.newFixedThreadPool(numWorkers);
        this.messageQueue = new LinkedBlockingQueue<>(queueCapacity);
        this.pendingOffsets = new ConcurrentHashMap<>();
        
        // Register rebalance listener
        consumer.subscribe(Collections.singletonList(topic), new RebalanceListener());
    }
    
    public void start() {
        // Start worker threads
        for (int i = 0; i < ((ThreadPoolExecutor)workerPool).getCorePoolSize(); i++) {
            workerPool.submit(this::workerLoop);
        }
        consumerLoop();
    }
    
    /**
     * Consumer loop with backpressure handling
     */
    private void consumerLoop() {
        try {
            while (running) {
                // Backpressure: Pause partitions if queue is filling up
                handleBackpressure();
                
                // Poll Kafka (always fast, never blocked by processing)
                ConsumerRecords<String, String> records = 
                    consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    // Non-blocking offer with timeout to handle race conditions
                    try {
                        if (!messageQueue.offer(record, 100, TimeUnit.MILLISECONDS)) {
                            // Queue is full - this indicates backpressure isn't catching up
                            System.err.println("Warning: Queue full despite backpressure, " +
                                             "consider tuning thresholds. Offset: " + record.offset());
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
                
                // Commit offsets asynchronously
                commitProcessedOffsets();
            }
        } finally {
            shutdown();
        }
    }
    
    /**
     * Backpressure management: Pause when queue fills, resume when it drains
     */
    private void handleBackpressure() {
        int queueSize = messageQueue.size();
        Set<TopicPartition> assignment = consumer.assignment();
        
        if (queueSize > queueCapacity * 0.8) {
            // Queue is 80% full - pause all partitions
            consumer.pause(assignment);
            System.out.println("Backpressure: Paused partitions (queue: " + queueSize + ")");
        } else if (queueSize < queueCapacity * 0.5) {
            // Queue is below 50% - resume if paused
            Set<TopicPartition> paused = consumer.paused();
            if (!paused.isEmpty()) {
                consumer.resume(paused);
                System.out.println("Backpressure: Resumed partitions (queue: " + queueSize + ")");
            }
        }
    }
    
    /**
     * Worker thread: Pull messages from queue and process
     */
    private void workerLoop() {
        while (running) {
            try {
                ConsumerRecord<String, String> record = 
                    messageQueue.poll(1, TimeUnit.SECONDS);
                
                if (record != null) {
                    try {
                        processMessage(record);
                        
                        // Track offset for commit only after successful processing
                        TopicPartition partition = 
                            new TopicPartition(record.topic(), record.partition());
                        OffsetAndMetadata offset = 
                            new OffsetAndMetadata(record.offset() + 1);
                        
                        synchronized (pendingOffsets) {
                            pendingOffsets.put(partition, offset);
                        }
                    } catch (Exception e) {
                        // Handle processing failure - log and continue
                        // In real scenarios, consider DLQ or retry logic
                        System.err.println("Failed to process record at offset " + 
                                         record.offset() + ": " + e.getMessage());
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            }
        }
    }
    
    private void processMessage(ConsumerRecord<String, String> record) {
        try {
            // Your business logic here
            Thread.sleep(100); // Simulate processing
            System.out.println("Processed: " + record.value());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    /**
     * Async offset commit with callback
     */
    private void commitProcessedOffsets() {
        synchronized (pendingOffsets) {
            if (!pendingOffsets.isEmpty()) {
                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
                    new HashMap<>(pendingOffsets);
                
                consumer.commitAsync(offsetsToCommit, (offsets, exception) -> {
                    if (exception != null) {
                        System.err.println("Commit failed: " + exception.getMessage());
                    }
                });
                
                pendingOffsets.clear();
            }
        }
    }
    
    /**
     * Rebalance listener to handle partition assignment changes
     */
    private class RebalanceListener implements ConsumerRebalanceListener {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            System.out.println("Partitions revoked: " + partitions);
            
            // Commit any pending offsets before losing partitions
            synchronized (pendingOffsets) {
                if (!pendingOffsets.isEmpty()) {
                    try {
                        consumer.commitSync(pendingOffsets);
                        pendingOffsets.clear();
                    } catch (Exception e) {
                        System.err.println("Failed to commit on rebalance: " + e);
                    }
                }
            }
        }
        
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            System.out.println("Partitions assigned: " + partitions);
        }
    }
    
    /**
     * Graceful shutdown: Stop polling, drain queue, commit final offsets
     */
    public void shutdown() {
        running = false;
        
        // Wait for workers to drain queue
        workerPool.shutdown();
        try {
            System.out.println("Draining queue (size: " + messageQueue.size() + ")...");
            workerPool.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            workerPool.shutdownNow();
        }
        
        // Final commit
        synchronized (pendingOffsets) {
            if (!pendingOffsets.isEmpty()) {
                try {
                    consumer.commitSync(pendingOffsets);
                } catch (Exception e) {
                    System.err.println("Final commit failed: " + e);
                }
            }
        }
        
        consumer.close();
        System.out.println("Shutdown complete");
    }
}

Key Features Explained

1. Backpressure with Pause/Resume

The critical fix: when the queue reaches 80% capacity, we pause partitions using consumer.pause(assignment). This tells Kafka to stop delivering data to those partitions, but the consumer keeps polling to maintain its heartbeat. When the queue drains to 50%, we resume.

Why this works:

  • ✅ Consumer never blocks for extended periods
  • ✅ No max.poll.interval.ms violations
  • ✅ Automatic flow control prevents OOM
  • ✅ System self-regulates under load

2. Rebalance Handling

The ConsumerRebalanceListener handles partition reassignment during rebalancing:

onPartitionsRevoked(): Called before partitions are taken away. We commit any pending offsets synchronously to avoid losing progress.

onPartitionsAssigned(): Called when new partitions are assigned. We can initialize any partition-specific state here.

Without this listener, uncommitted offsets would be lost during rebalancing, causing duplicate processing.

3. Async Offset Commits

We use commitAsync() instead of commitSync() for better performance:

java

consumer.commitAsync(offsetsToCommit, (offsets, exception) -> {
    if (exception != null) {
        System.err.println("Commit failed: " + exception.getMessage());
    }
});

Async commits don’t block the consumer thread, allowing higher throughput. The callback handles failures (though we accept some risk of duplicate processing on restart).

Important: We use commitSync() in critical moments (rebalancing, shutdown) where we can’t afford to lose offsets.

4. Graceful Shutdown

The shutdown sequence ensures no data loss:

  1. Set running = false (stops polling loop)
  2. Shutdown worker pool but wait for queue to drain
  3. Commit any remaining offsets synchronously
  4. Close consumer

This guarantees all in-flight messages are processed and offsets are committed before exit.


Performance: The Proof

Test Scenario: 100ms processing time per message, Kafka delivers 2,000 messages per poll

PatternThroughputProcessing TimeConsumer StateRisk
Single-Threaded10 msg/sec200 seconds/batchBlockedHigh timeout risk
Thread Pool (10 workers)100 msg/sec20 seconds (parallel)ResponsiveNone
Thread Pool (50 workers)500 msg/sec4 seconds (parallel)ResponsiveNone

The thread pool pattern scales linearly with worker count while keeping the consumer responsive. With pause/resume, it handles any load safely.


Key Takeaways

Decouple consumption from processing – Consumer thread stays fast, workers handle slow operations
Pause/resume for backpressure – Prevents OOM and timeout violations when queue fills
Rebalance listener – Commits offsets before partition reassignment to avoid data loss
Async commits – Better performance for routine commits, sync for critical moments
Graceful shutdown – Drains queue and commits final offsets cleanly
Battle-tested – This pattern handles billions of messages daily across the industry

This is your go-to pattern for most Kafka consumption scenarios. It’s robust, scalable, and handles all the edge cases that trip up naive implementations.

References

Diagrams and illustrations created using Claude AI


What’s Next?

In Part 4, we’ll explore the Async CompletableFuture Pattern—a modern approach for I/O-bound processing that uses non-blocking async chains instead of worker threads. It’s more efficient for scenarios dominated by network calls and database queries.


📚 Series Navigation

  • Part 1: Foundation
  • Part 2: Single-Threaded Anti-Pattern
  • Part 3: Thread Pool Pattern (Complete) ← You are here
  • Part 4: Async CompletableFuture Pattern
  • Part 5-9: [Additional patterns and topics]

Discussion: How many worker threads do you run? Have you encountered queue capacity issues? What’s your backpressure strategy? Share below!

Picture of nhatnguyen1

nhatnguyen1

Leave a Comment

Your email address will not be published. Required fields are marked *

Suggested Article

Scroll to Top