1. 首页
  2. 大数据
  3. Kafka教程

深入剖析kafka生产者

我们都知道kafka是消息引擎,既然是消息引擎就得有消息,那么消息怎么来的呢?你会告诉我,so easy。生产者发来的。那么生产者怎么发送消息呢?你也会告诉我,网上随便找段demo就知道了,最多10分钟就可以看到成果。

然而事实可并不这么简单,我再问你:
– 生产者发送的整个流程是怎么样的?有哪些参数配置是很重要的呢?
– 有几种生产者?如何实现异步生产者和同步生产者
– 生产者发送消息如何保证顺序性呢?
– 会发消息还不够,如何保证消息不丢失呢?保证消息不丢失还不行,我还不想要重复数据怎么办呢?(即精确一次)

如果有任一问题回答不出来,那你就不能说自己掌握了Kafka Procuder。老老实实看完这篇文章吧骚年。

从简单Demo说起

首先我们还是官网copy一段Demo来看看Kafka如何发送消息,大概要经历以下步骤:

1、配置依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>

2、编写代码

// 创建生产者
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:port1, broker2:port2");
kafkaProps.put("key.serializer", "org.apache.kafka.common.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
// 最简单的发送方式:异步发送 且无回调
Producer<String,String> producer=new KafkaProducer<>(props);
for(int i=0;i<100;i++)
{
producer.send(new ProducerRecord<String,String>("my-topic",Integer.toString(i),Integer.toString(i)));
}
producer.close();

这段最基础的代码也给了我们一些最基础的指示:

1、我们先要配置一些参数,比如必须要配置server的地址; 必须配置序列化参数,因为 Kafka 的消息需要从客户端传到服务端,涉及到网络传输,所以需要实现序列,一般用StringSerializer即可,顾名思义,就是把消息当成String处理,即使你是{"name":"lyl"}这种看起来是json的,也是String。那么除了官方这几个必备的参数,还有哪些参数我们可以配置,又可以带来怎样的效果呢?

2、一条消息会被封装成为一个 ProducerRecord 对象。构造该对象有几个常见方法:

    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }
    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }

topic肯定是要指定的,消息必然要归属于某个topic。value也是指定的,这value就是消息。那么key和partition是什么,要不要配置?怎么配置?

3、使用producer.send方法发送即可,上面demo是最简单的send,那么还有哪些呢?就这两个:

    Future<RecordMetadata> send(ProducerRecord<K, V> record);
    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

callback我们太熟悉了,回调函数。提供对请求完成的异步处理。确认发送到服务器的记录后,将调用此方法。划重点,异步发送的时候我们可以设置回调函数,因为异步发送就是发完就闪人了,不需要确定一条消息发送成功再发另一条,因此异步可以提高吞吐量。但是我们总不能完全不管吧,发失败了都不知道也太坑了?因此等它处理完还是等通知一下我们的。

发现了吗?从一个简单的demo,我们去扩展,就能得出很多有用的信息也指明了我们的学习方向,实际这是一个很好的学习方法

比如配置参数,你得多了解下还有哪些参数呢?ProducerRecord,我们点进去看看这个类,你就会知道原来我们可以指定key,指定partition,那么你又可以深究为什么要指定key和partition?还有send方法,最简单的就是把ProducerRecord传进去即可,但是你点进去看,你又会发现原来可以回调,那么你又可以深究为啥要回调?

Kafka生产者如何保证顺序性

如果不了解分区机制,建议你一定要先阅读:

如果明白了这篇文章,生产者如何保证顺序性这个问题我相信你自己都有答案了:因为kafka仅保证分区内的顺序性,所以其实换句话说就是,kafka生产者如何指定分区发送

我们从源码来看一下kafka如何指定分区发送,首先在send方法中有这么一句话:

int partition = partition(record, serializedKey, serializedValue, cluster)

