1. 首页
  2. 大数据
  3. Kafka教程

Kafka多线程消费的思考和实现

随着科技的发展,普通个人电脑多核已是标配,更不必说公司服务器,如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。无论是Kafka官方提供的客户端API,还是Spring封装的Spring Kafka,在消息消费方面,均只是实现了默认情况下的1个Consumer 1个线程。所以我们研究KAKFA多线程消费是非常有必要的,因为很多时候我们需要自己实现开发多线程消费程序。

两种多线程思路

多线程+多KafkaConsumer实例

首先,我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全,即保证共享资源在同一时间只能由一个线程进行操作(原子性,有序性)。简单来说,就是你不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。
既然一个线程只能有一个KafkaConsumer 实例,那么我们完全可以让消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。

Kafka多线程消费的思考和实现

单/多线程+单/多KafkaConsumer实例+消息处理Worker线程池

仅仅只能通过创建多个KafkaConsumer 实例来实现吗?假如我就只想创建一个KafkaConsumer 实例,又想加快速度,怎么办呢?那么思路就是1个Consumer实例有多个线程来加快消费速率,以进一步提升对partition的并行消费能力。消费者程序可以使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。即将原来的消费线程分拆为2类线程(拉取线程、工作线程),通过这2类线程的分工合作来完成消息的消费。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。

  • 拉取线程:只负责kafka消息的拉取、分发和消息offset的提交(可选),但不负责消息的业务处理。拉取线程的数量依旧要受Kafka的“一个Partition只能被该Group里的一个Consumer线程消费”规则的限制,即若该Group所订阅的Topic有N个Partition,则该Group最多只能有N个拉取线程。
  • 工作线程:只负责处理消息的业务处理,但不负责kafka消息的拉取和offset的提交。拉取线程拉取到消息后,便可将消息分发给工作线程进行处理。比如拉取线程一次拉取到100个消息,分发给20个工作线程并行异步处理。

Kafka多线程消费的思考和实现

两种方案的优劣势对比

这两种方案孰优孰劣呢?应该说是各有千秋:

Kafka多线程消费的思考和实现

方案一优点:

1、该方案最大的优点便是实现很简单,且对于消费速率要求不是很高的情况下,是完全能满足需要的。
2、由于每个线程使用专属的 KafkaConsumer 实例来执行消息获取和消息处理逻辑,因此,Kafka 主题中的每个分区都能保证只被一个线程处理,这样就很容易实现分区内的消息消费顺序。这对在乎事件先后顺序的应用场景来说,是非常重要的优势。

方案一缺点:

1、最大的缺点是消费线程受限于Topic分区数,我们知道,在一个消费者组中,每个订阅分区都只能被组内的一个消费者实例所消费。假设一个消费者组订阅了 100 个分区,那么方案 1 最多只能扩展到 100 个线程,多余的线程无法分配到任何分区,只会白白消耗系统资源。因此只能通过不断增加Partition数量来提升消费能力的扩展性。增加Partition数量可不是那么随便的事,变更Kafka集群的Topic Partition数量会带来Consumer Reblance,重平衡的危害如果不清楚可以参考往期文章,总之,如果对于Kafka的要求是全天不断,且每日消费的消息量巨大的场景时,一旦发生Consumer Reblance,会带来较高的重新消费成本。
2、每个线程完整地执行消息获取和消息处理逻辑。一旦消息处理逻辑很重,造成消息处理速度慢,就很容易出现不必要的 Rebalance,从而引发整个消费者组的消费停滞。

方案二优点:

1、方案 2 将任务切分成了消息获取和消息处理两个部分,分别由不同的线程处理它们。消息处理线程不受Partition数量限制。因此只需要增加工作线程数量,便可进一步提升Kafka客户端的并行消费能力。
2、高伸缩性,就是说我们可以独立地调节消息获取的线程数,以及消息处理的线程数,而不必考虑两者之间是否相互影响。如果你的消费获取速度慢,那么增加消费获取的线程数即可;如果是消息的处理速度慢,那么增加 Worker 线程池线程数即可。

方案二缺点:

