改进 async-await 的 “Future is not Send” 诊断

2019 年 10 月 11 日 · David Wood 代表 异步基础工作组

Async-await 即将在 1.39 版本中稳定发布(仅一个月后!),正如上个月在 “异步基础更新:是时候打磨了!” 帖子中宣布的那样,异步基础工作组已将其重点转移到打磨上。这篇文章将重点介绍该重点的一个方面,即诊断改进,特别是工作组一直在对曾经无用的 “future is not send” 诊断进行的改进。

为什么我的 future 没有实现 Send

Async-await 的一个主要应用场景是多线程环境,在这种环境中,拥有一个可以发送到其他线程的 future 是非常理想的。这可能如下所示(为了简洁,这里没有任何线程,只是要求 future 实现 std::marker::Send

use std::sync::{Mutex, MutexGuard};

fn is_send<T: Send>(t: T) { }

async fn foo() {
    bar(&Mutex::new(22)).await
}

async fn bar(x: &Mutex<u32>) {
    let g = x.lock().unwrap();
    baz().await
}

async fn baz() { }

fn main() {
    is_send(foo());
}

当我们尝试编译它时,我们会得到一个笨拙且难以理解的诊断

error[E0277]: `std::sync::MutexGuard<'_, u32>` cannot be sent between threads safely
  --> src/main.rs:23:5
   |
23 |     is_send(foo());
   |     ^^^^^^^ `std::sync::MutexGuard<'_, u32>` cannot be sent between threads safely
   |
   = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, u32>`
   = note: required because it appears within the type `for<'r, 's> {&'r std::sync::Mutex<u32>, std::sync::MutexGuard<'s, u32>, impl std::future::Future, ()}`
   = note: required because it appears within the type `[static generator@src/main.rs:13:30: 16:2 x:&std::sync::Mutex<u32> for<'r, 's> {&'r std::sync::Mutex<u32>, std::sync::MutexGuard<'s, u32>, impl std::future::Future, ()}]`
   = note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:13:30: 16:2 x:&std::sync::Mutex<u32> for<'r, 's> {&'r std::sync::Mutex<u32>, std::sync::MutexGuard<'s, u32>, impl std::future::Future, ()}]>`
   = note: required because it appears within the type `impl std::future::Future`
   = note: required because it appears within the type `impl std::future::Future`
   = note: required because it appears within the type `for<'r> {impl std::future::Future, ()}`
   = note: required because it appears within the type `[static generator@src/main.rs:9:16: 11:2 for<'r> {impl std::future::Future, ()}]`
   = note: required because it appears within the type `std::future::GenFuture<[static generator@src/main.rs:9:16: 11:2 for<'r> {impl std::future::Future, ()}]>`
   = note: required because it appears within the type `impl std::future::Future`
   = note: required because it appears within the type `impl std::future::Future`
note: required by `is_send`
  --> src/main.rs:5:1
   |
5  | fn is_send<T: Send>(t: T) {
   | ^^^^^^^^^^^^^^^^^^^^^^^^^

这…不太好。让我们分解一下正在发生的事情,并理解这个错误想要告诉我们什么。

fn main() {
    is_send(foo());
}

main 中,我们调用 foo 并将返回值传递给 is_sendfoo 是一个 async fn,所以它不返回 ()(你可能期望一个没有指定返回类型的函数)。相反,它返回 impl std::future::Future<Output = ()>,某种实现 std::future::Future 的未命名类型

async fn foo() {
    bar(&Mutex::new(22)).await
}

// becomes...

fn foo() -> impl std::future::Future<Output = ()> {
    async move {
        bar(&Mutex::new(22)).await
    }
}

如果你想了解更多关于 async-await 转换的信息,可以阅读 异步书籍的 async/.await 入门章节

fn is_send<T: Send>(t: T) { }

看起来我们得到的错误是因为 foo 返回的 future 不满足 is_sendT: Send 约束。

异步函数是如何实现的?

为了解释为什么我们的 future 没有实现 Send,我们首先需要更多地了解 async-await 在底层做了什么。rustc 使用生成器来实现 async fn,生成器是一种不稳定的语言特性,用于可恢复函数,类似于你可能从其他语言中熟悉的协程。生成器像枚举一样布局,其变体包含所有在 await 点(它会转化为生成器的 yield)之间使用的变量。

async fn bar(x: &Mutex<u32>) {
    let g = x.lock().unwrap();
    baz().await // <- await point (suspend #0), `g` and `x` are in use before await point
} // <- `g` and `x` dropped here, after await point
enum BarGenerator {
    // `bar`'s parameters.
    Unresumed { x: &Mutex<u32> },

    Suspend0 {
        // Future returned by `baz`, which is being polled.
        baz_future: BazGenerator,

        // Locals that are used across the await point.
        x: &Mutex<u32>,
        g: MutexGuard<'_, u32>,
    },

    Returned { value: () }
}

如果你想了解更多关于 async fn 实现细节的信息,那么 Tyler Mandry 写了一篇 优秀博文,深入探讨了他们在这方面的工作,绝对值得一读。

那么,为什么我的 future 没有实现 Send

我们现在知道 async fn 在幕后被表示为一个枚举。在同步 Rust 中,当 编译器确定合适时,你会习惯于你的类型自动实现 Send - 通常当你的类型的所有字段也实现 Send 时。由此可见,如果它里面的所有类型都实现 Send,那么代表我们 async fn 的类似枚举的类型也会实现 Send

换句话说,如果所有在 .await 点之间持有的类型都实现了 Send,那么 future 就可以安全地在线程之间发送。这种行为很有用,因为它允许我们编写与 async-await 无缝互操作的通用代码,但如果没有诊断支持,我们会得到令人困惑的错误消息。

那么,示例中哪个类型有问题?

回到我们的例子,future 必须在不实现 Send.await 点持有一个类型,但是在哪里呢?这是诊断改进旨在帮助回答的主要问题。让我们先看看 foo

async fn foo() {
    bar(&Mutex::new(22)).await
}

foo 调用 bar,传递一个 std::sync::Mutex 的引用并取回一个 future,然后在 await 它。

async fn bar(x: &Mutex<u32>) {
    let g: MutexGuard<u32> = x.lock().unwrap();
    baz().await
} // <- `g` is dropped here

barawaiting baz 之前锁定互斥锁。std::sync::MutexGuard<u32> 不实现 Send 并且在 baz().await 点存在(因为 g 在作用域结束时被丢弃),这导致整个 future 不实现 Send

这从错误中并不明显:我们必须知道 future 可能会根据它们捕获的类型来实现 Send并且自己找到在 await 点存在的类型。

幸运的是,异步基础工作组一直在努力改进这个错误,并且 在 nightly 版本中,我们看到以下诊断

error[E0277]: `std::sync::MutexGuard<'_, u32>` cannot be sent between threads safely
  --> src/main.rs:23:5
   |
5  | fn is_send<T: Send>(t: T) {
   |    -------    ---- required by this bound in `is_send`
...
23 |     is_send(foo());
   |     ^^^^^^^ `std::sync::MutexGuard<'_, u32>` cannot be sent between threads safely
   |
   = help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, u32>`
note: future does not implement `std::marker::Send` as this value is used across an await
  --> src/main.rs:15:3
   |
14 |   let g = x.lock().unwrap();
   |       - has type `std::sync::MutexGuard<'_, u32>`
15 |   baz().await;
   |   ^^^^^^^^^^^ await occurs here, with `g` maybe used later
16 | }
   | - `g` is later dropped here

好多了!

它是如何工作的?

当 rustc 的 trait 系统确定一个 trait 没有实现时,在这种情况下是 std::marker::Send,它会发出此错误。trait 系统产生一个“义务”链。“义务”是表示绑定(例如 is_send 中的 T: Send)的来源或绑定传播位置的类型。

为了改进此诊断,义务链现在被视为堆栈帧,其中每个义务“帧”表示每个函数对错误的贡献。让我们用一个非常粗略的近似来使它更具体

Obligation {
    kind: DerivedObligation(/* generator that captured `g` */),
    source: /* `Span` type pointing at `bar`'s location in user code */,
    parent: Some(Obligation {
        kind: DerivedObligation(/* generator calling `bar` */),
        source: /* `Span` type pointing at `foo`'s location in user code */,
        parent: Some(Obligation {
            kind: ItemObligation(/* type representing `std::marker::Send` */),
            source: /* `Span` type pointing at `is_send`'s location in user code */,
            cause: None,
        }),
    }),
}

编译器匹配链,期望一个 ItemObligation 和一些包含生成器的 DerivedObligation,这标识了我们要改进的错误。使用这些义务中的信息,rustc 可以构建上面显示的专用错误 - 如果你想查看实际的实现是什么样子的,请查看 PR #64895

如果你有兴趣改进这样的诊断,或者只是修复 bug,请考虑为编译器做出贡献!有很多工作组可以加入,还有资源可以帮助你入门(例如 rustc 开发指南编译器团队文档)。

下一步是什么?

计划并正在努力对这个诊断进行更多改进,使其适用于更多情况,并为 SendSync 提供专门的消息,如下所示

error[E0277]: future cannot be sent between threads safely
  --> src/main.rs:23:5
   |
5  | fn is_send<T: Send>(t: T) {
   |    -------    ---- required by this bound in `is_send`
...
23 |     is_send(foo());
   |     ^^^^^^^ future returned by `foo` is not `Send`
   |
   = help:  future is not `Send` as this value is used across an await
note: future does not implement `std::marker::Send` as this value is used across an await
  --> src/main.rs:15:3
   |
14 |   let g = x.lock().unwrap();
   |       - has type `std::sync::MutexGuard<'_, u32>`
15 |   baz().await;
   |   ^^^^^^^^^^^ await occurs here, with `g` maybe used later
16 | }
   | - `g` is later dropped here