显然它就是用于计算消息在哪个分区。这里的serializedKey就是ProducerRecord<K, V> record中的key经过序列化。再继续跟踪到DefaultPartitioner类的partition方法,就可以知道默认按照轮询策略来分区,可是如果有key的话,会按照key来分区。

// 按key分区源码 hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

说来说去都是ProducerRecord的key,前文提到的ProducerRecord构造函数key到底是何方神圣呢?

    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }

key指的是消息的key,我们一般会设置一个全局唯一的值,方便问题追踪定位。

如果需要保证顺序性,这个 Key 的作用就非常大了,我们刚说了有key的话,会按照key来分区。因此key可以是一个有着明确业务含义的字符串,比如用户id,那么同一个id的消息肯定会发到同一个分区。

partition参数就是更简单粗暴了,你自己指定发送到哪个分区。但是一般来说,我们即使想指定分区,也是设定key,然后kafka根据key去分区。

咦,按key分区,kafka官方是用Utils.toPositive方法去计算的,如果我想自己实现计算逻辑怎么办?模仿着DefaultPartitioner来就好了,创建一个MyPartitioner类,implements Partitioner,然后重写父类的partitionclose方法。举例:

public class MyPartitioner implements Partitioner {


    public void configure(Map<String, ?> configs) {}


    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
         if (Integer.parseInt((String)key)%3==1)
                return 0;
         else if (Integer.parseInt((String)key)%3==2)
                return 1;
         else return 2;
    }


    @Override
    public void close() {


    }


}

那么怎么告诉kafka要以你的分区类来分区呢?配置一个参数就好了props.put("partitioner.class", "com.example.demo.MyPartitioner");

Kafka生产者如何保证消息不丢失

消息不丢失我觉得是最重要的前提吧。当然某些场景下也不需要,比如不重要的日志,偶尔丢一两条无所谓。

丢数据源头

要解决这个问题,要知道源头先。那么Kafka为什么会丢失数据呢?其实不是Kafka会丢数据,请你不要让它背锅。

生产者丢数据,即发送的数据根本没有保存到Broker端。出现这个情况的原因可能是,网络抖动,导致消息压根就没有发送到 Broker 端;也可能是消息本身不合格导致 Broker 拒绝接收(比如消息太大了,超过了 Broker 的承受能力)等等。

刚刚举例的两个原因,其实根本就不关kafka的事。kafka只对已提交的消息保证不丢失。也就是说,你提交过去了丢了才算kafka的锅,都没提交过去不能算kafka的锅。

如何避免

上面所说比如网络原因导致消息没有成功发送到broker端,常见,也并不可怕。可怕的不是没发送成功,而是发送失败了你不做任何处理。

很简单的一个重试配置,基本就可以解决这种网络瞬时抖动问题:props.put("retries", 10);

当然还有很多其他原因导致的,不能只依靠kafka的配置来做处理,我们刚刚看到kafka发送端的send方法有两个,通常会出问题的方法是那个简单的send,没有callback(回调)。简单的send发送后不会去管它的结果是否成功,而callback能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

现在让我们改造一下刚刚的简单demo,用回调的方式,保证消息的可靠投递(这里只是简单的记录异常日志,可根据自己业务需求来处理):

producer.send(record, (metadata, e) -> {
    if (e != null) {
logger.error("kafka produce failed. topic={}, key={}", topic, key,e);
    } else {
logger.info("kafka produce success. topic={}, key={}, offset={}, partition={}", topic, key, metadata.offset(), metadata.partition());
    }
});

因此,一定要使用带有回调通知的 send 方法

我们知道,broker一般不会有一个,我们就是要通过多Broker达到高可用的效果,所以对于生产者程序来说,也不能简单的认为发送到一台就算成功,如果只满足于一台,那台机器如果损坏了,那消息必然会丢失。设置 props.put("acks","all"),表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”,这样可以达到高可用的效果。

