[转]FUTEX_SWAP补丁分析-SwitchTo 如何大幅度提升切换性能?
2021-07-21 14:27:44 Author: blog.csdn.net(查看原文) 阅读量:83 收藏

作者简介

胡哲宁,西安邮电大学计算机科学与技术专业大二学生。

Google SwitchTo

由于协程本身对操作系统的不可见性,协程中出现的 BUG 往往不能通过一些已有的工具去排查。在谷歌内部有一套闭源的用户态任务调度框架 SwitchTo, 这个框架可以为谷歌提供延迟敏感的服务,对运行的内容进行细粒度的用户空间控制/调度,它可以让内核来实现上下文的切换,同时将任务何时切换,何时恢复的工作交给了用户态的程序来做,这样既可以实现在任务间协作式切换的功能,又可以不丧失内核对于任务的控制和观察能力。谷歌去年恢复尝试将其 SwitchTo API 上游引入 Linux。相关补丁见:[1],[2],[3],[4].

1pid_t switchto_wait(timespec *timeout)
2/*  Enter an 'unscheduled state', until our control is re-initiated by another thread or external event (signal). */
3void switchto_resume(pid_t tid)
4/* Resume regular execution of tid */
5pid_t switchto_switch(pid_t tid)
6/* Synchronously transfer control to target sibling thread, leaving the current thread unscheduled.Analogous to:Atomically { Resume(t1); Wait(NULL); }
7*/

这是使用 SwitchTo 和使用其他线程间切换的组件的上下文切换性能对比:

BenchmarkTime(ns)CPU(ns)Iterations
BM_Futex290519581000000
BM_GoogleMutex310223261000000
BM_SwitchTo1791783917412
BM_SwitchResume273415541000000

可以看到在使用 SwitchTo 后切换的性能比其他组件提高了一个数量级别。

SwitchTo 是如何做到在切换性能上大幅度领先的呢?我们暂时可能无法看到它们,但让我们来看看 Peter Oskolkov 向 LKML(Linux Kernel Mail List) 提出的补丁中有关 futex_swap() 的实现。可以确定的是,SwitchTo 构建在这个内核函数之上。

什么是 futex

futex 全称 fast user-space locking,快速用户空间互斥锁,作为内核中一种基本的同步原语,它提供了非常快速的无竞争锁获取和释放,用于构建复杂的同步结构:互斥锁、条件变量、信号量等。由于 futex 的一些机制和使用过于复杂,glibc 没有为 futex 提供包装器,但我们仍然可以使用 syscall 来调用这个 极其 hack 的系统调用。

1static int futex(uint32_t *uaddr, int futex_op, uint32_t val,
2                 const struct timespec *timeout, uint32_t *uaddr2,
3                 uint32_t val3)
4 {
5  return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
6}
  • uaddr: 一个四字节的用户空间地址。多个任务间可以通过 *uaddr 的值的变化来控制阻塞或者运行。

  • futex_op: 用于控制 futex 执行的命令 如 FUTEX_WAITFUTEX_WAKEFUTEX_LOCK_PIFUTEX_UNLOCK_PI

  • val: 在不同的 futex_op 具有不同的含义,如在 futex(uaddr, FUTEX_WAKE) 中作为唤醒等待在该 futex 上所有任务的数量。

  • timeout: 作为等待(如 FUTEX_WAIT)的超时时间。

  • uaddr2: uaddr2 参数是一个四字节的用户空间地址 在需要的场景使用(如后文的 FUTEX_SWAP )。

  • val3: 整数参数val3的解释取决于在操作上。

为什么 futex “快速”?

由于用户模式和内核模式之间的上下文切换很昂贵,futex 实现的同步结构会尽可能多地留在用户空间,这意味着它们只需要执行更少的系统调用。futex 的状态存储在用户空间变量中,futex 可以通过一些原子操作在没有竞争的情况下更改 futex 的状态,而无需系统调用的开销。

futex_wait() 和 futex_wake()

在看 futex_swap() 之前让我们先看看 内核中 与 futex 最重要的两个内核函数:

1static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset);

