VictorialMetrics存储原理之索引存储格式
前文我们介绍了当插入数据的时候会先去添加索引数据,索引构建完成后又是如何去持久化数据的呢?保存的数据又是怎样的格式呢?本节我们将对此进行详细讲解。
添加索引数据
索引构建完成后会调用 AddItems 函数将索引添加到 Table 中去:
{
if err := tb.rawItems.addItems(tb, items); err != nil{
return fmt.Errorf("cannot insert data into %q: %w", tb.path, err)
}
return nil}1.2.3.4.5.6.7.8.Table 的结构如下所示:
{
activeMerges uint64 mergesCount uint64 itemsMerged uint64 assistedMerges uint64 // merge 索引 mergeIdx uint64 // 路径 path string // flush回调 flushCallback func()
flushCallbackWorkerWG sync.WaitGroup needFlushCallbackCall uint32 // 在将指定项的整个块刷新到持久存储之前,在合并期间调用的回调 prepareBlock PrepareBlockCallback // parts 列表 partsLock sync.Mutex parts []*partWrapper // rawItems 包含最近添加的尚未转换为 parts 的数据 // 出于性能原因,未在搜索中使用 rawItems rawItems rawItemsShards snapshotLock sync.RWMutex flockF *os.File stopCh chan struct{}
partMergersWG syncwg.WaitGroup rawItemsFlusherWG sync.WaitGroup convertersWG sync.WaitGroup rawItemsPendingFlushesWG syncwg.WaitGroup}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.一个索引 Table 就对应着一个 indexDB,也就是数据目录 indexdb 下面的文件夹:
其中核心的是 parts 和 rawItems 两个属性。
parts 主要是存储 merge 后的 blocks,一个part 与文件系统上的一个目录对应,比如上图中的24_1_16F4A862471C1DC9 目录就是一个part。
rawItems 是用于预处理Items 的,是一个rawItemsShards 对象。
rawItemsShards 结构体定义如下所示:
{
shardIdx uint32 // 在多 cpu 系统上添加 rows 数据时,shards 分片可以减少锁竞争 shards []rawItemsShard}
// 每个 table 的 rawItems 分片数 var rawItemsShardsPerTable = cgroup.AvailableCPUs()
// 每个分片最大的Block数const maxBlocksPerShard = 512// 当在打开Table的时候就会调用该函数进行初始化func (riss *rawItemsShards) init() {
riss.shards = make([]rawItemsShard, rawItemsShardsPerTable)
}
// 添加 items 元素func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) error{
n := atomic.AddUint32(&riss.shardIdx, 1)
shards := riss.shards idx := n % uint32(len(shards))
shard := &shards[idx]
return shard.addItems(tb, items)
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.rawItemsShards 其实就是加了一个分片功能用于保存索引数据,addItems 函数就是将要添加的数据添加到对应的分片上去,最终执行的逻辑是 shard.addItems。
{
mu sync.Mutex ibs []*inmemoryBlock lastFlushTime uint64}
// 添加items元素func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error{
var err error var blocksToFlush []*inmemoryBlock ris.mu.Lock()
ibs := ris.ibs if len(ibs) == 0{
ib := getInmemoryBlock()
ibs = append(ibs, ib)
ris.ibs = ibs}
// 取最后一个内存块 ib := ibs[len(ibs)-1]
for _, item := range items{
// 添加索引item到内存块 if !ib.Add(item) { // 超过了内存块大小 // 重新获取一个内存块,此时肯定为空 ib = getInmemoryBlock()
// 重新添加 if !ib.Add(item) {
putInmemoryBlock(ib)
err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d", item, len(item))
break}
ibs = append(ibs, ib)
ris.ibs = ibs}
}
// 超过了每个分片的最大内存块的数量 if len(ibs) >= maxBlocksPerShard{
// 将内存块放到待刷新的内存块列表中去 blocksToFlush = append(blocksToFlush, ibs...)
// 释放前面的内存块资源 for i := range ibs{
ibs[i] = nil}
ris.ibs = ibs[:0]
ris.lastFlushTime = fasttime.UnixTimestamp()
}
ris.mu.Unlock()
// 执行merge合并操作 tb.mergeRawItemsBlocks(blocksToFlush, false)
return err}
// lib/mergeset/encoding.go// 内存中的一个Block块结构type inmemoryBlock struct{
commonPrefix []byte data []byte // 用来存储数据 items []Item // 用来存储每个item数据的起始偏移量}
// Item 表示用于存储在 mergeset 中的单个 item 数据type Item struct{
// 数据的开始偏移量 Start uint32 // 数据的结束偏移量 End uint32}
// maxInmemoryBlockSize 是 memoryblock.data 的最大值。// // 它必须适合 CPU 缓存大小,即当前 CPU 的缓存大小为64kb。const maxInmemoryBlockSize = 64 * 1024// Add 将 x 添加到内存卡 ib 的末尾// // 如果由于块大小限制,x 未添加到 ib,则返回 falsefunc (ib *inmemoryBlock) Add(x []byte) bool{
data := ib.data // 操过块大小限制了 if len(x)+len(data) > maxInmemoryBlockSize{
return false}
if cap(data) == 0{
// 预分配 data 和 items 以减少内存分配 data = make([]byte, 0, maxInmemoryBlockSize)
ib.items = make([]Item, 0, 512)
}
dataLen := len(data)
data = append(data, x...) // 将 x 添加到 data ib.items = append(ib.items, Item{ // 更新 items Start: uint32(dataLen),
End: uint32(len(data)),
})
ib.data = data return true}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.rawItemsShard 表示保存索引数据的一个分片,里面其实就是一个 inmemoryBlock 的内存块切片,每个分片最多有 512 个内存块,每个内存块占用 64KB 的容量,当每个分片中的内存块数量超过最大数量(512)会去将内存块数据刷新为 Part。
如果分片中的内存块数量没超过上限,则会通过一个任务去定时(1s)将 rawItem 数据刷新(转换)为 Part,以便它们对搜索可见。
() {
tb.rawItemsFlusherWG.Add(1)
go func() {
tb.rawItemsFlusher()
tb.rawItemsFlusherWG.Done()
}()
}
func (tb *Table) rawItemsFlusher() {
ticker := time.NewTicker(rawItemsFlushInterval)
defer ticker.Stop()
for{
select{
case <-tb.stopCh:
return case <-ticker.C:
tb.flushRawItems(false)
}
}
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.合并内存数据
将内存块数据转换为 Part 都是通过 mergeRawItemsBlocks 函数去实现的。
) {
if len(ibs) == 0{
return}
tb.partMergersWG.Add(1)
defer tb.partMergersWG.Done()
pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge)
var pwsLock sync.Mutex var wg sync.WaitGroup for len(ibs) > 0{
// 一次最大合并的内存块数量 n := defaultPartsToMerge if n > len(ibs) {
n = len(ibs)
}
wg.Add(1)
go func(ibsPart []*inmemoryBlock) {
defer wg.Done()
// merge inmemoryBlock pw := tb.mergeInmemoryBlocks(ibsPart)
if pw == nil{
return}
pw.isInMerge = true pwsLock.Lock()
pws = append(pws, pw)
pwsLock.Unlock()
}(ibs[:n])
ibs = ibs[n:]
}
wg.Wait()
if len(pws) > 0{
if err := tb.mergeParts(pws, nil, true); err != nil{
logger.Panicf("FATAL: cannot merge raw parts: %s", err)
}
if tb.flushCallback != nil{
if isFinal{
tb.flushCallback()
} else{
atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1)
}
}
}
for{
tb.partsLock.Lock()
ok := len(tb.parts) <= maxParts tb.partsLock.Unlock()
if ok{
return}
// The added part exceeds maxParts count. Assist with merging other parts. // // Prioritize assisted merges over searches. storagepacelimiter.Search.Inc()
err := tb.mergeExistingParts(false)
storagepacelimiter.Search.Dec()
if err == nil{
atomic.AddUint64(&tb.assistedMerges, 1)
continue}
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {
return}
logger.Panicf("FATAL: cannot merge small parts: %s", err)
}
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.mergeRawItemsBlocks 函数将指定的内存块进行 merge 合并操作,一次合并最大的内存块数量为 15,然后在独立的 goroutine 中去进行合并操作,使用 mergeInmemoryBlocks 函数。
{
// 将 InmemoryBlock 列表转换成 inmemoryPart 列表 // inmemoryPart 表示内存中的Part mps := make([]*inmemoryPart, 0, len(ibs))
for _, ib := range ibs{
if len(ib.items) == 0{
continue}
mp := getInmemoryPart()
mp.Init(ib) // 将inmemoryBlock转换为inmemoryPart putInmemoryBlock(ib)
mps = append(mps, mp)
}
if len(mps) == 0{
return nil}
if len(mps) == 1{
// 没有要合并的内容。只需返回单个 inmemory part。 mp := mps[0]
p := mp.NewPart()
return &partWrapper{
p: p,
mp: mp,
refCount: 1,
}
}
defer func() {
for _, mp := range mps{
putInmemoryPart(mp)
}
}()
atomic.AddUint64(&tb.mergesCount, 1)
atomic.AddUint64(&tb.activeMerges, 1)
defer atomic.AddUint64(&tb.activeMerges, ^uint64(0))
// 为每个 `inmemoryPart` 构造 `blockStreamReader`, 用于迭代读取 items bsrs := make([]*blockStreamReader, 0, len(mps))
for _, mp := range mps{
bsr := getBlockStreamReader()
bsr.InitFromInmemoryPart(mp)
bsrs = append(bsrs, bsr)
}
// 准备一个 blockStreamWriter 用于合并写入的 part bsw := getBlockStreamWriter()
// 不要通过 getInmemoryPart() 获取 mpDst,因为与池中的其他条目相比,它的大小可能太大。 // 这可能会导致内存使用量增加,因为存在大量的碎片。 // 创建一个新的 inmemoryPart,接收合并的数据 mpDst := &inmemoryPart{}
bsw.InitFromInmemoryPart(mpDst)
// 开始 merge 数据 // 该 merge 不应该被 stopCh 中断,因为它可能是 stopCh 关闭后的最终结果 err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.itemsMerged)
if err != nil{
logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err)
}
putBlockStreamWriter(bsw)
for _, bsr := range bsrs{
putBlockStreamReader(bsr)
}
p := mpDst.NewPart()
return &partWrapper{
p: p,
mp: mpDst,
refCount: 1,
}
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.上面的函数会将指定的内存块转换成 partWrapper,该结构就是一个包含 part 和 inmemoryPart 的包装器。
{
p *part mp *inmemoryPart refCount uint64 isInMerge bool}1.2.3.4.5.6.7.part 的结构如下所示:
{
ph partHeader path string size uint64 mrs []metaindexRow indexFile fs.MustReadAtCloser itemsFile fs.MustReadAtCloser lensFile fs.MustReadAtCloser}1.2.3.4.5.6.7.8.9.10.一个 part 就是 Table 下面的一个数据目录。
part 中包含一个 partHeader,该属性中包含当前 part 的一些 Meta 信息,一共有多少个 items、有多少 blocks、第一个和最后一个 item,对应着 part 目录下面的 metadata.json 文件。
{
// part 包含的 items 数 itemsCount uint64 // part 包含的 blocks 数 blocksCount uint64 // part 中的第一个 item firstItem []byte // part 中的最后一个 item lastItem []byte}1.2.3.4.5.6.7.8.9.10.11.part 中另外的属性 path 表示当前 part 的路径,size 表示大小,另外三个属性 indexFile、itemsFile、lensFile 对应中 part 目录下面的三个文件:index.bin、items.bin、lens.bin。此外 part 结构中还有最后一个 mrs 属性,是一个 []metaindexRow。
{
// 第一个 block 中的第一个 item 元素 // 它用于快速查找所需的索引块 firstItem []byte // 块包含的 blockHeaders 的数量 blockHeadersCount uint32 // 索引文件中块的偏移量 indexBlockOffset uint64 // 索引文件中块的大小 indexBlockSize uint32}1.2.3.4.5.6.7.8.9.10.11.12.13.除了 part 之外还有一个内存中的 inmemoryPart 结构,其基本结构和 part 类似,不同的是几个相关的属性不是文件对象,而是 ByteBuffer,因为是内存中的结构。
{
// partHeader 记录 itemsCount, blocksCount, firstItem, lastItem 信息, 最后会序列化到 metadata.json ph partHeader // 当前 block 的 header 信息,有 commonPrefix, firstItem, marshalType, itemsCount, itemsBlockOffset, lenBlockOffset, itemsBlockSize, lenBlockSize bh blockHeader // 当前 block 的 metaindex 信息,存储了当前 blockHeader 的 firstItem, blockHeaderCount, indexBlockOffset, indexBlockSize mr metaindexRow // 用于序列化后写入内存/磁盘文件使用 metaindexData bytesutil.ByteBuffer // -> metaindex.bin indexData bytesutil.ByteBuffer // -> index.bin itemsData bytesutil.ByteBuffer // -> items.bin lensData bytesutil.ByteBuffer // -> lens.bin}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.其他几个属性上面介绍过,blockHeader 结构如下所示,用于记录 block 头信息:
{
// 块中所有 items 的公用前缀 commonPrefix []byte // 第一个 item firstItem []byte // 用于块压缩的 Marshal 类型 marshalType marshalType // 块中的 items 数,不包括第一个 item itemsCount uint32 // items block 的偏移量 itemsBlockOffset uint64 // lens block 的偏移量 lensBlockOffset uint64 // items block 的大小 itemsBlockSize uint32 // lens block 的大小 lensBlockSize uint32}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.整个 part 的结构看上去确实比较复杂,为什么需要设计这些属性?核心肯定就是为了快速索引,我们先往下分析,待会再回过头来看。
inmemoryPart 是 part 读入内存中的结构, 在 inmemoryBlock merge 之前,每个 inmemoryBlock 都会先通过 mp.Init 转换成一个 inmemoryPart 的结构,inmemoryPart 中 metaindexData、indexData、itemsData、lensData 数据结构与磁盘对应的文件内容一致。
序列化数据
现在我们再回到上面的 mergeInmemoryBlocks 函数,流程如下所示:
1.将所有的inmemoryBlock 转换为inmemoryPart 结构。2.为每个inmemoryPart 构造blockStreamReader,用于迭代读取 items。3.创建一个新的inmemoryPart,并构造一个blockSteamWriter 用于合并写入的数据。4.然后调用mergeBlockStreams 函数执行真正的merge操作。首先通过 Init 函数将 inmemoryBlock 转换为 inmemoryPart 结构。
) {
mp.Reset()
sb := &storageBlock{}
sb.itemsData = mp.itemsData.B[:0]
sb.lensData = mp.lensData.B[:0]
// 使用尽可能小的压缩等级来压缩 inmemoryPart,因为它很快就会被合并到文件 part 去。 compressLevel := -5 // 序列化乱序的数据 mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel)
// 获取 partHeader 值 mp.ph.itemsCount = uint64(len(ib.items))
mp.ph.blocksCount = 1 mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)
mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)
// 获取itemsData,更新blockHeader的items偏移和数量 mp.itemsData.B = sb.itemsData mp.bh.itemsBlockOffset = 0 mp.bh.itemsBlockSize = uint32(len(mp.itemsData.B))
// 获取lensData,更新blockHeader的lens偏移和数量 mp.lensData.B = sb.lensData mp.bh.lensBlockOffset = 0 mp.bh.lensBlockSize = uint32(len(mp.lensData.B))
// 获取 indexData,blockHeader序列化的值 bb := inmemoryPartBytePool.Get()
bb.B = mp.bh.Marshal(bb.B[:0])
mp.indexData.B = encoding.CompressZSTDLevel(mp.indexData.B[:0], bb.B, 0)
// 获取 metaindexData,metaindexRow序列化的值 mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)
mp.mr.blockHeadersCount = 1 mp.mr.indexBlockOffset = 0 mp.mr.indexBlockSize = uint32(len(mp.indexData.B))
bb.B = mp.mr.Marshal(bb.B[:0])
mp.metaindexData.B = encoding.CompressZSTDLevel(mp.metaindexData.B[:0], bb.B, 0)
inmemoryPartBytePool.Put(bb)
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.上面的函数将 inmemoryBlock 转换成 inmemoryPart,首先会通过一个 MarshalUnsortedData 函数来序列化未排序的数据。
) {
if !ib.isSorted() {
sort.Sort(ib) // 排序}
// 更新内存块的公共前缀 ib.updateCommonPrefix()
// 序列化数据 return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel)
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.上面的序列化函数中首先会对未排序的数据进行排序,然后更新内存块的公共前缀:
() {
ib.commonPrefix = ib.commonPrefix[:0] // 公共前缀 if len(ib.items) == 0{
return}
items := ib.items // 数据前后位置 data := ib.data // 数据 cp := items[0].Bytes(data) // 第一段数据 if len(cp) == 0{
return}
for _, it := range items[1:] { // 后面的数据 // 计算公共前缀的长度 cpLen := commonPrefixLen(cp, it.Bytes(data))
if cpLen == 0{
return}
// 截取公共前缀数据 cp = cp[:cpLen]
}
// 设置内存块的公共前缀 ib.commonPrefix = append(ib.commonPrefix[:0], cp...)
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.公共前缀就是把每段数据包含的共同前缀提取出来,这样存储的时候后面就可以不需要存储共同的部分了,减少存储空间。
公共前缀提取出来后,接下来调用 marshalData 函数去序列化数据。
) {
......
// 拷贝 inmemoryBlock 数据块的 firstItem(排序后的第一条数据) data := ib.data // 内存块数据 firstItem := ib.items[0].Bytes(data) // 第一条数据 firstItemDst = append(firstItemDst, firstItem...)
// 最大公共前缀 commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...)
// 内存块数据小于2段或(数据大小-公共前缀长度*数据段大小 < 64) 则定义为小块 if len(data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 2{
// 对small block使用普通序列化,因为它更便宜 ib.marshalDataPlain(sb)
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain}
bbItems := bbPool.Get()
bItems := bbItems.B[:0] // 保存目的 items 数据的内存 buffer bbLens := bbPool.Get()
bLens := bbLens.B[:0] // 保存目的 lens 数据的内存buffer // 序列化 items 数据 // 第一项数据不需要存储,所以获取的 Uint64s 大小要减1 xs := encoding.GetUint64s(len(ib.items) - 1)
defer encoding.PutUint64s(xs)
cpLen := len(ib.commonPrefix) // 公共前缀的长度 prevItem := firstItem[cpLen:] // 第一项数据(排除公共前缀) prevPrefixLen := uint64(0)
// 从第二个元素开始遍历(第一个 firstItem 单独存储) for i, it := range ib.items[1:] {
// 偏移到公共前缀之后的位置 it.Start += uint32(cpLen)
// Bytes(data) 得到的数据不包含公共前缀的部分 item := it.Bytes(data)
// 计算第 N 项和 N-1 项的公共前缀长度 prefixLen := uint64(commonPrefixLen(prevItem, item))
// 仅仅只把差异的部分拷贝到目的buffer bItems = append(bItems, item[prefixLen:]...)
// 第一次,与0异或,还是等于原值。异或后,两个整数值前面相同的部分都为0了,数值变得更短,能够便于压缩。 xLen := prefixLen ^ prevPrefixLen // 上次的除去公共前缀的item prevItem = item // 上次计算得到的公共前缀长度 prevPrefixLen = prefixLen xs.A[i] = xLen // 异或后的公共前缀值}
// 对N-1个长度进行序列化(将uint64数组序列化成byte数组) bLens = encoding.MarshalVarUint64s(bLens, xs.A)
// 将items数据(只有差异的部分)ZSTD压缩后,写入storageBlock sb.itemsData = encoding.CompressZSTDLevel(sb.itemsData[:0], bItems, compressLevel)
bbItems.B = bItems bbPool.Put(bbItems)
// 序列化 lens 数据 // 第一项数据大小(排除公共前缀) prevItemLen := uint64(len(firstItem) - cpLen)
for i, it := range ib.items[1:] { // 从第二个元素开始遍历 // item长度 = End-Start-公共前缀大小 itemLen := uint64(int(it.End-it.Start) - cpLen)
// 与前面一个元素长度异或 xLen := itemLen ^ prevItemLen // 上次去除公共前缀的长度 prevItemLen = itemLen xs.A[i] = xLen // 异或后的元素长度}
// 前面记录的是两两相对的长度,这里记录的是数据的真实长度 // 长度信息包含两种,相对长度和总长度 bLens = encoding.MarshalVarUint64s(bLens, xs.A)
// 将lens数据进行ZSTD压缩后,写入storageBlock sb.lensData = encoding.CompressZSTDLevel(sb.lensData[:0], bLens, compressLevel)
bbLens.B = bLens bbPool.Put(bbLens)
// 如果压缩不到90%则选择不压缩 if float64(len(sb.itemsData)) > 0.9*float64(len(data)-len(ib.commonPrefix)*len(ib.items)) {
// 压缩率不高的时候,选择不压缩 ib.marshalDataPlain(sb)
return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain}
// 很好的压缩率 return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.上面的序列化函数看上去比较复杂,实际上核心的一点就是想办法尽可能减少存储空间。首先将数据块的第一个数据拷贝出来放入 firstItemDst,然后后面就从第二个元素开始去循环处理,首先计算第 N 项和 N-1 项的公共前缀长度,然后将差异的数据部分保存起来,为了能够反序列化回数据,还需要将两两之间公共前缀的长度保存下来,为了能够便于压缩,使用异或的方式来计算两两之间的公共前缀长度值。
循环计算后,将保存的两两之间的公共前缀长度进行序列化,下面的函数将一个 uint64 类型的切片转换成字节切片,如果数据小于 128 直接转换即可,如果大于 127 则用一个 7bit 来表示数值的内容,最高位后面的一个字节用来表示长度,这样就可以用变长长度来序列化数值,而不是每个数值都占用固定的长度。
{
for _, u := range us{
if u < 0x80 { // 小于128,直接加入到 dst,能直接存到 byte 中去 // Fast path dst = append(dst, byte(u))
continue}
for u > 0x7f { // 大于127,则超过的部分保留为 0x80,低位右移7位继续计算 dst = append(dst, 0x80|byte(u))
u >>= 7}
dst = append(dst, byte(u))
}
return dst}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.长度数据序列化后,将 items 数据(只有差异的部分)进行 ZSTD 压缩后,写入 storageBlock。
只记录两两之间的公共前缀长度还不够,还需要记录数据的真实长度,最后同样再将 lens 数据进行 ZSTD 压缩后,写入 storageBlock。
如果最后的结果压缩不到 90% 则选择不压缩,不压缩则使用 marshalDataPlain 函数进行序列化:
) {
data := ib.data // 序列化 items 数据 // 不需要序列化第一项数据,因为它会在 marshalData 中返回给调用者。 cpLen := len(ib.commonPrefix) // 公共前缀长度 b := sb.itemsData[:0]
for _, it := range ib.items[1:] { // 第一项之后的数据 it.Start += uint32(cpLen) // 跳过公共前缀 b = append(b, it.String(data)...) // 添加移出公共前缀的数据}
sb.itemsData = b // itemsData数据 // 序列化 lens 数据 b = sb.lensData[:0]
for _, it := range ib.items[1:] { // 第一项之后的数据 // 原始的End-Start-公共前缀长度 b = encoding.MarshalUint64(b, uint64(int(it.End-it.Start)-cpLen))
}
sb.lensData = b}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.经过上面的序列化过后就可以得到第一个数据、公共前缀、items 个数以及序列化类型,然后将这些数据存入 blockHeader 中去,后面就是一些比较简单的常规操作。
转换成 inmemoryPart 后,再包装成 blockStreamReader,创建一个新的 inmemoryPart,并构造一个 blockSteamWriter 用于合并写入的数据,然后调用 mergeBlockStreams 函数执行真正的 merge 操作。
{},
itemsMerged *uint64) error{
// 将多个 blockStreamReader 构造成一个 blockStreamMerger 结构 bsm := bsmPool.Get().(*blockStreamMerger)
if err := bsm.Init(bsrs, prepareBlock); err != nil{
return fmt.Errorf("cannot initialize blockStreamMerger: %w", err)
}
err := bsm.Merge(bsw, ph, stopCh, itemsMerged)
bsm.reset()
bsmPool.Put(bsm)
bsw.MustClose()
if err == nil{
return nil}
return fmt.Errorf("cannot merge %d block streams: %s: %w", len(bsrs), bsrs, err)
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.首先把多个 blockStreamReader 构造成一个 blockStreamMerger 结构, merger 里面主要是一个 bsrHeap 堆用于维护 bsrs,用于 merge 数据时的排序。首先通过 merger 的 Init 函数构造堆排序的结构,然后核心是调用 merger 的 Merge 函数进行处理。
{
again:
if len(bsm.bsrHeap) == 0{
// 将最后的 inmemoryBlock(可能不完整)写入 bsw bsm.flushIB(bsw, ph, itemsMerged)
return nil}
select{
case <-stopCh:
return errForciblyStopped default:
}
// 取出 blockStreamReader bsr := heap.Pop(&bsm.bsrHeap).(*blockStreamReader)
var nextItem []byte // 下一个 blockStreamReader hasNextItem := false if len(bsm.bsrHeap) > 0{
nextItem = bsm.bsrHeap[0].bh.firstItem hasNextItem = true}
items := bsr.Block.items data := bsr.Block.data // 循环所有的 items for bsr.blockItemIdx < len(bsr.Block.items) {
item := items[bsr.blockItemIdx].Bytes(data)
if hasNextItem && string(item) > string(nextItem) {
break}
// 添加元素 if !bsm.ib.Add(item) {
// bsm.ib 已满,将其刷新到 bsw 并继续 bsm.flushIB(bsw, ph, itemsMerged)
continue}
bsr.blockItemIdx++}
if bsr.blockItemIdx == len(bsr.Block.items) {
// bsr.Block 已完全读取,处理下一个 block if bsr.Next() {
heap.Push(&bsm.bsrHeap, bsr)
goto again}
if err := bsr.Error(); err != nil{
return fmt.Errorf("cannot read storageBlock: %w", err)
}
goto again}
// bsr.Block 中的下一个 item 超过了 nextItem // 调整 bsr.bh.firstItem 并将 bsr 返回到堆 bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...)
heap.Push(&bsm.bsrHeap, bsr)
goto again}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.这里主要解决的问题是多个有序的字节数组(inmemoryPart),按照字节序排序,合成一个 inmemoryPart 的过程,在 merge 的过程中,每 64KB 会单独创建一个 blockHeader,用于快速索引该 block 里面的 Items。
持久化数据
最后重复上面的过程,将 n 个 inmemoryBlock 合并成 (n-1)/defaultPartsToMerge+1 个 inmemoryPart,最后再调用 mergeParts 函数完成索引持久化操作,持久化后生成的索引 part,主要包含 metaindex.bin、index.bin、lens.bin、items.bin、metadata.json 等 5 个文件。
这几个文件的关系如下图所示, metaindex.bin 文件通过 metaindexRow 索引 index.bin 文件,index.bin 文件通过 indexBlock 中的 blockHeader 同时索引 items.bin 文件和 items.bin 文件。
metaindex.bin:文件包含一系列的 metaindexRow 数据,每个 metaindexRow 中包含第一条数据 firstItem、索引块包含的块头部数 blockHeadersCount、索引块偏移 indexBlockOffset 以及索引块大小 indexBlockSize。
metaindexRow 在文件中按照firstItem 的大小的字典序排序存储,以支持二分查找。metaindex.bin 文件使用 ZSTD 进行压缩。metaindex.bin 文件中的内容在 part 打开时,会全部读出加载至内存中,以加速查询过滤。metaindexRow 包含的firstItem 为其索引的indexBlock 中所有blockHeader 中字典序最小的firstItem。查找时根据firstItem 进行二分检索。index.bin:文件中包含一系列的 indexBlock, 每个 indexBlock 又包含一系列 blockHeader,每个 blockHeader 包含 item 的公共前缀 commonPrefix、第一项数据 firstItem、itemsData 的序列化类型 marshalType、itemsData 包含的 item 数、item 块的偏移 itemsBlockOffset 等内容,就是前面使用将 inmemoryBlock 转换为 inmemoryPart 结构的 Init 函数得到的。
每个indexBlock 使用ZSTD 压缩算法进行压缩。在indexBlock 中查找时,根据firstItem 进行二分检索blockHeader。items.bin 文件中,包含一系列的 itemsData, 每个 itemsData 又包含一系列的 Item。
itemsData 会视情况而定来是否使用 ZTSD 压缩,当 item 个数小于 2 时,或者itemsData 的长度小于 64 字节时,不压缩;当itemsData 使用 ZSTD 压缩后的压缩率大于90%的时候也不压缩。每个 item 在存储时,去掉了blockHeader 中的公共前缀commonPrefix 以提高压缩率。lens.bin 文件中,包含一系列的 lensData, 每个 lensData 又包含一系列 8 字节的长度 len, 长度 len 标识 items.bin 文件中对应 item 的长度。在读取或者需要解析 itemsData 中的 item 时,先要读取对应的 lensData 中对应的长度 len。 当 itemsData 进行压缩时,lensData 会先使用异或算法进行压缩,然后再使用 ZSTD 算法进一步压缩。
到这里我们就了解了索引数据是实现和存储原理了,那么真正的指标数据又是如何去存储的呢?