1. 首页
  2. 大数据
  3. Kafka教程

【Kafka教程】(五)消息传递语义及精确一致(不丢失\不重复)如何保证

kafka 提供三种语义的传递:

  • 至少一次 (at least once) 消息不会丢失 ack=all ,但是可能重复投递
  • 至多一次 (at most once) 消息可能丢失,但是不会重复投递
  • 精确一次 (Exactly Once) 消息不会丢失,也不会重复

显然、Exactly Once才是我们真正想要的,但是只在0.11.0.0之后的版本中支持,如果是0.11之前的版本,想实现精确一次,需要通过其他方式,如通过让下游系统具有幂等性来配合。接下来具体讲解:

如何保证消息不“丢”失

请注意这里的”丢”字我加了引号,为什么加引号,是因为很多时候消息丢失并不是kafka的错,却让kafka背了锅。接下来我们会通过一些丢失案例,来判断一下这些丢失到底是不是kafka的锅,很多时候我们容易混淆责任的边界,如果搞不清楚事情由谁负责,自然也就不知道由谁来出解决方案了。

丢数据的源头

Kafka可能会在哪些层面丢数据呢?了解kafka架构就不难回答这个问题,生产者,Broker,消费者都是有可能丢数据的。

📷 生产端丢失数据
生产者丢数据,即发送的数据根本没有保存到Broker端。出现这个情况的原因可能是,网络抖动,导致消息压根就没有发送到 Broker 端;也可能是消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等等。

刚刚举例的两个原因,其实根本就不关kafka的事。所以我们要明确一个责任边界问题,kafka只对已提交的消息保证不丢失。也就是说,你提交过去了丢了才算kafka的锅,都没提交过去不能算kafka的锅。

📷 Broker端丢失数据

数据已经保存在broker端,但是数据却丢失了。出现这个的原因可能是,Broker机器down了,当然broker是高可用的,假如你的消息保存在 N 个 Kafka Broker 上,那么至少有 1 个存活就不会丢。
所以我们又可以明确一个责任边界问题,kafka是有限度的保证消息不丢失,不可能你所有机器都没了我还给你保证。

📷 消费端丢数据

Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。Kafka默认是自动提交位移的,这样可能会有个问题,假如你在pull(拉取)30条数据,处理到第20条时自动提交了offset,但是在处理21条的时候出现了异常,当你再次pull数据时,由于之前是自动提交的offset,所以是从30条之后开始拉取数据,这也就意味着21-30条的数据发生了丢失。

如何保证数据不丢失

既然kafka在生产、broker、消费三个层面都有可能”丢”消息,那么我们也需要从这三个方面考虑怎样保证消息不丢失。

📷 生产端保证消息不丢失

上面所说比如网络原因导致消息没有成功发送到broker端,常见,也并不可怕。可怕的不是没发送成功,而是发送失败了你不做任何处理。
很简单的一个重试配置,基本就可以解决这种网络瞬时抖动问题。
props.put("retries", 10);
当然还有很多其他原因导致的,不能只依靠kafka的配置来做处理,我们看一下kafka发送端的源码,其实人家是提供了两个方法的,通常会出问题的方法是那个简单的send,没有callback(回调)。简单的send发送后不会去管它的结果是否成功,而callback能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

/**
* See {@link KafkaProducer#send(ProducerRecord)}
*/
Future<RecordMetadata> send(ProducerRecord<K, V> record);
/**
 * See {@link KafkaProducer#send(ProducerRecord, Callback)}
 */
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
             new Callback() {
                 public void onCompletion(RecordMetadata metadata, Exception e) {
                     if(e != null) {
                         e.printStackTrace();
                      } else {
                         System.out.println("The offset of the record we just sent is: " + metadata.offset());
                     }
                   }
               });
 }

因此,一定要使用带有回调通知的 send 方法。
我们知道,broker一般不会有一个,我们就是要通过多Broker达到高可用的效果,所以对于生产者程序来说,也不能简单的认为发送到一台就算成功,如果只满足于一台,那台机器如果损坏了,那消息必然会丢失。设置 acks = all,表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”,这样可以达到高可用的效果。

  • acks=1(默认):当且仅当leader收到消息返回commit确认信号后认为发送成功。如果 leader 宕机,则会丢失数据。
  • acks=0:producer发出消息即完成发送,无需等待来自 broker 的确认。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  • acks=-1(ALL):发送端需要等待 ISR 列表中所有列表都确认接收数据后才算一次发送完成,可靠性最高,延迟也较大。