简单来说 对于 futex_wait() 有用的参数就只有 uaddrvalabs_time,就像 futex_wait(uaddr,val,abs_time)。其含义是当这个用户空间地址 uaddr的值等于传入的参数 val 的时候睡眠,即 if (*uaddr == val) wait(). futex_wake() 可以将它唤醒,另外还可以通过指定超时时间来超时唤醒。

 1static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
 2              ktime_t *abs_time, u32 bitset)
 3{
 4    struct hrtimer_sleeper timeout, *to;
 5    struct restart_block *restart;
 6    struct futex_hash_bucket *hb;
 7    struct futex_q q = futex_q_init;
 8    int ret;
 9
10    if (!bitset)
11        return -EINVAL;
12    q.bitset = bitset;
13  /* 设置定时器 */
14    to = futex_setup_timer(abs_time, &timeout, flags,
15                   current->timer_slack_ns);
16retry:
17    /*
18     * Prepare to wait on uaddr. On success, holds hb lock and increments
19     * q.key refs.
20     */
21    /* 获取哈希桶自旋锁 如果 *uaddr == val return -EWOULDBLOCK 否则返回 0 */
22    ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
23    if (ret)
24        goto out;
25
26    /* queue_me and wait for wakeup, timeout, or a signal. */
27    /*将当前任务状态改为TASK_INTERRUPTIBLE,并将当前任务插入到futex等待队列,释放哈希桶自旋锁,然后重新调度*/
28    futex_wait_queue_me(hb, &q, to);
29
30    /* If we were woken (and unqueued), we succeeded, whatever. */
31    ret = 0;
32    /* unqueue_me() drops q.key ref */
33  /* 如果 unqueue_me 返回 0 表示已经被删除则是正常唤醒跳到 out 否则则是超时触发 */
34    if (!unqueue_me(&q))
35        goto out;
36    ret = -ETIMEDOUT;
37    if (to && !to->task)
38        goto out;
39
40    /*
41     * We expect signal_pending(current), but we might be the
42     * victim of a spurious wakeup as well.
43     */
44    if (!signal_pending(current))
45        goto retry;
46
47    ret = -ERESTARTSYS;
48    if (!abs_time)
49        goto out;
50
51    restart = &current->restart_block;
52    restart->futex.uaddr = uaddr;
53    restart->futex.val = val;
54    restart->futex.time = *abs_time;
55    restart->futex.bitset = bitset;
56    restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;
57
58    ret = set_restart_fn(restart, futex_wait_restart);
59
60out:
61    if (to) {
62    /* 即将结束,取消定时任务 */
63        hrtimer_cancel(&to->timer);
64        destroy_hrtimer_on_stack(&to->timer);
65    }
66    return ret;
67}

futex 内部采用了哈希表的数据结构来保存那些需要睡眠的任务。通过用户空间地址 uaddr,flag,以及 futex 的读写状态可以计算出相同的 key 值,将需要睡眠的任务的 task_struct放到对应的哈希桶上的优先链表的节点中。

futex_wait() 流程:

  1. 寻找 futex 对应的 key,获取 key 对应的哈希桶。

  2. 获取哈希桶自旋锁,如果 *uaddr == val 返回错误给用户态。

  3. 否则将当前任务状态改为 TASK_INTERRUPTIBLE,并将当前任务插入到 futex 等待队列,释放哈希桶自旋锁,然后调度器重新调度。

  4. 从睡眠中苏醒,进行超时和唤醒两种情况的相应处理,返回用户态。

1static int futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset);