那么这个ack是啥?为保证 Producer 发送的数据,能可靠地发送到指定的 Topic,Topic 的每个 Partition 收到 Producer 发送的数据后,都需要向 Producer 发送 ACK(ACKnowledge 确认收到)。如果 Producer 收到 ACK,就会进行下一轮的发送,否则重新发送数据。

acks有三个配置参数,0、1、-1(all)。acks参数指定了在集群中有多少个分区副本收到消息,kafka producer才会认为消息是被写入成功。这个参数对消息丢失的可能性有很大的影响。接下来对其选项做相关的说明:

  • acks=0,不会等待任何的来自服务器的响应。也就是说如果当中出现了错误,导致broker没有收到消息,那么生产者是无从得知的,消息也就丢失了。最不安全但最高吞吐量。
  • acks=1,只要集群的Leader节点收到消息,生产者就会收到来自服务器成功的响应。这样可能有问题,如果一个没有收到消息的节点后续成为新Leader,消息就会丢失的。
  • acks=all / -1,只有在集群所有的跟随副本都接收到消息后,生产者才会受到一个来自服务器的成功响应。最安全但延迟性也最高。

因此,如果单从要保证消息不丢失的角度,acks是需要配置成all的。

Kafka生产者如何保证消息不重复

刚刚和大家聊kafka是如何保证消息不丢失的,那么不丢消息的同时,如何实现精确一次处理的语义实现?

先科普一下消息组件对消息的可靠性保障,常见的模式有 3 种:

  • 最多一次(at most once):消息可能会丢失,但不会重复
  • 至少一次(at least once):消息不会丢失,但有可能重复
  • 精确一次(exactly once):消息不会丢失,且不会重复,精准一次发送

kafka默认情况下,提供的是至少一次的可靠性保障(acks=all)。即broker保障已提交的消息的发送,但是遇上某些意外情况,如:网络抖动,超时等问题,导致Producer没有收到broker返回的数据ack,则Producer会继续重试发送消息,从而导致消息重复发送。

相应的,如果我们禁止Producer的失败重试发送功能,或者说不用等待服务器响应(acks=0),消息要么写入成功,要么写入失败,但绝不会重复发送。这样就是最多一次的消息保障模式。

但对于消息组件,排除特殊业务场景,我们追求的一定是精确一次的消息保障模式。kafka通过幂等性(Idempotence)和事务(Transaction)的机制,提供了这种精确的消息保障。

幂等生产者

“幂等”这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的。幂等性有很多好处,其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态。

幂等生产者,说白了,就是你消息发送多次,对于系统也没影响。

Producer 默认不是幂等性的,但我们可以创建幂等性 Producer。指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture) ,被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变。

Kafka 自动帮你做消息的重复去重。底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。当然,实际的实现原理并没有这么简单,但你大致可以这么理解。

但你需要注意以下问题:

1、它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,无法实现多个分区的幂等性。

2、它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了。

那么如果我想实现多分区以及多会话上的消息无重复,应该怎么做呢?答案就是事务(transaction)或者依赖事务型 Producer。这也是幂等性 Producer 和事务型 Producer 的最大区别!

事务生产者

kafka的事务跟我们常见数据库事务概念差不多,也是提供经典的ACID,即原子性(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)。

事务Producer保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer重启回来后,kafka依然保证它们发送消息的精确一次处理。事务特性的配置也很简单:

和幂等Producer一样,开启enable.idempotence = true
设置Producer端参数transctional.id

事务Producer的代码稍微也有点不一样,需要调一些事务处理的API。数据的发送需要放在beginTransaction和commitTransaction之间。示例代码:

producer.initTransactions();
try {
     producer.beginTransaction();
     producer.send(record1);
     producer.send(record2);
     producer.commitTransaction();
} catch (KafkaException e) {
     producer.abortTransaction();
}

这段代码能够保证 Record1 和 Record2 被当作一个事务统一提交到 Kafka,要么它们全部提交成功,要么全部写入失败。实际上即使写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。

