IndexLib(5):版本管理与增量更新
发布于:
在上一篇文章中,我们深入了解了查询流程的实现。本文将继续深入,详细解析版本管理和增量更新的机制,这是理解 IndexLib 如何管理索引版本和实现增量更新的关键。
版本管理与增量更新概览:Version 与 Locator 的协同工作:
flowchart TB
Start([版本管理与增量更新<br/>Version Management & Incremental Update]) --> InputLayer[数据输入层<br/>Data Input Layer]
subgraph InputGroup["数据输入 Data Input"]
direction TB
I1[数据源<br/>DataSource<br/>数据来源]
I2[文档流<br/>Document Stream<br/>文档数据流]
I1 --> I2
end
InputLayer --> LocatorLayer[位置信息层<br/>Locator Layer]
subgraph LocatorGroup["位置信息 Locator"]
direction TB
L1[Locator<br/>数据处理位置<br/>记录处理进度]
L2[Timestamp<br/>时间戳<br/>数据时间信息]
L3[MultiProgress<br/>多进度信息<br/>多分片进度]
L4[HashId<br/>分片标识<br/>数据分片ID]
L1 --> L2
L1 --> L3
L3 --> L4
end
LocatorLayer --> UpdateLayer[增量更新层<br/>Incremental Update Layer]
subgraph UpdateGroup["增量更新 Incremental Update"]
direction TB
U1[IsFasterThan<br/>比较判断<br/>判断数据是否已处理]
U2[数据过滤<br/>Filter Processed<br/>过滤已处理数据]
U3[处理新数据<br/>Process New Data<br/>构建新索引]
U4[更新Locator<br/>Update Locator<br/>记录处理进度]
U1 --> U2
U2 --> U3
U3 --> U4
end
UpdateLayer --> VersionLayer[版本管理层<br/>Version Management Layer]
subgraph VersionGroup["版本管理 Version Management"]
direction TB
V1[Version<br/>版本信息<br/>版本元数据]
V2[VersionId<br/>版本号递增<br/>单调递增版本号]
V3[Segments<br/>Segment列表<br/>包含的Segment]
V4[Schema演进<br/>Schema Evolution<br/>SchemaId映射]
V1 --> V2
V1 --> V3
V1 --> V4
end
VersionLayer --> CommitLayer[版本提交层<br/>Version Commit Layer]
subgraph CommitGroup["版本提交 Version Commit"]
direction TB
C1[VersionCommitter<br/>版本提交器<br/>提交新版本]
C2[Fence机制<br/>Fence Mechanism<br/>原子性保证]
C3[持久化<br/>Persistence<br/>写入磁盘]
C1 --> C2
C2 --> C3
end
CommitLayer --> End([版本管理完成<br/>Version Management Complete])
InputLayer -.->|包含| InputGroup
LocatorLayer -.->|包含| LocatorGroup
UpdateLayer -.->|包含| UpdateGroup
VersionLayer -.->|包含| VersionGroup
CommitLayer -.->|包含| CommitGroup
I2 --> L1
L1 --> U1
U4 --> V1
V1 --> C1
C3 --> V3
style Start fill:#c8e6c9,stroke:#388e3c,stroke-width:3px
style End fill:#c8e6c9,stroke:#388e3c,stroke-width:3px
style InputLayer fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style LocatorLayer fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px
style UpdateLayer fill:#e8f5e9,stroke:#2e7d32,stroke-width:3px
style VersionLayer fill:#fff3e0,stroke:#f57c00,stroke-width:3px
style CommitLayer fill:#fce4ec,stroke:#ef4444,stroke-width:3px
style InputGroup fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style I1 fill:#90caf9,stroke:#1976d2,stroke-width:2px
style I2 fill:#90caf9,stroke:#1976d2,stroke-width:2px
style LocatorGroup fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px
style L1 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style L2 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style L3 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style L4 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style UpdateGroup fill:#e8f5e9,stroke:#2e7d32,stroke-width:3px
style U1 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
style U2 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
style U3 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
style U4 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
style VersionGroup fill:#fff3e0,stroke:#f57c00,stroke-width:3px
style V1 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style V2 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style V3 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style V4 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style CommitGroup fill:#fce4ec,stroke:#ef4444,stroke-width:3px
style C1 fill:#f8bbd0,stroke:#ef4444,stroke-width:2px
style C2 fill:#f8bbd0,stroke:#ef4444,stroke-width:2px
style C3 fill:#f8bbd0,stroke:#ef4444,stroke-width:2px
1. 版本管理概览
1.1 版本管理的核心概念
IndexLib 的版本管理包括以下核心概念:
- Version:版本信息,记录索引包含哪些 Segment
- Locator:位置信息,记录数据处理的位置,用于增量更新
- 版本演进:每次 Commit 都会创建新版本,版本号递增
- 增量更新:通过 Locator 判断哪些数据已处理,避免重复处理
让我们先通过图来理解版本管理的整体架构:
版本管理架构:Version、Locator、Segment 的关系:
flowchart TB
Start([版本管理架构<br/>Version Management Architecture]) --> VersionLayer[版本信息层<br/>Version Information Layer]
subgraph VersionGroup["Version 版本信息 Version Information"]
direction TB
V1[VersionId<br/>版本号单调递增<br/>每次Commit递增]
V2[Segments<br/>Segment列表<br/>包含的Segment集合]
V3[Locator<br/>位置信息<br/>数据处理位置]
V4[Timestamp<br/>时间戳<br/>版本创建时间]
V5[Sealed<br/>封存状态<br/>是否封存]
V6[SchemaId<br/>Schema标识<br/>当前Schema版本]
V1 --> V2
V1 --> V3
V1 --> V4
V1 --> V5
V1 --> V6
end
VersionLayer --> SegmentLayer[索引段层<br/>Segment Layer]
subgraph SegmentGroup["Segment 索引段 Segment"]
direction TB
S1[SegmentId<br/>段标识<br/>唯一标识Segment]
S2[SchemaId<br/>段Schema<br/>Segment的Schema版本]
S3[IndexFiles<br/>索引文件<br/>索引数据文件]
S4[SegmentInfo<br/>段信息<br/>Segment元数据]
S1 --> S2
S1 --> S3
S1 --> S4
end
SegmentLayer --> LocatorLayer[位置信息层<br/>Locator Layer]
subgraph LocatorGroup["Locator 位置信息 Locator"]
direction TB
L1[SourceId<br/>数据源标识<br/>数据来源ID]
L2[Timestamp<br/>时间戳<br/>数据时间信息]
L3[ConcurrentIdx<br/>并发索引<br/>并发处理索引]
L4[HashId<br/>分片标识<br/>数据分片ID]
L5[MultiProgress<br/>多进度信息<br/>多分片处理进度]
L1 --> L2
L2 --> L3
L3 --> L4
L4 --> L5
end
LocatorLayer --> CommitLayer[版本提交层<br/>Version Commit Layer]
subgraph CommitGroup["版本提交 Version Commit"]
direction TB
C1[VersionCommitter<br/>版本提交器<br/>提交新版本]
C2[Fence目录<br/>Fence Directory<br/>临时目录保证原子性]
C3[原子切换<br/>Atomic Switch<br/>重命名操作]
C4[持久化<br/>Persistence<br/>写入磁盘]
C1 --> C2
C2 --> C3
C3 --> C4
end
CommitLayer --> End([版本管理完成<br/>Version Management Complete])
VersionLayer -.->|包含| VersionGroup
SegmentLayer -.->|包含| SegmentGroup
LocatorLayer -.->|包含| LocatorGroup
CommitLayer -.->|包含| CommitGroup
V2 -.->|包含| S1
V3 -.->|包含| L1
V1 -.->|提交| C1
C4 -.->|更新| V1
style Start fill:#c8e6c9,stroke:#388e3c,stroke-width:3px
style End fill:#c8e6c9,stroke:#388e3c,stroke-width:3px
style VersionLayer fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style SegmentLayer fill:#fff3e0,stroke:#f57c00,stroke-width:3px
style LocatorLayer fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px
style CommitLayer fill:#e8f5e9,stroke:#2e7d32,stroke-width:3px
style VersionGroup fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style V1 fill:#90caf9,stroke:#1976d2,stroke-width:2px
style V2 fill:#90caf9,stroke:#1976d2,stroke-width:2px
style V3 fill:#90caf9,stroke:#1976d2,stroke-width:2px
style V4 fill:#90caf9,stroke:#1976d2,stroke-width:2px
style V5 fill:#90caf9,stroke:#1976d2,stroke-width:2px
style V6 fill:#90caf9,stroke:#1976d2,stroke-width:2px
style SegmentGroup fill:#fff3e0,stroke:#f57c00,stroke-width:3px
style S1 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style S2 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style S3 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style S4 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style LocatorGroup fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px
style L1 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style L2 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style L3 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style L4 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style L5 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style CommitGroup fill:#e8f5e9,stroke:#2e7d32,stroke-width:3px
style C1 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
style C2 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
style C3 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
style C4 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
1.2 版本管理的作用
版本管理在 IndexLib 中起到关键作用,是系统稳定性和数据一致性的基础。让我们通过类图来理解版本管理的整体架构:
classDiagram
class Version {
- versionid_t _versionId
- vector_SegmentInVersion _segments
- Locator _locator
- int64_t _timestamp
- bool _sealed
+ GetVersionId()
+ AddSegment()
+ SetLocator()
+ IncVersionId()
}
class Locator {
- uint64_t _src
- MultiProgress _multiProgress
- string _userData
+ IsFasterThan()
+ Update()
+ Serialize()
}
class VersionCommitter {
+ Commit()
+ CreateFence()
+ WriteVersion()
}
class VersionLoader {
+ Load()
+ Validate()
}
Version --> Locator : 包含
VersionCommitter --> Version : 创建
VersionLoader --> Version : 加载
版本管理的核心作用:
- 版本控制:记录索引的演进历史,支持版本回滚
- 版本演进:每次 Commit 创建新版本,版本号单调递增
- 版本历史:保留版本历史,支持查看和回滚到历史版本
- 版本比较:支持版本比较,判断版本之间的差异
- 增量更新:通过 Locator 判断哪些数据已处理,实现增量更新
- 数据定位:通过 Locator 精确定位数据处理位置
- 避免重复:通过 Locator 比较避免重复处理数据
- 进度追踪:记录每个 HashId 的处理进度,支持分片处理
- Schema 演进:支持 Schema 变更,每个 Segment 记录自己的 SchemaId
- 向后兼容:新 Schema 向后兼容旧 Schema,旧 Segment 可以继续使用
- 渐进式迁移:新 Segment 使用新 Schema,旧 Segment 保持原样
- 版本映射:通过 SchemaVersionRoadMap 记录 Schema 版本映射
- 数据一致性:保证数据不重复、不丢失,支持多数据源场景
- 不重复保证:通过 Locator 比较保证数据不重复处理
- 不丢失保证:通过 Locator 更新保证数据不丢失
- 多数据源支持:通过
sourceIdx区分数据源,支持多数据源场景
2. Version:版本信息
2.1 Version 的结构
Version 记录索引的版本信息,定义在 framework/Version.h 中:
// framework/Version.h
class Version : public autil::legacy::Jsonizable
{
private:
struct SegmentInVersion {
segmentid_t segmentId = INVALID_SEGMENTID;
schemaid_t schemaId = DEFAULT_SCHEMAID; // 每个 Segment 可以有不同的 Schema
};
public:
// 版本信息
versionid_t GetVersionId() const { return _versionId; }
void IncVersionId() { ++_versionId; } // 每次 Commit 时递增
// Segment 管理
void AddSegment(segmentid_t segmentId, schemaid_t schemaId);
void RemoveSegment(segmentid_t segmentId);
size_t GetSegmentCount() const { return _segments.size(); }
// Locator:数据位置信息
void SetLocator(const Locator& locator);
const Locator& GetLocator() const { return _locator; }
// 时间戳
void SetTimestamp(int64_t timestamp) { _timestamp = timestamp; }
int64_t GetTimestamp() const { return _timestamp; }
// 封存状态
void SetSealed() { _sealed = true; }
bool IsSealed() const { return _sealed; }
private:
versionid_t _versionId; // 版本号,单调递增
std::vector<SegmentInVersion> _segments; // Segment 列表(有序)
Locator _locator; // 位置信息,用于增量更新
int64_t _timestamp; // 时间戳
bool _sealed = false; // 是否封存
schemaid_t _schemaId; // Schema ID
std::string _fenceName; // Fence 名称
};
Version 的关键字段:
Version 的结构:包含 VersionId、Segments、Locator 等关键信息:
flowchart TD
subgraph Version["Version 对象"]
V[Version<br/>版本信息]
end
subgraph Fields["核心字段"]
F1[VersionId<br/>版本号<br/>单调递增]
F2[Segments<br/>Segment列表<br/>vector SegmentInVersion]
F3[Locator<br/>位置信息<br/>用于增量更新]
F4[Timestamp<br/>时间戳<br/>版本创建时间]
F5[Sealed<br/>封存状态<br/>是否封存]
F6[SchemaId<br/>Schema标识<br/>当前Schema版本]
F7[FenceName<br/>Fence名称<br/>临时目录名]
end
subgraph SegmentInVersion["SegmentInVersion 结构"]
S1[SegmentId<br/>段标识<br/>INVALID_SEGMENTID]
S2[SchemaId<br/>段Schema<br/>DEFAULT_SCHEMAID]
S1 --> S2
end
V --> F1
V --> F2
V --> F3
V --> F4
V --> F5
V --> F6
V --> F7
F2 -->|包含| S1
style Version fill:#e3f2fd
style Fields fill:#fff3e0
style SegmentInVersion fill:#f3e5f5
- VersionId:版本号,单调递增,每次 Commit 时递增
- Segments:该版本包含的 Segment 列表,每个 Segment 记录自己的 SchemaId
- Locator:数据位置信息,用于增量更新
- Timestamp:时间戳,记录版本创建时间
- Sealed:是否封存,封存后不再接收新 Segment
2.2 Version 的演进
每次 Commit 都会创建新版本,版本号递增:
Version 演进:从 V1 到 V2 的版本变化:
flowchart TB
Start([Version 演进流程<br/>Version Evolution Flow]) --> V1Layer[Version 1 层<br/>Version 1 Layer]
subgraph V1Group["Version 1 版本信息"]
direction TB
V1_ID[VersionId: 1<br/>版本号1]
V1_SEG[Segments: 1, 2<br/>包含Segment 1和2]
V1_LOC[Locator: timestamp=100<br/>处理到时间戳100]
V1_SCHEMA[SchemaId: 0<br/>Schema版本0]
V1_ID --> V1_SEG
V1_ID --> V1_LOC
V1_ID --> V1_SCHEMA
end
V1Layer --> CommitLayer[提交操作层<br/>Commit Operation Layer]
subgraph CommitGroup["Commit 操作 Commit Operation"]
direction TB
C1[收集Segment<br/>Collect Segments<br/>收集所有Segment]
C2[更新Locator<br/>Update Locator<br/>更新处理位置]
C3[递增VersionId<br/>Increment VersionId<br/>版本号递增]
C4[创建新版本<br/>Create New Version<br/>创建Version 2]
C1 --> C2
C2 --> C3
C3 --> C4
end
CommitLayer --> V2Layer[Version 2 层<br/>Version 2 Layer]
subgraph V2Group["Version 2 版本信息"]
direction TB
V2_ID[VersionId: 2<br/>版本号递增为2]
V2_SEG[Segments: 1, 2, 3<br/>新增Segment 3]
V2_LOC[Locator: timestamp=200<br/>处理到时间戳200]
V2_SCHEMA[SchemaId: 0<br/>Schema版本0保持不变]
V2_ID --> V2_SEG
V2_ID --> V2_LOC
V2_ID --> V2_SCHEMA
end
V2Layer --> MergeLayer[合并操作层<br/>Merge Operation Layer]
subgraph MergeGroup["合并操作 Merge Operation"]
direction TB
M1[合并Segment 1和2<br/>Merge Segments 1 and 2<br/>合并为Segment 4]
end
MergeLayer --> V3Layer[Version 3 层<br/>Version 3 Layer]
subgraph V3Group["Version 3 版本信息"]
direction TB
V3_ID[VersionId: 3<br/>版本号递增为3]
V3_SEG[Segments: 4<br/>合并后的Segment 4]
V3_LOC[Locator: timestamp=300<br/>处理到时间戳300]
V3_SCHEMA[SchemaId: 0<br/>Schema版本0保持不变]
V3_ID --> V3_SEG
V3_ID --> V3_LOC
V3_ID --> V3_SCHEMA
end
V3Layer --> End([版本演进完成<br/>Version Evolution Complete])
V1Layer -.->|包含| V1Group
CommitLayer -.->|包含| CommitGroup
V2Layer -.->|包含| V2Group
MergeLayer -.->|包含| MergeGroup
V3Layer -.->|包含| V3Group
V1Group -.->|提交| CommitGroup
CommitGroup -.->|创建| V2Group
V2Group -.->|合并| MergeGroup
MergeGroup -.->|创建| V3Group
style Start fill:#c8e6c9,stroke:#388e3c,stroke-width:3px
style End fill:#c8e6c9,stroke:#388e3c,stroke-width:3px
style V1Layer fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style CommitLayer fill:#fff3e0,stroke:#f57c00,stroke-width:3px
style V2Layer fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px
style MergeLayer fill:#e8f5e9,stroke:#2e7d32,stroke-width:3px
style V3Layer fill:#fce4ec,stroke:#ef4444,stroke-width:3px
style V1Group fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style V1_ID fill:#90caf9,stroke:#1976d2,stroke-width:2px
style V1_SEG fill:#90caf9,stroke:#1976d2,stroke-width:2px
style V1_LOC fill:#90caf9,stroke:#1976d2,stroke-width:2px
style V1_SCHEMA fill:#90caf9,stroke:#1976d2,stroke-width:2px
style CommitGroup fill:#fff3e0,stroke:#f57c00,stroke-width:3px
style C1 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style C2 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style C3 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style C4 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style V2Group fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px
style V2_ID fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style V2_SEG fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style V2_LOC fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style V2_SCHEMA fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style MergeGroup fill:#e8f5e9,stroke:#2e7d32,stroke-width:3px
style M1 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
style V3Group fill:#fce4ec,stroke:#ef4444,stroke-width:3px
style V3_ID fill:#f8bbd0,stroke:#ef4444,stroke-width:2px
style V3_SEG fill:#f8bbd0,stroke:#ef4444,stroke-width:2px
style V3_LOC fill:#f8bbd0,stroke:#ef4444,stroke-width:2px
style V3_SCHEMA fill:#f8bbd0,stroke:#ef4444,stroke-width:2px
版本演进示例:
- V1:包含 Segment [1, 2],Locator 记录处理到 timestamp=100
- V2:新增 Segment 3,Locator 更新到 timestamp=200
- V3:Segment 1 和 2 合并为 Segment 4,Locator 更新到 timestamp=300
版本演进的关键设计:
版本演进是 IndexLib 版本管理的核心机制。让我们通过序列图来理解版本演进的完整过程:
sequenceDiagram
participant Writer as TabletWriter
participant MemSeg as MemSegment
participant DiskSeg as DiskSegment
participant Version as Version
participant VersionCommitter as VersionCommitter
participant TabletData as TabletData
Writer->>MemSeg: Build(documents)
MemSeg-->>Writer: Success
Writer->>MemSeg: NeedDump()?
MemSeg-->>Writer: true
Writer->>MemSeg: CreateSegmentDumpItems()
MemSeg-->>Writer: DumpItems
Writer->>DiskSeg: Dump(DumpItems)
DiskSeg-->>Writer: Success
Writer->>Version: AddSegment(segmentId, schemaId)
Version->>Version: IncVersionId()
Version-->>Writer: newVersionId
Writer->>Version: SetLocator(locator)
Version-->>Writer: Success
Writer->>VersionCommitter: Commit(TabletData, Schema, Options)
VersionCommitter->>VersionCommitter: CreateFence()
VersionCommitter->>VersionCommitter: WriteVersion(Version)
VersionCommitter->>VersionCommitter: AtomicSwitch()
VersionCommitter-->>Writer: VersionMeta
Writer->>TabletData: UpdateVersion(Version)
TabletData-->>Writer: Success
版本演进的关键设计:
- 版本号递增:每次 Commit 时
VersionId自动递增,保证版本顺序- 单调性:版本号严格单调递增,保证版本顺序
- 原子性:版本号递增是原子操作,避免并发问题
- 持久化:版本号持久化到磁盘,保证重启后继续递增
- Schema 演进:每个 Segment 记录自己的
SchemaId,支持 Schema 变更- Segment SchemaId:每个 Segment 在创建时记录自己的 SchemaId
- Schema 映射:Version 维护 SchemaVersionRoadMap,记录 Schema 版本映射
- 兼容性检查:Schema 变更时检查兼容性,保证数据一致性
- Locator 更新:每次 Commit 时更新 Locator,记录最新的数据处理位置
- 位置记录:Locator 记录每个 HashId 的处理进度
- 更新条件:只有当新的 Locator 完全比当前 Locator 快时,才更新
- 一致性保证:保证 Locator 只向前推进,不会回退
2.3 Version 的持久化
Version 需要持久化到磁盘,通过 Fence 机制保证原子性:
Version 持久化:通过 Fence 机制保证原子性:
flowchart TD
Start([Version 持久化开始]) --> P1
subgraph Prepare["1. 准备阶段"]
direction LR
P1[PrepareVersion<br/>准备版本信息]
P2[CollectSegments<br/>收集Segment列表]
P3[PrepareLocator<br/>准备Locator信息]
P1 --> P2 --> P3
end
P3 --> F1
subgraph Fence["2. Fence 机制"]
direction LR
F1[CreateFenceDirectory<br/>创建临时目录]
F2[WriteVersionFile<br/>写入版本文件]
F3[IncVersionId<br/>递增版本号]
F4[AtomicRename<br/>原子重命名]
F1 --> F2 --> F3 --> F4
end
F4 --> PE1
subgraph Persist["3. 持久化"]
direction LR
PE1[序列化Version<br/>JSON格式]
PE2[写入version文件<br/>version.0]
PE3[写入元数据<br/>时间戳、Locator]
PE1 --> PE2 --> PE3
end
PE3 --> U1
subgraph Update["4. 更新内存"]
direction LR
U1[UpdateTabletData<br/>更新TabletData]
U2[更新Version引用<br/>切换到新版本]
U1 --> U2
end
U2 --> Success([持久化成功])
F2 -.->|异常| Error
F4 -.->|异常| Error
PE2 -.->|异常| Error
U1 -.->|异常| Error
subgraph Error["错误处理"]
direction TB
E1[CleanupFence<br/>清理临时目录]
E2[不影响已有版本<br/>保证一致性]
E1 --> E2
end
Error --> E1
E2 --> Fail([持久化失败])
style Start fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style Prepare fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style P1 fill:#c5e1f5,stroke:#1976d2,stroke-width:1.5px
style P2 fill:#90caf9,stroke:#1976d2,stroke-width:1.5px
style P3 fill:#64b5f6,stroke:#1976d2,stroke-width:1.5px
style Fence fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style F1 fill:#ffe0b2,stroke:#f57c00,stroke-width:1.5px
style F2 fill:#ffcc80,stroke:#f57c00,stroke-width:1.5px
style F3 fill:#ffb74d,stroke:#f57c00,stroke-width:1.5px
style F4 fill:#ffa726,stroke:#f57c00,stroke-width:1.5px
style Persist fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style PE1 fill:#e1bee7,stroke:#7b1fa2,stroke-width:1.5px
style PE2 fill:#ce93d8,stroke:#7b1fa2,stroke-width:1.5px
style PE3 fill:#ba68c8,stroke:#7b1fa2,stroke-width:1.5px
style Update fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
style U1 fill:#c8e6c9,stroke:#2e7d32,stroke-width:1.5px
style U2 fill:#a5d6a7,stroke:#2e7d32,stroke-width:1.5px
style Error fill:#fce4ec,stroke:#c2185b,stroke-width:2px
style E1 fill:#f8bbd0,stroke:#c2185b,stroke-width:1.5px
style E2 fill:#f48fb1,stroke:#c2185b,stroke-width:1.5px
style Success fill:#c8e6c9,stroke:#2e7d32,stroke-width:3px
style Fail fill:#ffcdd2,stroke:#c62828,stroke-width:3px
持久化流程:
Version 的持久化是版本管理的核心,通过 Fence 机制保证原子性。让我们通过序列图来理解完整的持久化流程:
sequenceDiagram
participant Committer as VersionCommitter
participant Version as Version
participant FileSys as FileSystem
participant TabletData as TabletData
Committer->>Version: PrepareVersion()
Version->>Version: CollectSegments()
Version->>Version: PrepareLocator()
Version-->>Committer: Version对象
Committer->>FileSys: CreateFenceDirectory()
FileSys-->>Committer: FencePath
Committer->>FileSys: WriteVersionFile(Version, FencePath)
FileSys->>FileSys: 序列化Version为JSON
FileSys->>FileSys: 写入version文件
FileSys-->>Committer: Success
Committer->>Version: IncVersionId()
Version-->>Committer: newVersionId
Committer->>FileSys: AtomicRename(FencePath, VersionPath)
FileSys-->>Committer: Success
Committer->>TabletData: UpdateVersion(Version)
TabletData-->>Committer: Success
alt 提交失败
Committer->>FileSys: CleanupFence(FencePath)
FileSys-->>Committer: Success
end
持久化流程详解:
- 创建 Fence 目录:在提交前创建临时目录(Fence)
- 目录命名:Fence 目录使用临时名称(如
version.fence.1234567890) - 目录隔离:Fence 目录与正式版本目录隔离,避免冲突
- 原子性准备:Fence 目录为原子切换做准备
- 目录命名:Fence 目录使用临时名称(如
- 写入 Version:将 Version 写入 Fence 目录
- 序列化:将 Version 对象序列化为 JSON 格式
- 文件写入:将 JSON 写入版本文件(如
version.0) - 元数据写入:写入版本元数据(时间戳、Locator 等)
- 原子切换:原子性地将 Fence 目录重命名为正式版本目录
- 原子操作:使用文件系统的原子重命名操作(
rename) - 切换时机:只有在所有文件写入成功后才切换
- 失败处理:如果切换失败,清理 Fence 目录,不影响已有版本
- 原子操作:使用文件系统的原子重命名操作(
- 保证原子性:要么全部成功,要么全部失败
- 事务性:整个提交过程是事务性的,要么全部成功,要么全部失败
- 错误恢复:如果提交失败,可以清理 Fence 目录,不影响已有版本
- 一致性保证:保证版本文件的一致性,避免部分写入
Fence 机制的设计优势:
- 原子性:通过原子重命名保证版本提交的原子性
- 性能:Fence 机制不需要额外的锁,性能开销小
- 可靠性:即使提交失败,也不会影响已有版本
- 简单性:实现简单,易于理解和维护
2.4 Version 的加载
Version 的加载通过 VersionLoader 实现:
Version 加载:从磁盘加载版本信息:
flowchart TB
Start([Version 加载开始<br/>Version Load Start]) --> LoadLayer[加载阶段<br/>Load Phase]
subgraph LoadGroup["1. 加载阶段 Load Phase"]
direction TB
L1[VersionLoader.Load<br/>加载版本信息<br/>从磁盘读取]
L2[读取版本文件<br/>version.0, version.1等<br/>按版本号顺序<br/>找到最新版本]
L3[解析JSON<br/>反序列化Version对象<br/>转换为内存结构]
L1 --> L2
L2 --> L3
end
LoadLayer --> ValidateLayer[验证阶段<br/>Validate Phase]
subgraph ValidateGroup["2. 验证阶段 Validate Phase"]
direction TB
V1[ValidateVersion<br/>验证版本有效性<br/>检查基本格式]
V2[检查Segment存在性<br/>Segment文件是否存在<br/>验证文件完整性]
V3[检查Schema兼容性<br/>Schema版本映射检查<br/>确保兼容性]
V4[检查Locator有效性<br/>Locator格式正确性<br/>验证数据一致性]
V1 --> V2
V2 --> V3
V3 --> V4
end
ValidateLayer --> SegmentLayer[加载Segment阶段<br/>Load Segment Phase]
subgraph SegmentGroup["3. 加载Segment Load Segment"]
direction TB
S1[根据Segment列表<br/>加载Segment信息<br/>遍历所有Segment]
S2[OpenSegment<br/>打开Segment文件<br/>初始化文件句柄]
S3[加载索引文件<br/>按需加载索引数据<br/>延迟加载策略]
S4[创建DiskSegment<br/>DiskSegment对象<br/>封装Segment信息]
S1 --> S2
S2 --> S3
S3 --> S4
end
SegmentLayer --> InitLayer[初始化TabletData阶段<br/>Initialize TabletData Phase]
subgraph InitGroup["4. 初始化TabletData Initialize TabletData"]
direction TB
I1[UpdateVersion<br/>更新Version引用<br/>设置当前版本]
I2[设置Segment列表<br/>添加到TabletData<br/>建立索引关系]
I3[初始化查询器<br/>准备查询功能<br/>创建Reader对象]
I1 --> I2
I2 --> I3
end
InitLayer --> End([完成加载<br/>Load Complete])
LoadLayer -.->|包含| LoadGroup
ValidateLayer -.->|包含| ValidateGroup
SegmentLayer -.->|包含| SegmentGroup
InitLayer -.->|包含| InitGroup
L3 --> V1
V4 --> S1
S4 --> I1
style Start fill:#c8e6c9,stroke:#388e3c,stroke-width:3px
style End fill:#c8e6c9,stroke:#388e3c,stroke-width:3px
style LoadLayer fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style ValidateLayer fill:#fff3e0,stroke:#f57c00,stroke-width:3px
style SegmentLayer fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px
style InitLayer fill:#e8f5e9,stroke:#2e7d32,stroke-width:3px
style LoadGroup fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style L1 fill:#90caf9,stroke:#1976d2,stroke-width:2px
style L2 fill:#90caf9,stroke:#1976d2,stroke-width:2px
style L3 fill:#90caf9,stroke:#1976d2,stroke-width:2px
style ValidateGroup fill:#fff3e0,stroke:#f57c00,stroke-width:3px
style V1 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style V2 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style V3 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style V4 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style SegmentGroup fill:#f3e5f5,stroke:#7b1fa2,stroke-width:3px
style S1 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style S2 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style S3 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style S4 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style InitGroup fill:#e8f5e9,stroke:#2e7d32,stroke-width:3px
style I1 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
style I2 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
style I3 fill:#a5d6a7,stroke:#2e7d32,stroke-width:2px
加载流程:
- 读取版本文件:从磁盘读取版本文件(version.0、version.1 等)
- 解析 Version:解析 JSON 格式的版本信息
- 验证 Version:验证版本的有效性(Segment 是否存在等)
- 加载 Segment:根据 Version 中的 Segment 列表加载 Segment
3. Locator:位置信息
3.1 Locator 的作用
Locator 是增量更新的核心,记录数据的位置信息:
Locator 的作用:记录数据处理位置,支持增量更新:
flowchart LR
subgraph Role["Locator 核心作用"]
R1[增量更新<br/>判断数据是否已处理]
R2[数据一致性<br/>保证不重复不丢失]
R3[进度追踪<br/>记录每个HashId进度]
R4[并发控制<br/>处理时间戳相同情况]
end
subgraph Compare["比较机制"]
C1[IsFasterThan<br/>比较两个Locator]
C2[LCR_FULLY_FASTER<br/>完全更快]
C3[LCR_SLOWER<br/>更慢]
C4[LCR_PARTIAL_FASTER<br/>部分更快]
C5[LCR_INVALID<br/>无效比较]
C1 --> C2
C1 --> C3
C1 --> C4
C1 --> C5
end
subgraph Update["更新机制"]
U1[Update<br/>更新Locator]
U2[条件检查<br/>新Locator必须更快]
U3[更新MultiProgress<br/>记录最新进度]
U4[保证一致性<br/>只向前推进]
U1 --> U2
U2 --> U3
U3 --> U4
end
subgraph Application["应用场景"]
A1[实时写入<br/>实时接收数据流]
A2[批量更新<br/>批量处理数据]
A3[多数据源<br/>支持多数据源场景]
A4[故障恢复<br/>故障恢复时判断]
end
R1 --> C1
R2 --> U1
R3 --> A1
R4 --> A2
C2 --> A3
U4 --> A4
style Role fill:#e3f2fd
style Compare fill:#fff3e0
style Update fill:#f3e5f5
style Application fill:#e8f5e9
Locator 的关键作用:
- 增量更新:通过
IsFasterThan()判断哪些数据已处理,避免重复处理 - 数据一致性:保证数据不重复、不丢失,支持多数据源场景
- 进度追踪:记录每个 HashId 的处理进度,支持分片处理
- 并发控制:通过
concurrentIdx处理时间戳相同的情况
3.2 Locator 的结构
Locator 的结构定义在 framework/Locator.h 中:
// framework/Locator.h
class Locator final
{
public:
// Locator 比较结果
enum class LocatorCompareResult {
LCR_INVALID, // 无效
LCR_SLOWER, // 比这个 locator 慢
LCR_PARTIAL_FASTER, // 部分 hash id 更快
LCR_FULLY_FASTER // 完全比这个 locator 快(包括相等)
};
// 文档信息:记录文档在数据源中的位置
struct DocInfo {
int64_t timestamp; // 时间戳
uint32_t concurrentIdx; // 并发索引(时间戳相同时的序号)
uint16_t hashId; // Hash ID(用于分片)
uint8_t sourceIdx; // 数据源索引
};
// 比较两个 Locator:判断数据是否已处理
LocatorCompareResult IsFasterThan(const Locator& other,
bool ignoreLegacyDiffSrc) const;
private:
uint64_t _src; // 数据源标识
base::Progress::Offset _minOffset; // 最小偏移量
base::MultiProgress _multiProgress; // 多进度信息(每个 hashId 的进度)
std::string _userData; // 用户数据
};
Locator 的关键字段:
Locator 的结构:包含 timestamp、concurrentIdx、hashId 等信息:
flowchart TD
Locator[Locator 对象<br/>━━━━━━━━━━<br/>位置信息<br/>记录数据处理进度] --> Fields
subgraph Fields["核心字段"]
direction LR
F1["SourceId<br/>━━━━━━━━━━<br/>数据源标识<br/>uint64_t _src"]
F2["MinOffset<br/>━━━━━━━━━━<br/>最小偏移量<br/>Progress::Offset"]
F3["MultiProgress<br/>━━━━━━━━━━<br/>多进度信息<br/>每个HashId的进度"]
F4["UserData<br/>━━━━━━━━━━<br/>用户数据<br/>string _userData"]
end
Locator --> F1
Locator --> F2
Locator --> F3
Locator --> F4
F3 --> Progress
F2 --> Progress
subgraph Progress["Progress 进度信息<br/>用于MultiProgress和MinOffset"]
direction LR
P1["Offset<br/>━━━━━━━━━━<br/>偏移量<br/>包含时间信息"]
P2["Timestamp<br/>━━━━━━━━━━<br/>时间戳<br/>int64_t"]
P3["ConcurrentIdx<br/>━━━━━━━━━━<br/>并发索引<br/>uint32_t"]
P1 --> P2 --> P3
end
Progress --> P1
F3 --> DocInfo
subgraph DocInfo["DocInfo 文档信息<br/>用于构建Progress"]
direction LR
D1["Timestamp<br/>━━━━━━━━━━<br/>时间戳<br/>int64_t"]
D2["ConcurrentIdx<br/>━━━━━━━━━━<br/>并发索引<br/>uint32_t"]
D3["HashId<br/>━━━━━━━━━━<br/>分片标识<br/>uint16_t"]
D4["SourceIdx<br/>━━━━━━━━━━<br/>数据源索引<br/>uint8_t"]
D1 --> D2 --> D3 --> D4
end
DocInfo --> D1
style Locator fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style Fields fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style F1 fill:#ffe0b2,stroke:#f57c00,stroke-width:1.5px
style F2 fill:#ffcc80,stroke:#f57c00,stroke-width:1.5px
style F3 fill:#ffb74d,stroke:#f57c00,stroke-width:1.5px
style F4 fill:#ffa726,stroke:#f57c00,stroke-width:1.5px
style Progress fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
style P1 fill:#c8e6c9,stroke:#2e7d32,stroke-width:1.5px
style P2 fill:#a5d6a7,stroke:#2e7d32,stroke-width:1.5px
style P3 fill:#81c784,stroke:#2e7d32,stroke-width:1.5px
style DocInfo fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style D1 fill:#e1bee7,stroke:#7b1fa2,stroke-width:1.5px
style D2 fill:#ce93d8,stroke:#7b1fa2,stroke-width:1.5px
style D3 fill:#ba68c8,stroke:#7b1fa2,stroke-width:1.5px
style D4 fill:#ab47bc,stroke:#7b1fa2,stroke-width:1.5px
- timestamp:时间戳,记录数据的时间位置
- concurrentIdx:并发索引,处理时间戳相同的情况
- hashId:Hash ID,用于分片
- sourceIdx:数据源索引,支持多数据源
- multiProgress:多进度信息,每个 hashId 记录自己的进度
3.3 Locator 的比较逻辑
Locator 的比较逻辑用于判断数据是否已处理:
Locator 比较:判断数据是否已处理的逻辑(已在上面详细展示,此处不再重复):
比较示例:
- Locator A:timestamp=100, hashId=0
- Locator B:timestamp=200, hashId=0
- 结果:B 比 A 快(
LCR_FULLY_FASTER),说明 B 包含 A 的所有数据
比较逻辑:
Locator 的比较逻辑是增量更新的核心算法。让我们通过流程图来理解详细的比较过程:
flowchart TD
Start([IsFasterThan 调用]) --> CheckSource{数据源是否相同?}
CheckSource -->|否| Invalid[返回 LCR_INVALID<br/>数据源不同,无法比较]
CheckSource -->|是| Loop[遍历 multiProgress<br/>遍历所有 hashId]
Loop --> CheckHashId{当前 hashId<br/>是否存在?}
CheckHashId -->|不存在| CheckOther{other 中<br/>是否存在?}
CheckHashId -->|存在| Compare[比较 Progress<br/>CompareProgress方法]
CheckOther -->|存在| Partial[返回 LCR_PARTIAL_FASTER<br/>部分更快]
CheckOther -->|不存在| Next[继续下一个 hashId]
Compare --> CheckResult{比较结果}
CheckResult -->|LCR_FULLY_FASTER| Next
CheckResult -->|LCR_SLOWER| Return[返回 LCR_SLOWER<br/>更慢]
CheckResult -->|LCR_PARTIAL_FASTER| Return2[返回 LCR_PARTIAL_FASTER<br/>部分更快]
Next --> CheckComplete{是否遍历完<br/>所有 hashId?}
CheckComplete -->|否| Loop
CheckComplete -->|是| Full[返回 LCR_FULLY_FASTER<br/>完全更快]
Invalid --> End([结束])
Partial --> End
Return --> End
Return2 --> End
Full --> End
style Start fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style CheckSource fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Invalid fill:#ffcdd2,stroke:#c62828,stroke-width:2px
style Loop fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style CheckHashId fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style CheckOther fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Compare fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Partial fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style Next fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
style CheckResult fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style Return fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style Return2 fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style CheckComplete fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
style Full fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
style End fill:#c8e6c9,stroke:#2e7d32,stroke-width:3px
比较逻辑详解:
// framework/Locator.h
LocatorCompareResult Locator::IsFasterThan(const Locator& other,
bool ignoreLegacyDiffSrc) const
{
// 1. 检查数据源是否相同
if (!IsSameSrc(other, ignoreLegacyDiffSrc)) {
return LCR_INVALID; // 数据源不同,无法比较
}
// 2. 比较每个 hashId 的进度
for (size_t i = 0; i < _multiProgress.size(); ++i) {
if (i >= other._multiProgress.size()) {
// 当前 Locator 有更多的 hashId,部分更快
return LCR_PARTIAL_FASTER;
}
// 比较该 hashId 的进度
auto result = CompareProgress(_multiProgress[i], other._multiProgress[i]);
if (result != LCR_FULLY_FASTER) {
// 如果该 hashId 不是完全更快,返回结果
return result;
}
}
// 3. 所有 hashId 都完全更快,返回完全更快
return LCR_FULLY_FASTER;
}
比较算法的性能优化:
- 快速路径:
- 如果数据源不同,直接返回
LCR_INVALID,避免遍历 Progress - 如果 Progress 数量不同,快速判断部分更快
- 如果数据源不同,直接返回
- 短路优化:
- 如果某个 hashId 不是完全更快,立即返回结果
- 不需要继续比较后续 hashId
- 缓存优化:
- 比较结果可以缓存,避免重复计算
- 对于相同的 Locator 对,直接返回缓存结果
- 位运算优化:
- 使用位运算优化 Progress 的比较
- 减少比较开销,提高比较性能
3.4 Locator 的更新
Locator 的更新通过 Update() 方法实现:
Locator 更新:更新数据处理位置:
flowchart TD
Start([Locator 更新开始]) --> Input[接收输入<br/>新 Locator + 当前 Locator]
Input --> Compare[IsFasterThan 比较<br/>判断新Locator是否完全更快]
Compare --> Decision{是否完全更快?<br/>LCR_FULLY_FASTER}
Decision -->|否| Fail[更新失败<br/>保持原Locator不变]
Decision -->|是| Step1[1. 更新 MultiProgress<br/>合并每个HashId的进度信息]
Step1 --> Step2[2. 更新 MinOffset<br/>取最小偏移量]
Step2 --> Step3[3. 更新 UserData<br/>保留用户自定义数据]
Step3 --> Step4[4. 保证一致性<br/>确保只向前推进,不后退]
Step4 --> Success[更新成功<br/>Locator已更新]
Fail --> End([结束])
Success --> End
style Start fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style Input fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Compare fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Decision fill:#ffcc80,stroke:#f57c00,stroke-width:2px
style Step1 fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style Step2 fill:#e1bee7,stroke:#7b1fa2,stroke-width:2px
style Step3 fill:#ce93d8,stroke:#7b1fa2,stroke-width:2px
style Step4 fill:#ba68c8,stroke:#7b1fa2,stroke-width:2px
style Success fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
style Fail fill:#ffcdd2,stroke:#c62828,stroke-width:2px
style End fill:#c8e6c9,stroke:#2e7d32,stroke-width:3px
更新逻辑:
- 条件:只有当新的 Locator 完全比当前 Locator 快时,才更新
- 更新内容:更新
multiProgress,记录最新的数据处理位置 - 保证一致性:保证 Locator 只向前推进,不会回退
4. 增量更新机制
4.1 增量更新的流程
增量更新通过 Locator 判断哪些数据已处理:
增量更新流程:通过 Locator 判断数据是否已处理(已在上面详细展示,此处不再重复):
增量更新流程图:
graph TD
A[读取数据源] --> B[获取数据 Locator]
B --> C[比较 Locator]
C --> D{IsFasterThan?}
D -->|LCR_FULLY_FASTER| E[数据已处理]
D -->|LCR_SLOWER| F[处理新数据]
D -->|LCR_PARTIAL_FASTER| G[部分处理]
E --> H[跳过数据]
F --> I[构建索引]
G --> I
I --> J[更新 Locator]
J --> K[提交版本]
K --> L[更新 Version Locator]
style C fill:#e3f2fd
style F fill:#fff3e0
style J fill:#f3e5f5
style K fill:#e8f5e9
增量更新流程:
- 读取数据源:从数据源读取数据
- 检查 Locator:通过
IsFasterThan()判断数据是否已处理 - 处理新数据:只处理未处理的数据
- 更新 Locator:处理完成后更新 Locator
- 提交版本:Commit 时更新 Version 的 Locator
4.2 增量更新的判断
增量更新的判断通过 Locator 比较实现:
增量更新判断:通过 Locator 比较判断数据是否已处理:
flowchart TD
subgraph Input["输入"]
I1[数据Locator<br/>Data Locator]
I2[版本Locator<br/>Version Locator]
end
subgraph Compare["比较判断"]
C1[IsFasterThan<br/>比较两个Locator]
C2{比较结果}
end
subgraph Result["判断结果"]
R1[LCR_FULLY_FASTER<br/>数据已处理<br/>跳过数据]
R2[LCR_SLOWER<br/>数据未处理<br/>需要处理]
R3[LCR_PARTIAL_FASTER<br/>部分数据已处理<br/>需要部分处理]
R4[LCR_INVALID<br/>数据源不同<br/>无法比较]
end
subgraph Action["处理动作"]
A1[跳过数据<br/>不处理]
A2[处理新数据<br/>构建索引]
A3[部分处理<br/>处理未处理部分]
A4[无法判断<br/>需要人工处理]
end
I1 --> C1
I2 --> C1
C1 --> C2
C2 -->|完全更快| R1
C2 -->|更慢| R2
C2 -->|部分更快| R3
C2 -->|无效| R4
R1 --> A1
R2 --> A2
R3 --> A3
R4 --> A4
style Input fill:#e3f2fd
style Compare fill:#fff3e0
style Result fill:#f3e5f5
style Action fill:#e8f5e9
判断逻辑:
- LCR_FULLY_FASTER:数据已处理,跳过
- LCR_SLOWER:数据未处理,需要处理
- LCR_PARTIAL_FASTER:部分数据已处理,需要部分处理
- LCR_INVALID:数据源不同,无法比较
4.3 增量更新的场景
增量更新适用于以下场景:
增量更新场景:实时写入、批量更新等:
flowchart TD
subgraph Realtime["1. 实时写入场景"]
direction LR
R1[实时接收数据流<br/>━━━━━━━━━━<br/>Continuous Data Stream<br/>持续接收新数据]
R2[检查Locator<br/>━━━━━━━━━━<br/>IsFasterThan判断<br/>判断是否需要处理]
R3[处理新数据<br/>━━━━━━━━━━<br/>只处理未处理数据<br/>避免重复处理]
R4[更新Locator<br/>━━━━━━━━━━<br/>记录最新进度<br/>更新处理位置]
R5[定期Commit<br/>━━━━━━━━━━<br/>提交版本<br/>持久化进度]
R1 --> R2 --> R3 --> R4 --> R5
end
subgraph Batch["2. 批量更新场景"]
direction LR
B1[批量读取数据源<br/>━━━━━━━━━━<br/>Batch Read<br/>一次性读取大量数据]
B2[检查Locator<br/>━━━━━━━━━━<br/>过滤已处理数据<br/>跳过已处理部分]
B3[处理新数据<br/>━━━━━━━━━━<br/>批量构建索引<br/>高效处理]
B4[更新Locator<br/>━━━━━━━━━━<br/>更新进度<br/>记录处理位置]
B5[批量Commit<br/>━━━━━━━━━━<br/>提交版本<br/>批量持久化]
B1 --> B2 --> B3 --> B4 --> B5
end
subgraph MultiSource["3. 多数据源场景"]
direction LR
M1[多个数据源<br/>━━━━━━━━━━<br/>Multiple Data Sources<br/>来自不同来源]
M2[区分SourceIdx<br/>━━━━━━━━━━<br/>区分数据源<br/>标识来源]
M3[分别处理<br/>━━━━━━━━━━<br/>独立处理每个数据源<br/>独立进度跟踪]
M4[保证一致性<br/>━━━━━━━━━━<br/>数据不重复不丢失<br/>确保完整性]
M1 --> M2 --> M3 --> M4
end
subgraph Recovery["4. 故障恢复场景"]
direction LR
F1[故障恢复<br/>━━━━━━━━━━<br/>Failure Recovery<br/>系统重启或恢复]
F2[检查Locator<br/>━━━━━━━━━━<br/>判断需要重新处理的数据<br/>定位断点]
F3[重新处理<br/>━━━━━━━━━━<br/>处理未处理数据<br/>从断点继续]
F4[恢复完成<br/>━━━━━━━━━━<br/>恢复正常状态<br/>继续正常运行]
F1 --> F2 --> F3 --> F4
end
style Realtime fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style R1 fill:#c5e1f5,stroke:#1976d2,stroke-width:1.5px
style R2 fill:#90caf9,stroke:#1976d2,stroke-width:1.5px
style R3 fill:#64b5f6,stroke:#1976d2,stroke-width:1.5px
style R4 fill:#42a5f5,stroke:#1976d2,stroke-width:1.5px
style R5 fill:#2196f3,stroke:#1976d2,stroke-width:1.5px
style Batch fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style B1 fill:#ffe0b2,stroke:#f57c00,stroke-width:1.5px
style B2 fill:#ffcc80,stroke:#f57c00,stroke-width:1.5px
style B3 fill:#ffb74d,stroke:#f57c00,stroke-width:1.5px
style B4 fill:#ffa726,stroke:#f57c00,stroke-width:1.5px
style B5 fill:#ff9800,stroke:#f57c00,stroke-width:1.5px
style MultiSource fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style M1 fill:#e1bee7,stroke:#7b1fa2,stroke-width:1.5px
style M2 fill:#ce93d8,stroke:#7b1fa2,stroke-width:1.5px
style M3 fill:#ba68c8,stroke:#7b1fa2,stroke-width:1.5px
style M4 fill:#ab47bc,stroke:#7b1fa2,stroke-width:1.5px
style Recovery fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
style F1 fill:#c8e6c9,stroke:#2e7d32,stroke-width:1.5px
style F2 fill:#a5d6a7,stroke:#2e7d32,stroke-width:1.5px
style F3 fill:#81c784,stroke:#2e7d32,stroke-width:1.5px
style F4 fill:#66bb6a,stroke:#2e7d32,stroke-width:1.5px
使用场景:
- 实时写入:实时接收数据,通过 Locator 判断哪些数据已处理
- 批量更新:批量处理数据,通过 Locator 避免重复处理
- 多数据源:从多个数据源读取数据,通过 Locator 保证数据一致性
- 故障恢复:故障恢复时,通过 Locator 判断需要重新处理的数据
5. 版本提交与加载
5.1 版本提交流程
版本提交通过 VersionCommitter 实现:
版本提交流程:从准备到持久化的完整过程(已在上面详细展示,此处不再重复):
版本提交流程图:
flowchart TD
Start([版本提交开始]) --> Check[检查提交条件<br/>━━━━━━━━━━<br/>判断是否有新Segment<br/>是否有数据变更]
Check --> Decision{需要提交?}
Decision -->|否| Skip[跳过提交<br/>━━━━━━━━━━<br/>无变更,无需提交<br/>保持当前版本]
Decision -->|是| Prepare[准备版本信息<br/>━━━━━━━━━━<br/>收集版本元数据<br/>准备提交内容]
Prepare --> Collect[收集 Segment<br/>━━━━━━━━━━<br/>收集所有已构建Segment<br/>构建Segment列表]
Collect --> Locator[准备 Locator<br/>━━━━━━━━━━<br/>准备位置信息<br/>记录处理进度]
Locator --> Fence[创建 Fence<br/>━━━━━━━━━━<br/>创建临时目录<br/>保证原子性]
Fence --> Write[写入版本文件<br/>━━━━━━━━━━<br/>序列化Version<br/>写入JSON文件]
Write --> Update[更新版本号<br/>━━━━━━━━━━<br/>递增版本ID<br/>生成新版本号]
Update --> Persist[持久化到磁盘<br/>━━━━━━━━━━<br/>原子重命名<br/>完成持久化]
Persist --> Success[完成提交<br/>━━━━━━━━━━<br/>版本提交成功<br/>更新TabletData]
Skip --> End([结束])
Success --> End
style Start fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style Check fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Decision fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Skip fill:#ffcdd2,stroke:#c62828,stroke-width:2px
style Prepare fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Collect fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Locator fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Fence fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Write fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style Update fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style Persist fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px
style Success fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
style End fill:#c8e6c9,stroke:#2e7d32,stroke-width:3px
提交流程:
- 检查提交条件:判断是否需要提交(有新的 Segment、有数据变更等)
- 准备版本信息:收集所有已构建的 Segment,准备 Locator
- 创建 Fence:创建 Fence 目录,保证原子性
- 持久化 Version:将 Version 写入 Fence 目录
- 原子切换:原子性地将 Fence 目录切换为正式版本目录
- 更新 TabletData:更新 TabletData 的 Version
5.2 版本加载流程
版本加载通过 VersionLoader 实现:
版本加载流程:从磁盘加载版本信息(已在上面详细展示,此处不再重复):
加载流程:
- 读取版本文件:从磁盘读取版本文件
- 解析 Version:解析 JSON 格式的版本信息
- 验证 Version:验证版本的有效性
- 加载 Segment:根据 Version 中的 Segment 列表加载 Segment
- 初始化 TabletData:初始化 TabletData,设置 Version 和 Segment 列表
5.3 版本回滚
版本回滚支持回滚到历史版本:
flowchart TD
Start([版本回滚开始]) --> Step1[1. 选择目标版本<br/>指定要回滚的版本号]
Step1 --> Step2[2. 验证版本<br/>检查版本文件和Segment存在性]
Step2 --> Step3[3. 加载Version<br/>从磁盘读取版本信息]
Step3 --> Step4[4. 加载Segment列表<br/>读取Segment元数据]
Step4 --> Step5[5. 加载Locator<br/>读取位置信息]
Step5 --> Step6[6. 加载Schema映射<br/>读取Schema版本映射]
Step6 --> Step7[7. 更新TabletData<br/>切换到目标版本]
Step7 --> Step8[8. 设置Version引用<br/>设置当前版本]
Step8 --> Step9[9. 设置Segment列表<br/>恢复Segment结构]
Step9 --> Step10[10. 初始化查询器<br/>重建查询组件]
Step10 --> Success[回滚成功<br/>系统已恢复到目标版本]
Success --> End([回滚完成])
style Start fill:#e3f2fd,stroke:#1976d2,stroke-width:3px
style Step1 fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Step2 fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
style Step3 fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Step4 fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Step5 fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Step6 fill:#fff3e0,stroke:#f57c00,stroke-width:2px
style Step7 fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style Step8 fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style Step9 fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style Step10 fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
style Success fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
style End fill:#c8e6c9,stroke:#2e7d32,stroke-width:3px
回滚流程:
- 选择目标版本:选择要回滚到的目标版本
- 验证版本:验证目标版本的有效性
- 加载版本:加载目标版本的 Version 和 Segment
- 更新 TabletData:更新 TabletData,恢复到目标版本
6. Schema 演进
6.1 Schema 演进机制
IndexLib 支持 Schema 演进,每个 Segment 可以有不同的 Schema:
Schema 演进:支持 Schema 变更,每个 Segment 记录自己的 SchemaId:
flowchart LR
subgraph Schema["Schema 演进机制"]
S1[Segment SchemaId<br/>每个Segment记录自己的SchemaId]
S2[Schema版本映射<br/>SchemaVersionRoadMap]
S3[兼容性检查<br/>Schema兼容性验证]
S1 --> S2
S2 --> S3
end
subgraph Version["版本演进"]
V1[Version 1<br/>SchemaId: 0]
V2[Version 2<br/>SchemaId: 0]
V3[Version 3<br/>SchemaId: 1]
V1 -->|Schema变更| V2
V2 -->|新Segment使用新Schema| V3
end
subgraph Segment["Segment Schema"]
SE1[Segment 1<br/>SchemaId: 0]
SE2[Segment 2<br/>SchemaId: 0]
SE3[Segment 3<br/>SchemaId: 1]
SE4[Segment 4<br/>SchemaId: 1]
end
subgraph Compatibility["兼容性保证"]
C1[向后兼容<br/>新Schema向后兼容旧Schema]
C2[渐进式迁移<br/>新Segment使用新Schema]
C3[旧Segment保持<br/>旧Segment保持原样]
C1 --> C2
C2 --> C3
end
V1 --> SE1
V2 --> SE2
V3 --> SE3
V3 --> SE4
S3 --> C1
style Schema fill:#e3f2fd
style Version fill:#fff3e0
style Segment fill:#f3e5f5
style Compatibility fill:#e8f5e9
Schema 演进机制:
- Segment SchemaId:每个 Segment 记录自己的
SchemaId - Schema 版本映射:Version 维护
SchemaVersionRoadMap,记录 Schema 版本映射 - 兼容性检查:Schema 变更时检查兼容性,保证数据一致性
6.2 Schema 变更流程
Schema 变更的流程:
Schema 变更流程:从 Schema 变更到版本提交:
flowchart TD
subgraph Check["检查兼容性"]
C1[检查新Schema<br/>Check New Schema]
C2[检查兼容性<br/>Check Compatibility]
C3{兼容性检查<br/>通过?}
C1 --> C2
C2 --> C3
end
subgraph Seal["Seal 当前Segment"]
S1[Seal当前Segment<br/>Seal Current Segment]
S2[停止接收新文档<br/>Stop Receiving Documents]
S3[等待转储完成<br/>Wait for Dump]
S1 --> S2
S2 --> S3
end
subgraph Create["创建新Segment"]
CR1[使用新Schema<br/>Use New Schema]
CR2[创建新MemSegment<br/>Create New MemSegment]
CR3[开始接收新文档<br/>Start Receiving Documents]
CR1 --> CR2
CR2 --> CR3
end
subgraph Commit["提交版本"]
CO1[更新SchemaId<br/>Update SchemaId]
CO2[更新SchemaVersionRoadMap<br/>Update RoadMap]
CO3[提交Version<br/>Commit Version]
CO1 --> CO2
CO2 --> CO3
end
C3 -->|通过| S1
C3 -->|失败| Error[Schema变更失败]
S3 --> CR1
CR3 --> CO1
CO3 --> Success[Schema变更成功]
style Check fill:#e3f2fd
style Seal fill:#fff3e0
style Create fill:#f3e5f5
style Commit fill:#e8f5e9
变更流程:
- 检查兼容性:检查新 Schema 与旧 Schema 的兼容性
- Seal 当前 Segment:Seal 当前构建中的 Segment
- 创建新 Segment:使用新 Schema 创建新的 Segment
- 提交版本:Commit 时更新 SchemaId 和 SchemaVersionRoadMap
7. 版本清理
7.1 版本清理机制
版本清理用于清理不再需要的旧版本文件:
版本清理:清理不再需要的旧版本文件:
flowchart TD
subgraph Identify["识别清理目标"]
I1[保留版本列表<br/>Keep Recent N Versions]
I2[识别旧版本<br/>Identify Old Versions]
I3[检查版本引用<br/>Check Version References]
I1 --> I2
I2 --> I3
end
subgraph CleanSegment["清理Segment"]
CS1[检查Segment引用<br/>Check Segment References]
CS2{Segment是否<br/>被引用?}
CS3[清理Segment文件<br/>Delete Segment Files]
CS4[清理索引文件<br/>Delete Index Files]
CS1 --> CS2
CS2 -->|否| CS3
CS3 --> CS4
end
subgraph CleanVersion["清理版本文件"]
CV1[清理版本文件<br/>Delete Version Files]
CV2[清理Fence目录<br/>Delete Fence Directories]
CV3[清理元数据<br/>Delete Metadata]
CV1 --> CV2
CV2 --> CV3
end
subgraph Result["清理结果"]
R1[释放存储空间<br/>Free Storage Space]
R2[保持系统稳定<br/>Maintain System Stability]
R1 --> R2
end
I3 --> CS1
CS2 -->|是| Skip[跳过清理]
CS4 --> CV1
CV3 --> R1
style Identify fill:#e3f2fd
style CleanSegment fill:#fff3e0
style CleanVersion fill:#f3e5f5
style Result fill:#e8f5e9
清理机制:
- 保留版本列表:保留指定数量的版本,清理其他版本
- 清理 Segment:清理不再被任何版本引用的 Segment
- 清理索引文件:清理不再使用的索引文件
7.2 版本清理策略
版本清理的策略:
版本清理策略:保留版本数量、清理时机等:
flowchart LR
subgraph Strategy["清理策略"]
S1[保留版本数<br/>Keep N Versions]
S2[清理时机<br/>Cleanup Timing]
S3[清理范围<br/>Cleanup Scope]
S1 --> S2
S2 --> S3
end
subgraph Keep["保留策略"]
K1[保留最近N个版本<br/>Keep Recent N Versions]
K2[保留活跃版本<br/>Keep Active Versions]
K3[保留重要版本<br/>Keep Important Versions]
end
subgraph Timing["清理时机"]
T1[Commit时清理<br/>Cleanup on Commit]
T2[定期清理<br/>Periodic Cleanup]
T3[手动清理<br/>Manual Cleanup]
end
subgraph Scope["清理范围"]
SC1[版本文件<br/>Version Files]
SC2[Segment文件<br/>Segment Files]
SC3[索引文件<br/>Index Files]
SC4[元数据文件<br/>Metadata Files]
end
S1 --> K1
S2 --> T1
S3 --> SC1
K1 --> T1
T1 --> SC1
SC1 --> SC2
SC2 --> SC3
SC3 --> SC4
style Strategy fill:#e3f2fd
style Keep fill:#fff3e0
style Timing fill:#f3e5f5
style Scope fill:#e8f5e9
清理策略:
- 保留版本数:保留最近 N 个版本,清理其他版本
- 清理时机:在 Commit 时或定期清理
- 清理范围:清理版本文件、Segment 文件、索引文件等
8. 增量更新的实际应用
8.1 实时写入场景
在实时写入场景中,增量更新的应用:
实时写入场景中的增量更新:通过 Locator 判断数据是否已处理:
flowchart TD
subgraph Receive["接收数据"]
R1[实时接收数据流<br/>Receive Data Stream]
R2[解析文档<br/>Parse Documents]
R3[提取Locator<br/>Extract Locator]
R1 --> R2
R2 --> R3
end
subgraph Check["检查Locator"]
C1[获取Version Locator<br/>Get Version Locator]
C2[IsFasterThan比较<br/>Compare Locators]
C3{数据是否<br/>已处理?}
C1 --> C2
C2 --> C3
end
subgraph Process["处理新数据"]
P1[处理新数据<br/>Process New Data]
P2[构建索引<br/>Build Index]
P3[更新Locator<br/>Update Locator]
P1 --> P2
P2 --> P3
end
subgraph Commit["提交版本"]
CO1[定期Commit<br/>Periodic Commit]
CO2[更新Version Locator<br/>Update Version Locator]
CO3[持久化版本<br/>Persist Version]
CO1 --> CO2
CO2 --> CO3
end
R3 --> C1
C3 -->|未处理| P1
C3 -->|已处理| Skip[跳过数据]
P3 --> CO1
style Receive fill:#e3f2fd
style Check fill:#fff3e0
style Process fill:#f3e5f5
style Commit fill:#e8f5e9
实时写入流程:
- 接收数据:实时接收数据流
- 检查 Locator:通过
IsFasterThan()判断数据是否已处理 - 处理新数据:只处理未处理的数据
- 更新 Locator:处理完成后更新 Locator
- 提交版本:定期 Commit,更新 Version 的 Locator
8.2 批量更新场景
在批量更新场景中,增量更新的应用:
批量更新场景中的增量更新:批量处理数据,避免重复处理:
flowchart TD
subgraph Read["读取数据源"]
RD1[批量读取数据<br/>Batch Read Data]
RD2[解析文档<br/>Parse Documents]
RD3[提取Locator<br/>Extract Locators]
RD1 --> RD2
RD2 --> RD3
end
subgraph Filter["过滤已处理数据"]
F1[获取Version Locator<br/>Get Version Locator]
F2[批量比较Locator<br/>Batch Compare Locators]
F3[过滤已处理数据<br/>Filter Processed Data]
F1 --> F2
F2 --> F3
end
subgraph Process["处理新数据"]
P1[批量处理新数据<br/>Batch Process New Data]
P2[批量构建索引<br/>Batch Build Index]
P3[更新Locator<br/>Update Locator]
P1 --> P2
P2 --> P3
end
subgraph Commit["提交版本"]
CO1[批量Commit<br/>Batch Commit]
CO2[更新Version Locator<br/>Update Version Locator]
CO3[持久化版本<br/>Persist Version]
CO1 --> CO2
CO2 --> CO3
end
RD3 --> F1
F3 --> P1
P3 --> CO1
style Read fill:#e3f2fd
style Filter fill:#fff3e0
style Process fill:#f3e5f5
style Commit fill:#e8f5e9
批量更新流程:
- 读取数据源:从数据源批量读取数据
- 检查 Locator:通过
IsFasterThan()判断哪些数据已处理 - 过滤已处理数据:过滤掉已处理的数据
- 处理新数据:只处理未处理的数据
- 更新 Locator:处理完成后更新 Locator
- 提交版本:批量处理完成后 Commit
9. 版本管理的关键设计
9.1 原子性保证
版本管理的原子性通过 Fence 机制保证:
版本管理的原子性:通过 Fence 机制保证版本提交的原子性:
flowchart LR
subgraph Fence["Fence 机制"]
F1[创建Fence目录<br/>Create Fence Directory]
F2[写入版本文件<br/>Write Version File]
F3[原子重命名<br/>Atomic Rename]
F1 --> F2
F2 --> F3
end
subgraph Atomicity["原子性保证"]
A1[要么全部成功<br/>All or Nothing]
A2[要么全部失败<br/>Rollback on Failure]
A3[避免部分写入<br/>Avoid Partial Write]
A1 --> A2
A2 --> A3
end
subgraph Error["错误处理"]
E1[提交失败<br/>Commit Failure]
E2[清理Fence目录<br/>Cleanup Fence]
E3[不影响已有版本<br/>No Impact on Existing]
E1 --> E2
E2 --> E3
end
subgraph Advantage["设计优势"]
AD1[原子性<br/>Atomicity]
AD2[性能高<br/>High Performance]
AD3[可靠性<br/>Reliability]
AD4[简单性<br/>Simplicity]
end
F3 --> A1
A3 --> E1
E3 --> AD1
style Fence fill:#e3f2fd
style Atomicity fill:#fff3e0
style Error fill:#f3e5f5
style Advantage fill:#e8f5e9
原子性保证:
- Fence 机制:通过 Fence 目录保证版本提交的原子性
- 原子切换:原子性地将 Fence 目录切换为正式版本目录
- 错误恢复:如果提交失败,可以清理 Fence 目录,不影响已有版本
9.2 数据一致性
版本管理保证数据一致性:
版本管理的数据一致性:通过 Locator 保证数据不重复、不丢失:
flowchart TD
subgraph Consistency["数据一致性保证"]
C1[不重复保证<br/>No Duplication]
C2[不丢失保证<br/>No Loss]
C3[多数据源支持<br/>Multi-Source Support]
end
subgraph Locator["Locator 机制"]
L1[Locator比较<br/>Locator Comparison]
L2[IsFasterThan<br/>判断数据是否已处理]
L3[Update机制<br/>保证只向前推进]
L1 --> L2
L2 --> L3
end
subgraph MultiSource["多数据源支持"]
M1[SourceIdx区分<br/>Distinguish by SourceIdx]
M2[独立处理<br/>Independent Processing]
M3[保证一致性<br/>Ensure Consistency]
M1 --> M2
M2 --> M3
end
subgraph Concurrent["并发控制"]
CO1[ConcurrentIdx<br/>处理时间戳相同]
CO2[HashId分片<br/>Sharding by HashId]
CO3[保证顺序<br/>Ensure Order]
CO1 --> CO2
CO2 --> CO3
end
C1 --> L1
C2 --> L3
C3 --> M1
L3 --> CO1
style Consistency fill:#e3f2fd
style Locator fill:#fff3e0
style MultiSource fill:#f3e5f5
style Concurrent fill:#e8f5e9
数据一致性保证:
- Locator 比较:通过 Locator 比较判断数据是否已处理
- 多数据源支持:支持多数据源场景,通过
sourceIdx区分数据源 - 并发控制:通过
concurrentIdx处理时间戳相同的情况
9.3 性能优化
版本管理的性能优化:
版本管理的性能优化:版本缓存、懒加载等:
flowchart LR
subgraph Cache["版本缓存"]
CA1[缓存常用版本<br/>Cache Common Versions]
CA2[LRU淘汰策略<br/>LRU Eviction]
CA3[减少磁盘读取<br/>Reduce Disk Reads]
CA1 --> CA2
CA2 --> CA3
end
subgraph Lazy["懒加载"]
L1[按需加载版本<br/>Load on Demand]
L2[减少启动时间<br/>Reduce Startup Time]
L3[并行加载<br/>Parallel Loading]
L1 --> L2
L2 --> L3
end
subgraph Batch["批量操作"]
B1[批量处理版本<br/>Batch Process Versions]
B2[批量清理<br/>Batch Cleanup]
B3[提高效率<br/>Improve Efficiency]
B1 --> B2
B2 --> B3
end
subgraph Optimize["优化策略"]
O1[快速路径<br/>Fast Path]
O2[短路优化<br/>Short Circuit]
O3[缓存优化<br/>Cache Optimization]
O1 --> O2
O2 --> O3
end
CA3 --> L1
L3 --> B1
B3 --> O1
style Cache fill:#e3f2fd
style Lazy fill:#fff3e0
style Batch fill:#f3e5f5
style Optimize fill:#e8f5e9
性能优化策略:
- 版本缓存:缓存常用版本,减少磁盘读取
- 懒加载:按需加载版本信息,减少启动时间
- 批量操作:批量处理版本操作,提高效率
10. 性能优化与最佳实践
10.1 版本管理性能优化
优化策略:
- 版本缓存优化:
- 缓存策略:缓存常用版本,减少磁盘读取
- 缓存大小:根据内存情况调整缓存大小
- 缓存淘汰:使用 LRU 等策略淘汰不常用的版本
- 版本加载优化:
- 懒加载:按需加载版本信息,减少启动时间
- 并行加载:多个版本可以并行加载,提高加载速度
- 预加载:预加载常用版本,减少查询延迟
- 版本清理优化:
- 延迟清理:延迟清理旧版本,避免影响查询
- 批量清理:批量清理旧版本,减少 IO 开销
- 清理策略:根据版本使用情况选择清理策略
10.2 Locator 性能优化
优化策略:
- 比较优化:
- 快速路径:数据源不同时直接返回,避免遍历
- 短路优化:部分 hashId 不满足时立即返回
- 缓存优化:缓存比较结果,避免重复计算
- 序列化优化:
- 压缩序列化:使用压缩算法减少序列化大小
- 增量序列化:只序列化变更部分,减少序列化开销
- 批量序列化:批量序列化多个 Locator,提高效率
- 更新优化:
- 批量更新:批量更新多个 hashId 的进度
- 增量更新:只更新变更的进度,减少更新开销
- 异步更新:异步更新 Locator,不阻塞主流程
10.3 增量更新性能优化
优化策略:
- 数据过滤优化:
- 批量过滤:批量过滤已处理数据,减少比较次数
- 索引优化:使用索引加速数据过滤
- 并行过滤:多个数据源可以并行过滤
- 处理优化:
- 批量处理:批量处理新数据,提高处理效率
- 并行处理:多个 hashId 可以并行处理
- 流式处理:边读取边处理,减少内存占用
- Locator 更新优化:
- 延迟更新:延迟更新 Locator,减少更新频率
- 批量更新:批量更新多个 hashId 的进度
- 异步更新:异步更新 Locator,不阻塞数据处理
11. 小结
版本管理和增量更新是 IndexLib 的核心功能,通过 Version 和 Locator 两个机制实现。通过本文的深入解析,我们了解到:
核心机制:
- Version:版本信息,记录索引包含哪些 Segment,支持版本演进和 Schema 演进
- 版本控制:版本号单调递增,支持版本回滚
- Schema 演进:每个 Segment 记录自己的 SchemaId,支持 Schema 变更
- 持久化:通过 Fence 机制保证版本提交的原子性
- Locator:位置信息,记录数据处理位置,用于增量更新和数据一致性保证
- 多维度定位:通过 timestamp、concurrentIdx、hashId、sourceIdx 等多维度定位
- 比较算法:通过
IsFasterThan()判断数据是否已处理 - 更新机制:保证 Locator 只向前推进,不会回退
- 版本演进:每次 Commit 都会创建新版本,版本号递增,支持版本回滚
- 版本号递增:版本号严格单调递增,保证版本顺序
- 版本历史:保留版本历史,支持查看和回滚
- 版本清理:定期清理旧版本,释放存储空间
- 增量更新:通过 Locator 判断哪些数据已处理,避免重复处理,支持实时写入和批量更新
- 数据过滤:通过 Locator 比较过滤已处理数据
- 进度追踪:记录每个 HashId 的处理进度,支持分片处理
- 多数据源支持:支持多数据源场景,保证数据一致性
- Schema 演进:支持 Schema 变更,每个 Segment 记录自己的 SchemaId
- 向后兼容:新 Schema 向后兼容旧 Schema
- 渐进式迁移:新 Segment 使用新 Schema,旧 Segment 保持原样
- 版本映射:通过 SchemaVersionRoadMap 记录 Schema 版本映射
- 原子性保证:通过 Fence 机制保证版本提交的原子性
- Fence 目录:创建临时目录,写入版本文件
- 原子切换:原子性地将 Fence 目录切换为正式版本目录
- 错误恢复:提交失败时清理 Fence 目录,不影响已有版本
- 数据一致性:通过 Locator 保证数据不重复、不丢失,支持多数据源场景
- 不重复保证:通过 Locator 比较保证数据不重复处理
- 不丢失保证:通过 Locator 更新保证数据不丢失
- 多数据源支持:通过
sourceIdx区分数据源,支持多数据源场景
设计亮点:
- Fence 机制:通过原子重命名保证版本提交的原子性,实现简单、性能高
- Locator 比较算法:多维度比较算法,支持精确的数据定位和增量更新
- Schema 演进:支持 Schema 变更,每个 Segment 记录自己的 SchemaId,实现渐进式迁移
- 版本清理:定期清理旧版本,释放存储空间,保证系统稳定性
- 性能优化:通过缓存、懒加载、批量操作等机制优化性能
性能优化:
- 版本提交:Fence 机制保证原子性,提交延迟较低
- Locator 比较:快速路径和短路优化,显著提升比较性能
- 增量更新:通过 Locator 过滤,有效减少处理量
- 版本加载:懒加载和并行加载,有效减少启动时间
理解版本管理和增量更新,是掌握 IndexLib 数据管理机制的关键。在下一篇文章中,我们将深入介绍 Segment 合并策略的实现细节,包括合并策略的选择、合并计划的创建、合并执行的流程等各个组件的实现原理和性能优化策略。