futex_wake() 的参数稍微简单一些,最重要的只有一个用户地址 uaddr,以及触发唤醒的任务数最大值 nr_wake

 1/*
 2 * Wake up waiters matching bitset queued on this futex (uaddr).
 3 */
 4static int
 5futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
 6{
 7    struct futex_hash_bucket *hb;
 8    struct futex_q *this, *next;
 9    union futex_key key = FUTEX_KEY_INIT;
10    int ret;
11    DEFINE_WAKE_Q(wake_q);
12
13    if (!bitset)
14        return -EINVAL;
15
16  /* 寻找 futex 对应的 key */
17    ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, FUTEX_READ);
18    if (unlikely(ret != 0))
19        return ret;
20
21  /* 获取 key 对应的哈希桶 */
22    hb = hash_futex(&key);
23
24    /* Make sure we really have tasks to wakeup */
25  /* 如果哈希桶桑没有等待者...那么谁也不需要被唤醒 */
26    if (!hb_waiters_pending(hb))
27        return ret;
28
29  /* 获取当前哈希桶的自旋锁 */
30    spin_lock(&hb->lock);
31
32  /* 遍历这个哈系桶上的优先链表 */
33    plist_for_each_entry_safe(this, next, &hb->chain, list) {
34    /* 如果 this 项的 key 与 futex 对应的 key 相同 说明该项在等待 futex */
35        if (match_futex (&this->key, &key)) {
36            if (this->pi_state || this->rt_waiter) {
37                ret = -EINVAL;
38                break;
39            }
40
41            /* Check if one of the bits is set in both bitsets */
42            if (!(this->bitset & bitset))
43                continue;
44
45      /* 将 this 添加到唤醒队列 wake_q 中 */
46            mark_wake_futex(&wake_q, this);
47      /* ret此时为0 递增至 nr_wake 最大唤醒任务数量则退出循环 */
48            if (++ret >= nr_wake)
49                break;
50        }
51    }
52
53    spin_unlock(&hb->lock);
54    wake_up_q(&wake_q);
55    return ret;
56}

futex_wake() 流程:

  1. 寻找 futex 对应的 key,获取 key 对应的哈希桶。

  2. 获取哈希桶的自旋锁,遍历这个哈系桶上的优先链表,如果当前任务的 keyfutex 对应的 key 相同,说明该任务在等待 futex,将当前任务添加到唤醒队列 wake_q 中,如果达到了 nr_wake 个,则退出循环。

  3. 释放哈希桶自旋锁,唤醒队列 wake_q 中每一个任务。

因此,通过 futex_wait()futex_wake(),我们可以实现任务的等待和唤醒,见 [5] man 手册中的小 demo。

FUTEX_SWAP 相关补丁

基于以上的了解,现在我们来看 Peter Oskolkov 向内核提交的 FUTEX_SWAP 补丁系列。首先看 [4] 中有关 FUTEX_SWAP 的相关测试补丁,其中关键的功能函数 futex_swap_op() 引起了我们的注意:

 1void futex_swap_op(int mode, futex_t *futex_this, futex_t *futex_that)
 2{
 3    int ret;
 4
 5    switch (mode) {
 6    case SWAP_WAKE_WAIT:
 7        futex_set(futex_this, FUTEX_WAITING);
 8        futex_set(futex_that, FUTEX_WAKEUP);
 9        futex_wake(futex_that, 1, FUTEX_PRIVATE_FLAG);
10        futex_wait(futex_this, FUTEX_WAITING, NULL, FUTEX_PRIVATE_FLAG);
11        if (*futex_this != FUTEX_WAKEUP) {
12            fprintf(stderr, "unexpected futex_this value on wakeup\n");
13            exit(1);
14        }
15        break;
16
17    case SWAP_SWAP:
18        futex_set(futex_this, FUTEX_WAITING);
19        futex_set(futex_that, FUTEX_WAKEUP);
20        ret = futex_swap(futex_this, FUTEX_WAITING, NULL,
21                 futex_that, FUTEX_PRIVATE_FLAG);
22        if (ret < 0 && errno == ENOSYS) {
23            /* futex_swap not implemented */
24            perror("futex_swap");
25            exit(1);
26        }
27        if (*futex_this != FUTEX_WAKEUP) {
28            fprintf(stderr, "unexpected futex_this value on wakeup\n");
29            exit(1);
30        }
31        break;
32
33    default:
34        fprintf(stderr, "unknown mode in %s\n", __func__);
35        exit(1);
36    }
37}

其中比较了使用 FUTEX_WAIT, FUTEX_WAKE 实现线程切换以及使用 FUTEX_SWAP 实现线程切换的两种方式。

其中 通过调用参数含有 futex_thisfutex_thatfutex_swap()

1ret = futex_swap(futex_this, FUTEX_WAITING, NULL,
2                 futex_that, FUTEX_PRIVATE_FLAG);

