简介
Presto 是一款由 Facebook 开发的通用计算引擎。它由 Coordinator 和 Worker 两部分组成。Worker 是执行 task 的节点,而 task 会在执行过程中被拆分为 pipeline,并在 Driver 中执行。Driver 默认的执行逻辑是采用时间片轮转的方式,即每当时间片消耗完毕,任务会重新进入 pending 队列等待下次调度。这种设计使得 Presto 能够高效地处理各种类型的计算任务,并且具有良好的可扩展性和容错性
本文着重探讨了 Presto 中 pending 队列的逻辑。Presto 的 pending 队列有两个实现:PriorityBlockingQueue 和 MultilevelSplitQueue。PriorityBlockingQueue 是 JDK 内置的队列,前期实现主要是修改了其 compare 逻辑。而 MultilevelSplitQueue 是 Presto 自行实现的队列逻辑,是目前正在使用的队列。
Presto 目前社区分裂为两个 presto 和 trino,本文使用的是 prestodb 的代码。
PriorityBlockingQueue
参考代码分支:0.165
同一个优先队列存放pending的任务,通过优先级和执行时间(时间越长优先级越低)两个来决定执行哪个任务。
整体思路是:
- 执行时间越长的查询就是慢查询,会调低其优先级,使得小查询可以更早获得调度执行
- 优先级分为 0 ~ 4,一共5个级别,最低是4,这里限制了最低是为了防止大查询一直降低优先级导致饿死
- PriorityBlockingQueue pendingSplits 优先队列,compareTo方法:比较优先级,优先级数字越小,优先级越高
- 优先级 < 4:执行使用的时间越短优先级越高
- 优先级 = 4:根据上次执行的时间,上次执行的时间越早优先级越高,FIFO
整体的思路:使用了优先级,执行的时间越长,优先级会降低,来保证小查询得到更早的执行
- PrioritizedSplitRunner 包含了一个 priorityLevel 的参数,用来标记优先级
@Override public int compareTo(PrioritizedSplitRunner o) { int level = priorityLevel.get(); // 比较优先级大小 int result = Integer.compare(level, o.priorityLevel.get()); if (result != 0) { return result; } if (level < 4) { // 执行时间短的会优先执行 result = Long.compare(taskScheduledNanos.get(), o.taskScheduledNanos.get()); } else { // 最后一次执行ticker比较,保证 优先级为4的可以相对公平调度 result = Long.compare(lastRun.get(), o.lastRun.get()); } if (result != 0) { return result; } // workerId 是递增的,就是FIFO的顺序 return Long.compare(workerId, o.workerId); }
- 每次 process 会统计执行消耗的时间,用这个时间来更新 priorityLevel
priorityLevel.set(calculatePriorityLevel(taskScheduledTimeNanos));
- alculatePriorityLevel 更新 priorityLevel,分为5个档位:0 ~ 4,根据执行的时间来进行划分
public static int calculatePriorityLevel(long threadUsageNanos) { long millis = NANOSECONDS.toMillis(threadUsageNanos); int priorityLevel; if (millis < 1000) { priorityLevel = 0; } else if (millis < 10_000) { priorityLevel = 1; } else if (millis < 60_000) { priorityLevel = 2; } else if (millis < 300_000) { priorityLevel = 3; } else { priorityLevel = 4; } return priorityLevel; }
MultilevelSplitQueue
MultilevelSplitQueue 是当前版本 presto 的执行队列实现。
整体的调度策略是基于执行时间的公平调度(fair),不可以自行设置查询的优先级,查询的优先级是执行的过程中根据执行的时间动态调度,且每个优先级都有独立的优先队列。选择任务的时候,先选择优先级,再从对应的优先队列中拿取任务来执行。每个优先级预期可以执行的时间是通过level 0优先级的时间 和 levelTimeMultiplier 系数来一起决定的(优先级越低,执行时间越短)。
MultilevelSplitQueue waitingSplits 多级优先队列,优先级有0~4共5个优先级,每个优先级对应一个优先队列。
- waitingSplits.take() 获取任务
- waitingSplits.offer(split) 将未完成的split添加回队列
任务执行逻辑
TaskRunner 是执行的逻辑,其中最关键的就是 run 方法,关键部分增加了注释。
@Override
public void run()
{
try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
while (!closed && !Thread.currentThread().isInterrupted()) {
// select next worker
final PrioritizedSplitRunner split;
try {
split = waitingSplits.take();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
try (SetThreadName splitName = new SetThreadName(threadId)) {
RunningSplitInfo splitInfo = new RunningSplitInfo(ticker.read(), threadId, Thread.currentThread(), split);
runningSplitInfos.add(splitInfo);
runningSplits.add(split);
ListenableFuture<?> blocked;
try {
blocked = split.process();
}
finally {
runningSplitInfos.remove(splitInfo);
runningSplits.remove(split);
}
if (split.isFinished()) {
// 任务结束
log.debug("%s is finished", split.getInfo());
splitFinished(split);
}
else {
if (blocked.isDone()) {
// 没有 blocked 的直接添加回 waiting 队列
waitingSplits.offer(split);
}
else {
// 等待 blocked 结束再添加回 waiting 队列
blockedSplits.put(split, blocked);
blocked.addListener(() -> {
blockedSplits.remove(split);
// reset the level priority to prevent previously-blocked splits from starving existing splits
split.resetLevelPriority();
waitingSplits.offer(split);
}, executor);
}
}
}
catch (Throwable t) {
// ignore random errors due to driver thread interruption
// ...
splitFinished(split);
}
}
}
finally {
// unless we have been closed, we need to replace this thread
if (!closed) {
addRunnerThread();
}
}
}
核心逻辑
MultilevelSplitQueue 重要成员变量:
- LEVEL_THRESHOLD_SECONDS = {0, 1, 10, 60, 300};
- 新进来的split,优先级为0
- computeLevel 根据这个数组计算优先级
List<PriorityQueue<PrioritizedSplitRunner>>
levelWaitingSplits:- list index 就是对应的优先级,目前优先级为1~4一共五个优先级
- PriorityQueue 就是每个优先级对应的优先队列
- levelScheduledTime:
- 每个优先级的执行时间(不会清零,下标对应的是优先级)
- levelMinPriority:
- 每个优先级当前最低的 priority(take 的时候会更新)
- levelTimeMultiplier:默认为2
- 优先级更高的level 会拥有比其低一级拥有 levelTimeMultiplier 倍的执行数据(例如:level 0 有 2倍的level 1 的执行时间)
MultilevelSplitQueue 关键的方法:
- offer(split):添加 split 到队列中
- 为了避免某个优先级队列为空的时候,导致其他优先级的队列饿死,所以会在添加split的时候进行补偿,将levelScheduledTime直接补偿到expected运行时间
/**
* During periods of time when a level has no waiting splits, it will not accumulate
* scheduled time and will fall behind relative to other levels.
* <p>
* This can cause temporary starvation for other levels when splits do reach the
* previously-empty level.
* <p>
* To prevent this we set the scheduled time for levels which were empty to the expected
* scheduled time.
*/
public void offer(PrioritizedSplitRunner split)
{
checkArgument(split != null, "split is null");
split.setReady();
int level = split.getPriority().getLevel();
lock.lock();
try {
// 空的优先级队列会补偿其执行时间
if (levelWaitingSplits.get(level).isEmpty()) {
// Accesses to levelScheduledTime are not synchronized, so we have a data race
// here - our level time math will be off. However, the staleness is bounded by
// the fact that only running splits that complete during this computation
// can update the level time. Therefore, this is benign.
long level0Time = getLevel0TargetTime();
long levelExpectedTime = (long) (level0Time / Math.pow(levelTimeMultiplier, level));
long delta = levelExpectedTime - levelScheduledTime[level].get();
levelScheduledTime[level].addAndGet(delta);
}
levelWaitingSplits.get(level).offer(split);
notEmpty.signal();
}
finally {
lock.unlock();
}
}
- take:从多级队列中拿取一个split,用于执行
- pollSplit 是选取ratio(预期执行时间 / 实际执行时间)> 1 的优先级,低优先级的会被先选到,因为从高到低遍历一次,然后从 levelWaitingSplits.get(selectedLevel).poll() 来获取
/**
* Presto attempts to give each level a target amount of scheduled time, which is configurable
* using levelTimeMultiplier.
* <p>
* This function selects the level that has the the lowest ratio of actual to the target time
* with the objective of minimizing deviation from the target scheduled time. From this level,
* we pick the split with the lowest priority.
*/
@GuardedBy("lock")
private PrioritizedSplitRunner pollSplit()
{
long targetScheduledTime = getLevel0TargetTime();
double worstRatio = 1;
int selectedLevel = -1;
for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
if (!levelWaitingSplits.get(level).isEmpty()) {
long levelTime = levelScheduledTime[level].get();
// ratio = 预期执行时间 / 实际执行时间
double ratio = levelTime == 0 ? 0 : targetScheduledTime / (1.0 * levelTime);
if (selectedLevel == -1 || ratio > worstRatio) {
worstRatio = ratio;
selectedLevel = level;
}
}
targetScheduledTime /= levelTimeMultiplier;
}
// 队列中都没有split需要执行
if (selectedLevel == -1) {
return null;
}
PrioritizedSplitRunner result = levelWaitingSplits.get(selectedLevel).poll();
checkState(result != null, "pollSplit cannot return null");
return result;
}
- updatePriority 的逻辑:更新优先级的补偿机制
- 优先级变更的时候会更新从低到高遍历所有优先级的 levelScheduledTime,将执行慢的查询的时间分给多个level
- 因为如果一次性将长时间执行(LEVEL_CONTRIBUTION_CAP限制其大小)的时间一次性加到 某一个优先级,会导致一段时间内该优先级的任务被饿死
其他说明:
- split.resetLevelPriority():在 TaskRunner.run 方法中,会在 blocked split 被唤醒后进行reset,主要是为了防止其优先级过高,导致其他队列中执行的split饿死
更新优先级
TaskPriorityTracker 包含了 updatePriority 和 resetLevelPriority 两个方法,优先级均在这个类进行更新,有两种策略:
- TASK_FAIR:优先级的粒度是 task 级别的
- QUERY_FAIR:优先级的粒度是 query 级别的
// TaskExecutor 的构造函数中
Function<QueryId, TaskPriorityTracker> taskPriorityTrackerFactory;
switch (taskPriorityTracking) {
case TASK_FAIR:
taskPriorityTrackerFactory = (queryId) -> new TaskPriorityTracker(splitQueue);
break;
case QUERY_FAIR:
LoadingCache<QueryId, TaskPriorityTracker> cache = CacheBuilder.newBuilder()
.weakValues()
.build(CacheLoader.from(queryId -> new TaskPriorityTracker(splitQueue)));
taskPriorityTrackerFactory = cache::getUnchecked;
break;
default:
throw new IllegalArgumentException("Unexpected taskPriorityTracking: " + taskPriorityTracking);
}
相关 PR
MultilevelSplitQueue 的主要实现 PR:
- Change local scheduling to guarantee a time share to levels
- Change local scheduling to be fair within a level
总结
MultilevelSplitQueue 是一种公平且高效的调度策略,与操作系统的调度策略具有一定的相似性。它通过考虑执行时间的长度来进行调度,更加偏向于小作业,从而有利于提高整体吞吐量。为了进一步优化调度效果,可以考虑添加一个 ratio 值,对时间(实际时间 * 系数)的快慢进行调整干预,使得执行时间变化较慢的任务有机会获得更多的执行时间。基于这个思路,可以设计一套优先级策略,确保在资源竞争的情况下,优先处理执行时间较长的任务,以平衡整体执行时间,提高系统的公平性和效率。