TiDB 是存储和计算分离的设计,当 TiDB 的物理计划优化完成后,就需要将真正的取数请求发给 TiKV。而由于数据是分布在多个 TiKV 节点的,因此需要有一个框架来统筹计算,汇总结果,并将结果返回给 TiDB Server。这就是我们这篇文章要看的 coprocessor 模块。
一、背景与概念
1.1 什么是 Coprocessor?
在传统的数据库架构中,计算和存储通常是耦合在一起的。而在 TiDB 这样的分布式数据库中,存储层(TiKV)和计算层(TiDB Server)是分离的。 Coprocessor 就是实现计算下推的关键组件。
简单来说,Coprocessor 允许 TiDB 将部分计算逻辑(如过滤、聚合等)下推到 TiKV 节点执行,而不是将所有数据都拉取到 TiDB Server 再处理。这样做的好处是:
- 减少网络传输:只传输过滤后的结果数据
- 并行计算:多个 TiKV 节点可以并行处理各自的数据
- 提高性能:利用 TiKV 的本地计算能力
1.2 一个例子
SELECT name, age FROM users WHERE age > 18;
在这个查询中:
1. TiDB 会构建一个 Coprocessor 请求,包含过滤条件 age > 18
2. 将请求发送到存储对应数据的多个 TiKV 节点
3. 每个 TiKV 节点在本地过滤数据,只返回符合条件的结果
4. TiDB 收集并合并各节点的结果
二、架构概览
TiDB Coprocessor 的整体架构可以分为以下几层:
┌─────────────────────────────────────────────┐
│ TiDB Server (SQL Layer) │
│ ┌────────────────────────────────────┐ │
│ │ CopClient (发起请求) │ │
│ └────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────┐
│ copIterator (任务调度器) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │ Worker N │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────┘
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│ TiKV 1 │ │ TiKV 2 │ │ TiKV N │
│ (Region) │ │ (Region) │ │ (Region) │
└──────────┘ └──────────┘ └──────────┘
三、核心数据结构
3.1 Store 和 CopClient
Store 是 Coprocessor 模块的入口,封装了底层的 TiKV 客户端:
type Store struct {
*kvStore
coprCache *coprCache // coprocessor 缓存
replicaReadSeed uint32 // 副本读随机种子
numcpu int // CPU 核心数
}
CopClient 是真正发起 Coprocessor 请求的客户端:
type CopClient struct {
kv.RequestTypeSupportedChecker
store *Store
replicaReadSeed uint32
}
它的核心方法是 Send(),负责构建请求并返回一个可迭代的响应。
3.2 copTask - 任务单元
copTask 代表一个需要发送到单个 Region 的 Coprocessor 任务:
type copTask struct {
taskID uint64 // 任务 ID
region tikv.RegionVerID // 目标 Region 信息
bucketsVer uint64 // Bucket 版本
ranges *KeyRanges // 要扫描的 Key 范围
respChan chan *copResponse // 响应通道(用于 KeepOrder)
storeAddr string // 目标 Store 地址
cmdType tikvrpc.CmdType // 命令类型
storeType kv.StoreType // Store 类型(TiKV/TiFlash)
// 分页相关
paging bool
pagingSize uint64
// 批量任务
batchTaskList map[uint64]*batchedCopTask
// 其他元数据...
}
关键点:
- 一个 SQL 查询会被拆分成多个 copTask,每个对应一个 Region
- 每个 copTask 包含了该 Region 需要扫描的 Key 范围
3.3 copIterator - 任务调度器
copIterator 是整个 Coprocessor 执行的核心,负责:
- 任务分发
- 并发控制
- 结果收集
type copIterator struct {
store *Store
req *kv.Request // 原始请求
concurrency int // 并发度
smallTaskConcurrency int // 小任务额外并发度
// 任务相关
tasks []*copTask
curr int // 当前处理到的任务索引
// 通道相关
respChan chan *copResponse // 响应通道(无序)
finishCh chan struct{} // 结束信号
// 并发控制
sendRate *util.RateLimit // 发送速率控制
wg sync.WaitGroup // 等待所有 worker 完成
// 内存管理
memTracker *memory.Tracker
actionOnExceed *rateLimitAction // OOM 时的限流动作
// 其他...
}
3.4 copIteratorWorker - 任务执行器
每个 worker 从任务通道取任务,发送到 TiKV 并处理响应:
type copIteratorWorker struct {
taskCh <-chan *copTask // 任务通道
wg *sync.WaitGroup
store *Store
req *kv.Request
respChan chan<- *copResponse // 结果通道
finishCh <-chan struct{}
vars *tikv.Variables
kvclient *txnsnapshot.ClientHelper // 底层 KV 客户端
memTracker *memory.Tracker
// 统计信息
replicaReadSeed uint32
storeBatchedNum *atomic.Uint64
storeBatchedFallbackNum *atomic.Uint64
}
3.5 KeyRanges - Key 范围管理
KeyRanges 是一个优化的数据结构,用于高效管理 Key 范围:
type KeyRanges struct {
first *kv.KeyRange // 头部额外范围
mid []kv.KeyRange // 主体范围切片
last *kv.KeyRange // 尾部额外范围
}
设计亮点:
- 通过
first和last指针避免在头尾添加元素时重新分配大切片 - 提供
Split()方法支持按 Key 切分范围
比如:场景:将 [a→z) 切分成 [a→m) 和 [m→z)
传统方案:
原始: [a→c) [c→f) [f→m) [m→s) [s→z)
↓ Split at 'm'
左边: [a→c) [c→f) [f→m) ← 需要复制 3 个 KeyRange
右边: [m→s) [s→z) ← 需要复制 2 个 KeyRange
KeyRanges 方案:
原始: first=nil, mid=[a→c)[c→f)[f→z)], last=nil
↓ Split at 'm'
左边: first=nil, mid=[a→c)[c→f)], last=&[f→m) ← mid 共享底层数组!
右边: first=&[m→z)], mid=[], last=nil ← 几乎零拷贝!
四、请求执行流程
4.1 整体流程图
CopClient.Send()
↓
BuildCopIterator()
↓
buildCopTasks() ←─────┐
↓ │
copIterator.open() │ (Region 错误时重建)
↓ │
启动 Workers │
↓ │
copIteratorTaskSender │
↓ │
worker.handleTask() ────┘
↓
copIterator.Next()
4.2 步骤一:构建 CopIterator
入口在 CopClient.Send() 方法:
func (c *CopClient) Send(ctx context.Context, req *kv.Request,
variables any, option *kv.ClientSendOption) kv.Response {
// 1. 构建 copIterator
it, errRes := c.BuildCopIterator(ctx, req, vars, option)
if errRes != nil {
return errRes
}
// 2. 启动 workers
it.open(ctx, option.TryCopLiteWorker)
return it
}
BuildCopIterator() 做了以下几件事:
func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request,
vars *tikv.Variables, option *kv.ClientSendOption) (*copIterator, kv.Response) {
// 1. 创建 Backoffer (用于重试)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
// 2. 构建 copTask
tasks, err := buildCopTasks(bo, ranges, buildOpt)
// 3. 创建 copIterator
it := &copIterator{
store: c.store,
req: req,
concurrency: req.Concurrency,
tasks: tasks,
// ... 其他初始化
}
// 4. 动态调整并发度
if it.concurrency > len(tasks) {
it.concurrency = len(tasks)
}
// 5. 创建响应通道
if it.req.KeepOrder {
it.sendRate = util.NewRateLimit(2 * it.concurrency)
it.respChan = nil // KeepOrder 模式使用 task.respChan
} else {
it.respChan = make(chan *copResponse)
it.sendRate = util.NewRateLimit(it.concurrency)
}
return it, nil
}
4.3 步骤二:构建 copTask
buildCopTasks() 将 Key 范围拆分成多个任务:
func buildCopTasks(bo *Backoffer, ranges *KeyRanges,
opt *buildCopTaskOpt) ([]*copTask, error) {
// 1. 通过 Region Cache 将 Key 范围按 Region 和 Bucket 切分
locs, err := cache.SplitKeyRangesByBuckets(bo, ranges)
// 2. 为每个 location 创建 copTask
for _, loc := range locs {
for i := 0; i < rLen; {
// 限制单个 task 的 range 数量(默认 25000)
nextI := min(i+rangesPerTaskLimit, rLen)
task := &copTask{
region: loc.Location.Region,
ranges: loc.Ranges.Slice(i, nextI),
cmdType: tikvrpc.CmdCop,
storeType: req.StoreType,
paging: req.Paging.Enable,
pagingSize: req.Paging.MinPagingSize,
// ...
}
// KeepOrder 模式需要为每个 task 创建响应通道
if req.KeepOrder {
task.respChan = make(chan *copResponse, 2)
}
tasks = append(tasks, task)
}
}
return tasks, nil
}
关键机制:
- Region 定位:通过 SplitKeyRangesByBuckets() 获取每个 Key 范围对应的 Region
- 范围限制:每个 task 最多包含 25000 个范围,避免请求过大
- Paging 支持:如果启用分页,会设置初始的 pagingSize
4.4 步骤三:启动 Workers
copIterator.open() 启动并发的 worker 协程:
func (it *copIterator) open(ctx context.Context, tryCopLiteWorker *atomic2.Uint32) {
// 特殊优化:只有一个任务时使用轻量级 worker(避免启动 goroutine)
if len(it.tasks) == 1 && tryCopLiteWorker != nil &&
tryCopLiteWorker.CompareAndSwap(0, 1) {
it.liteWorker = &liteCopIteratorWorker{
ctx: ctx,
worker: newCopIteratorWorker(it, nil),
tryCopLiteWorker: tryCopLiteWorker,
}
return
}
// 创建任务通道
taskCh := make(chan *copTask, 1)
it.wg.Add(it.concurrency + it.smallTaskConcurrency)
// 如果有小任务,创建额外的小任务通道
var smallTaskCh chan *copTask
if it.smallTaskConcurrency > 0 {
smallTaskCh = make(chan *copTask, 1)
}
// 启动 worker goroutines
for i := range it.concurrency + it.smallTaskConcurrency {
ch := taskCh
if i >= it.concurrency && smallTaskCh != nil {
ch = smallTaskCh
}
worker := newCopIteratorWorker(it, ch)
go worker.run(ctx)
}
// 启动任务分发器
taskSender := &copIteratorTaskSender{
taskCh: taskCh,
smallTaskCh: smallTaskCh,
wg: &it.wg,
tasks: it.tasks,
finishCh: it.finishCh,
sendRate: it.sendRate,
respChan: it.respChan,
}
go taskSender.run(it.req.ConnID, it.req.RunawayChecker)
}
并发控制亮点: - 动态并发:根据任务数量和任务大小动态调整 worker 数量 - 小任务优化:为小任务(行数少)分配额外的并发度 - Lite Worker:单任务时避免 goroutine 开销
4.5 步骤四:Worker 处理任务
Worker 的核心逻辑在 handleTask() 方法:
func (worker *copIteratorWorker) handleTask(ctx context.Context,
task *copTask, respCh chan<- *copResponse) {
remainTasks := []*copTask{task}
backoffermap := make(map[uint64]*Backoffer)
// 循环处理任务(可能因为错误产生新的子任务)
for len(remainTasks) > 0 {
curTask := remainTasks[0]
// 为每个 Region 独立使用 Backoffer
bo := chooseBackoffer(ctx, backoffermap, curTask, worker)
// 处理单次任务
result, err := worker.handleTaskOnce(bo, curTask)
if err != nil {
// 发送错误响应
resp := &copResponse{err: errors.Trace(err)}
worker.sendToRespCh(resp, respCh)
return
}
// 发送成功响应
if result != nil {
if result.resp != nil {
worker.sendToRespCh(result.resp, respCh)
}
for _, resp := range result.batchRespList {
worker.sendToRespCh(resp, respCh)
}
}
// 处理剩余任务(Region 错误或锁错误会产生)
if result != nil && len(result.remains) > 0 {
remainTasks = append(result.remains, remainTasks[1:]...)
} else {
remainTasks = remainTasks[1:]
}
}
}
handleTaskOnce() 执行实际的 RPC 调用:
func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer,
task *copTask) (*copTaskResult, error) {
// 1. 构建 Coprocessor 请求
copReq := coprocessor.Request{
Tp: worker.req.Tp,
StartTs: worker.req.StartTs,
Data: worker.req.Data,
Ranges: task.ranges.ToPBRanges(),
PagingSize: task.pagingSize,
// ...
}
// 2. 构建 TiKV RPC 请求
req := tikvrpc.NewReplicaReadRequest(task.cmdType, &copReq,
replicaReadType, &worker.replicaReadSeed, context)
// 3. 发送请求到 TiKV
resp, rpcCtx, storeAddr, err := worker.kvclient.SendReqCtx(
bo.TiKVBackoffer(), req, task.region, timeout,
getEndPointType(task.storeType), task.storeAddr)
if err != nil {
return nil, errors.Trace(err)
}
copResp := resp.Resp.(*coprocessor.Response)
// 4. 处理响应
if worker.req.Paging.Enable {
return worker.handleCopPagingResult(bo, rpcCtx,
&copResponse{pbResp: copResp}, task, costTime)
} else {
return worker.handleCopResponse(bo, rpcCtx,
&copResponse{pbResp: copResp}, task, costTime)
}
}
4.6 步骤五:处理响应
handleCopResponse() 处理各种错误情况:
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer,
rpcCtx *tikv.RPCContext, resp *copResponse, task *copTask) (*copTaskResult, error) {
// 1. 处理 Region 错误(Region 分裂、合并、迁移等)
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
// Backoff 后重建任务
if err := bo.Backoff(tikv.BoRegionMiss(), errors.New(errStr)); err != nil {
return nil, errors.Trace(err)
}
// 重新构建 copTask
remains, err := buildCopTasks(bo, task.ranges, buildOpt)
if err != nil {
return nil, err
}
return &copTaskResult{remains: remains}, nil
}
// 2. 处理锁错误
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
if err := worker.handleLockErr(bo, lockErr, task); err != nil {
return nil, err
}
task.meetLockFallback = true
return &copTaskResult{remains: []*copTask{task}}, nil
}
// 3. 处理其他错误
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
return nil, errors.Trace(err)
}
// 4. 正常响应:设置 startKey,收集执行信息
resp.startKey = task.ranges.At(0).StartKey
if err := worker.handleCollectExecutionInfo(bo, rpcCtx, resp); err != nil {
return nil, err
}
// 5. 检查内存使用
worker.checkRespOOM(resp)
return &copTaskResult{resp: resp}, nil
}
4.7 步骤六:获取结果
上层调用 copIterator.Next() 逐个获取结果:
func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
var resp *copResponse
// 1. Lite Worker 路径(单任务优化)
if it.liteWorker != nil {
resp = it.liteWorker.liteSendReq(ctx, it)
// ...
}
// 2. 无序模式:从共享通道获取
else if it.respChan != nil {
resp, ok, closed = it.recvFromRespCh(ctx, it.respChan)
if !ok || closed {
return nil, errors.Trace(ctx.Err())
}
// finCopResp 是结束标记,递归获取下一个
if resp == finCopResp {
it.sendRate.PutToken() // 归还令牌
return it.Next(ctx)
}
}
// 3. 有序模式:按任务顺序从各自通道获取
else {
for {
if it.curr >= len(it.tasks) {
return nil, nil // 所有任务完成
}
task := it.tasks[it.curr]
resp, ok, closed = it.recvFromRespCh(ctx, task.respChan)
if closed {
return nil, errors.Trace(ctx.Err())
}
if ok {
break
}
// 当前任务完成,移到下一个
it.sendRate.PutToken()
it.tasks[it.curr] = nil
it.curr++
}
}
if resp.err != nil {
return nil, errors.Trace(resp.err)
}
return resp, nil
}
KeepOrder vs 无序模式:
- KeepOrder:每个 task 有独立的 respChan,按顺序读取
- 无序:所有 task 共享一个 respChan,谁先到谁先处理
五、关键机制
5.1 并发控制
TiDB Coprocessor 的并发控制非常精细:
1. 基础并发度
it.concurrency = req.Concurrency
if it.concurrency > len(tasks) {
it.concurrency = len(tasks)
}
2. 小任务额外并发
对于行数很少的”小任务”,额外分配并发度以提高吞吐:
func smallTaskConcurrency(tasks []*copTask, numcpu int) (int, int) {
res := 0
for _, task := range tasks {
if isSmallTask(task) { // RowCountHint <= 32
res++
}
}
if res == 0 {
return 0, 0
}
// 使用公式计算额外并发度
extraConc := int(float64(res) / (1 + 0.5*math.Sqrt(2*math.Log(float64(res)))))
// 限制不超过 smallConcPerCore * numcpu
smallTaskConcurrencyLimit := 20 * numcpu
if extraConc > smallTaskConcurrencyLimit {
extraConc = smallTaskConcurrencyLimit
}
return res, extraConc
}
3. 流量控制(Rate Limit)
使用令牌机制控制在途任务数量:
func (sender *copIteratorTaskSender) run(connID uint64,
checker resourcegroup.RunawayChecker) {
for _, t := range sender.tasks {
// 获取令牌(阻塞直到有可用令牌)
exit := sender.sendRate.GetToken(sender.finishCh)
if exit {
break
}
// 发送任务
exit = sender.sendToTaskCh(t, taskCh)
if exit {
break
}
}
close(sender.taskCh)
sender.wg.Wait()
if sender.respChan != nil {
close(sender.respChan)
}
}
令牌容量:
- KeepOrder:2 * concurrency(允许更多在途任务)
- 无序:concurrency
5.2 错误处理与重试
1. Region 错误
Region 错误是分布式系统中常见的情况(分裂、合并、迁移等):
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
// 1. Backoff 等待
if err := bo.Backoff(tikv.BoRegionMiss(), errors.New(errStr)); err != nil {
return nil, errors.Trace(err)
}
// 2. 重新构建任务(会查询最新的 Region 信息)
remains, err := buildCopTasks(bo, task.ranges, buildOpt)
if err != nil {
return nil, err
}
// 3. 返回新任务继续执行
return &copTaskResult{remains: remains}, nil
}
2. 锁错误
遇到未提交的事务锁时,需要解锁后重试:
func (worker *copIteratorWorker) handleLockErr(bo *Backoffer,
lockErr *kvrpcpb.LockInfo, task *copTask) error {
if lockErr == nil {
return nil
}
// 记录锁信息
resolveLockDetail := worker.getLockResolverDetails()
// 尝试解锁
resolveLocksOpts := txnlock.ResolveLocksOptions{
CallerStartTS: worker.req.StartTs,
Locks: []*txnlock.Lock{txnlock.NewLock(lockErr)},
Detail: resolveLockDetail,
}
resolveLocksRes, err := worker.kvclient.ResolveLocksWithOpts(
bo.TiKVBackoffer(), resolveLocksOpts)
if err != nil {
return errors.Trace(err)
}
// 如果锁还未过期,等待一段时间
msBeforeExpired := resolveLocksRes.TTL
if msBeforeExpired > 0 {
if err := bo.BackoffWithMaxSleepTxnLockFast(
int(msBeforeExpired), errors.New(lockErr.String())); err != nil {
return errors.Trace(err)
}
}
return nil
}
3. Backoffer 机制
每个 Region 使用独立的 Backoffer,避免一个 Region 的问题影响其他:
func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*Backoffer,
task *copTask, worker *copIteratorWorker) *Backoffer {
bo, ok := backoffermap[task.region.GetID()]
if ok {
return bo
}
// 为新 Region 创建独立的 Backoffer
boMaxSleep := CopNextMaxBackoff // 20000ms
newbo := backoff.NewBackofferWithVars(ctx, boMaxSleep, worker.vars)
backoffermap[task.region.GetID()] = newbo
return newbo
}
5.3 Paging 分页请求
为了避免单次请求返回数据过多,TiDB 支持分页协议:
func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer,
rpcCtx *tikv.RPCContext, resp *copResponse, task *copTask) (*copTaskResult, error) {
// 1. 先处理响应
result, err := worker.handleCopResponse(bo, rpcCtx, resp, task, costTime)
if err != nil {
return nil, errors.Trace(err)
}
// 2. 检查是否有剩余数据
pagingRange := resp.pbResp.Range
if pagingRange == nil {
// TiKV 不支持分页或已返回全部数据
return result, nil
}
// 3. 计算剩余范围
task.ranges = worker.calculateRemain(task.ranges, pagingRange, worker.req.Desc)
if task.ranges.Len() == 0 {
return result, nil
}
// 4. 增长分页大小(指数增长)
task.pagingSize = paging.GrowPagingSize(task.pagingSize,
worker.req.Paging.MaxPagingSize)
// 5. 将剩余任务加入待处理列表
result.remains = []*copTask{task}
return result, nil
}
分页大小动态增长:
- 初始:MinPagingSize(如 128 行)
- 每次翻倍增长
- 最大:MaxPagingSize(如 8192 行)
5.4 Store Batch 批量请求
对于小任务,可以将多个 Region 的请求批量发送到同一个 Store:
type batchStoreTaskBuilder struct {
bo *Backoffer
req *kv.Request
cache *RegionCache
taskID uint64
limit int // 每批最多任务数
store2Idx map[storeReplicaKey]int // Store -> Task 索引
tasks []*copTask
replicaRead kv.ReplicaReadType
}
func (b *batchStoreTaskBuilder) handle(task *copTask) error {
b.taskID++
task.taskID = b.taskID
// 只批量小任务
if b.limit <= 0 || !isSmallTask(task) {
b.tasks = append(b.tasks, task)
return nil
}
// 构建批量任务
batchedTask, err := b.cache.BuildBatchTask(b.bo, b.req, task, b.replicaRead)
if err != nil {
return err
}
key := storeReplicaKey{
storeID: batchedTask.storeID,
replicaRead: batchedTask.loadBasedReplicaRetry,
}
// 查找或创建 Store 的批量任务
if idx, ok := b.store2Idx[key]; !ok || len(b.tasks[idx].batchTaskList) >= b.limit {
// 新建批量任务
b.tasks = append(b.tasks, batchedTask.task)
b.store2Idx[key] = len(b.tasks) - 1
} else {
// 添加到现有批量任务
if b.tasks[idx].batchTaskList == nil {
b.tasks[idx].batchTaskList = make(map[uint64]*batchedCopTask, b.limit)
}
b.tasks[idx].batchTaskList[task.taskID] = batchedTask
}
return nil
}
六、举个例子
SELECT name FROM users ORDER BY id DESC LIMIT 20 的执行过程:
假设数据分布在 3 个 Region:
Region 1: id [1, 30000)
Region 2: id [30000, 60000)
Region 3: id [60000, 100001) ← 包含最大的 ID
因为是 DESC(降序),需要从大到小返回:
1️⃣ 构建 3 个 copTask
┌────────┐ ┌────────┐ ┌────────┐
│Region1 │ │Region2 │ │Region3 │
└────────┘ └────────┘ └────────┘
2️⃣ Desc=true → 反转任务顺序!
┌────────┐ ┌────────┐ ┌────────┐
│Region3 │ │Region2 │ │Region1 │
└────────┘ └────────┘ └────────┘
↑ 先处理(最大ID)
3️⃣ KeepOrder=true → 每个 task 独立通道
┌────────┐ ┌──────────┐
│Region3 │─────→│ respCh 3 │─┐
└────────┘ └──────────┘ │
┌────────┐ ┌──────────┐ │
│Region2 │─────→│ respCh 2 │─┼→ Next() 按顺序读
└────────┘ └──────────┘ │
┌────────┐ ┌──────────┐ │
│Region1 │─────→│ respCh 1 │─┘
└────────┘ └──────────┘
4️⃣ 执行流程
Worker → 处理 Region3
↓
TiKV3 → 倒序扫描 (id=100000→99999...)
↓
返回 128 行 (paging)
↓
TiDB → 取前 20 行
↓
Close() → 停止!Region2/Region1 不再处理
七、总结
这篇文章中,我们讲了 TiDB 的 Coprocessor 的实现,大概了解了一下 TiDB 与 TiKV 之间是如何交互的,最后以一个实际例子来看 了一下请求的过程。