代替了下面 futex_wake()futex_wait() 的两步操作,

1futex_wake(futex_that, 1, FUTEX_PRIVATE_FLAG);
2futex_wait(futex_this, FUTEX_WAITING, NULL, FUTEX_PRIVATE_FLAG);

实现了让当前线程睡眠,并切换到指定线程的作用。

 1$ ./futex_swap -i 100000
 2
 3
 4------- running SWAP_WAKE_WAIT -----------
 5
 6
 7completed 100000 swap and back iterations in 820683263 ns: 4103 ns per swap
 8PASS
 9
10
11------- running SWAP_SWAP -----------
12
13
14completed 100000 swap and back iterations in 124034476 ns: 620 ns per swap
15PASS

可见在 100k 级别的任务切换批处理上,使用新接口 futex_swap() 的上下文切换性能要比之前好很多。

 1/*
 2- * Wake up waiters matching bitset queued on this futex (uaddr).
 3+ * Prepare wake queue matching bitset queued on this futex (uaddr).
 4  */
 5 static int
 6-futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
 7+prepare_wake_q(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset,
 8+           struct wake_q_head *wake_q)
 9 {
10     struct futex_hash_bucket *hb;
11     struct futex_q *this, *next;
12     union futex_key key = FUTEX_KEY_INIT;
13     int ret;
14-    DEFINE_WAKE_Q(wake_q);
15
16     if (!bitset)
17         return -EINVAL;
18@@ -1629,20 +1629,34 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
19             if (!(this->bitset & bitset))
20                 continue;
21
22-            mark_wake_futex(&wake_q, this);
23+            mark_wake_futex(wake_q, this);
24             if (++ret >= nr_wake)
25                 break;
26         }
27     }
28
29     spin_unlock(&hb->lock);
30-    wake_up_q(&wake_q);
31 out_put_key:
32     put_futex_key(&key);
33 out:
34     return ret;
35 }
36
37+/*
38+ * Wake up waiters matching bitset queued on this futex (uaddr).
39+ */
40+static int
41+futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
42+{
43+    int ret;
44+    DEFINE_WAKE_Q(wake_q);
45+
46+    ret = prepare_wake_q(uaddr, flags, nr_wake, bitset, &wake_q);
47+    wake_up_q(&wake_q);
48+
49+    return ret;
50+}
51+
52 s

首先是通过从 futex_wake() 中抽出一个 prepare_wake_q() 获得 nr_wake 个等待在 futex 的任务并填入到传入的唤醒队列 wake_q 中。

 1+static int futex_swap(u32 __user *uaddr, unsigned int flags, u32 val,
 2+              ktime_t *abs_time, u32 __user *uaddr2)
 3+{
 4+    u32 bitset = FUTEX_BITSET_MATCH_ANY;
 5+    struct task_struct *next = NULL;
 6+    DEFINE_WAKE_Q(wake_q);
 7+    int ret;
 8+
 9+    ret = prepare_wake_q(uaddr2, flags, 1, bitset, &wake_q);
10+    if (!wake_q_empty(&wake_q)) {
11+        /* Pull the first wakee out of the queue to swap into. */
12+        next = container_of(wake_q.first, struct task_struct, wake_q);
13+        wake_q.first = wake_q.first->next;
14+        next->wake_q.next = NULL;
15+        /*
16+         * Note that wake_up_q does not touch wake_q.last, so we
17+         * do not bother with it here.
18+         */
19+        wake_up_q(&wake_q);
20+    }
21+    if (ret < 0)
22+        return ret;
23+
24+    return futex_wait(uaddr, flags, val, abs_time, bitset, next);
25+}

futex_swap() 流程:

  1. 获得等待在 uaddr2 上的预备唤醒队列,记录队列第一个任务为 next,对其他任务则执行唤醒。

  2. uaddr1 执行 futex_wait(),传入 next

