Kafka源码解析之索引

Kafka源码解析之索引 索引结构

Kafka有两种类型的索引:

TimeIndex: 根据时间戳索引,可以通过时间查找偏移量所在位置,目录下以.timeindex结尾Index: 根据偏移量索引,.index结尾

构建索引时机

由log.index.interval.bytes 参数控制,默认4KB构建一条索引

为什么默认值是4kb呢?这里认为与基于磁盘的读写单位是 block(一般大小为 4KB)还有内存管理与分配的最小单位是4kb有关

def append(largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { ..... // 判断是否写入索引文件 if (bytesSinceLastIndexEntry > indexIntervalBytes) { offsetIndex.append(largestOffset, physicalPosition) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar) bytesSinceLastIndexEntry = 0 } bytesSinceLastIndexEntry += records.sizeInBytes }}

在源码的LogSegment.append方法中,会对当前segement写入大小与上次构建索引时大小差值进行判断,如果超过log.index.interval.bytes,会构建timeIndex以及offsetIndex索引

AbstractIndex

AbstractIndexl类时TimeIndex以及OffsetIndex文件的父类,其中有一个很重要的成员变量 mmap:

protected var mmap: MappedByteBuffer = { val newlyCreated = file.createNewFile() val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r") try { //提前进行文件的创建 if(newlyCreated) { if(maxIndexSize CoreUtils.swallow(raf.close(), AbstractIndex) }}

这里用到了Memory Mapped Files即内存映射

mmap是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。

内存映射mmap参考文章:https://zhuanlan.zhihu.com/p/507907660

mmap同样是一种零拷贝的技术,常规的文件操作需要从磁盘到页缓存再到用户主存的两次数据拷贝。而mmap操控文件,只需要从磁盘到用户主存的一次数据拷贝过程,其实也是一种通过磁盘空间代替内存的操作,提供进程间共享内存及相互通信的方式。

二分查找与页缓存

Kafka根据索引文件查找offset

private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { // 判断index文件是否为空 if(_entries == 0) return (-1, -1) def binarySearch(begin: Int, end: Int) : (Int, Int) = { // 二分查找开始 var lo = begin var hi = end while(lo return binarySearch(firstHotEntry, _entries - 1) } // check if the target offset is smaller than the least offset if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) return (-1, 0) binarySearch(0, firstHotEntry)}

Kafka对二分搜索的优化

对正常的二分查找来说,假设索引的大小有13个页,我们需要查找的偏移量在页12上,那么我们会依次访问0->12->6->9->11->12这六个页

当生产者继续往分区中生产消息,超过4kb后,又写了一个新的索引项,这个时候索引访问的顺序是:0->13->7->10->11>12->13

通过对上面mmap的研究可以知道,磁盘到用户主存的映射实际上依赖于页表,只是用户进程可以通过指针操作直接读写page cache,不再需要系统调用和内存拷贝。常用的页表置换算法基本是基于LRU的,当读取页7/10的时候,这两个页可能已经很长时间没有被访问到了,已经从LRU中移除了,这个时候再访问这两个页的时候就可能导致操作系统陷入缺页中断。

Here, we use a more cache-friendly lookup algorithm:if (target > indexEntry[end - N]) // if the target is in the last N entries of the index binarySearch(end - N, end)else binarySearch(begin, end - N)

这里Kafka做了一个优化,保证index文件的最后N个项分为热区,而剩余项则是冷区。这是因为在热区中的索引项可能因为更为频繁的访问,更有可能存在于页表中,可以增加搜索的效率。这里N的值是8192,官方给的解释是:

We set N (_warmEntries) to 8192, because1. This number is small enough to guarantee all the pages of the "warm" section is touched in every warm-section lookup. So that, the entire warm section is really "warm". When doing warm-section lookup, following 3 entries are always touched: indexEntry(end), indexEntry(end-N), and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section pages (3 or fewer) are touched, when we touch those 3 entries. As of 2018, 4096 is the smallest page size for all the processors (x86-32, x86-64, MIPS, SPARC, Power, ARM etc.).2. This number is large enough to guarantee most of the in-sync lookups are in the warm-section. With default Kafka settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB (time index) log messages.

warmEntries的个数:

protected def _warmEntries: Int = 8192 / entrySize class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true) extends AbstractIndex(_file, baseOffset, maxIndexSize, writable) { import OffsetIndex._ override def entrySize = 8

这样的设计保证了几点:

1、8192的大小保证了三个索引项是在页表中的indexEntry(end), indexEntry(end-N), indexEntry((end*2 -N)/2)

//待续


比丘资源网 » Kafka源码解析之索引

发表回复

提供最优质的资源集合

立即查看 了解详情