因此Consumer端的代码也需要加上isolation.level参数,用以处理事务提交的数据。值为read_committed:表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

事务Producer虽然在多分区的数据处理上保证了幂等,但是处理性能上相应的是会有一些下降的。毕竟天下没有免费的午餐。

redis 实现幂等

在 0.8 系列版本的kafka还没有这些自带的幂等事务特性,只能依靠开发者自己来实现。

常见的方式就是通过数据的业务属性来生成个uniqueId来维护到redis中,利用redis的高并发,高吞吐,分布式锁特性,让写入kafka多分区的数据前,先去redis中校验一下uniqueId等方式,来实现幂等。

得益于redis的高性能,在保证幂等同时,还能不让消息数据吞吐性能下降太多。当然,因为redis的依赖引入,也增加了架构的复杂度,从运维上来说也增加了整体的故障点。如果是用的新版本,其实直接用kafka官方的方法就可以了。

同步生产者和异步生产者

再瞄一眼kafka官方的send方法:

    Future<RecordMetadata> send(ProducerRecord<K, V> record);
    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

咦,你说过生产者有同步生产者、异步生产者,可是为啥两个方法都是send,看不出来同步还是异步呀。

哥们,同步还是异步,其实是你自己来实现的,别指望官方给你提供一个syncSend,一个asyncSend,实际也没有必要。

异步的发送代码参考简单Demo里(异步无回调),以及保证消息不丢失中说的异步有回调。

那同步生产者怎么实现呢?我们看看send方法的返回是什么?Future!这东西在JAVA并发编程里也太有名了吧?我觉得熟悉JAVA并发编程的人应该马上可以得出结论了。

Future的核心思想是:一个方法M(有返回值),在计算的过程中可能非常耗时,其他线程一直阻塞等待M的返回,这显然不可取。可以在调用M的时候,立马返回一个Future,后续再通过Future这个数据结构去控制方法M的结果。这也是为什么叫做Future的意思吧,代表未来。

Future中的get方法:该方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么 get 会立即返回或者抛出一个 Exception(如果任务执行过程中发生异常),如果任务没有完成,那么 get 将阻塞并直到任务完成,这是必须的。如果任务抛出了异常,那么 get 将该异常封装为 ExecutionException 并重新抛出。如果任务被中断,那么 get 将抛出 InterruptedException。

说白了,只要你调用Futrue对象的get()方法,假如没有结果就必须阻塞。直到客户端(生产者)收到服务器的响应。这不就是同步吗?当然了,你也不能死等吧,不要一颗老鼠屎坏了一锅粥,所以要设置一个时间参数,最大的等待时间。

现在明白了吧,代码写起来:

    public String syncSend(KafkaMessage kafkaMessage, String topic, long timeoutMilliseconds) throws InterruptedException, ExecutionException, TimeoutException {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, kafkaMessage.getMsgId(), kafkaMessage.toString());
        Future<RecordMetadata> recordMetadataFuture = producer.send(record);
        RecordMetadata metadata = recordMetadataFuture.get(timeoutMilliseconds, TimeUnit.MILLISECONDS);
        logger.info("kafka produce success. topic={}, msgId={}, offset={}, partition={}", topic, kafkaMessage.getMsgId(), metadata.offset(), metadata.partition());
        return kafkaMessage.getMsgId();
    }

写到这本文要匆匆的结束了,因为不知不觉这篇文章超过一万字了。。佛系少女不喜欢一篇文章超过一万字。

深入剖析kafka生产者

还没有解决这个问题:生产者发送的整个流程是怎么样的?有哪些参数配置是很重要的呢?欲知后事如何,请听下回分解。下一篇文章,我们将从源码角度分析kafka生产者的发送流程,以及剖析一些重要的参数配置。

BDStar原创文章。发布者:Liuyanling,转载请注明出处:http://bigdata-star.com/archives/2423

发表评论

登录后才能评论

联系我们

562373081

在线咨询:点击这里给我发消息

邮件:562373081@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code