📷 Broker端保证消息不丢失
前面我们说到,kafka是有限度的保证消息不丢失,这里的限度,指至少要有一台broker可以正常提供服务。至少一台,这种说法可并不准确,应该说至少一台存储了你消息的的broker。我们知道分区可以设置副本数,假如你只设置副本为1,只要挂的刚好是你副本的那台,即使你有1000台broker,也无济于事。
因此,副本的设置尤为重要,一般设置 replication.factor >= 3,毕竟目前防止消息丢失的主要机制就是冗余。
但仅仅设置副本数就有用吗?并不能保证broker端一定存储了三个副本呀。假如共有三个broker,发送一条消息的时候,某个broker刚好宕机了,即使你配置了replication.factor = 3,也最多只会有2台副本。因此,我们还要确认,至少要被写入到多少个副本才算是“已提交”。
min.insync.replicas > 1 , 控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。
说到这,可能会有疑问,上面生产端不是已经配置acks=all了,和这个参数不是冲突了吗??注意acks = all是针对所有副本 Broker 都要接收到消息,假如ISR(当前活跃的副本列表)中只有1个副本了,acks=all也就相当于acks=1了,引入min.insync.replicas的目的就是为了做一个下限的限制,不能只满足于ISR全部写入,还要保证ISR中的写入个数不少于min.insync.replicas。
对了,请确保 replication.factor > min.insync.replicas。一般设置为replication.factor = min.insync.replicas + 1。如果两者相等,有一个副本挂机,整个分区就无法正常工作了。我们不仅要考虑消息的可靠性,防止消息丢失,更应该考虑可用性问题。

刚刚我们谈论的都是如何通过消息冗余来达到可靠性。但是还需要关注一个leader选举的问题。

我们知道kafka中有领导者副本(Leader Replica)和追随者副本(Follower Replica),而follower replica存在的唯一目的就是防止消息丢失,并不参与具体的业务逻辑的交互。只有leader 才参与服务,follower的作用就是充当leader的候补,平时的操作也只有信息同步。ISR也就是这组与leader保持同步的replica集合,我们要保证不丢消息,首先要保证ISR的存活(至少有一个备份存活),那存活的概念是什么呢,不仅需要机器正常,还需要跟上leader的消息进度,当达到一定程度的时候就会认为“非存活”状态。
假设这么一种场景,有Leader,Follow1,Follow2;其中Follow2落后于Leader太多,因此不在leader副本和follower1副本所在的ISR集合之中。此时Leader,Follow1都宕机了,只剩下Follow2了,Follow2还在,就会进行新的选举,不过在选举之前首先要判断unclean.leader.election.enable参数的值。如果unclean.leader.election.enable参数的值为false,那么就意味着非ISR中的副本不能够参与选举,此时无法进行新的选举,此时整个分区处于不可用状态。如果unclean.leader.election.enable参数的值为true,那么可以从非ISR集合中选举follower副本称为新的leader。如果让非ISR中的Follow2成为Leader会有什么后果呢?

【Kafka教程】(五)消息传递语义及精确一致(不丢失\不重复)如何保证

我们说Follow2已经落后之前的Leader很多,他成为新的Leader后从客户端继续收取消息,此时,原来的leader副本恢复,成为了新的follower副本,准备向新的leader副本同步消息,但是它发现自身的LEO(LEO是Log End Offset的缩写,它表示了当前日志文件中下一条待写入消息的offset)比leader副本的LEO还要大。Kafka中有一个准则,follower副本的LEO是不能够大于leader副本的,所以新的follower副本就需要截断日志至leader副本的LEO处,截断日志,不就丢失了之前的消息吗?即图中所示,丢失了3和4两条数据,并且新的Follow和新Leader之间的消息也不一致。

因此,如果要保证消息不丢失,需设置:
unclean.leader.election.enable=false,但是Kafka的可用性就会降低,具体怎么选择需要读者根据实际的业务逻辑进行权衡,可靠性优先还是可用性优先。从Kafka 0.11.0.0版本开始将此参数从true设置为false,可以看出Kafka的设计者偏向于可靠性。

📷 消费端保证消息不丢失

消费端保证不丢数据,最重要就是保证offset的准确性。我们能做的,就是确保消息消费完成再提交。Consumer 端有个参数 ,设置enable.auto.commit= false,并且采用手动提交位移的方式。如果在处理数据时发生了异常,那就把当前处理失败的offset进行提交(放在finally代码块中)注意一定要确保offset的正确性,当下次再次消费的时候就可以从提交的offset处进行再次消费。consumer在处理数据的时候失败了,其实可以把这条数据给缓存起来,可以是redis、DB、file等,也可以把这条消息存入专门用于存储失败消息的topic中,让其它的consumer专门处理失败的消息。

