1. 首页
  2. 大数据
  3. Kafka教程

Kafka消费位移的那些事

摘要:Kafka中的位移是个极其重要的概念,因为数据一致性、准确性是一个很重要的语义,我们都不希望消息重复消费或者丢失。而位移就是控制消费进度的大佬。本文就详细聊聊kafka消费位移的那些事,包括:

  • 位移的概念剖析
  • 位移的提交
  • 如何”完美”的控制消费位移
  • 位移主题

概念剖析

kafka的两种位移

关于位移(Offset),在kafka术语那篇文章我们提到过,在kafka的世界里有两种位移,先来回顾一下。一种叫做分区位移,生产者向分区写入消息,每条消息在分区中的位置信息由一个叫offset的数据来表征。假设一个生产者向一个空分区写入了 10 条消息,那么这 10 条消息的位移依次是 0、1、…、9;

还有一种就是今天重点要讲的,也是实际生产中关注度很高的一个东西,叫消费位移,消费者需要记录消费进度,即消费到了哪个分区的哪个位置上,这是消费者位移(Consumer Offset)。
注意,这和上面所说的消息在分区上的位移完全不是一个概念。上面的“位移”表征的是分区内的消息位置,它是不变的,即一旦消息被成功写入到一个分区上,它的位移值就是固定的了。而消费者位移则不同,它可能是随时变化的,毕竟它是消费者消费进度的指示器。

消费位移

消费位移,记录的是 Consumer 要消费的下一条消息的位移,切记,是下一条消息的位移!而不是目前最新消费消息的位移。假设一个分区中有 10 条消息,位移分别是 0 到 9。某个 Consumer 应用已消费了 5 条消息,这就说明该 Consumer 消费了位移为 0 到 4 的 5 条消息,此时 Consumer 的位移是 5,指向了下一条消息的位移。
至于为什么要有消费位移,很好理解,当 Consumer 发生故障重启之后,就能够从 Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。就好像书签一样,需要书签你才可以快速找到你上次读书的位置。
那么了解了位移是什么以及它的重要性,我们自然而然会有一个疑问,kafka是怎么记录、怎么保存、怎么管理位移的呢?

位移的提交

Consumer 需要上报自己的位移数据,这个汇报过程被称为位移提交。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即Consumer 需要为分配给它的每个分区提交各自的位移数据。
鉴于位移提交甚至是位移管理对 Consumer 端的巨大影响,KafkaConsumer API提供了多种提交位移的方法,每一种都有各自的用途,这些都是本文将要谈到的方案。

void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

先粗略的总结一下,从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

自动提交

当消费配置enable.auto.commit=true的时候代表自动提交位移。
自动提交位移是发生在什么时候呢?auto.commit.interval.ms默认值是50000ms。即kafka每隔5s会帮你自动提交一次位移。假如消费数据量特别大,可以设置的短一点。
越简单的东西功能越不足,自动提交位移省事的同时肯定会带来一些问题。自动提交位移不会去关注你自己的业务逻辑是否成功,
举例:你的处理逻辑是插入数据库,由于某种原因数据库短暂的失去连接,但是try catch住了异常,程序正常运行,故kafka仍然会为你提交位移。这样可能存在消息丢失的情况。如果手动提交位移,你可以设置插入数据库返回ok了再提交,这样当数据库短暂失去连接的情况,位移不会被提交,等数据库恢复连接后会重新消费,这样就不会丢失数据了。

手动提交