1、它的实现难度要比方案 1 大得多,毕竟它有两组线程,你需要分别管理它们。
2、因为该方案将消息获取和消息处理分开了,也就是说获取某条消息的线程不是处理该消息的线程,因此无法保证分区内的消费顺序。举个例子,比如在某个分区中,消息 1 在消息 2 之前被保存,那么 Consumer 获取消息的顺序必然是消息 1 在前,消息 2 在后,但是,后面的 Worker 线程却有可能先处理消息 2,再处理消息 1,这就破坏了消息在分区中的顺序。如果你在意 Kafka 中消息的先后顺序,方案 2 的这个劣势是致命的。
3、方案 2 引入了多组线程,使得整个消息消费链路被拉长,最终导致正确位移提交会变得异常困难,结果就是可能会出现消息的重复消费。如果你在意Kafka重复消费,那也不推荐使用方案 2。

两种方案的实例代码

方案一具体实现

方法一的实现比较简单,创建一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 Runner 类都会创建一个专属的 KafkaConsumer 实例。我们可以创建多个Runner实例。

public class MuiltThreadConsumer implements Runnable {

    private final AtomicBoolean closed = new AtomicBoolean(false);

    private final KafkaConsumer consumer = new KafkaConsumer(PropertiesConfig.getConsumerProperties());


    @Override
    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {

                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
                //  执行消息处理逻辑
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(Thread.currentThread().getName() + "进行了消费" + record.value() + "分区是" + record.partition() + "偏移量是" + record.offset());
                }
            }

        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;

        } finally {
            consumer.close();

        }

    }

    public void shutdown() {
        closed.set(true);
        consumer.wakeup();

    }
}
        for (int i=0;i<threadNum;i++){
            Thread thread = new Thread(new MuiltThreadConsumer(),"thread--"+i);
            thread.start();
        }

方案二具体实现

关于方法二的代码实现,只需要理解,方法二两类线程的分工:拉取线程负责拉取对应Topic的消息,并将消息分发给工作线程,而工作线程完成消息的业务处理。这段代码最重要的地方是executors.submit(new Worker(record));当 Consumer 的 poll 方法返回消息后,是由专门的线程池来负责处理具体的消息。调用 poll 方法的主线程不负责消息处理逻辑,这样就实现了方案 2 的多线程架构。

public class ConsumerHandler<K, V> {

    private final KafkaConsumer<K, V> consumer;
    private ExecutorService executors;

    public ConsumerHandler(String groupId, String topic) {
        Properties properties = PropertiesConfig.getConsumerProperties();
        properties.put("group.id", groupId);
        consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic));
    }

    /**
     * 消费主方法
     *
     * @param threadNumber 线程池中线程数
     */
    public void consume(int threadNumber) {
        executors = new ThreadPoolExecutor(
                threadNumber,
                threadNumber,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());

        while (true) {
            //当 Consumer 的 poll 方法返回消息后,由专门的线程池来负责处理具体的消息。调用 poll 方法的主线程不负责消息处理逻辑
            ConsumerRecords<K, V> records = consumer.poll(1000L);
            if (!records.isEmpty()) {
                executors.submit(new Worker(record));
            }
        }

    }

}

关于方案二的思考

我们刚刚说到方案二的缺点是实现较为复杂,尤其位移管理较为复杂。上述代码案例中,是设置的自动提交位移方式。如果你的实际业务场景对消息消费可靠性要求不高,只要求“至多消费一次”,则kafka的enable.auto.commit参数使用默认的true即可,即kafka客户端一拉取到消息后,就会自动提交消费位移(offset),不管消息后续是否有处理完成。这意味着,拉取线程在拉取到消息,只需要分发给工作线程,之后的处理流程,就可以不管了,继续下次消息的拉取。至于工作线程是否有完成消息的业务处理,拉取线程不用关心,它只负责拉取和分发。
可是如果我业务场景是“至少消费一次”,这种场景对消息的消费可靠性要求较高,因此需要手动提交位移,消息被拉取到后,并不会自动提交offset,而是要等该消息的业务处理也全部完成之后,才手动提交offset,否则会认为没有消费成功,会再次拉取重新消费,这样就可以保证消息至少被消费一次了。
也就是说,我们需要设计手动提交offset,且保证这一批消息的offset提交的一致性是关键所在。

