1. 首页
  2. 大数据
  3. Kafka教程

【Kafka教程】(三)Kafka存储机制(topic,partition,segment)详解

上一节中我们知道了topic\partition\segment相关概念,我们知道了topic由partition构成,而partition由segment构成,现在来详细讲解topic\partition\segment存储机制。

topic中partition存储分布

  • 为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition。
  • 消息进入哪个partition呢?在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition,如果没有指定key,则随机分配到某个partition。
  • 因为每条消息都被append到该partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
  • 每个partition对应于一个文件夹(目录在哪是根据自己的设置),该文件夹下存储该partition的数据和索引文件。partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始。
    例如:weblogs-0 weblogs-1 weblogs-2
[root@master ~]# cd /usr/local/kafka_2.11-1.0.0/logs/
[root@master logs]# ls
weblogs-0  weblogs-1  weblogs-2
  • 对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据

partiton文件存储方式

partiton是由Segment file组成。partion会被平均分配到多个大小相等segment(段)数据文件中。记录只会被append到segment中,不会修改。但每个segment file消息数量不一定相等,这种特性方便old segment file快速被删除。

partiton中segment文件存储结构

segment file由2大部分组成,分别为index file(后缀”.index”)和data file后缀(“.log”),此2个文件一一对应,成对出现,分别表示为segment索引文件、数据文件.

[root@master weblogs-0]# ls
00000000000000000000.index  00000000000000000000.log  00000000000000091932.index  00000000000000091932.log 

索引文件

什么是索引文件?这得知道offset是什么。offset代表一个偏移量,你可以理解为,消费到哪里了。就比如你写暑假作业,一天写一点,你得知道你上次写到哪里,你才能接着继续写。消费者也一样,需要记录已经读取到消息的位置,通过存储最后消费的 Offset,消费者应用在重启或者停止之后,还可以继续从之前的位置读取。
索引文件的命名规则就是根据offset。partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。
索引文件即是元信息的存储。

数据文件

数据文件就是用来存储消息的。segment data file由许多message组成,下面详细说明message物理结构如下:
【Kafka教程】(三)Kafka存储机制(topic,partition,segment)详解
了解几个重要参数:
8 byte offset:在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message:size message大小
4 byte key length:表示key的长度,当key为-1时,K byte key字段不填

索引文件与数据文件的关系

既然它们是一一对应成对出现,必然有关系。索引文件中元数据指向对应数据文件中message的物理偏移地址。
【Kafka教程】(三)Kafka存储机制(topic,partition,segment)详解
比如索引文件中3,497代表:数据文件中的第三个message,它的偏移地址为497。再来看数据文件中,Message 368772表示:在全局partiton中是第368772个message。
注:segment index file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

在partition中如何通过offset查找message

既然知道了索引文件与数据文件的关系,就可以解决一个经典问题了?如何查找我的message在哪里呢?

  • 首先查找segment file。因为segment file命名规则跟offset有关,根据segment file可以知道它的起始偏移量,因为Segment file的命名规则是上一个segment文件最后一条消息的offset值。所以只要根据offset 二分查找文件列表,就可以快速定位到具体文件。
    比如第一个segment file是00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0。第二个是00000000000000091932.index:代表消息量起始偏移量为91933 = 91932 + 1。那么offset=5000时应该定位到00000000000000000000.index
  • 第二步通过segment file查找message
    通过第一步定位到segment file,当offset=5000时,依次定位到00000000000000000000.index的元数据物理位置和00000000000000000000.log的物理偏移地址,然后再通过00000000000000000000.log顺序查找直到offset=5000为止。

过期日志删除策略

old segment file快速被删除(删除策略有两种:一种基于时间,默认168小时后就被删除,一种可以自定义设置partition size,比如超过10GB就删除。记录只会被append到segment中,不会修改。清除过期日志时直接删除一个或者等多个Segment)。

Kafka分区策略

补充下Kafka的分区策略,在发送一条消息时,这条消息会发送到哪个parition呢?
关于默认分区策略,直接看源码就清楚了:

 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

大概意思是:

  • 如果指定了这条消息的key,分区策略就是直接取模了:key的哈希值与partition数量取模
  • 如果没有指定这条消息的key,那么根据是否有可用分区(availablePartitions),有可用分区那么nextValue与availablePartitions的数量取模。如果没有可用分区,那么nextValue与Partitions的数量取模。
    nextValue逻辑如下:
private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

BDStar原创文章。发布者:Liuyanling,转载请注明出处:http://bigdata-star.com/archives/1479

发表评论

登录后才能评论

联系我们

562373081

在线咨询:点击这里给我发消息

邮件:562373081@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code