Kafka多线程消费的思考和实现
随着科技的发展,普通个人电脑多核已是标配,更不必说公司服务器,如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。无论是Kafka官方提供的客户端API,还是Spring封装的Spring Kafka,在消息消费方面,均只是实现了默认情况下的1个Consumer 1个线程。所以我们研究KAKFA多线程消费是非常有必要的,因为很多时候我们需要自己实现开发多线程消费程序。
两种多线程思路
多线程+多KafkaConsumer实例
首先,我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全,即保证共享资源在同一时间只能由一个线程进行操作(原子性,有序性)。简单来说,就是你不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。
既然一个线程只能有一个KafkaConsumer 实例,那么我们完全可以让消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。
单/多线程+单/多KafkaConsumer实例+消息处理Worker线程池
仅仅只能通过创建多个KafkaConsumer 实例来实现吗?假如我就只想创建一个KafkaConsumer 实例,又想加快速度,怎么办呢?那么思路就是1个Consumer实例有多个线程来加快消费速率,以进一步提升对partition的并行消费能力。消费者程序可以使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。即将原来的消费线程分拆为2类线程(拉取线程、工作线程),通过这2类线程的分工合作来完成消息的消费。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。
- 拉取线程:只负责kafka消息的拉取、分发和消息offset的提交(可选),但不负责消息的业务处理。拉取线程的数量依旧要受Kafka的“一个Partition只能被该Group里的一个Consumer线程消费”规则的限制,即若该Group所订阅的Topic有N个Partition,则该Group最多只能有N个拉取线程。
- 工作线程:只负责处理消息的业务处理,但不负责kafka消息的拉取和offset的提交。拉取线程拉取到消息后,便可将消息分发给工作线程进行处理。比如拉取线程一次拉取到100个消息,分发给20个工作线程并行异步处理。
两种方案的优劣势对比
这两种方案孰优孰劣呢?应该说是各有千秋:
方案一优点:
1、该方案最大的优点便是实现很简单,且对于消费速率要求不是很高的情况下,是完全能满足需要的。
2、由于每个线程使用专属的 KafkaConsumer 实例来执行消息获取和消息处理逻辑,因此,Kafka 主题中的每个分区都能保证只被一个线程处理,这样就很容易实现分区内的消息消费顺序。这对在乎事件先后顺序的应用场景来说,是非常重要的优势。
方案一缺点:
1、最大的缺点是消费线程受限于Topic分区数,我们知道,在一个消费者组中,每个订阅分区都只能被组内的一个消费者实例所消费。假设一个消费者组订阅了 100 个分区,那么方案 1 最多只能扩展到 100 个线程,多余的线程无法分配到任何分区,只会白白消耗系统资源。因此只能通过不断增加Partition数量来提升消费能力的扩展性。增加Partition数量可不是那么随便的事,变更Kafka集群的Topic Partition数量会带来Consumer Reblance,重平衡的危害如果不清楚可以参考往期文章,总之,如果对于Kafka的要求是全天不断,且每日消费的消息量巨大的场景时,一旦发生Consumer Reblance,会带来较高的重新消费成本。
2、每个线程完整地执行消息获取和消息处理逻辑。一旦消息处理逻辑很重,造成消息处理速度慢,就很容易出现不必要的 Rebalance,从而引发整个消费者组的消费停滞。
方案二优点:
1、方案 2 将任务切分成了消息获取和消息处理两个部分,分别由不同的线程处理它们。消息处理线程不受Partition数量限制。因此只需要增加工作线程数量,便可进一步提升Kafka客户端的并行消费能力。
2、高伸缩性,就是说我们可以独立地调节消息获取的线程数,以及消息处理的线程数,而不必考虑两者之间是否相互影响。如果你的消费获取速度慢,那么增加消费获取的线程数即可;如果是消息的处理速度慢,那么增加 Worker 线程池线程数即可。
方案二缺点:
1、它的实现难度要比方案 1 大得多,毕竟它有两组线程,你需要分别管理它们。
2、因为该方案将消息获取和消息处理分开了,也就是说获取某条消息的线程不是处理该消息的线程,因此无法保证分区内的消费顺序。举个例子,比如在某个分区中,消息 1 在消息 2 之前被保存,那么 Consumer 获取消息的顺序必然是消息 1 在前,消息 2 在后,但是,后面的 Worker 线程却有可能先处理消息 2,再处理消息 1,这就破坏了消息在分区中的顺序。如果你在意 Kafka 中消息的先后顺序,方案 2 的这个劣势是致命的。
3、方案 2 引入了多组线程,使得整个消息消费链路被拉长,最终导致正确位移提交会变得异常困难,结果就是可能会出现消息的重复消费。如果你在意Kafka重复消费,那也不推荐使用方案 2。
两种方案的实例代码
方案一具体实现
方法一的实现比较简单,创建一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 Runner 类都会创建一个专属的 KafkaConsumer 实例。我们可以创建多个Runner实例。
public class MuiltThreadConsumer implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer = new KafkaConsumer(PropertiesConfig.getConsumerProperties());
@Override
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
// 执行消息处理逻辑
for (ConsumerRecord<String, String> record : records) {
System.out.println(Thread.currentThread().getName() + "进行了消费" + record.value() + "分区是" + record.partition() + "偏移量是" + record.offset());
}
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
for (int i=0;i<threadNum;i++){
Thread thread = new Thread(new MuiltThreadConsumer(),"thread--"+i);
thread.start();
}
方案二具体实现
关于方法二的代码实现,只需要理解,方法二两类线程的分工:拉取线程负责拉取对应Topic的消息,并将消息分发给工作线程,而工作线程完成消息的业务处理。这段代码最重要的地方是executors.submit(new Worker(record));当 Consumer 的 poll 方法返回消息后,是由专门的线程池来负责处理具体的消息。调用 poll 方法的主线程不负责消息处理逻辑,这样就实现了方案 2 的多线程架构。
public class ConsumerHandler<K, V> {
private final KafkaConsumer<K, V> consumer;
private ExecutorService executors;
public ConsumerHandler(String groupId, String topic) {
Properties properties = PropertiesConfig.getConsumerProperties();
properties.put("group.id", groupId);
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topic));
}
/**
* 消费主方法
*
* @param threadNumber 线程池中线程数
*/
public void consume(int threadNumber) {
executors = new ThreadPoolExecutor(
threadNumber,
threadNumber,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
//当 Consumer 的 poll 方法返回消息后,由专门的线程池来负责处理具体的消息。调用 poll 方法的主线程不负责消息处理逻辑
ConsumerRecords<K, V> records = consumer.poll(1000L);
if (!records.isEmpty()) {
executors.submit(new Worker(record));
}
}
}
}
关于方案二的思考
我们刚刚说到方案二的缺点是实现较为复杂,尤其位移管理较为复杂。上述代码案例中,是设置的自动提交位移方式。如果你的实际业务场景对消息消费可靠性要求不高,只要求“至多消费一次”,则kafka的enable.auto.commit参数使用默认的true即可,即kafka客户端一拉取到消息后,就会自动提交消费位移(offset),不管消息后续是否有处理完成。这意味着,拉取线程在拉取到消息,只需要分发给工作线程,之后的处理流程,就可以不管了,继续下次消息的拉取。至于工作线程是否有完成消息的业务处理,拉取线程不用关心,它只负责拉取和分发。
可是如果我业务场景是“至少消费一次”,这种场景对消息的消费可靠性要求较高,因此需要手动提交位移,消息被拉取到后,并不会自动提交offset,而是要等该消息的业务处理也全部完成之后,才手动提交offset,否则会认为没有消费成功,会再次拉取重新消费,这样就可以保证消息至少被消费一次了。
也就是说,我们需要设计手动提交offset,且保证这一批消息的offset提交的一致性是关键所在。
其实整体交互以及线程的分工与前面的“至多消费一次”是类似的,最主要的差异在于,工作线程和拉取线程分别多了一项工作:更新offset、提交offset。拉取线程:拉取对应Topic的消息,并将消息分发给工作线程;同时异步提交已完成业务处理的消息的offset;工作线程:完成消息的业务处理后,把该消息的offset同步更新到待提交的offset池子中(但不提交)。
代码设计思路:通过一个公共的Map来暂存待提交的offset,该Map的key是消息所属Partition,value则是要提交的offset。offset的更新(工作线程)与offset的提交(拉取线程)则是通过该Map来异步解耦。工作线程完成该消息的业务处理后,就会把该消息对应的offset放入Map,但不会提交。拉取线程则会一直检查该Map的每个Partition的offset是否有更新,若有更新,则就会提交该offset到Kafka。
本例中包含3个类:
- ConsumerHandler类:consumer多线程的管理类,用于创建线程池以及为每个线程分配任务。另外consumer位移的提交也在这个类中进行
- Worker类:本质上是一个Runnable,执行真正的消费逻辑并上报位移信息给ConsumerThreadHandler
- Main类:测试主方法类
public class ConsumerHandler<K, V> {
private final KafkaConsumer<K, V> consumer;
private ExecutorService executors;
private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
public ConsumerHandler(String groupId, String topic) {
Properties properties = PropertiesConfig.getConsumerProperties();
properties.put("group.id", groupId);
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(offsets);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
offsets.clear();
}
});
}
/**
* 消费主方法
* @param threadNumber 线程池中线程数
*/
public void consume(int threadNumber) {
executors = new ThreadPoolExecutor(
threadNumber,
threadNumber,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
try {
while (true) {
//当 Consumer 的 poll 方法返回消息后,由专门的线程池来负责处理具体的消息。调用 poll 方法的主线程不负责消息处理逻辑
ConsumerRecords<K, V> records = consumer.poll(1000L);
if (!records.isEmpty()) {
executors.submit(new Worker<>(records, offsets));
}
commitOffsets();
}
} catch (WakeupException e) {
// swallow this exception
} finally {
commitOffsets();
consumer.close();
}
}
private void commitOffsets() {
// 尽量降低synchronized块对offsets锁定的时间
Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
synchronized (offsets) {
if (offsets.isEmpty()) {
return;
}
unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
offsets.clear();
}
// 提交已完成业务处理的消息的offset
consumer.commitSync(unmodfiedMap);
}
public void close() {
consumer.wakeup();
executors.shutdown();
}
}
public class Worker<K,V> implements Runnable {
private final ConsumerRecords<K, V> records;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
public Worker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) {
this.records = record;
this.offsets = offsets;
}
@Override
public void run() {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);
for (ConsumerRecord<K, V> record : partitionRecords) {
// 消息处理逻辑
System.out.println(String.format("value=%s,topic=%s, partition=%d, offset=%d",
record.value(),record.topic(), record.partition(), record.offset()));
}
// 上报位移信息 把该消息的offset同步更新到待提交的offset池子中(但不提交)
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
synchronized (offsets) {
if (!offsets.containsKey(partition)) {
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
} else {
long curr = offsets.get(partition).offset();
if (curr <= lastOffset + 1) {
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
}
}
}
}
}
String topic = "kafka_test";
String groupID = "test";
final ConsumerHandler<byte[], byte[]> handler = new ConsumerHandler<>(groupID, topic);
final int cpuCount = Runtime.getRuntime().availableProcessors();
System.out.println("cpuCount..."+cpuCount);
Runnable runnable = new Runnable() {
@Override
public void run() {
handler.consume(cpuCount);
}
};
new Thread(runnable).start();
BDStar原创文章。发布者:Liuyanling,转载请注明出处:http://bigdata-star.com/archives/2235