当消费配置enable.auto.commit=false的时候代表手动提交位移。用户必须在适当的时机(一般是处理完业务逻辑后),手动的调用相关api方法提交位移。比如在下面的案例中,我需要确认我的业务逻辑返回true之后再手动提交位移

 while (true) {
     try {
         ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
         if (!consumerRecords.isEmpty()) {
             boolean batchHandleResult = true;
             for (ConsumerRecord<String, String> record : consumerRecords) {
                 KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                 // 处理业务
                 boolean handleResult = handle(kafkaMessage);
                 if (handleResult) {
                     System.out.println(" handle success, kafkaMessage={}" + kafkaMessage);
                 } else {
                     batchHandleResult = false;
                     System.out.println(" handle failed, kafkaMessage={}" + kafkaMessage);
                 }
             }
             // 手动提交offset
             if (batchHandleResult) {
                 System.out.println("batch handle success, start submit offset, count=" + consumerRecords.count());
                 consumer.commitSync(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
             } else {
                 System.out.println("batch handle failed, not commit, count=" + consumerRecords.count());
             }
​
         } else {
             System.out.println("no message in kafk");
         }
     } catch (Exception e) {
         System.out.println("kafka consume error." + e);
     }
 }

手动提交又分为异步提交和同步提交。

同步提交

上面案例代码使用的是commitSync() ,顾名思义,是同步提交位移的方法。同步提交位移Consumer 程序会处于阻塞状态,等待 Broker 返回提交结果。同步模式下提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束。在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。当然,你可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。因此,为了解决这些不足,kafka还提供了异步提交方法。

异步提交

异步提交会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数,供你实现提交之后的逻辑,比如记录日志或处理异常等。下面这段代码展示了调用 commitAsync() 的方法
consumer.commitAsync((offsets, exception) -> {if (exception != null) handleException(exception);});

但是异步提交会有一个问题,那就是它没有重试机制,不过一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。所以消息也是不会丢失和重复消费的。
但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。因此,组合使用commitAsync()和commitSync()是最佳的方式。

Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(getTopics());
   try {
       while (true) {
           ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
           if (!consumerRecords.isEmpty()) {
               boolean batchHandleResult = true;
               for (ConsumerRecord<String, String> record : consumerRecords) {
                   KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                   boolean handleResult = handle(kafkaMessage);
                   if (handleResult) {
                       System.out.println(" handle success, kafkaMessage={}" + kafkaMessage);
                   } else {
                       batchHandleResult = false;
                       System.out.println(" handle failed, kafkaMessage={}" + kafkaMessage);
                   }
               }
               //异步提交位移
               if (batchHandleResult) {
                   System.out.println("batch handle success, submit offset, count=" + consumerRecords.count());
                   consumer.commitAsync((offsets, exception) -> {
                       if (exception != null)
                           handleException(exception);
                   });
               } else {
                   System.out.println("batch handle failed, not commit, count=" + consumerRecords.count());
               }
           }
       }
   } catch (Exception e) {
       System.out.println("kafka consumer error:" + e.toString());
   } finally {
       try {
        //最后同步提交位移
           consumer.commitSync();
       } finally {
           consumer.close();
       }
   }

让位移提交更加灵活和可控

如果细心的阅读了上面所有demo的代码,那么你会发现这样几个问题:

1、所有的提交,都是提交 poll 方法返回的所有消息的位移,poll 方法一次返回1000 条消息,则一次性地将这 1000 条消息的位移一并提交。可这样一旦中间出现问题,位移没有提交,下次会重新消费已经处理成功的数据。所以我想做到细粒度控制,比如每次提交100条,该怎么办?
答:可以通过commitSync(Map<TopicPartition, OffsetAndMetadata>) 和 commitAsync(Map<TopicPartition, OffsetAndMetadata>)对位移进行精确控制。

2、poll和commit方法对于普通的开发人员而言是一个黑盒,无法精确地掌控其消费的具体位置。我都不知道这次的提交,是针对哪个partition,提交上去的offset是多少。
答:可以通过record.topic()获取topic信息, record.partition()获取分区信息,record.offset() + 1获取消费位移,记住消费位移是指示下一条消费的位移,所以要加一。

3、我想自己管理offset怎么办?一方面更加保险,一方面下次重启之后可以精准的从数据库读取最后的offset就不存在丢失和重复消费了。
答:可以将消费位移保存在数据库中。消费端程序使用comsumer.seek方法指定从某个位移开始消费。

综合以上几个可优化点,并结合全文,可以给出一个比较完美且完整的demo:联合异步提交和同步提交,对处理过程中所有的异常都进行了处理。细粒度的控制了消费位移的提交,并且保守的将消费位移记录到了数据库中,重新启动消费端程序的时候会从数据库读取位移。这也是我们消费端程序位移提交的最佳实践方案。你只要继承这个抽象类,实现你具体的业务逻辑就可以了。

/**
 * Created by liuyanling on 2020/3/20
 */
public abstract class PrefectCosumer {
    private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    int count = 0;
    public final void consume() {
        Properties properties = PropertiesConfig.getConsumerProperties();
        properties.put("group.id", getGroupId());
        Consumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(getTopics());
        consumer.poll(0);
        // 把offset记录到数据库中 从指定的offset处消费 
        consumer.partitionsFor(getTopics()).stream().map(info ->
        new TopicPartition(getTopics(), info.partition()))
        .forEach(tp -> {
               consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));   
         });
        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes));
                if (!consumerRecords.isEmpty()) {
                    for (ConsumerRecord<String, String> record : consumerRecords) {
​
                        KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class);
                        boolean handleResult = handle(kafkaMessage);
                        if (handleResult) {
                            //注意:提交的是下一条消息的位移。所以OffsetAndMetadata 对象时,必须使用当前消息位移加 1。
                            offsets.put(new TopicPartition(record.topic(), record.partition()),
                                    new OffsetAndMetadata(record.offset() + 1));
​
                            // 细粒度控制提交 每10条提交一次offset
                            if (count % 10 == 0) {
                                // 异步提交offset
                                consumer.commitAsync(offsets, (offsets, exception) -> {
                                    if (exception != null) {
                                        handleException(exception);
                                    }
                                    // 将消费位移再记录一份到数据库中
                                    offsets.forEach((k, v) -> {
                                        String s = "insert into kafka_offset(`topic`,`group_id`,`partition_id`,`offset`) values" +
                                                " ('" + k.topic() + "','" + getGroupId() + "'," + k.partition() + "," + v.offset() + ")" +
                                                " on duplicate key update offset=values(offset);";
                                        JdbcUtils.insertTable(s);
                                    });
​
​
                                });
                            }
                            count++;
                        } else {
                            // 失败的时候重试10次 防止因为网络抖动导致没有成功处理逻辑
                            Boolean retryResult = false;
                            for (int i = 0; i < 10; i++) {
                                retryResult = handle(kafkaMessage);
                                if (retryResult) {
                                    break;
                                }
                            }
​
                            if (!retryResult) {
                                System.out.println("消费消息失败 kafkaMessage={}" + getTopics() + getGroupId() + kafkaMessage.toString());
                            }
                        }
                    }