【Kafka教程】(五)消息传递语义及精确一致(不丢失\不重复)如何保证

消息去重

首先是生产端如何保证消息不重复呢?

kafka默认情况下,提供的是至少一次的可靠性保障(acks=all)。即broker保障已提交的消息的发送,但是遇上某些意外情况,如:网络抖动,超时等问题,导致Producer没有收到broker返回的数据ack,则Producer会继续重试发送消息,从而导致消息重复发送。

相应的,如果我们禁止Producer的失败重试发送功能,或者说不用等待服务器响应(acks=0),消息要么写入成功,要么写入失败,但绝不会重复发送。这样就是最多一次的消息保障模式。

但对于消息组件,排除特殊业务场景,我们追求的一定是精确一次的消息保障模式。kafka通过幂等性(Idempotence)和事务(Transaction)的机制,提供了这种精确的消息保障。

📷 幂等生产者
“幂等”这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。幂等性有很多好处,其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。

幂等生产者,说白了,就是你消息发送多次,对于系统也没影响。

Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture) ,被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。

Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。

但你需要注意以下问题:

1、它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,无法实现多个分区的幂等性。

2、它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。

那么如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!

📷 事务生产者
kafka的事务跟我们常见数据库事务概念差不多,也是提供经典的ACID,即原子性(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。

事务Producer保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer重启回来后,kafka依然保证它们发送消息的精确一次处理。事务特性的配置也很简单:

和幂等Producer一样,开启enable.idempotence = true
设置Producer端参数transctional.id
事务Producer的代码稍微也有点不一样,需要调一些事务处理的API。数据的发送需要放在beginTransaction和commitTransaction之间。示例代码:

producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。

因此Consumer端的代码也需要加上isolation.level参数,用以处理事务提交的数据。值为read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

事务Producer虽然在多分区的数据处理上保证了幂等,但是处理性能上相应的是会有一些下降的。毕竟天下没有免费的午餐。

📷 redis 实现幂等
在 0.8 系列版本的kafka还没有这些自带的幂等事务特性,只能依靠开发者自己来实现。

常见的方式就是通过数据的业务属性来生成个uniqueId来维护到redis中,利用redis的高并发,高吞吐,分布式锁特性,让写入kafka多分区的数据前,先去redis中校验一下uniqueId等方式,来实现幂等。

得益于redis的高性能,在保证幂等同时,还能不让消息数据吞吐性能下降太多。当然,因为redis的依赖引入,也增加了架构的复杂度,从运维上来说也增加了整体的故障点。如果是用的新版本,其实直接用kafka官方的方法就可以了。

消费端保证不重复
考虑到 producer,broker,consumer 之间都有可能造成消息重复,所以我们要求接收端需要支持消息去重的功能,最好借助业务消息本身的幂等性来做。其中有些大数据组件,如 hbase,elasticsearch 天然就支持幂等操作。
举例:
在华泰证券中Kafka的幂等性是如何保证的?在接收端,启动专门的消费者拉取 kafka 数据存入 hbase。hbase 的 rowkey 的设计主要包括 SecurityId(股票id)和 timestamp(行情数据时间)。消费线程从 kafka 拉取数据后反序列化,然后批量插入 hbase,只有插入成功后才往 kafka 中持久化 offset。这样的好处是,如果在中间任意一个阶段发生报错,程序恢复后都会从上一次持久化 offset 的位置开始消费数据,而不会造成数据丢失。如果中途有重复消费的数据,则插入 hbase 的 rowkey 是相同的,数据只会覆盖不会重复,最终达到数据一致。

消息乱序

这里说的乱序不是全局顺序,目前Kafka并不保证全局的消息顺序,只是提供分区级别的顺序性。如果同一分区消息乱序,很可能是重试导致的:
假设a,b两条消息,a先发送后由于发送失败重试,这时顺序就会在b的消息后面,可以设置max.in.flight.requests.per.connection=1来避免。
producer向broker发送数据的时候,其实是有多个网络连接的(因为有多个broker)。每个网络连接可以忍受producer端发送给broker消息然后消息没有响应的个数。
max.in.flight.requests.per.connection:限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求,但吞吐量会下降

BDStar原创文章。发布者:Liuyanling,转载请注明出处:http://bigdata-star.com/archives/1507

发表评论

登录后才能评论

联系我们

562373081

在线咨询:点击这里给我发消息

邮件:562373081@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code