我们看看 futex_wait() 上发生了哪些更改:

 1@@ -2600,9 +2614,12 @@ static int fixup_owner(u32 __user *uaddr, struct futex_q *q, int locked)
 2  * @hb:        the futex hash bucket, must be locked by the caller
 3  * @q:        the futex_q to queue up on
 4  * @timeout:    the prepared hrtimer_sleeper, or null for no timeout
 5+ * @next:    if present, wake next and hint to the scheduler that we'd
 6+ *        prefer to execute it locally.
 7  */
 8 static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
 9-                struct hrtimer_sleeper *timeout)
10+                struct hrtimer_sleeper *timeout,
11+                struct task_struct *next)
12 {
13     /*
14      * The task state is guaranteed to be set before another task can
15@@ -2627,10 +2644,27 @@ static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
16          * flagged for rescheduling. Only call schedule if there
17          * is no timeout, or if it has yet to expire.
18          */
19-        if (!timeout || timeout->task)
20+        if (!timeout || timeout->task) {
21+            if (next) {
22+                /*
23+                 * wake_up_process() below will be replaced
24+                 * in the next patch with
25+                 * wake_up_process_prefer_current_cpu().
26+                 */
27+                wake_up_process(next);
28+                put_task_struct(next);
29+                next = NULL;
30+            }
31             freezable_schedule();
32+        }
33     }
34     __set_current_state(TASK_RUNNING);
35+
36+    if (next) {
37+        /* Maybe call wake_up_process_prefer_current_cpu()? */
38+        wake_up_process(next);
39+        put_task_struct(next);
40+    }
41 }
42
43@@ -2710,7 +2744,7 @@ static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
44 }
45
46 static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
47-              ktime_t *abs_time, u32 bitset)
48+              ktime_t *abs_time, u32 bitset, struct task_struct *next)
49 {
50     struct hrtimer_sleeper timeout, *to;
51     struct restart_block *restart;
52@@ -2734,7 +2768,8 @@ static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
53         goto out;
54
55     /* queue_me and wait for wakeup, timeout, or a signal. */
56-    futex_wait_queue_me(hb, &q, to);
57+    futex_wait_queue_me(hb, &q, to, next);
58+    next = NULL;
59
60     /* If we were woken (and unqueued), we succeeded, whatever. */
61     ret = 0;
62@@ -2767,6 +2802,10 @@ static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
63     ret = -ERESTART_RESTARTBLOCK;
64
65 out:
66+    if (next) {
67+        wake_up_process(next);
68+        put_task_struct(next);
69+    }
70     if (to) {
71         hrtimer_cancel(&to->timer);
72         destroy_hrtimer_on_stack(&to->timer);
73@@ -2774,7 +2813,6 @@ static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
74     return ret;
75 }

我们可以看到 futex_wait() 传入的 next 任务在两种情况下会被唤醒:

  1. 当前任务从对 uaddr 的等待中苏醒,接着在futex_wait结束的时候执行 wake_up_process() 切换到 next 任务。

  2. futex_wait_queue_me() 中等待超时(也代表着当前任务从对锁的等待中结束),执行 wake_up_process()切换到 next 任务。

通过对 futex 的魔改, 我们仿佛有了在用户态使用 switch_to() 指定任务切换的能力,这真是让人感到兴奋!这就是用户模式线程的用途:极低的切换开销,意味着我们操作系统可以支持的数以千计的线程可以提高到 10 倍以上甚至百万级别!

不过很可惜,该补丁似乎被 Linux 内核社区遗弃。如今补丁作者 Peter Oskolkov 正在试图向 Linux 内核引入另外一套 Google 的用户态任务调度框架 Fiber [7],来支持 Linux 世界中 c 系程序员对协作式任务切换的需求。

参考资料

[1] https://lore.kernel.org/lkml/[email protected]/

[2] https://lore.kernel.org/lkml/[email protected]/

[3] https://lore.kernel.org/lkml/[email protected]/

[4] https://lore.kernel.org/lkml/[email protected]/

[5] https://man7.org/linux/man-pages/man2/futex.2.html

[6] https://lwn.net/Articles/360699/

[7] https://lore.kernel.org/lkml/[email protected]/

本文来源于 陈莉君老师 “Linux内核之旅”公众号。


文章来源: https://blog.csdn.net/21cnbao/article/details/118980971
如有侵权请联系:admin#unsafe.sh