1. 首页
  2. 大数据
  3. Kafka教程

Kafka重设消费者组offset的7种策略

摘要:Kafka是基于日志结构,消费者在消费消息时仅仅只读而不作删除,因此消费消息是可以重演的,kafka有七种控制消费组消费offset的策略,主要分为位移维度和时间维度,包括:
一、Earliest/Latest/Current/Specified-Offset/Shift-By-N策略
二、DateTime和Duration策略

kafka为何能重设位移

并不是所有的消息队列都可以重设消费者组位移达到重新消费的目的。比如传统的RabbitMq,它们处理消息是一次性的,即一旦消息被成功消费,就会被删除。而Kafka消费消息是可以重演的,因为它是基于日志结构(log-based)的消息引擎,消费者在消费消息时,仅仅是从磁盘文件上读取数据而已,所以消费者不会删除消息数据。同时,由于位移数据是由消费者控制的,因此它能够很容易地修改位移的值,实现重复消费历史数据的功能。
在Kafka消费位移的那些事这篇文章我们详细说了下位移的概念和位移的提交,并且提到了怎么控制消费位移,今天实际是详细补充一下如何控制消费组位移。假设这么一个场景,我已经消费了1000条消息后,我发现处理逻辑错了,所以我需要重新消费一下,可是位移已经提交了,我到底该怎么重新消费这1000条呢??假设我想从某个时间点开始消费,我又该如何处理呢?这就是本文要解决的问题。

重设位移的策略

auto.offset.reset=earliest/latest这个参数大家都很熟悉,但是初学者很容易误会它。大部分朋友都觉得在任何情况下把这两个值设置为earliest或者latest ,消费者就可以从最早或者最新的offset开始消费,但实际上并不是那么回事,他们生效都有一个前提条件,那就是对于同一个groupid的消费者,如果这个topic某个分区有已经提交的offset,那么无论是把auto.offset.reset=earliest还是latest,都将失效,消费者会从已经提交的offset开始消费。因此这个参数并不能解决用户想重设消费位移的需求。

Kafka一共有七种重设位移的策略。不论是哪种设置方式,重设位移大致可以从两个维度来进行。
位移维度。这是指根据位移值来重设。也就是说,直接把消费者的位移值重设成我们给定的位移值。
时间维度。我们可以给定一个时间,让消费者把位移调整成大于该时间的最小位移;也可以给出一段时间间隔,比如 30 分钟前,然后让消费者直接将位移调回 30 分钟之前的位移值。

说完了重设策略,我们就来看一下具体应该如何实现,可以从两个角度,API方式和命令行方式。

重设位移的方法之API方式

API方式只要记住用seek方法就可以了,包括seek,seekToBeginning 和 seekToEnd。

void seek(TopicPartition partition, long offset);    
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);    
void seekToBeginning(Collection<TopicPartition> partitions);    
void seekToEnd(Collection<TopicPartition> partitions);    

从方法签名我们可以看出seekToBeginning和seekToEnd是可以一次性重设n个分区的位移,而seek 只允许重设指定分区的位移,即为每个分区都单独设置位移,因为不难得出,如果要自定义每个分区的位移值则用seek,如果希望kafka帮你批量重设所有分区位移,比如从最新数据消费或者从最早数据消费,那么用seekToEnd和seekToBeginning。

Earliest 策略:从最早的数据开始消费

从主题当前最早位移处开始消费,这个最早位移不一定就是 0 ,因为很久远的消息会被 Kafka 自动删除,主要取决于你的删除配置。

代码如下:

Properties properties = PropertiesConfig.getConsumerProperties();
properties.put("group.id", getGroupId());
Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(getTopics());
consumer.poll(0);
consumer.seekToBeginning(
consumer.partitionsFor(getTopics()).stream().map(partitionInfo ->
   new TopicPartition(getTopics(), partitionInfo.partition()))
   .collect(Collectors.toList()));

首先是构造consumer对象,这样我们可以通过partitionsFor获取到分区的信息,然后我们就可以构造出TopicPartition 的集合传给seekToBegining方法。需要注意的一个地方是:需要用consumer.poll(0),而不能用consumer.poll(Duration.ofMillis(0))
在poll(0)中consumer会一直阻塞直到它成功获取了所需的元数据信息,之后它才会发起fetch请求去获取数据。而poll(Duration)会把元数据获取也计入整个超时时间。由于本例中使用的是0,即瞬时超时,因此consumer根本无法在这么短的时间内连接上coordinator,所以只能赶在超时前返回一个空集合。

Latest策略:从最新的数据开始消费

    consumer.seekToEnd(
        consumer.partitionsFor(getTopics().get(0)).stream().map(partitionInfo ->
            new TopicPartition(getTopics().get(0), partitionInfo.partition()))
              .collect(Collectors.toList()));

Current策略:从当前已经提交的offset处消费

consumer.partitionsFor(getTopics().get(0)).stream().map(info ->
        new TopicPartition(getTopics().get(0), info.partition()))
        .forEach(tp -> {
            long committedOffset = consumer.committed(tp).offset();
            consumer.seek(tp, committedOffset);

        });

Special-offset策略:从指定的offset处消费

该策略使用的方法和current策略一样,区别在于,current策略是直接从kafka元信息中读取中已经提交的offset值,而special策略需要用户自己为每一个分区指定offset值,我们一般是把offset记录到数据库中然后可以从数据库去读取这个值

    consumer.partitionsFor(getTopics().get(0)).stream().map(info ->

                new TopicPartition(getTopics().get(0), info.partition()))
                .forEach(tp -> {
                    try {
                        consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition()));
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }

                });

以上演示了用API方式重设位移,演示了四种常见策略的代码,另外三种没有演示,一方面是大同小异,另一方面在实际生产中,用API的方式不太可能去做时间维度的重设,而基本都是用命令行方式。

重设位移的方法之命令行方式

命令行方式重设位移是通过 kafka-consumer-groups 脚本。比起 API 的方式,用命令行重设位移要简单得多。

Earliest 策略指定–to-earliest。

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –all-topics –to-earliest –execute

Latest 策略指定–to-latest。

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –all-topics –to-latest –execute

Current 策略指定–to-current。

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –all-topics –to-current –execute

Specified-Offset 策略指定–to-offset。

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –all-topics –to-offset –execute

Shift-By-N 策略指定–shift-by N。

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –shift-by –execute

DateTime 策略指定–to-datetime。

DateTime 允许你指定一个时间,然后将位移重置到该时间之后的最早位移处。常见的使用场景是,你想重新消费昨天的数据,那么你可以使用该策略重设位移到昨天 0 点。

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –to-datetime 2019-06-20T20:00:00.000 –execute

Duration 策略指定–by-duration。

Duration 策略则是指给定相对的时间间隔,然后将位移调整到距离当前给定时间间隔的位移处,具体格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 类的话,你应该不会对这个格式感到陌生。它就是一个符合 ISO-8601 规范的 Duration 格式,以字母 P 开头,后面由 4 部分组成,即 D、H、M 和 S,分别表示天、小时、分钟和秒。举个例子,如果你想将位移调回到 15 分钟前,那么你就可以指定 PT0H15M0S。

bin/kafka-consumer-groups.sh –bootstrap-server kafka-host:port –group test-group –reset-offsets –by-duration PT0H30M0S –execute

BDStar原创文章。发布者:Liuyanling,转载请注明出处:http://bigdata-star.com/archives/2244

发表评论

登录后才能评论

联系我们

562373081

在线咨询:点击这里给我发消息

邮件:562373081@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code