其实整体交互以及线程的分工与前面的“至多消费一次”是类似的,最主要的差异在于,工作线程和拉取线程分别多了一项工作:更新offset、提交offset。拉取线程:拉取对应Topic的消息,并将消息分发给工作线程;同时异步提交已完成业务处理的消息的offset;工作线程:完成消息的业务处理后,把该消息的offset同步更新到待提交的offset池子中(但不提交)。

代码设计思路:通过一个公共的Map来暂存待提交的offset,该Map的key是消息所属Partition,value则是要提交的offset。offset的更新(工作线程)与offset的提交(拉取线程)则是通过该Map来异步解耦。工作线程完成该消息的业务处理后,就会把该消息对应的offset放入Map,但不会提交。拉取线程则会一直检查该Map的每个Partition的offset是否有更新,若有更新,则就会提交该offset到Kafka。

本例中包含3个类:

  • ConsumerHandler类:consumer多线程的管理类,用于创建线程池以及为每个线程分配任务。另外consumer位移的提交也在这个类中进行
  • Worker类:本质上是一个Runnable,执行真正的消费逻辑并上报位移信息给ConsumerThreadHandler
  • Main类:测试主方法类
public class ConsumerHandler<K, V> {

    private final KafkaConsumer<K, V> consumer;
    private ExecutorService executors;
    private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    public ConsumerHandler(String groupId, String topic) {
        Properties properties = PropertiesConfig.getConsumerProperties();
        properties.put("group.id", groupId);

        consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                consumer.commitSync(offsets);
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                offsets.clear();
            }
        });
    }

    /**
     * 消费主方法
     * @param threadNumber  线程池中线程数
     */
    public void consume(int threadNumber) {
        executors = new ThreadPoolExecutor(
                threadNumber,
                threadNumber,
                0L,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());
        try {
            while (true) {
                //当 Consumer 的 poll 方法返回消息后,由专门的线程池来负责处理具体的消息。调用 poll 方法的主线程不负责消息处理逻辑
                ConsumerRecords<K, V> records = consumer.poll(1000L);
                if (!records.isEmpty()) {
                    executors.submit(new Worker<>(records, offsets));
                }
                commitOffsets();
            }
        } catch (WakeupException e) {
            // swallow this exception
        } finally {
            commitOffsets();
            consumer.close();
        }
    }

    private void commitOffsets() {
        // 尽量降低synchronized块对offsets锁定的时间
        Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
        synchronized (offsets) {
            if (offsets.isEmpty()) {
                return;
            }
            unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
            offsets.clear();
        }
        // 提交已完成业务处理的消息的offset
        consumer.commitSync(unmodfiedMap);
    }

    public void close() {
        consumer.wakeup();
        executors.shutdown();
    }
}
public class Worker<K,V> implements Runnable {

    private final ConsumerRecords<K, V> records;
    private final Map<TopicPartition, OffsetAndMetadata> offsets;

    public Worker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.records = record;
        this.offsets = offsets;
    }

    @Override
    public void run() {
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
            for (ConsumerRecord<K, V> record : partitionRecords) {
                // 消息处理逻辑
                System.out.println(String.format("value=%s,topic=%s, partition=%d, offset=%d",
                        record.value(),record.topic(), record.partition(), record.offset()));
            }
            // 上报位移信息 把该消息的offset同步更新到待提交的offset池子中(但不提交)
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            synchronized (offsets) {
                if (!offsets.containsKey(partition)) {
                    offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                } else {
                    long curr = offsets.get(partition).offset();
                    if (curr <= lastOffset + 1) {
                        offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
                    }
                }
            }
        }
    }
}
        String topic = "kafka_test";
        String groupID = "test";
        final ConsumerHandler<byte[], byte[]> handler = new ConsumerHandler<>(groupID, topic);
        final int cpuCount = Runtime.getRuntime().availableProcessors();
        System.out.println("cpuCount..."+cpuCount);

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                handler.consume(cpuCount);
            }
        };
        new Thread(runnable).start();

BDStar原创文章。发布者:Liuyanling,转载请注明出处:http://bigdata-star.com/archives/2235

发表评论

登录后才能评论

联系我们

562373081

在线咨询:点击这里给我发消息

邮件:562373081@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code