​
​
                }
            }
        } catch (Exception e) {
            System.out.println("kafka consumer error:" + e.toString());
        } finally {
            try {
                // 最后一次提交 使用同步提交offset
                consumer.commitSync();
            } finally {
                consumer.close();
            }
​
​
        }
    }
​
​
    /**
     * 具体的业务逻辑
     *
     * @param kafkaMessage
     * @return
     */
    public abstract boolean handle(KafkaMessage kafkaMessage);
​
    public abstract List<String> getTopics();
​
    public abstract String getGroupId();
​
    void handleException(Exception e) {
        System.out.println("commitAsync handleException"+e);
    }
}

提交的位移都去哪了?

通过上面那几部分的内容,我们已经搞懂了位移提交的方方面面,那么提交的位移它保存在哪里呢?这就要去位移主题的的世界里一探究竟了。kafka把位移保存在一个叫做__consumer_offsets的内部主题中,叫做位移主题。

注意:老版本的kafka其实是把位移保存在zookeeper中的,但是zookeeper并不适合这种高频写的场景。所以新版本已经是改进了这个方案,直接保存到kafka。毕竟kafka本身就适合高频写的场景,并且kafka也可以保证高可用性和高持久性。

既然它也是主题,那么离不开分区和副本这两个机制。我们并没有手动创建这个主题并且指定,所以是kafka自动创建的, 分区的数量取决于Broker 端参数 offsets.topic.num.partitions,默认是50个分区,而副本参数取决于offsets.topic.replication.factor,默认是3。

既然也是主题,肯定会有消息,那么消息格式是什么呢?参考前面我们手动设计将位移写入数据库的方案,我们保存了topic,group_id,partition,offset四个字段。topic,group_id,partition无疑是数据表中的联合主键,而offset是不断更新的。无疑kafka的位移主题消息也是类似这种设计。key也是那三个字段,而消息体其实很复杂,你可以先简单理解为就是offset。

既然也是主题,肯定也会有删除策略,否则消息会无限膨胀。但是位移主题的删除策略和其他主题删除策略又不太一样。我们知道普通主题的删除是可以通过配置删除时间或者大小的。而位移主题的删除,叫做 Compaction。Kafka 使用Compact 策略来删除位移主题中的过期消息,对于同一个 Key 的两条消息 M1 和 M2,如果 M1 的发送时间早于 M2,那么 M1 就是过期消息。Compact 的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。

Kafka 提供了专门的后台线程定期地巡检待 Compact 的主题,看看是否存在满足条件的可删除数据。这个后台线程叫 Log Cleaner。很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,我建议你去检查一下 Log Cleaner 线程的状态,通常都是这个线程挂掉了导致的。

总结

kafka的位移是个极其重要的概念,控制着消费进度,也即控制着消费的准确性,完整性,为了保证消息不重复和不丢失。我们最好做到以下几点:

  • 手动提交位移。
  • 手动提交有异步提交和同步提交两种方式,既然两者有利也有弊,那么我们可以结合起来使用。
  • 细粒度的控制消费位移的提交,这样可以避免重复消费的问题。
  • 保守的将消费位移再记录到了数据库中,重新启动消费端程序的时候从数据库读取位移。

参考文档:

https://time.geekbang.org/column/article/106904
http://kafka.apache.org/22/documentation.html
http://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

BDStar原创文章。发布者:Liuyanling,转载请注明出处:http://bigdata-star.com/archives/2270

发表评论

登录后才能评论

联系我们

562373081

在线咨询:点击这里给我发消息

邮件:562373081@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code