兔子先生

探寻计算机的历史与哲学密码

出于好奇,我想弄明白 Containerd 是如何处理容器 stdio 的,因为我在 k8s 环境中使用 Containerd 作为容器运行时,且观察到容器中的标准 io 为如下形式:

1
2
3
4
5
/proc/1/fd # ls -l
total 0
lrwx------ 1 root root 64 Oct 10 02:25 0 -> /dev/null
l-wx------ 1 root root 64 Oct 10 02:25 1 -> pipe:[45731]
l-wx------ 1 root root 64 Oct 10 02:25 2 -> pipe:[45732]

这是个非常简单的容器,只是往标准输出打印一些内容。从容器进程打开的文件描述符中,我们看到其标准输入被丢弃(通过重定向到 /dev/null ),而标准输出和标准错误都被重定向到 Linux 管道

是的,我对 Containerd 如何做到这一点有着浓厚的兴趣,这驱使我产生了 Debug 容器运行时的念头。

这篇短文仅仅介绍我如何达到 Debug 的目的,并不会介绍 Containerd 如何设置标准 io,这部分内容我会放在接下来想写的《终端闲思录》里面。

Containerd 的生态链位置

首先应该明确一下 Containerd 在整个容器调用链里的位置,我借用一幅容器技术架构图来说明:

图 1-1 容器技术架构

从图中可知,Containerd 通过 CRI 接口上承 k8s,通过 OCI 接口下接 runc,真正创建容器和启动容器的是底层的 runc,所有的 CRI 容器运行时均依赖于 runc 。

你可能已经被 OCI 和 CRI 绕晕了,简言之,这是容器圈里的两大标准:

  • Open Container Initiative (OCI): a set of standards for containers, describing the image format, runtime, and distribution.
  • Container Runtime Interface (CRI) in Kubernetes: An API that allows you to use different container runtimes in Kubernetes.

只要记住 OCI 是容器标准,包扩容器创建,镜像格式等等,而 CRI 是属于 k8s 的接口,每个对接 k8s 的容器运行时都要实现这一套接口才能被 kubelet 调用。

Debug 前的准备

我 Debug 的对象就是实现了 CRI 的 Containerd,但我却面临两个问题:

  1. Containerd 需要 root 权限运行,而我并不想在 root 用户下再配置一次 ide 。
  2. 虽然可以在 root 下使用 delve 直接调试二进制文件,但我并不清楚 kubelet 调用 Containerd 时传递的参数,而我 Debug 的初衷就是想弄清楚 kubelet 传递的参数内容。

所以,我决定 Debug 一个运行中的 Containerd!

这很容易做到,使用 dlv attach pid就可以 Debug Containerd 进程。万事俱备,只欠一套 k8s 环境。所幸,我们有 minikube !

minikube 是在一个虚拟机中运行所有相关组件的,所以有两个问题需要解决:

  1. 在 minikube 的虚拟机中准备 delve 工具。
  2. 准备一个可调试的 Containerd 服务。

第一点很好解决,minikube 虚拟机和 Host 之间是通过一个网桥通信的,使用minikube ssh命令登录到 minikube 虚拟机中,scp 宿主机的工具到 minikube 中就可以了。

通过软件仓库安装的 Containerd 都是经过编译优化的二进制文件,里面缺乏调试信息,因此我们需要编译一份带有调试信息的二进制文件,观察 Containerd 的 makefile 可以发现如下内容:

1
2
3
4
5
6
7
8
ifndef GODEBUG
EXTRA_LDFLAGS += -s -w
DEBUG_GO_GCFLAGS :=
DEBUG_TAGS :=
else
DEBUG_GO_GCFLAGS := -gcflags=all="-N -l"
DEBUG_TAGS := static_build
endif

如果环境变量没有设置 GODEBUG ,那么就启用编译优化,否则增加调试信息,所以我们在 make 之前,执行一下export GODEBUG=1即可编译出可调试的二进制文件了。编译完成后,使用 scp 拷贝到 minikube 中,重新启动 Containerd 即可。

delve 读取的本地文件系统中项目的内容用于显示单步中的代码,所以,还需要将编译时所用的源码 copy 到 minikube 中,并保持原有路径!

开始 Debug

上述准备工作完成后,就可以进入 minikube 虚拟机进行 debug 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
richard@Richard-Manjaro:~ » minikube ssh
docker@minikube:~$ sudo -i
root@minikube:~# ps -ef|grep containerd|grep -v shim
root 504 1 3 06:47 ? 00:00:02 /usr/bin/containerd -l debug
root 873 1 3 06:48 ? 00:00:02 /var/lib/minikube/binaries/v1.27.4/kubelet --bootstrap-kubeconfig=/etc/kubernetes/bootstrap-kubelet.conf --config=/var/lib/kubelet/config.yaml --container-runtime-endpoint=unix:///run/containerd/containerd.sock --hostname-override=minikube --kubeconfig=/etc/kubernetes/kubelet.conf --node-ip=192.168.49.2
root@minikube:~# dlv attach 504
Type 'help' for list of commands.
(dlv) bp
Breakpoint runtime-fatal-throw (enabled) at 0x559126263aa4,0x559126263b84 for (multiple functions)() <multiple locations>:0 (0)
Breakpoint unrecovered-panic (enabled) at 0x559126263f04 for runtime.fatalpanic() /usr/lib/go/src/runtime/panic.go:1188 (0)
print runtime.curg._panic.arg
(dlv) b pkg/cri/server/container_create.go:222
Breakpoint 1 set at 0x559127d44f4d for github.com/containerd/containerd/pkg/cri/server.(*criService).CreateContainer() /home/richard/opensource/containerd/pkg/cri/server/container_create.go:222
(dlv) b services/tasks/local.go:167
Breakpoint 2 set at 0x55912734f072 for github.com/containerd/containerd/services/tasks.(*local).Create() /home/richard/opensource/containerd/services/tasks/local.go:167
(dlv) bp
Breakpoint runtime-fatal-throw (enabled) at 0x559126263aa4,0x559126263b84 for (multiple functions)() <multiple locations>:0 (0)
Breakpoint unrecovered-panic (enabled) at 0x559126263f04 for runtime.fatalpanic() /usr/lib/go/src/runtime/panic.go:1188 (0)
print runtime.curg._panic.arg
Breakpoint 1 (enabled) at 0x559127d44f4d for github.com/containerd/containerd/pkg/cri/server.(*criService).CreateContainer() /home/richard/opensource/containerd/pkg/cri/server/container_create.go:222 (0)
Breakpoint 2 (enabled) at 0x55912734f072 for github.com/containerd/containerd/services/tasks.(*local).Create() /home/richard/opensource/containerd/services/tasks/local.go:167 (0)
(dlv) c
> github.com/containerd/containerd/services/tasks.(*local).Create() /home/richard/opensource/containerd/services/tasks/local.go:167 (hits goroutine(2264):1 total:1) (PC: 0x55912734f072)
162: monitor runtime.TaskMonitor
163: v2Runtime runtime.PlatformRuntime
164: }
165:
166: func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
=> 167: container, err := l.getContainer(ctx, r.ContainerID)
168: if err != nil {
169: return nil, errdefs.ToGRPC(err)
170: }
171: checkpointPath, err := getRestorePath(container.Runtime.Name, r.Options)
172: if err != nil {
(dlv)

进入虚拟机后,切换到 root 执行 delve 指令 dlv attach 504,我在此设置了两个断点:

  1. pkg/cri/server/container_create.go:222 这是 Containerd 的 CRI 实现里创建容器的接口函数,此函数隶属于一个 grpc server,当我们使用 kubectl 创建工作负载时,会触发此断点。
  2. services/tasks/local.go:167 这是 Containerd 将 CRI 请求转化为任务事件的创建入口。

使用 kubectl 提交一个负载就可以单步调试啦 !O(∩_∩)O~

一些注意事项

  1. 由图 1-1 可知,k8s 使用 CRI 与容器运行时交互,所以确保 minikube 的运行时使用 Containerd,而不是 docker。docker 虽然也会使用 Containerd,但这种情况下 k8s 不会走 Containerd 的 CRI 接口,相反走的是 docker-shim 的接口,而后者已在 k8s 1.24 中被抛弃。

    1
    2
    3
    richard@Richard-Manjaro:~/playground/k8s » ikube get nodes -o wide
    NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
    minikube NotReady control-plane 5d23h v1.27.4 192.168.49.2 <none> Ubuntu 22.04.2 LTS 5.15.133-1-MANJARO containerd://1.7.6
  2. 如果想通过 Containerd 的日志进行简单的 debug,那么开启 debug 日志即可,方法为在执行文件后追加 —log-level=debug

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    docker@minikube:~$ cat /usr/lib/systemd/system/containerd.service 
    ......

    [Unit]
    Description=containerd container runtime
    Documentation=https://containerd.io
    After=network.target local-fs.target

    [Service]
    ExecStartPre=-/sbin/modprobe overlay

    ExecStart=/usr/bin/containerd -l debug
    ......
  3. 一定要在你的玩具环境中调试,任何公共环境都不应被用来 debug !

参考文献

  1. The differences between Docker, containerd, CRI-O and runc

从来没去了解过 lua,今天抽空看了一下原理,特此记录一下。

在此,我们先不去管 lua 的语法,仅就其神奇之处论之,看看它到底神在哪里。

lua 的神奇之处是它的解释器很小,目前 5.4.6 版本的二进制只有 300k,及其小巧!

唯其如此,它才可以被嵌入到服务端程序的内部!根据 lua 提供的 API,你可以在进程里启动一个 lua 虚拟机,来解释客户端提交上来的 lua 脚本或者字节码。

这还不算完,你还可以在进程里和 lua 脚本联动,在程序中提供 lua 脚本能访问的 API,之后就可以和 lua 脚本互相交换数据了,这就给了程序设计很大的自由空间,向客户端提供了操纵程序逻辑的摇杆。

lua 是用 C 语言写的,天然适配于 C 的后端,如果其它语言想要内嵌,则需要一个相关语言实现的 lua 解释器,比如 gopher-lua 就是 Go 语言实现的 lua 虚拟机和编译器,Go 程序就可以将其嵌入自身来解释 lua 脚本,并提供 API 给脚本使用,促成了程序功能的灵活多变性。

Redis 内嵌了 lua 的代码,提供了 lua 脚本可以访问的 Redis 对象。所以,你就可以在 lua 脚本里对 Redis 进行操作了。

Redis 使用 lua 脚本可以保证一组操作的原子性,我们知道 Redis 中的事务也可以做到,但是使用 lua 则更加灵活,你可以根据不同的条件执行不同的命令,这是事务所做不到的。

综上,lua 基本上就是“灵活” 的代名词!

计算机之所以需要并发,是为了提高 CPU 的利用率,因为大多数任务场景是混合了计算和 I/O 的,那么为了减少响应时间,使任务能够“同时”进行,计算机程序便演化出了并发的概念。

本文就针对 CPU-bound 和 IO-bound 两种极端场景,来聊一聊 Go 和 Tokio 的并发模型在应对不同并发场景下的异同。

《异步 IO 探秘》《对话 ChatGPT 理解 Rust 异步网络 io》 已基于 Linux 平台就 Go 和 Tokio 的网络模型做了简要剖析,大致有如下几个要点:

  1. 底层 Reactor 都是 非阻塞 I/O + epoll 模型。
  2. 事件处理方式不同。Go 紧密结合 goroutine,让网络事件转化为对网络文件描述符感兴趣的 goroutine,并将其注入运行队列,伺机调度;Tokio 基于唤醒机制催动 Executor 去轮询每个 Future,每个 Future 都被编译为一个状态机。
  3. 异步编程是对并发模型的考验。程序必须有能力挂起不能继续的任务,转而执行其它的任务,因为网络文件描述符非阻塞的特性,异步网络 I/O 才会成为可能。
  4. 普通文件 I/O 的异步解决方案需要等待 io_uring 的普及。

关于“异步编程是对并发模型的考验"这一点,可以从 Tokio 官方对于异步编程的论述中得到印证:

What is asynchronous programming?

Most computer programs are executed in the same order in which they are written. The first line executes, then the next, and so on. With synchronous programming, when a program encounters an operation that cannot be completed immediately, it will block until the operation completes. For example, establishing a TCP connection requires an exchange with a peer over the network, which can take a sizeable amount of time. During this time, the thread is blocked.

With asynchronous programming, operations that cannot complete immediately are suspended to the background. The thread is not blocked, and can continue running other things. Once the operation completes, the task is unsuspended and continues processing from where it left off. Our example from before only has one task, so nothing happens while it is suspended, but asynchronous programs typically have many such tasks.

Although asynchronous programming can result in faster applications, it often results in much more complicated programs. The programmer is required to track all the state necessary to resume work once the asynchronous operation completes. Historically, this is a tedious and error-prone task.

With asynchronous programming, operations that cannot complete immediately are suspended to the background,不能继续的任务,要被扔到后台。

The thread is not blocked, and can continue running other things,底层线程不因此而阻塞,继续运行其它任务。

Once the operation completes, the task is unsuspended and continues processing from where it left off,当被异步的操作完成后,被终止的任务恢复执行。

所以,“异步编程”和“异步”这两个概念是有所区别的,“异步”是一种特性,“异步编程”是基于此特性演化出的编程范式。

“异步”并不会使单个任务加速,Netpoller 和 Tokio 都是为了解决高并发网络 I/O 而生的,并不会加速某个单一的任务,而是让多个任务在有限的 CPU资源下,跑出接近单个任务的响应时间,本质上是对 CPU 的充分利用。

在我看来,Tokio 口中异步编程的复杂性,完全来自于性能考量下的权衡,它解决的是高并发网络 I/O 的问题,而不是并发的问题。不同的设计哲学,让它们在并发 CPU-bound 任务上走向了不同的目标。

我们看一个并发 CPU 的例子,此例中有 1 万个并发任务递归计算斐波那契数列。运行环境为 8 核,16G 内存,Manjaro Linux:

Go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"fmt"
"sync"
"time"
)

func fib(n int) int {
if n == 0 || n == 1 {
return n
}
return fib(n-1) + fib(n-2)
}

func main() {
//runtime.GOMAXPROCS(24)
ch := make(chan float64, 8)
done := make(chan struct{})

before := time.Now()
var wg sync.WaitGroup

for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = fib(40)
el := time.Since(before).Seconds()
ch <- el
}()
}

go func() {
for {
select {
case v := <-ch:
fmt.Printf("耗时:%fs", v)
case <-done:
return
}
}
}()
wg.Wait()
elapsed := time.Since(before)
close(done)
fmt.Println(elapsed, "total,", elapsed/10000, "avg per iteration")
}

运行结果:

耗时区间:676 s ~ 814 s 总耗时:814s 平均耗时:81.4 ms

再来看看 Tokio,依据 Tokio 官网建议,计算型任务使用spawn_blocking ,它会将任务派发到一个专门的线程池,根据并发任务的数量,这个线程池会增长到最多 500 个线程:

Tokio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use std::time::Instant;
use tokio::task;
use futures::future::{self, join_all};
use std::sync::mpsc;
use std::fs::File;
use std::io::{Read, Write};


#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel();
let start = Instant::now();
for i in 1..=10000 {
let tt = tx.clone();
task::spawn_blocking(move|| {
let r = fib(40);
let duration = start.elapsed();
let mut dev_null = File::create("/dev/null").unwrap();
dev_null.write(&r.to_le_bytes()).unwrap();
tt.send(duration.as_secs_f64() ).unwrap();
drop(tt);
0
});
}

drop(tx);
for received in rx {
print!("耗时: {:?}", received);
}

let duration = start.elapsed();
println!("总耗时: {:?}", duration);
println!("平均耗时: {:?}", duration / 10000);
}


fn fib(n: u64) -> u64 {
if n == 0 || n == 1 {
return n;
}
fib(n - 1) + fib(n - 2)
}

运行结果:

耗时区间:0.6 s ~ 564 s 总耗时:564s 平均耗时:56.4 ms

Tokio 建议使用 Rayon 来运行 CPU-bound 任务,我们再来看一下,Rayon 的版本:

Rayon

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use rayon::prelude::*;
use std::time::Instant;
use std::sync::mpsc;

fn fib(n: u32) -> u32 {
if n < 2 {
return n;
}
fib(n - 2) + fib(n - 1)
}

// 使用rayon的并行迭代器来重复计算一万次
fn main() {
let (tx, rx) = mpsc::channel();
let start = Instant::now();
let mut results = vec![0; 10000];
results.par_iter_mut().for_each_with(tx,|tx,r| {
*r = fib(40);
let duration = start.elapsed();
let tt = tx.clone();
tt.send(duration.as_secs_f64() ).unwrap();
drop(tt);
});

for received in rx {
print!("耗时: {:?}", received);
}

let duration = start.elapsed();
println!("总耗时: {:?}", duration);
println!("平均耗时: {:?}", duration / 10000);
}

运行结果:

耗时区间:0.00420079 s ~ 503 s 总耗时:507s 平均耗时:50 ms

Rayon 默认只使用与 CPU 数量相同的线程来执行任务,执行效率反而比 Tokio 略好,Tokio 因为启动了大量的线程,导致我的电脑已无法正常响应键鼠了。

Object 区间 总耗时 平均
Go 676 s ~ 814 s 814 s 81.4 ms
Tokio 0.6 s ~ 564 s 564 s 56.4 ms
Rayon 0.00420079 s ~ 503 s 507 s 50 ms

由于例子的特殊性,在总的执行时间上,Go 没有任何优势可言,因为 Go 和 Rust 的定位不同,性能也有差距,所以比较总耗时并没有意义。我们这里主要观察每个任务的耗时区间。

这里有意思的是 Go 的所有任务耗时趋向于“平均”,而 Rust 的两个框架是在每个线程上串行执行任务,任务耗时如同信号图标📶,由低到高渐进式增长。

所以,如果计算任务之间没有依赖,更看重总的响应时间的话,使用与 CPU 核数相当的线程池进行并行计算能得到最佳效果;如果任务是并发的,更加注重单个任务的响应时间,类似于 Go 的并发模型可能是更好的选择。本文所举的示例,每个任务的计算量相当,并不能体现 Go 的这一优势,不过可以想象一下:并发途中,一个计算量很小的任务加入队列,上述两种任务处理方式会给出不同的结果。

Go 是为并发而生的语言,所以你会发现,在编写 Go 代码的时候,你根本不用去考虑并发任务是计算型还是 I/O 型的,在其并发模型下所有的任务都会尽可能得到及时的处理;而对于缺乏完善调度运行时的线程池来说,其注意事项就很多了,你要小心翼翼,不能在异步任务中写太多计算的代码。对此,甚至有博主指出:在进入.await之前,最好不要超过10 ~ 100 微秒

道理不难理解,以 Tokio 为例,虽然可以运行 CPU 密集型任务,但是官方很明确的说你要新开实例去运行,不要饿死 I/O 任务,显然这是因为运行时缺乏调度能力的折中方案。CPU 密集型任务属于会阻塞 executor 线程的任务,容易霸占 CPU 而饿坏或者饿死其它任务,此时只能靠手动 yield 来让出 CPU,给其它任务以运行的机会;而网络 I/O 之所以适合,完全是因为有非阻塞特性和 Reactor 的存在,每个 I/O 读写点都是一次 yield 的机会!

不难想见的是,Tokio 虽然适合网络 I/O 型并发,但是也要在 I/O 任务里小心地控制计算型代码的时间,否则会导致运行时任务调度不均,从而长时间阻塞其它任务的运行。

是总的响应时间优先?还是每个任务的及时响应优先?这是一种选择和权衡!

PS: 作为静态语言,Go 的性能真是差了 Rust 一大截,希望性能能再上一个台阶!

参考文献

  1. Hello Tokio
  2. CPU-bound tasks and blocking code
  3. Async: What is blocking?

本文主要讲述了我对 I/O 的一些理解,剖析了 Go 语言netpoller是如何结合 epoll 和 goroutine 的,文章还简单畅想了 io_uring 可能会带来的改变,并阐述了我对于异步编程的一点浅陋思考,希望能跟大家一起探讨。

作为 Gopher,你想必听过“Go 为我们提供了同步的网络 I/O 编程接口”,或者“Go 为我们提供了阻塞的网络 I/O 编程接口”这样的表述。

那么,这样的表达有什么问题吗?

答案是:完全没问题!

这两句话所传达的事实是确凿无疑的,但前提是,你需要多思考一步:优异的网络框架不应该是同步的!

如果网络的读写都是同步的,就无法应对高并发、高吞吐的应用场景,二十多年前的计算机前辈们就提出了 C10K 问题,也就是单机如何支持 1 万个并发连接的问题。

在 Linux 上, C10K 问题最终的解决方案是非阻塞 IO+epollI/O 多路复用技术,且一直沿用至今。

所以,一门优秀的语言必须解决 C10K 问题。事实上,这里的“同步”仅仅是指 Go 提供的“编程接口”,而不是内在的网络模型。

但是,为什么一定要提供同步的编程接口呢?

人类生活在一个充满异步的世界里,许多事物和事件并行发生,而计算机的发展却是从同步开始的,汇编语言以及其它的低级语言为我们塑造了原始的同步编程模型,我们已经越来越习惯它了,这就导致异步编程成为一件复杂且困难的事。

因此,以高效的内在网络模型为内核,向外提供同步的用户编程接口,会成为解救程序员思维,让编程体验极度舒适的善举。

为什么需要异步

对异步的渴求,其实是源自于对高效率的渴望。

一切都因为 CPU 太快,I/O 设备太慢,需要运行的任务太多。这是目前计算机所面临的现状。

然而,CPU 和 I/O 设备是可以并行工作的。通过异步操作,CPU 可以在等待 I/O 操作完成的同时继续执行其他指令,充分利用其计算能力。当 I/O 操作完成时,CPU 会收到通知,并处理 I/O 返回的结果,唤醒等待者。

可见,这种异步模型能极大地提高系统的吞吐量和响应能力,尤其在需要同时处理大量 I/O 操作时,异步的优势相比同步会更加明显。

简单总结一下异步 I/O 的优势,有如下两点:

  1. 能够在等待 I/O 操作的同时执行其它任务,充分利用 CPU 的计算能力。换句话说,正在 CPU 上披荆斩棘的线程不会因为一次 I/O 而停下脚步,在提交了 I/O 申请之后可以去执行下一个任务,而执行下一个任务的能力是由并发模型决定的。
  2. 最大限度地发挥 I/O 设备的吞吐能力。当系统中存在大量并发的 I/O 操作,或者有多个独立的任务可以并行处理时,异步 I/O 能够提供更好的性能和资源利用率。

最常见的文件 I/O 和网络 I/O 是最需要异步的,文件读写极有可能会遇到所读内容不在高速页缓存中的情况,此时需要等待慢速磁盘将内容读入内存;网络读写也无法永远保证 socket 缓冲区的就绪状态(读/写系统调用发生时,缓冲区有内容可读与有空间可写)。等待这两种 I/O 完成的代价实在太大,也就很有必要将其异步化——在 I/O 设备工作的同时,让 CPU 去完成其它任务。

然而,需要留意的是:异步 I/O 的效率提升主要取决于系统中存在的并行任务数量和性质。如果计算机只有少量任务且这些任务必须串行进行,那么异步 I/O 可能不会带来明显的性能提升,因为任务之间无法并行执行。

同步、异步、阻塞、非阻塞

同步、异步、阻塞、非阻塞这两对儿概念总是绕不过去的,同时也是无法轻易阐述明白的,本文无意做这种努力。软件领域并不存在一以贯之的概念,想要在概念上把软件行为掰扯清楚明白将注定徒劳无功。

能够确定的是,在很多场景下,这两对概念可以描述同一件事情,比如,以电商系统中用户下单为例,对支付系统需不需要等待订单处理系统的反馈问题上,可以分为同步、异步或者阻塞、非阻塞两种情况。如果支付系统需要等待订单处理的结果,那么就可以说这个过程是同步的,或者说订单处理流程阻塞了用户下单;如果支付系统在用户支付后就向用户反馈结果,并将新生成的订单以消息事件的方式通知下游订单系统处理,就可以说这个过程是异步的,订单的处理是非阻塞的。

但事情并不总是这样,比如谈到网络 I/O 时,这两对儿概念就不能等量齐观了。

每个 socket 连接都有两个缓冲区,一个用于发送,一个用于接收,当接收缓冲区为空,发送缓冲区满的时候,网络的读/写就会阻塞,此时意味着线程会失去 CPU,应用发出的读/写系统调用在内核代码路径中会触发进/线程的调度,在缓冲区就绪之前,线程会被换下 CPU 进入被阻塞的等待状态。因此,当在系统调用、内核态等上下文中提到阻塞的时候,往往意味着进/线程的切换,意味着有人失去 CPU,进而失去执行其它任务的机会。

同步就像是函数调用,一旦发出请求,就必须等到执行完成后返回,才能继续往下进行;异步则不然,发出请求后会立马返回,I/O 操作会在未来某个时刻完成并通知请求者。这样看来,在网络 I/O 这件事上,同步和阻塞的表达看上去没有什么区别,而非阻塞和异步就有些微妙了,这还要从非阻塞的功能意义上说起。

fnctl系统调用可以通过O_NONBLOCK标志将文件设置为非阻塞,吊诡的是,这个非阻塞标志只对网络文件描述符起作用,对于普通的磁盘文件描述符是没有任何效果的。

这其实完全可以理解,文件并不像网络有一个缓冲区,文件流永远都是就绪的。那么,当网络文件描述符被设置为非阻塞之后会有什么样的表现呢?我们分别描述一个网络文件描述符在阻塞和非阻塞状态下read系统调用的过程。

假设应用基于这个文件描述符发出read系统调用的时候,socket 接收缓冲区为空(网络消息尚未到达):

  1. 阻塞:当前的应用线程会因此被阻塞,进而被内核调离 CPU,当网络消息到达之后,缓冲区就绪,产生网络中断,内核获得 CPU 使用权,再次将应用线程换上 CPU ,read系统调用继续,CPU 将消息拷贝至用户空间,系统调用完成。
  2. 非阻塞:read系统调用会立即返回,并附带一个EWOULDBLOCK或者EAGAIN的错误,从错误含义就可以看出一些端倪,仿佛内核告诉你:“这次调用会造成阻塞,不过没关系,我发现你设置了非阻塞标志,我不会将你挂起,请你过一会儿再试吧!” 此时,应用就可以去做其它的任务,并在需要的时候重新发起read系统调用进行读取,缓冲区一旦就绪,CPU 就会将消息拷贝至用户空间。

很明显的一点是,非阻塞调用除了通知应用缓冲区尚未就绪之外,没做其它的事情。应用需要不断地试探,在某一次碰巧赶上缓冲区就绪时,这次read系统调用才宣告成功,因此问题的关键是:应用在得知缓冲区就绪之前,不知道要发出多少系统调用来试探!

而异步的网络 I/O,除了会在系统调用发出后立即返回之外,还会在“后台”完成真正的 I/O 操作,在这里就是将 socket 缓冲区中的内容拷贝至用户空间,并以某种方式通知应用,可见非阻塞和异步的区别还是很大的。

那么 Linux 平台提供异步 I/O 的支持了吗?

简单说,有,但很难用,很多时候没法用!

POSIX 有对应的 aio_readaio_write等异步函数实现,但无奈太拉胯,性能奇差。

Linux 内核也有原生的异步 I/O 支持,并冠以AIO之名,但限制很多,如只支持O_DIRECT访问、只支持特定设备、性能表现不佳等等,社区满意度极低,饱受诟病,我甚至找不到异步网络 I/O 的例子,因此可以说,当前 Linux 只支持有限的文件异步 I/O,不支持网络异步 I/O(至少没有应用场景)。

要知道,在异步 I/O 的概念出现之前,还有一种中间形态——IO 多路复用。

直到现在,IO 多路复用依然是 Linux 平台高并发网络的主流解决方案,以epoll为支点的事件循环结构铸就了当今互联网绝大多数网络程序的 Reactor模型。

I/O 多路复用

我们可以用非阻塞+单线程或多线程的网络模型来处理大量网络连接,但是由于需要浪费 CPU 来试探缓冲区是否就绪,所以效率难免会大打折扣。

那么索性就让操作系统来通知我们就绪的网络文件描述符吧!这就是 I/O 多路复用:应用只要阻塞在单个系统调用上,就可以监听多个网络文件描述符事件。

Linux 内核对 I/O 多路复用支持的变迁史可简单描述为select->poll->epoll的改进路线 ,这三种方式原理大致相同,但性能越来越高。

我在读 Stevens 先生的《UNIX网络编程》时,常常奇怪:为什么书中只介绍了selectpoll,却对epoll只字未提。后经求证,两部《UNIX网络编程》分别成书于 1990 年和 1999 年,epoll首次出现在 Linux 内核是在 2002 年。令人扼腕的是,Stevens 先生于 1999 年辞世,身后为我们留下了 7 部传世名著,其中就包括大家熟知的《TCP/IP详解》三部曲。

让我们沿着伟人的叙述脉络,来看一看 I/O 多路复用与其它 I/O 模型的比较:

Comparison of the five IO models

一次网络 I/O 读取操作分成两个阶段:

(1)等待数据就绪

(2)从内核空间拷贝数据到用户空间

可见,只有 blocking I/Oasynchronous I/O 能为请求者包揽两个阶段,其余的 I/O 模型在第 1 阶段的表现各不相同,却在第 2 阶段表现一致,即均需要请求者主动完成。noblocking I/O 为了探知数据是否就绪而空耗 CPU,这肯定是无法容忍的,于是内核出马,I/O 多路复用诞生。

至于 blocking I/O ,它是最简便的网络模型,用它来进行网络编程简洁却不高效,因为它会阻塞当前线程,引发线程切换,所以我们才会在寻求高效 I/O 模型的道路上孜孜不倦。

图中有三种模型的第二阶段被标注为blocked,对此我有一点不同的看法, 当我们谈及系统调用的时候,阻塞通常意味着线程因工作无法继续而被调离 CPU,我们看上图中的 noblocking I/OI/O 多路复用signal-driven I/O 三种模型,它们在第 1 阶段通过不同的方式得知数据已经就绪(noblocking I/O 不是收到通知,而是通过撞运气),第 2 阶段发出系统调用开始读取数据,窃以为第 2 阶段并不存在阻塞,也并没有绝对的首恶元凶导致线程切换,这一阶段也就不应被称为“blocked”,如果非要用什么概念来描述一下的话,我愿用“同步”来称呼它。

事实上,Stevens 先生在讨论 asynchronous I/O 的时候,确实将前面 4 种 I/O 模型称为“同步 IO”,我认为这一说法成功把水搅浑了,特别是当 noblocking I/OI/O 多路复用尚拥有一点“异步”特质的情况下,这往往让后来者很容易迷失在概念里。

正确的做法是:把概念丢掉,在内核和指令的维度去体认每一种 I/O 模型。即便真的存在两个可以称之为“同步”、“异步”的明确概念,也大可不必将 noblocking I/O、I/O 多路复用和 signal-driven I/O 套用其中。可以把它们看成一种中间形态,毕竟“同步”、“异步”的定语是修饰整个 I/O 过程的,而非其中的某一步。

非阻塞 I/O 和 epoll 的组合或者说就是 I/O 多路复用,是目前 Linux 平台主流的高并发网络解决方案,基本上所有的高性能网络框架或者服务器都是由此打造。然而每一项技术都不是独立存在的,也不应当被独立理解,Linux 为我们提供了 I/O 多路复用这样一块积木,至于用这块积木能构建成怎样的世界,可能性似乎是无限的,Reactor 网络模型就是其中一例。

Reactor 是利用非阻塞 I/O 和 epoll 构建的事件循环模型,是对 epoll 的抽象和封装,使得使用更加便捷,因为对事件分发,使用相应的 handler 来处理事件,非常像核反应堆,所以得名 Reactor。Reactor 的形式有多种多样,本文并不想去详细介绍个中细节,谈及 Reactor 仅仅是想说明:对于事物的理解,先要从一座森林理解一棵树,然后才能从一棵树理解整座森林。

异步编程”和“异步”这两个概念是有所区别的,“异步”是一种特性,“异步编程”是基于此特性演化出的编程范式。

在编程语言领域,我认为异步编程的终极形态是为上层开发者隐藏异步编程的复杂性,提供同步的编程接口。就像 Go 语言netpoller 那样,底层使用了目前最优秀的非阻塞和 epoll 模型构造的单线程 Reactor,上层为开发者呈现出简洁、直观的同步编程接口。

netpoller

在进入繁琐的netpoller源码之前,先来切身地体验一下 Go 的网络编程,相信下面的代码对每个 Gopher 来说都不陌生:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"net"
)

func main() {
listen, err := net.Listen("tcp", "localhost:8080")
...
for {
conn, err := listen.Accept()
...
go handleConnection(conn)
}
}

func handleConnection(conn net.Conn) {
defer conn.Close()

buffer := make([]byte, 1024)
for {
n, err := conn.Read(buffer)
if err != nil {
fmt.Println("Error reading:", err.Error())
return
}
data := buffer[:n]
fmt.Println("Received data:", string(data))
}
}

这段代码是使用net包编写的一个简单的网络 Server,其中handleConnection展示了如何读取 socket 中的内容,n, err := conn.Read(buffer)以同步阻塞的方式发出了Read请求,对于开发者来说一个读取操作就这样轻松完成了,简洁到无以复加!内里乾坤甚至是初学者根本不会去想的,这就是 Go 为开发者提供的网络编程接口,复古而优雅。

因为,根本不用担心 goroutine 所在的 M 线程被阻塞!

使用 epoll 进行网络程序的编写,需要三个步骤,分别是 epoll_createepoll_ctlepoll_wait。这三个 API 基本对应着 epoll 实例创建、增加监听描述符、poll 网络事件这三个动作,接下来我会就这三个动作来剖析一下 netpoller,此处参考 go 1.18 的源代码。

epoll 初始化

netpoller 创建 epoll 实例的关键代码位于 pollDesc 这个结构体的 init 方法中:

internal/poll/fd_poll_runtime.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type pollDesc struct {
runtimeCtx uintptr
}

var serverInit sync.Once

func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}

poll.pollDesc 是 poll.FD 中的一个字段,poll.FD 表示文件描述符,os 包和 net 包都包含它来组成上层意义的网络连接OS 文件。poll.FD 的 Init 方法会调用 poll.pollDesc.init 来初始化 epoll 实例,初始化函数runtime_pollServerInit是个单例模式,也就是说,epoll 实例会在程序创建第一个文件描述符时被创建。我们看一下runtime_pollServerInit

runtime/netpoll.go

1
2
3
4
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}

poll_runtime_pollServerInit 是一个未导出的方法,此处将其链接成了internal/poll.runtime_pollServerInit,因此可以在 internal/poll 包里直接调用。netpollGenericInit调用了netpollinitnetpollinit使用epoll_create来创建 epoll 实例:

runtime/netpoll_epoll.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd < 0 {
epfd = epollcreate(1024)
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}

epfd 是 epoll 实例的文件描述符,它是一个全局变量,后续会作为 epoll_ctlepoll_wait 系统调用的参数被传入。在创建完 epoll 实例之后,紧接着使用epoll_ctl加入了一个非阻塞的管道描述符,这里主要用于唤醒阻塞在 epoll_wait 上的 poll 线程。

这是可以理解的,毕竟 polling 事件的线程还有其它任务,如果待监听的描述符长时间无事件发生,不可能让线程一直阻塞下去。其实这个线程就是sysmon线程,一个不需要 P 即可运行的操作系统线程,稍后会介绍 sysmon 线程如何 polling 网络事件。

向 epoll 实例注册描述符

随着 epoll 实例的成功创建,接下来就可以使用epoll_ctl系统调用向其中添加感兴趣的文件描述符了,最终会使用epoll_wait来收取这些网络文件描述符上的网络事件,我们这里主要聚焦于描述符的注册之上。

Go 网络编程中涉及到向 netpoller 注册描述符的行为大致有ListenConnectAccept三类,为避免繁琐的代码,此处我以 TCP 为例,仅列出函数调用路径,至 epoll_ctl为止。

  • 服务端 Accept 连接
1
2
3
4
5
6
7
8
9
10
net.TCPListener
--->net.TCPListener.Accept
--->net.TCPListener.accept
--->net.netFD.accept
---->net.netFD.init
----->poll.FD.Init
------>poll.pollDesc.init
------->poll.runtime_pollOpen
-------->runtime.netpollopen
--------->epollctl

这里值得一提的是,net.netFD.accept 调用 poll.FD.Accept, 进而调用 accept4 系统调用,将文件描述符设置为非阻塞。

1
2
3
4
5
6
7
8
9
// Wrapper around the accept system call that marks the returned file
// descriptor as nonblocking and close-on-exec.
func accept(s int) (int, syscall.Sockaddr, string, error) {
ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
if err != nil {
return -1, sa, "accept4", err
}
return ns, sa, "", nil
}
  • 服务端监听
1
2
3
4
5
6
7
8
9
10
11
net.ListenTCP
--->net.sysListener.listenTCP
--->net.internetSocket
--->net.socket
---->net.netFD.dial
----->net.netFD.init
------>poll.FD.Init
------->poll.pollDesc.init
-------->poll.runtime_pollOpen
--------->runtime.netpollopen
--------->epollctl
  • 客户端拨号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
net.Dial
--->net.Dialer.Dial
--->net.Dialer.DialContext
---->net.sysDialer.dialSerial
----->net.sysDialer.dialSingle
------>net.sysDialer.dialTCP
------->net.sysDialer.doDialTCP
-------->net.internetSocket
--------->net.socket
---------->net.netFD.dial
----------->net.netFD.connect
------------>poll.FD.Init
------------->poll.pollDesc.init
-------------->poll.runtime_pollOpen
--------------->runtime.netpollopen
---------------->epollctl

三类操作最终都会调用 runtime.netpollopen,此函数会发出epoll_ctl调用,将fd与事件_EPOLLIN、_EPOLLOUT 、_EPOLLRDHUP 、_EPOLLET注册进 epoll 实例,并设置为边缘触发。

runtime/netpoll_epoll.go

1
2
3
4
5
6
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

epoll_ctl 有一个参数event,是和文件描述符关联的一个对象,在 go 源码中名为epollevent

1
2
3
4
type epollevent struct {
events uint32
data [8]byte // unaligned uintptr
}

其中data是用户设置的,kernel 必须保存它并在文件描述符就绪时(通过 epoll_wait 调用)一并返回。此处,ev.data被设置为pollDesc:

1
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd

每个网络文件描述符都会对应一个pollDescpollDesc是 Network poller descriptor,它控制着文件描述符的状态以及在该文件描述符上产生等待的 goroutine 的地址。

也就是说,当epoll_wait返回时,从就绪的ev.data中可以拿到等待读写的 goroutine 地址,我们接下来将详细剖析这个过程。

事件分发

epoll_wait的调用是由函数netpoll发出的,它会检查就绪的文件描述符,并且返回相关的 goroutine 列表,由此可见 netpoller 的事件分发特点:与 goroutine 紧密结合,将可读写的网络文件描述符转化为可运行的 goroutine,然后注入相关的运行队列,最终由调度器接管并伺机运行。

runtime/netpoll_epoll.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
// 准备 epollevent 数组,最多 poll 128个事件
var events [128]epollevent
retry:
// 发出 epoll_wait 调用
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
...
// toRun 是个 goroutine 链表,表示文件描述符已就绪,可以被调度运行的 goroutine
var toRun gList
// 遍历本次 epoll_wait 返回的就绪事件
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}

if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
...
}

var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 从 ev.data 中取出 pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
// 获取相关 goroutine 并插入到 toRun 链表
netpollready(&toRun, pd, mode)
}
}
return toRun
}

netpoll 并不像我们平时看到的 epoll 使用案例那样使用一个死循环来无限地epoll_wait,事实上它只epoll_wait一次,无限循环调用的任务放在了sysmon中,我们稍后会看到,现在将注意力放在如何将事件转化为对应的 goroutine 上。

从代码中可以看出,netpollready 会获取相关 goroutine 并插入到 toRun 链表:

runtime/netpoll.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// netpollready is called by the platform-specific netpoll function.
// It declares that the fd associated with pd is ready for I/O.
// The toRun argument is used to build a list of goroutines to return
// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
// whether the fd is ready for reading or writing or both.
//
// This may run while the world is stopped, so write barriers are not allowed.
//
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
// 获取等待读操作的 g
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
// 获取等待写操作的 g
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
// 将等待读操作的 g 插入链表
toRun.push(rg)
}
if wg != nil {
// 将等待写操作的 g 插入链表
toRun.push(wg)
}
}

这里交代一下 pollDesc 中的 rg,wg 字段,这两个字段就是控制描述符状态和相关 goroutine 地址的关键,它的定义如下:

runtime/netpoll.go

1
2
3
4
5
6
7
8
type pollDesc struct {
...
// rg, wg are accessed atomically and hold g pointers.
// (Using atomic.Uintptr here is similar to using guintptr elsewhere.)
rg atomic.Uintptr // pdReady, pdWait, G waiting for read or nil
wg atomic.Uintptr // pdReady, pdWait, G waiting for write or nil
...
}

rgwg 都是原子类型,里面可能存放的内容为:pdReady, pdWait, G waiting for read or nilG 就是 goroutine 的地址,我们继续沿着调用向下看:

runtime/netpoll.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

for {
old := gpp.Load()
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set pdReady for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 将 gpp 设置为 pdReady
if gpp.CompareAndSwap(old, new) {
if old == pdWait {
// 如果设置为0,则 (*g)(unsafe.Pointer(old)) 为 nil
old = 0
}
// 将 old 的值转换为 *g 返回,old 通常就是发生等待的 goroutine 地址
return (*g)(unsafe.Pointer(old))
}
}
}

拿到 *g 之后,就可以插入toRun链表了:

runtime/proc.go

1
2
3
4
5
6
7
8
9
10
11
// A gList is a list of Gs linked through g.schedlink. A G can only be
// on one gQueue or gList at a time.
type gList struct {
head guintptr
}

// push adds gp to the head of l.
func (l *gList) push(gp *g) {
gp.schedlink = l.head
l.head.set(gp)
}

toRungList类型,gList是一个靠g.schedlink串联起来的单链表。

是时候看看netpoll的调用者如何处理这些 goroutine 链表了,前面说过这个工作是sysmon来完成的,sysmon会在单独的操作系统线程中运行,看一下其创建过程:

runtime/proc.go

1
2
3
4
5
6
7
8
9
10
// The main goroutine.
func main() {
...
if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
systemstack(func() {
newm(sysmon, nil, -1)
})
}
...
}

sysmon的功能很多,我们这里只看它如何 polling network:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
...
for {
...
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
}
}
...
}
}

如果距离上次 polling 过了 10ms,那么就再次发出 polling,如果结果不为空,则调用 injectglist,而 injectglist 会将结果链表中的 goroutine 的状态修改为runnable后放入本地或全局队列,后面就是调度器接手了,此处不再赘述。

goroutine 的停泊与 pollDesc

上面介绍了网络事件如何转换为对应的 goroutine ,这个过程依靠的关键是pollDesc中的 rg 和 wg,但是 rg 和 wg 是在何时设置为当前 goroutine 的地址的呢?当 socket 缓冲区未就绪时,一个Read调用将会发生什么呢?

其实,只要稍加思索就会明白,这两个问题极具相关性。Go 中的网络连接是线程安全的,允许多个 goroutine 同时发出读写操作,而 pollDesc 只有一个 rg 和 wg,因此多个 goroutine 读写必定是用锁来串行化的,rg 和 wg 也必然是在读写遭遇阻塞时设置的。事实上,我也正是从这个思路出发,在源码中找到关键所在的。

让我们从一个Read调用开始,看看其内在端倪。

以 TCP 的读取为例,沿着 connRead 接口调用路径net.netFD.Read--->poll.FD.Read,可以定位到poll.FD.Read:

internal/poll/fd_unix.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
...
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
// 发出 Read 系统调用
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
// 如果 收到 EAGAIN 错误,且文件描述符是可 poll 的,则进入等待
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}

当 socket 缓冲区未就绪,读取一个非阻塞的网络文件描述符时会返回EAGAIN或者EWOULDBLOCK错误,紧接着调用链runtime.pollDesc.waitRead--->runtime.pollDesc.wait--->poll.runtime_pollWait--->runtime.netpollblock会触发调度,将当前 goroutine 换下 CPU,寻找一个新的 goroutine 来运行。

runtime/netpoll.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}

// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if gpp.CompareAndSwap(pdReady, 0) {
return true
}
if gpp.CompareAndSwap(0, pdWait) {
break
}

// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := gpp.Load(); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}

// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == pollNoError {
// 开启 park 流程,休眠当前 goroutine
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := gpp.Swap(0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}

netpollblock会将pollDesc中的 rg 或 wg 设置为pdWait,然后调用gopark将当前 goroutine 休眠,进入schedule流程挑选新的 goroutine 来运行。

gopark的调用在一个if判断里,按目前的代码只要netpollcheckerr没有错误发生,就会调用gopark,因此我觉得waitio特别是像为真正的异步 I/O 预留的接口,以备向后兼容。

请记住传入gopark的前两个参数:netpollblockcommitunsafe.Pointer(gpp),然后再来看gopark中的相关内容:

runtime/proc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Puts the current goroutine into a waiting state and calls unlockf on the
// system stack.
//
// If unlockf returns false, the goroutine is resumed.
//
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
//
// Note that because unlockf is called after putting the G into a waiting
// state, the G may have already been readied by the time unlockf is called
// unless there is external synchronization preventing the G from being
// readied. If unlockf returns false, it must guarantee that the G cannot be
// externally readied.
//
// Reason explains why the goroutine has been parked. It is displayed in stack
// traces and heap dumps. Reasons should be unique and descriptive. Do not
// re-use reasons, add new ones.
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
// 此时 lock 是 gpp(pollDesc 中的 rg)
mp.waitlock = lock
// 此时 unlockf 是 netpollblockcommit
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}

gopark会获取当前所在的m,并将函数netpollblockcommitunsafe.Pointer(gpp)pollDesc 中的 rg)分别赋值给mp.waitlockmp.waitunlockf,接下来进入mcall(park_m)的调用,mcall 是个汇编函数,它会调用传参中的函数且不再返回,我们看一下mcall的内容,明确一下将要传给函数park_m的参数:

runtime/asm_amd64.s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// func mcall(fn func(*g))
// Switch to m->g0's stack, call fn(g).
// Fn must never return. It should gogo(&g->sched)
// to keep running g.
TEXT runtime·mcall<ABIInternal>(SB), NOSPLIT, $0-8
MOVQ AX, DX // DX = fn

// save state in g->sched
// 下面这 5 行指令是保存当前 goroutine 的状态到 g->sched
MOVQ 0(SP), BX // caller's PC
MOVQ BX, (g_sched+gobuf_pc)(R14)
LEAQ fn+0(FP), BX // caller's SP
MOVQ BX, (g_sched+gobuf_sp)(R14)
MOVQ BP, (g_sched+gobuf_bp)(R14)

// switch to m->g0 & its stack, call fn
// 准备切换到 g0 及其堆栈
MOVQ g_m(R14), BX // 通过 R14 拿到当前的 m
MOVQ m_g0(BX), SI // SI = g.m.g0 通过 m 拿到 m 的 g0
CMPQ SI, R14 // if g == m->g0 call badmcall
JNE goodm
JMP runtime·badmcall(SB)
goodm:
MOVQ R14, AX // AX (and arg 0) = g 把当前 goroutine 地址放入 AX
MOVQ SI, R14 // g = g.m.g0 SI 是 g.m.g0,这一句将 goroutine 切换到 g0
get_tls(CX) // Set G in TLS
MOVQ R14, g(CX) // 设置 g0 到线程本地存储
MOVQ (g_sched+gobuf_sp)(R14), SP // sp = g0.sched.sp 切换到 g0 堆栈
PUSHQ AX // open up space for fn's arg spill slot
MOVQ 0(DX), R12
CALL R12 // fn(g)
POPQ AX
JMP runtime·badmcall2(SB)
RET

根据Go internal ABI specification的描述,amd64平台下整型参数传递依次使用如下寄存器:

1
RAX, RBX, RCX, RDI, RSI, R8, R9, R10, R11

这里有一个知识点:R14 寄存器存放当前 goroutine 的地址,可以从汇编代码中看到mcall是如何利用 R14 寄存器切换到 g0 的。

mcall的主要工作就是保存当前 goroutine 的状态,切换到 g0 堆栈并执行传入的函数fn,这里即将执行park_m

MOVQ R14, AX // AX (and arg 0) = g这一句将当前 g 的地址存入 AX 寄存器,并在接下来CALL R12 // fn(g)的时候,充当第一个参数。

接下来进入park_m:

runtime/proc.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()

if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}

casgstatus(gp, _Grunning, _Gwaiting)
dropg()

if fn := _g_.m.waitunlockf; fn != nil {
// 调用 netpollblockcommit
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
// 进入调度循环,不再返回
schedule()
}

此时,参数gp是发出Read系统调用的 goroutine 地址,在进入schedule()调度循环之前,我们仅关注ok := fn(gp, _g_.m.waitlock)这一句,gopark函数已经将netpollblockcommitunsafe.Pointer(gpp)(pollDesc 中的 rg)分别赋给了mp.waitlockmp.waitunlockf,所以这里的调用实际上是netpollblockcommit(gp, unsafe.Pointer(gpp))

不知道你有没有好奇,挂起的明明是当前的 goroutine,为什么函数的名字是park_m呢?这里的m显然就是 GMP 中的线程 M 啊,我是这样理解的:M 就像是一列高速运行的汽车,乘客是goroutine,当某个乘客因某些原因不能继续乘坐时,M 需要停下来让乘客下车,然后再开动去寻找下一位乘客,所以这个挂起 goroutine 的过程就像是 M 停泊了一样。

但为了叙述方便,我还是称 goroutine 停泊好了,毕竟比喻并不十分恰当,从指令角度看 M 可是一路狂奔从未停过,只是从代码所有权角度来看, M 确实是换了一位乘客。

runtime/netpoll.go

1
2
3
4
5
6
7
8
9
10
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}

atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))rg的值更新为发出Read 调用的 goroutine地址

至此,两件重要的事情已经完成:

  1. 休眠发出Read 调用的 goroutine。
  2. 设置与该goroutine感兴趣的文件描述符相关的pollDesc.rg为该 goroutine 的地址,以便后续polling时促成网络事件向goroutine链表的转化。

这种模型的好处是非常节省 M,M 在 Go 的 GMP 并发模型中代表操作系统线程,因为网络 I/O 非阻塞的特性,M 会从系统调用中立即返回,不会因为数据未就绪而被内核剥离 CPU,M 当然就可以挂起当前 goroutine,转而去寻找其它的 goroutine 来运行。

反观文件 I/O 则不然,文件 I/O 没有异步和非阻塞特性(不考虑臭名昭著的AIO),当 M 因为系统调用陷入内核时,如果要读取的内容不在页高速缓存中,就会触发缺页处理,内核需要向磁盘发出 I/O 请求,因为这个过程不是异步的,内核会将 M 剥离,调度其它线程来运行。可想而知,此时此刻 go runtime 只能新建 M 来匹配 P,新建的 M 需要加入内核运行队列,等待内核调度,经过这样一番折腾,吞吐自然就下来了。

更可怕的是如果有大量 I/O 请求,势必会让更多的 M 陷入内核无法自拔,go runtime 除了创建更多的 M 之外别无良策,在这种情况下仍不断地创建 M 无异于扬汤止沸,如果 M 的数量超过 1 万,程序就 panic 了。

所以,go 并不适合文件 io 密集型任务,除非有真正的异步 I/O,使得文件也可以享用 Reactor 的好处。

本质上, netpoller 使用非阻塞I/O和epoll模型构建的是一个单线程的Reactor,即便如此,也已经可以满足绝大多数的场景需求,可以说这套模型是 Linux 平台的最佳、最高效的解决方案。然而实在是拗不过非阻塞I/O 和 epoll这对组合有一些不那么“同步”的特性,以讹传讹间,不少不明就里的人就把这种模型称为异步 I/O 了。

直到 2019 年 io_uring 横空出世,Linux 异步世界才终于迎来了一丝曙光!

下一代异步 I/O

io_uring 由 Jens Axboe 提出和实现,并于 2019 年 5 月随 Linux 5.1 发布。io_uring 是 Linux 内核引入的新型异步 I/O 框架,旨在改善 AIO 的性能和可扩展性,社区称“io_uring can change everything”。本文不准备对其做过多介绍,感兴趣的朋友可以参考 Efficient IO with io_uring 这篇文章。这里仅从系统调用的角度来谈一谈 io_uring 可能为上层带来的变化。

  • 网络 I/O

    以非阻塞 I/O 和epoll为蓝本的 Reactor 在收到网络事件之后,仍然需要主动发起一次系统调用去读取 socket 的内容,在 io_uring 下就没有这一步系统调用了,当等待者被唤醒以后,数据就已经在手边了。不过,要适配 io_uring 还需要不少工作要做,比如 go 的 netpoller 就需要对 runtime 做改造适配,当然适配完成后的网络模型就不是 Reactor 了,而是 Proactor,当然了,叫什么并不重要,重要的是它怎么干这件事。

  • 文件 I/O

    前面讨论过,Reactor 并不能拯救文件 I/O,Go 运行时甚至会出现“卡线程”的情况。但是,io_uring 完美解决了这一痛点,不像它的前任 AIO 每次追加请求以及获取结果都要发起一次系统调用,io_uring 实现了用户空间和内核空间的内存无锁共享,很多很多的系统调用被节省了,所有 I/O 全都可以交给专门的线程负责,这就是 Proactor。

    如果 Go runtime 适配了 io_uring 会怎样?届时意味着“卡线程”成为历史,Go 不仅适合网络密集型I/O,也会适合文件密集型I/O 。

事实上,Go 社区已经在讨论适配的可能性了(见#31908),进展算不上快,但未来可期!

再论异步编程模型

直接进行异步编程是复杂且困难的,你需要设计专门的模块统一处理 I/O 请求,并且具备唤醒机制,即便这些问题得到了完美解决,你也会发现你创造了另一个 Reactor 或者 Proactor。但如果不做这些,异步就无法发挥作用。试想一下,一个任务发出异步请求之后,请求立即返回了,那么任务本身该何去何从呢?没有 I/O 的结果 任务就无法推进,这个时候就要有将任务 Park 的能力了。

由此可见,异步编程其实是考验并发模型的,也就是分解任务的能力,如果一门编程语言只能以线程来分解任务,那么使用异步的代价就会增加,这个问题的解决方案就是协程,如果编程语言实现了协程,那么任务分解的粒度变为协程,则更能有效的利用异步 I/O 带来的优势。

Go 就是这种玩法,非阻塞和异步都可以拿来和协程相配合,思路非常容易理解。Rust 的异步运行时也有协程的概念,走的是状态机路线(对话 ChatGPT 理解 Rust 异步网络 io),属于另外一种玩法,很难说孰优孰劣。Go 使用有栈协程,调度原理很好理解,编程接口简单直接,缺点是goroutine数量过多会有隐患;Rust 使用无栈协程,利用状态机实现协程切换,好处是节省了不少指令,理论上有性能优势,缺点是接口太过晦涩。

可见,trade-off 无处不在!

参考文献

  1. UNIX网络编程
  2. Linux/UNIX系统编程手册
  3. 网络编程实战
  4. Redis 源码剖析与实战
  5. Efficient IO with io_uring
  6. Ringing in a new asynchronous I/O API
  7. The rapid growth of io_uring
  8. What’s new with io_uring
  9. Getting Hands on with io_uring using Go
  10. #31908

从已知理解未知是人类的自然思维模式,但这很容易让人陷入认知偏差的思维陷阱。我在理解 Rust 异步网络 io 时就陷入了思维定式,总是试图以线程切换或者 goroutine 协程切换的模式来理解 Rust 协程的调度,而这两种调度方式有着天壤之别。

以 tokio 为首的 Rust 异步运行时,为用户呈现了一种无栈协程,既然都是协程,那么拿来与 go 的协程作比较是很自然的一种想法,加之我对协程切换、CPU 执行流改变近乎迷恋。所以,从 go 的角度去推测 Rust 也就情有可原了。

很多文章或者名家专栏在介绍异步 io 时,无一例外都是从 Stevens 先生在 《UNIX环境高级编程》中 对于 io 模型的介绍开始的,然后介绍 Linux 下的 epoll 多路复用,之后引出 reactor 模式,最后进入对 Rust 异步 io 的介绍。这其中有很大的断层,并不是很容易理解其中的脉络,所以,Rust 的异步 io 模型一直没搞明白,直到 chatgpt 的出现......

我先问了它go 的netpoller 和 tokio 两种网络模型的异同。

  1. 请比较一下go的netpoller网络模型和tokio的网络模型的异同

    Go的netpoller网络模型是基于epoll/kqueue/iocp等多路复用技术的,它使用同步编程模式达到异步执行的效果。它以网络描述符netFD为中心,当在一个netFD上读写遇到EAGAIN错误时,就将当前goroutine存储到这个netFD对应的PollDesc中,并调用gopark把当前goroutine给park住,直到这个netFD上再次发生读写事件,才将此goroutine给ready激活重新运行。
    Tokio的网络模型也是基于poll模型的,它使用异步编程模式实现高效的任务调度。它提供了TCP、UDP等网络类型,在tokio::net模块中可以找到像TcpListener、TcpStream、UdpSocket这些类型。Tokio的任务由Tokio调度器管理的最小可执行单元,它需要实现Future trait,并在poll方法中返回Async::NotReady或Async::Ready状态。
    两者的异同点有:

    • Go使用同步编程模式,Tokio使用异步编程模式
    • Go以netFD为中心管理网络操作,Tokio以Future为中心管理任务
    • Go使用goroutine作为轻量级线程,Tokio使用task作为轻量级线程
    • Go通过gopark和ready来控制goroutine的状态转换,Tokio通过Executor来调度task的状态转换

    这个回答不是很抽象,如果以前没有了解相关内容的话,并不太好理解。但是解释大致是正确的,“Go使用同步编程模式,Tokio使用异步编程模式”这句话并不准确,Go 是通过netpoller 和 goroutine 的配合为用户提供了编写同步编码的接口,网络的 io 本质上还是基于 epoll 的非阻塞 io,而 Tokio 就需要用户真正的编写异步代码了。

  2. 请介绍一下tokio的future使用poll来推进执行的内部原理

    因为事先了解过 Rust 的 funture 、调度器、执行器体系,所以我直接问其内部原理

    Tokio的future使用poll来推进执行的内部原理是:

    • Future是一个trait,它有一个poll方法,用于检查future是否已经完成或者还需要等待
    • poll方法返回一个Poll枚举类型,它有三个可能的值:Ok(Async::Ready(t))表示future已经成功完成并返回t;Ok(Async::NotReady)表示future还没有准备好完成;Err(e)表示future已经失败并返回e
    • 当future被创建时,它会注册自己到Tokio调度器中,并告诉调度器它依赖哪些资源(比如TCP socket)来完成
    • 当资源变得可用时(比如TCP socket收到数据),调度器会通知future,并调用它的poll方法
    • 如果poll方法返回Ok(Async::Ready(t)),那么future就完成了,并将t传递给下一个future;如果poll方法返回Ok(Async::NotReady),那么future就继续等待资源变得可用;如果poll方法返回Err(e),那么future就失败了,并将e传递给下一个future
      这样,Tokio可以通过不断地调用poll方法来驱动异步任务的执行。

    这些回答流于表面,但是能起到帮我建立上下文的作用,然后继续追问。

  3. 我以前不理解执行流是如何改变的,现在有些模糊的认识,你帮我参谋一下:对于一个 future 执行流的改变总是发生在对于一个异步函数的 await 的调用处,其它非 await 调用的语句是不会中断的,所以一个 future 只会在 await 处被中断和恢复,我理解的对吗?

    之所以有此一问,是因为我还局限在线程切换和 goroutine 切换的思维定式当中,不论是线程还是 goroutine 都会被抢占,也就是说执行流在任务中的任何一行代码上都可能发生改变,被调度器保存上下文换上新的任务运行,既然 Rust 中也是任务,任务之间的切换没理由不遵循已经被验证过的设计方式。

    你的理解基本上是对的。await是一个语法糖,它会将异步函数分割成多个future,并在每个await处暂停和恢复执行流。但是有一点需要注意,就是如果一个future返回Ok(Async::Ready(t)),那么它不会被中断,而是直接继续执行下一个future。所以只有当一个future返回Ok(Async::NotReady)时,它才会被中断,并等待资源变得可用再恢复。

    很好,这个回答验证了一个猜想,那就是 Rust 的协程无法被 tokio 运行时自由抢占,抢占的任务依赖于底层的线程调度。

  4. 嗯,我明白,现在我们再深入一下,tokio 是如何在 await 返 Ok(Async::NotReady) 时保护现场的?又是如何恢复现场的呢?

    我依旧对于执行流的改变耿耿于怀,我的认知是:当 await 无法完成时,执行器会转而执行其他的任务,而且会在合适的时机再次回到上次的中断处继续。 后面会证明这个想法是错误的

    Tokio是如何在await返回Ok(Async::NotReady)时保护现场的,又是如何恢复现场的呢:

    • 当一个future被创建时,它会被包装在一个Task结构体中,这个结构体包含了future的状态和上下文信息
    • 当一个future被调度器调用poll方法时,它会获取Task的引用,并将其传递给future
    • 当一个future返回Ok(Async::NotReady)时,它会创建一个Waker对象,并将其存储在Task中。Waker对象是一个通知机制,它可以让future告诉调度器什么时候再次poll自己
    • 当资源变得可用时,Waker对象会被触发,并通知调度器再次poll对应的future
    • 调度器会从Task中获取future的状态和上下文信息,并恢复执行流
      这样,Tokio可以通过Task和Waker来保护和恢复现场。

    表示对这个回答无感,可能是了解的不够深,无法 get 到正确的点,那就继续发问

  5. 还有一个疑问:poll 函数到底怎么推进 future 的执行呢?如果实现 future 的这个异步函数中有很多普通的语句,这些语句怎么能通过 poll 得到执行呢?

    我其实隐约感觉到了 future 的执行比较特别,因为很多资料的介绍都是说 future 的执行是在被执行器一次次的 poll 中前进的。

    poll函数到底怎么推进future的执行呢:

    • poll函数是Future trait的核心方法,它会尝试将future推进到完成状态
    • 如果future已经完成,poll函数会返回Poll::Ready(t),其中t是future的结果
    • 如果future还没有完成,poll函数会返回Poll::Pending,并安排一个wake()函数在future准备好继续执行时被调用
    • 当一个异步函数被调用时,它会被编译器分割成多个future,并在每个await处暂停和恢复执行流
    • 每个future都有一个状态机,它记录了当前的执行位置和局部变量
    • 当poll函数被调用时,它会根据状态机来决定接下来要执行哪些语句,并更新状态机
    • 如果遇到一个await表达式,poll函数会检查被等待的future是否已经完成
    • 如果已经完成,poll函数会获取结果并继续执行下一个语句;如果还没有完成,poll函数会返回Poll::Pending,并等待wake()函数再次唤醒自己
      这样,poll函数可以通过状态机来控制异步任务的执行流程

    不知道它说的这些哪些有问题,所以我想要一个被编译过后的实现了 future trait的异步函数内容,先看看再说。

  6. 能给我一个poll函数执行future的示例吗?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll};

    struct MyFuture {
    // some fields
    }

    impl Future for MyFuture {
    type Output = i32;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    // do some work
    if self.is_ready() {
    // return the result
    Poll::Ready(42)
    } else {
    // register a waker
    cx.waker().wake_by_ref();
    // return pending
    Poll::Pending
    }
    }
    }

    因为它没给出原来的异步函数内容,所以我有点生气(因为自己太平庸了,后来才明白 // do some work 就是异步函数中的普通语句)。

  7. 但是这个异步函数的内容是什么?我没看到poll执行了异步函数的内容

    异步函数的内容是由编译器自动分割成多个future的,每个future都有一个poll方法,它会执行一部分异步函数的内容,直到遇到一个await表达式。例如,下面这个异步函数:

    1
    2
    3
    4
    5
    async fn foo() {
    // do something
    let x = await bar();
    // do something else
    }

    会被编译器转换成类似这样的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    enum FooFuture {
    State1,
    State2(Pin<Box<dyn Future<Output = i32>>>),
    }

    impl Future for FooFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    match self {
    FooFuture::State1 => {
    // do something
    let fut = Box::pin(bar());
    *self = FooFuture::State2(fut);
    cx.waker().wake_by_ref();
    Poll::Pending
    }
    FooFuture::State2(fut) => match fut.as_mut().poll(cx) {
    Poll::Ready(x) => {
    // do something else
    Poll::Ready(())
    }
    Poll::Pending => Poll::Pending,
    },
    }
    }
    }

    这样,你可以看到poll执行了异步函数的内容,并在每个await处暂停和恢复执行流。

    看到这里我才恍然大悟,以前没有细细追究编译器将异步函数编译为一个状态机到底意味着什么。

  8. 你的意思是编译器会把异步函数中的语句拆分到不同的状态机状态中,poll 的时候这些代码就会被执行是吗?

    看,多么痛的领悟~

    是的,你理解得很对。编译器会把异步函数中的语句拆分到不同的状态机状态中,poll的时候这些代码就会被执行。这样,异步函数就可以实现非阻塞的并发操作。

    但是,这仅仅是发现了新大陆而已,不能忘记为何出发

  9. 那 await 放弃执行权是根据一个非阻塞文件描述符返回 EAGAIN 标志吗?

    根据 go 的经验,我问了一个底层的问题,想试着在Rust上缝合这一块儿内容

    await放弃执行权不是根据一个非阻塞文件描述符返回EAGAIN标志,而是根据future的poll方法返回Poll::Pending。当future的poll方法返回Poll::Pending时,它会注册一个waker,这个waker会在future可以继续执行时唤醒它。这样,异步函数就可以在等待某些操作完成时让出CPU资源给其他任务。

    它显然没理解我的意思,是我太跳跃了,那就给它点提示。

  10. 那 poll 方法根据什么返回一个 Poll::Pending 呢?

    poll方法根据future是否完成来返回一个Poll::Pending。如果future还没有完成,poll会返回Poll::Pending,并存储一个从当前Context复制的Waker的克隆。这个Waker会在future可以继续执行时唤醒它。例如,一个等待socket变为可读的future会调用Waker的.clone()方法并存储它。当socket变为可读时,Waker会被唤醒,然后再次调用poll方法。

    显然不能让它这么轻易几句就打发了,我决定要一个具体的例子。

  11. 给我一个读取 socket 产生 pending 的例子,我要看看这个函数生成的 poll 内容,因为我对如何发出 socket 读取,并且不会陷入系统调用发生阻塞感兴趣。我是想要 tokio 异步编程中,一个异步函数读取 socket,被编译器编译出来的 poll 方法的内容,可以吗?

    你总是需要小心描述你的要求,这对中文来说是个很大的挑战,但是我又没有耐心一点点儿地抠英文,学渣的无奈啊!

    tokio异步编程中,一个异步函数读取socket,被编译器编译出来的poll方法的内容可能是这样的:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll};
    use tokio::io::{AsyncReadExt, AsyncWriteExt};
    use tokio::net::TcpStream;

    // 定义一个异步函数,从socket中读取数据,并写回去
    async fn echo(mut stream: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
    let mut buf = [0; 1024];

    loop {
    // 读取socket,如果没有数据可读,返回Poll::Pending,并注册一个waker
    let n = stream.read(&mut buf).await?;

    if n == 0 {
    // socket已经关闭,退出循环
    break;
    }

    // 将读取到的数据写回socket
    stream.write_all(&buf[0..n]).await?;
    }

    Ok(())
    }

    // 编译器会将上面的异步函数转换为一个实现了Future特质的结构体类型
    struct Echo {
    stream: TcpStream,
    buf: [u8; 1024],
    state: State,
    }

    enum State {
    Reading { n: usize },
    Writing { n: usize },
    }

    impl Future for Echo {
    type Output = Result<(), Box<dyn std::error::Error>>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    loop {
    match self.state {
    State::Reading { mut n } => {
    // 尝试从socket中读取数据
    let result = Pin::new(&mut self.stream).poll_read(cx, &mut self.buf[n..]);

    match result {
    // 如果成功读取到数据,更新n和state,并继续循环
    Poll::Ready(Ok(m)) => {
    if m == 0 {
    // socket已经关闭,返回Poll::Ready(Ok(()))
    return Poll::Ready(Ok(()));
    } else {
    n += m;
    self.state = State::Writing { n };
    }
    }
    // 如果没有数据可读,返回Poll::Pending,并注册一个waker
    Poll::Pending => return Poll::Pending,
    // 如果出现错误,返回Poll::Ready(Err(e))
    Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
    }
    }
    State::Writing { mut n } => {
    // 尝试将数据写回socket
    let result = Pin::new(&mut self.stream).poll_write(cx, &self.buf[..n]);

    match result {
    // 如果成功写入数据,更新n和state,并继续循环
    Poll::Ready(Ok(m)) => {
    if m == n {
    // 数据已经全部写入,重置n和state,并继续循环
    n = 0;
    self.state = State ::Reading { n };
    } else {
    // 数据还有剩余部分没有写入,减少n并保持state不变,并继续循环
    n -= m;
    self.state = State ::Writing { n };
    }
    }
    // 如果没有空间可写,返回Poll ::Pending,并注册一个waker
    Poll ::Pending => return Poll ::Pending,
    // 如果出现错误,返回Poll ::Ready (Err (e))
    Poll ::Ready (Err (e)) => return Poll ::Ready (Err (e.into())),
    }
    }
    }
    }
    }
    }

    从这个例子就可以看出对于有 2 个 await 的异步函数,编译过后的状态机模样了

  12. tokio::net::TcpStream.poll_read 是如何做到不阻塞的?

    我选了 read 方法问它是如何做到不阻塞的

    tokio::net::TcpStream.poll_read 是一个异步读取方法,它可以从一个 TCP 流中接收数据,但不会等待新数据到达。它是非阻塞的,因为它不会挂起当前线程,而是返回一个 Poll 结果,表示操作是否完成或需要重试 。要使用这个方法,你需要确保 TCP 流是在非阻塞模式下创建的

    显然,它不知道我想要更底层的内容

  13. 可以给出源码吗?它是在收到 EAGAIN 时返回一个 pending 吗?

    你可以在这里查看 tokio::net::TcpStream.poll_read 的源码。它是一个 trait 方法,它调用了 TcpStream 的 poll_read_priv 方法,该方法使用了 PollEvented 来封装一个 mio::net::TcpStream。如果收到 EAGAIN 错误,它会返回一个 Pending 结果,并注册当前任务的唤醒器,以便在流变得可读时再次轮询。

    OK,问到我想要的答案了,接下来转移话题。

  14. 我有个疑问,Rust 不内置异步运行时,那么编译器在编译 await 时,如何在返回 pending 时放弃 future 的执行权将执行流引向 executor 代码的呢?

    之所以问出这种问题,是因为我对Rust的异步运行时并不了解,也不清楚 future 是如何被催动的

    这是一个很好的问题。Rust 的 async/await 特性是基于 trait 的,这意味着它不依赖于任何特定的运行时或执行器。编译器在编译 await 时,会将当前函数转换为一个状态机,其中每个 await 点都是一个状态转换。当一个 future 被轮询时,它会执行到下一个 await 点,然后返回 Pending 或 Ready 结果。执行器负责轮询 future,并在它们变得可用时恢复它们的执行。因此,编译器不需要知道执行器的细节,只需要生成合适的状态机和轮询逻辑。

  15. 奥,是不是执行器负责执行 future,当 future 中的 await 返回 pending 时,执行器放弃执行当前的 future,转而去寻找其它的 future 继续轮询,我理解的对吗?

    是的,你的理解是对的。执行器就是负责管理和调度 future 的组件,它可以根据不同的策略来选择哪些 future 要轮询或暂停。执行器可以是单线程或多线程的,也可以支持不同的并发模型,如协程、线程池、事件循环等。

    ChatGPT 的优点就是可以节约你很多研究时间,当看到被肯定时,还是挺激动地,但是不能止步,终极问题还没解决

  16. 详细讲解一下future是如何保存现场和恢复现场的?

    还是绕回到这个话题上来了,因为我的思维定式就是任务切换,要保存上下文的,IP,SP这些寄存器,及其他通用寄存器

    future 是一种抽象的类型,它表示一个异步计算的结果,它可以在某个时刻被轮询并返回一个值或者继续挂起。future 本身不保存任何现场,它只是一个状态机,它根据输入的上下文(Context)和缓冲区(ReadBuf)来决定下一步的动作。当 future 被轮询时,它会执行到下一个 await 点,并将自己的状态和局部变量封装在一个 Pin<Box<…>> 中返回给执行器。这个 Pin<Box<…>> 就是 future 的保存的现场,它可以在之后被再次传入 future 的 poll 方法来恢复执行。

    这个回答的好处是引出了Pin<Box<…>>,虽然细节不太清楚,但以前就混过脸熟,只要记住上下文放在这里就行。

  17. 那么为什么要用 Pin<Box<…>> 来封装 future 上下文呢?

    是为了保证 future 的内存地址不会改变,从而避免破坏 future 的安全性和正确性。Pin 是一个指针类型,它表示一个不能被移动的值。Box 是一个堆分配的智能指针,它可以存储任意类型的值 。将 future 包装在 Pin<Box<…>> 中,就相当于将 future 放在堆上,并且保证它不会被移动或者删除 。这样,future 就可以安全地保存自己的状态和局部变量,并且可以在任何时候被恢复执行。

    这个时候感觉的一点匪夷所思的是,执行器在一个线程上不断轮询若干个 future,而且 Rust 的协程是无栈协程,那么代码执行就只用到线程栈,按照前面的说法使用 Pin 保存现场,那么是否future的执行不依赖于栈中的内容,否则,在多个future之间跳来跳去岂不是乱了套?

  18. 那future是不是对堆栈的依赖不大?

    这个问题没有一个确定的答案,因为 future 的实现和使用方式可能不同。一般来说,future 会尽量减少对堆栈的依赖,因为堆栈是有限的资源,而 future 可能会有很多。future 通常会将自己的状态和局部变量存储在堆上,或者使用一些优化技术来避免不必要的内存分配。但是,future 也可能会使用一些堆栈空间来执行一些临时的计算或调用其他函数。因此,future 对堆栈的依赖程度取决于具体的情况和设计。

  19. 那 future 在保存现场的时候不考虑堆栈中的内容吗?

    future 在保存现场的时候,只需要考虑自己的状态和局部变量,而不需要考虑堆栈中的内容。这是因为 future 的 poll 方法是一个纯函数,它不会修改堆栈中的任何数据,也不会依赖于堆栈中的任何数据。堆栈中的内容只是一些临时的计算或调用结果,它们在 future 挂起或恢复时并不重要。因此,future 只需要将自己封装在一个 Pin<Box<…>> 中,并将其返回给执行器,就可以实现现场的保存和恢复。

    至此,我大概理解 Rust 的异步网络 io 模型了。

  20. 我有点理解了,每次 poll 或许会使用堆栈,但 poll 完之后,poll 函数返回堆栈内容就弹出了,但是上一次执行的状态会被保存在 pin 结构中,下一次 poll 的时候,会基于 pin 中的内容决定状态机的走向,进而一步步推动整个 future 逻辑的前行,是不是?

    是的,你的理解是正确的。future 的 poll 方法就是一个状态机,它会根据自己的状态和输入的上下文来决定下一步的动作。每次 poll 完成后,future 会将自己的状态和局部变量保存在 pin 结构中,并返回一个值表示 future 是否已经完成或者还需要继续轮询。下一次 poll 的时候,future 会从 pin 结构中恢复自己的状态和局部变量,并继续执行到下一个 await 点或者结束。这样,future 就可以实现异步计算的逻辑。

因为我用的是NewBing,所以20句就截止了,所幸问题已经聊清楚了,其实类似于 tokio 之类的异步运行时在调度协程时并没有我惯性思维中以为的那种保存寄存器恢复寄存器的过程,它的核心在于每个异步函数都会被编译为一个状态机,这个状态机是玄机所在,再加上一个 pin 的结构来保存和恢复一些中间变量或者说状态。这样一来,依靠 epoll 事件,调度器和执行器的配合,不停地去 poll 这个状态机就行了,事件的发生会导致状态的改变,从而执行不同状态下的代码。这种架构理解起来不算直观,不如go的协程好理解。

go 的 netpoller 网络模型是与 goroutine 的密切配合实现的,作为 reactor,netpoller 只是根据网络文件描述符的状态修改关注它的 goroutine 的状态,剩余的工作交给调度器;以同步模式使用的网络 socket 读取代码,会在一个非阻塞的 socket 上获得 EAGAIN 错误时执行 gopark 进入调度循环,放弃执行,等待再次被调度。

至于 tokio 的网络模型,虽然 reactor 的底层逻辑都是使用 epoll 事件循环,但是当被激活时的行为是完全不同的,tokio 的 reactor被激活时会调用 executor 事先注册好的 waker 函数来通知 executor 去轮询 future,猜测这种模型的运行时要比 go 的运行时要小,加上 future 是无栈协程,理论上在处理大量网络连接时的效率要高。但是,tokio 也仅仅适用于 io 相关的操作,无法像 go 的goroutine 一样万事皆可异步!

原文链接:Challenges of running gRPC services in production

实现服务间通信的方式有若干种,这通常涉及到 TCP/IP 协议族中的传输层。我们的应用程序经常依靠它来提供若干抽象和功能,例如负载均衡、重试和高可用性等。

然而,当我们在生产环境中运行服务时,我们会遇到更多网络相关的错误,这无疑超乎我们的想象。本文旨在阐释在使用 gRPC 进行服务间通信时,如何来缓解这些错误。

Why gRPC?

回溯至 2016 年 ,彼时 Incognia 的服务几乎都在使用 HTTP1.1 / JSON 技术栈进行通信。它在很长一段时间内运行良好,但随着公司的发展,一些高流量服务开始需要一种更高效的方式与内部客户端通信。

JSON API 的文档维护也很麻烦,因为它们没有与代码本身绑定,这意味着某人可以部署更改 API 的代码,却没有修改对应的文档。

为了寻找一个足够优秀的替代方案,我们调研了 grpc,它通过以下特性完美解决了性能问题以及上述模式定义问题:

  • API 平面直接在 protobuf 文件中定义,其中每个方法都描述了自己的请求/响应类型
  • 自动生成多语言的客户端和服务端代码
  • 与 Protobuf 结合,使用 HTTP/2 协议,HTTP/2 和 Protobuf 都是二进制协议,这意味着请求/响应的有效载荷更加紧凑
  • 而且,HTTP/2 使用持久化连接,无需像 HTTP/1.1 那样频繁地创建和销毁连接

但是,运行 grpc 服务也给我们带来很多挑战,主要归因于 HTTP/2 使用持久化连接这一事实。

生产环境使用 gRPC 的挑战

我们是 Kubernetes 的重度用户,因此我们的 grpc 服务全部运行在由 Amazon EKS 提供的 Kubernetes 集群之上。

确保在 server 上负载均衡是面临的挑战之一。因为服务的数量是动态变化的,这是由(k8s的)自动伸缩功能决定的,所以客户端必须有能力识别新的 server,并剔除不可用的连接。与此同时还要保证所有的请求以某种负载均衡策略均衡地分布到这些 server 上。

负载均衡

对此, The gRPC blog 阐述了几种解决方案,大致可分为代理负载均衡(或服务端负载均衡)和客户端负载均衡。在下面的章节中,我将按时间线来讲述我们的实现方式。

方式 1: 使用 Linkerd 1.x 作为负载均衡代理

如图1所示,我们实现的第一种方式是使用一个名为 Linkerd 1.x 的代理负载均衡器。这种方式在一段时间内工作的很好,它在服务端解决了负载均衡问题,但没解决从客户端到代理的负载均衡问题,这意味某些 Linkerd 实例要处理比其它实例更多的请求。

这种不均衡后来被证实是有问题的,过载的实例可能会增加太多延迟,甚至有时会耗尽内存,变的越来越难以维护。

除此之外,我们在 Kubernetes 集群中将 Linkerd 部署为守护进程,这意味着 Linkerd pod 在集群中的每个工作节点上运行。因此,这种解决方案被证明会增加相当大的开销(因为它需要额外的网络跳跃),并且还会消耗大量资源。

img

Figure 1: Proxy load balancer approach

方式 2: 胖 gRPC 客户端

将处理负载均衡的职责放到客户端代码,我们试图淘汰代理层,以此来尝试解决第一种方式带来的问题。

通过结合使用 grpc-gonaming.NewDNSResolverWithFreq(time.Duration) 和 Kubernetes 的 headless services(处理 server pod的发现),我们在客户端处理负载均衡。使用这种方式,客户端需要每隔几秒钟轮询目标 service 的 DNS,以此来刷新可供连接的主机池。

与使用代理层相比,此处客户端与服务的pod直连,降低了调用延迟。下图展示了该方式所涉及到的组件。

img

Figure 2: Thick client approach

然而,Go gRPC 实现不再支持使用 DNS 进行动态服务发现,而是推荐使用 xDS 等其他协议。而且,在其他语言中,DNS 动态服务发现从未被实现过。

我们知道,尽管这种方式带来了稳定和高性能的通信,但是由于 gRPC 实现的多样化,依赖客户端实现负载均衡终归是脆弱和难以维护的。这一点也同样适用于其它功能,比如限流和认证。

当尝试过这些不同的方法之后,我们清楚的认识到,我们需要一种通用、低成本、语言无关的方式来实现服务发现和负载均衡。

方式 3: 使用 Envoy 边车代理

经过一番调研,我们选择使用边车模式——在客户端 pod 中增加一个容器,由它来负责服务发现,负载均衡,以及一些针对连接的监测功能。从性能和部署简便性出发,我们选择了 Envoy

img

Figure 3: Sidecar proxy approach

在这种方式下,客户端容器与 Envoy 边车连接,由边车负责维护和目标 service 的连接。

采用这种方式,我们终于得偿所愿:

  • 与Linkerd 1.x相比,Envoy的开销很小,延迟低。
  • 对客户端代码没有侵入性
  • 可观测性,因为 Envoy 暴露了兼容 Prometheus 格式的 metrics
  • 丰富了网络层,支持认证和限流等特性

服务发现与优雅关闭

正确配置了负载均衡之后,我们仍然需要一种方法来让 Envoy 发现新的目标并更新其主机池。

Envoy 中的服务发现有几个选项,如 DNS、EDS(基于 xDS )。因为简单和熟悉的缘故,我们选择了 DNS。

使用 DNS 作为服务发现机制有个潜在的复杂性,即传播需要一定的时间,因此,在一个正在终止的后端服务真正停止接受连接之前,我们需要为 gRPC 客户端留有更新主机列表的余地。因为有一个 TLL 时间与 DNS 记录相关联,也就是说 Envoy 会在这段时间内缓存主机列表,所以使用 DNS 时,优雅关闭流程需要一点技巧

下图展示了一个基本流程,它以一个失败的请求结束:

img

Figure 4: Terminating host makes request fail due to DNS caching

在这个场景中,因为服务端 pod 不再可用,但 Envoy 缓存依然持有其 IP,所以第二次客户端请求以失败告终。

要解决这个问题,我们有必要看一下 Kubernetes 是如何销毁 pod 的,这篇文章对此有详细的论述。它包含两个同时进行的步骤:在 Kubernetes service endpoints 中移除 pod(在我们的案例中,同时也会移除 DNS 记录列表中该 pod 的 IP)向容器发送 TERM 信号,启动优雅关闭

有鉴于此,我们使用 Kubernetes 的 pre-stop 钩子来阻止 TERM 信号的立即发送:

img

Figure 5: preStop hook

配置好 preStop 钩子之后,我们的流程就变成下面这样:

img

Figure 6: successful server pod shutdown flow

通过这种解决方式,我们为 Envoy 的 DNS 缓存留出足够的时间,使其过期并且重新刷新,从而剔除已经死亡的 pod IP。

未来改进

尽管 Envoy 为我们带来了性能提升和整体的简洁,但是基于 DNS 的服务发现依旧不是很理想。因为它是基于轮询的,在 TTL 过期之后,由客户端负责刷新主机池,所以不够健壮。

一种更加稳健的方式是使用 Envoy 的 EDS,它拓展了一些功能,诸如金丝雀发布和更加精密的负载均衡策略等,因此是一种更加灵活的方案,不过我们仍然需要一些时间来评估这种方案并在生产环境上进行验证。

CPU是一个舞台,操作系统内核是位技艺精湛的指挥家,形形色色的进程就是舞者,它们在内核的指挥下轮番上台表演,演奏一场生命的赞歌!

1

调度是件很神奇的事,一直以来我都对它无比着迷,并不断地钻营个中奥妙。几年下来,略有心得,于是思索着记录下来,以备将来优哉游哉忘乎所以后回来检索之用。

然而吸引我的并不是调度算法,而是调度的时机和原理。因此这篇文章只讨论调度时机和 Linux 内核的调度原理,与调度算法无涉,本文将调度算法当成一个黑盒,优先了解调度行为本身。

计算机操作系统进入多道程序后,需要支持多个程序并发运行,这就需要操作系统必须有能力管理多个程序的运行,必要的时候进行程序切换,使得多个程序轮流获得CPU的使用权。当操作系统需要协调多个程序运行时,就有必要做点什么来保证各个程序可以无冲突并发运行(并发和并行的区别,此处不予讨论)。我们不妨做个不严谨的类比,如果需要使用文字来描述每个任务,那么把所有的任务写在一个文档里显然不是明智之举,即便是计算机新手,也懂得为每个任务单独建一个文档分别管理;当任务变得复杂,发展出很多支线时,一个文档很快又会捉襟见肘,不利于管理了。此时,聪明的做法就是为这个任务创建一个文件夹,每个文件夹中有若干文档,这些文档描述了任务的主线和若干支线,如果有必要它们可以共享文件夹中的图片、音视频等多媒体。

我想你一眼就能看出来,上面的描述是在说进程和线程。不过我类比的重点并不是进、线程本身,而意在说明:要管理多个任务,就必然会采取某种结构来分门别类,分而治之!也就是说,操作系统内核会使用一个结构体来描述进程、线程及其相关的一切。那可以称这个结构体为进程或者线程吗?答案当然是否定的!谈调度自然绕不开进程和线程,因此我们要直面一个难题:你永远无法轻易地说清楚进程是什么!

2

《操作系统导论》将进程定义为:操作系统为正在运行的程序提供的抽象!认为进程只是一个正在运行的程序,并且声明机器状态(程序在运行时可以读取或更新的内容)为进程的一部分,机器状态包括内存和寄存器。

无独有偶,《深入理解计算机系统》对进程的定义是:进程是操作系统对一个正在运行的程序的一种抽象。这和《操作系统导论》中给出的定义如出一辙,但是它紧接着还给出了线程的定义:进程往往不只有单一的执行流,进程实际上可以由多个称为线程的执行单元组成,每个线程都运行在进程的上下文中,并共享同样的代码和全局数据。

《深入理解Linux内核》补充说:可以把进程看作充分描述程序已经执行到何种程度的数据结构的汇集。这直接印证了我们之前的类比,从内核的观点看,进程的目的就是担当分配系统资源(CPU时间、内存等)的实体。这本书更偏爱于将进程描述成拥有多个相对独立的执行流,程序运行的本质就是CPU不断的执行一系列的指令序列,所以看起来就像是不断行进的流。

传统的Unix进程只拥有一个执行流,即便是现代主流语言的编程模式,如果你不采用多线程编程的话,进程也只有一个执行流,我们习惯称之为主线程;一旦使用了多线程的编程范式,进程就会拥有多个独立的执行流,Linux 内核就会为每个执行流分配单独的数据结构来管理资源的使用及其机器状态。实际上,Linux内核中的数据结构并不区分进程和线程,统一都使用task_struct这个结构体来描述一个执行流,程序的调度也是基于这个结构来进行的,而同属于一个进程的线程结构会共享某些资源,比如代码段、虚拟地址空间、打开的文件描述符、信号、堆等,只有堆栈是每个线程私有的。因此,当我们谈论进程时更多的是从资源的角度去思考,而谈到线程时,更多的是从执行流的角度去考虑。

综上,可以给出我心中对于进程的定义:进程是若干个活动的执行流以及各类相关资源的总称,这些资源包括了内核结构、地址空间(内存),寄存器等,其中进程的地址空间包括了多种类型的资源,代码段、数据段、堆、线程堆栈、文件映射等等;有些资源是线程共享的,比如堆、代码段,有些是线程私有的,比如堆栈。这些资源和若干独立执行流共同组成了进程这个抽象概念!

我们即将要探讨的就是:内核是如何将一个执行流从 CPU 上换下,代之以另一个执行流的!

3

这一章节中出现新进程和新建进程两个词语,未免混淆特此说明如下:

新进程:上下文切换时,被选中替换当前进程者

新建进程:使用fork、clone等创建的新进程或新线程

需要明确的一件事情就是内核调度的粒度,你定然听过“线程是调度的基本单位”这样的说法,这种说法固然没有错,但具体每个操作系统的实现却多有不同。以 Linux 为例,其内核角度并不区分进程和线程,用于标识调度单位的结构一律都是task_struct,因此后续的行文不会刻意区分进程和线程。

为了控制进程的执行,内核必须有能力挂起正在 CPU 上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换、任务切换、上下文切换。这是从 CPU 的角度来看,如果站在内核的角度,那么这种行为就是在进行进程调度。

进行任务切换或者调度的根本原因在于 CPU 和寄存器是共享的,大家必须轮流使用!

有意思的是,执行任务切换的并不是一个进程或者内核线程,而是一个函数schedule()。内核中有很多精心定义的点来执行schedule(),我们先来观察schedule()如何执行进程切换,至于调度的时机稍后进行讨论。

进程切换只会也只应发生在内核态,这本身就是内核的职责,需要申明的一点是:执行流进入内核态后,用户态的硬件上下文已经保存在该进程对应的内核态堆栈上了,当该进程从内核态返回用户态时即可恢复原貌,从之前的终止处继续执行。

进程切换由两步组成:

  1. 切换进程地址空间
  2. 切换内核态堆栈和硬件上下文

schedule 会调用 context_switch 执行上下文切换,我们以 Linux 内核 5.10 版本为例,假设此时已经由调度算法挑选出了下一个即将运行的进程next,执行流进入context_switch的代码执行上下文切换:

kernel/sched/core.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
* context_switch - switch to the new MM and the new thread's register state.
*/
static __always_inline struct rq *
context_switch(struct rq *rq, struct task_struct *prev,
struct task_struct *next, struct rq_flags *rf)
{
prepare_task_switch(rq, prev, next);
arch_start_context_switch(prev);

if (!next->mm) { // to kernel
enter_lazy_tlb(prev->active_mm, next);

next->active_mm = prev->active_mm;
if (prev->mm) // from user
mmgrab(prev->active_mm);
else
prev->active_mm = NULL;
} else { // to user
membarrier_switch_mm(rq, prev->active_mm, next->mm);
switch_mm_irqs_off(prev->active_mm, next->mm, next);

if (!prev->mm) { // from kernel
/* will mmdrop() in finish_task_switch(). */
rq->prev_mm = prev->active_mm;
prev->active_mm = NULL;
}
}

rq->clock_update_flags &= ~(RQCF_ACT_SKIP|RQCF_REQ_SKIP);

prepare_lock_switch(rq, next, rf);

/* Here we just switch the register state and the stack. */
switch_to(prev, next, prev);
barrier();

return finish_task_switch(prev);
}

context_switch的前半部分完成进程地址空间切换,switch_to 完成硬件上下文切换,此处主要关注switch_to如何完成上下文切换,毕竟我们只对执行流的变更感兴趣。

switch_to 是个预定义的宏,因其是和硬件体系结构密切相关的,所以主要部分需用汇编语言实现,我们看一下它的内容:

arch/x86/include/asm/switch_to.h

1
2
3
4
#define switch_to(prev, next, last)					\
do { \
((last) = __switch_to_asm((prev), (next))); \
} while (0)

__switch_to_asm 即是由汇编实现的程序主体:

arch/x86/entry/entry_64.S

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/*
* %rdi: prev task
* %rsi: next task
*/
.pushsection .text, "ax"
SYM_FUNC_START(__switch_to_asm)
/*
* Save callee-saved registers
* This must match the order in inactive_task_frame
*/
pushq %rbp
pushq %rbx
pushq %r12
pushq %r13
pushq %r14
pushq %r15

/* switch stack */
movq %rsp, TASK_threadsp(%rdi)
movq TASK_threadsp(%rsi), %rsp

#ifdef CONFIG_STACKPROTECTOR
movq TASK_stack_canary(%rsi), %rbx
movq %rbx, PER_CPU_VAR(fixed_percpu_data) + stack_canary_offset
#endif

#ifdef CONFIG_RETPOLINE
/*
* When switching from a shallower to a deeper call stack
* the RSB may either underflow or use entries populated
* with userspace addresses. On CPUs where those concerns
* exist, overwrite the RSB with entries which capture
* speculative execution to prevent attack.
*/
FILL_RETURN_BUFFER %r12, RSB_CLEAR_LOOPS, X86_FEATURE_RSB_CTXSW
#endif

/* restore callee-saved registers */
popq %r15
popq %r14
popq %r13
popq %r12
popq %rbx
popq %rbp

jmp __switch_to
SYM_FUNC_END(__switch_to_asm)

首先遵循被调用者原则保存6个寄存器的值,保存的方式就是压入即将被替换进程prev的内核栈,直到此时执行流依然使用prev的内核栈,不过接下来就开始切换内核栈了:

1
2
3
/* switch stack */
movq %rsp, TASK_threadsp(%rdi)
movq TASK_threadsp(%rsi), %rsp

依照内核函数的调用惯例,%rdi%rsi两个寄存器分别存放调用时传入的第一和第二个参数:__switch_to_asm((prev), (next)))

movq %rsp, TASK_threadsp(%rdi) 的结果是将当前内核栈的栈顶指针寄存器内容保存至prev进程(也就是当前进程)的thread->sp中,thread 是进程描述符task_struct 中一个类型为thread_struct的字段,里面会保存大部分 CPU 寄存器(但不包括 rax、rbx 等通用寄存器,它们的值保存至内核堆栈中)。而movq TASK_threadsp(%rsi), %rsp这一句将被选中进程next中之前被保存的栈顶指针恢复至rsp寄存器,执行完这一条之后执行流的内核堆栈就切换到新进程了。

严格来讲,从上面一条指令之后就是新进程的执行流了,可以说改变内核堆栈就意味着改变当前进程。这归因于和内核堆栈一起存放的一个名为thread_info的结构,内核都是通过它来寻找当前进程的描述符(参考《深入理解Linux内核》88页-标识一个进程)。

接下来,从新进程的内核堆栈中弹出之前保存的6个寄存器的值,然后 jmp__switch_to 函数,__switch_to 函数依然做一些保存老进程上下文和加载新进程上下文的工作,此处不再深入展开,我们仅将注意力集中到jmp这条指令上。

如果你熟悉 2.6 版本的内核,读过《深入理解Linux内核》,你就会奇怪:为何此处切换了新进程的rsp,却没有rip的切换呢?2.6 版本是会将新进程 thread 字段中的ip推入内核堆栈作为返回地址(该指令地址就位于switch_to中),这样在 __switch_toret时就会跳转到thread->ip指向的代码指令了,这也是为什么用jmp 不用 call的原因(call 会将ip 压栈,而jmp只是简单的跳转,不会压栈)。当我读到此处时,禁不住废书而叹,惊讶其设计的巧妙,但同时也生出疑问:既然新进程保存的地址就在附近,为何非要到thread->ip中绕一圈呢?直接把该地址推入内核堆栈效果不也一样吗?可能是内核的设计者也意识到这一点,在 2.6 版本 64 位内核直到最新的5.x版本就放弃了这种做法,不再去刻意的保存和恢复进程的rip 了,因为新进程的起点总是在switch_to函数中。

所以,此处在__switch_to 返回时会跳到开始的switch_to处:

1
2
3
4
#define switch_to(prev, next, last)					\
do { \
((last) = __switch_to_asm((prev), (next))); \
} while (0)

栈帧一层层解开,会返回到 context_switch 函数调用,继而回到schedule()。可见,任何一个进入内核态调用schedule()执行任务切换的进程,最终都会等到schedule()调用的返回。我们稍后会讲到 Go 运行时的协程调度,其schedule函数是不会返回的,所以它看起来并不像一个函数,这是区别于操作系统调度函数一个重要的点。

分析到此处,我们可能会以手舞之,以足蹈之,感觉终于在指令级别“看到”进程切换的本质了!不过还没到欢欣鼓舞的时候,我们似乎忘掉了一种情形:如果被选中的新进程是新创建的,从来没有被运行过,该又如何呢?__switch_toret时执行流又将流向何方呢?毕竟新进程是没有执行过schedule()的。

依然是内核堆栈救了我们!

要意识到的一点是:新建进程的内核堆栈并不是空的,它 copy 自父进程并经过精心的构造。

clone系统调用为例,copy_thread用发出clone()系统调用时的CPU寄存器的值(此时都在父进程的内核堆栈中)来初始化子进程或者线程的内核堆栈。不过copy_thread会把rax寄存器对应字段的值(这是fork和clone系统调用在子进程或线程中的返回值)强行置为0,这就是为何我们使用fork时要通过返回值来判断此时执行的是子进程还是父进程。既然说到这里,就聊一下clone系统调用的封装接口吧!线程创建的接口是由标准库 glibc 中的包装函数实现的,它处理了返回值的问题,如果是主线程则返回到调用处,如果是新建线程则跳转到任务函数调用处,这是 glibc 封装函数通过推入新线程的堆栈实现的。我们可以观察一下glibc 的 clone 封装代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/* The userland implementation is:
int clone (int (*fn)(void *arg), void *child_stack, int flags, void *arg),
the kernel entry is:
int clone (long flags, void *child_stack).
The parameters are passed in register and on the stack from userland:
rdi: fn
rsi: child_stack
rdx: flags
rcx: arg
r8d: TID field in parent
r9d: thread pointer
%esp+8: TID field in child
The kernel expects:
rax: system call number
rdi: flags
rsi: child_stack
rdx: TID field in parent
r10: TID field in child
r8: thread pointer */

.text
ENTRY (__clone)
/* Sanity check arguments. */
movq $-EINVAL,%rax
testq %rdi,%rdi /* no NULL function pointers */
jz SYSCALL_ERROR_LABEL
testq %rsi,%rsi /* no NULL stack pointers */
jz SYSCALL_ERROR_LABEL

/* Insert the argument onto the new stack. */
subq $16,%rsi
movq %rcx,8(%rsi)

/* Save the function pointer. It will be popped off in the
child in the ebx frobbing below.
这里把线程要运行的函数fn压入新建线程的用户态堆栈中*/
movq %rdi,0(%rsi)

/* Do the system call. */
movq %rdx, %rdi
movq %r8, %rdx
movq %r9, %r8
mov 8(%rsp), %R10_LP
movl $SYS_ify(clone),%eax

/* End FDE now, because in the child the unwind info will be
wrong. */
cfi_endproc;
syscall

/* 比较返回值 */
testq %rax,%rax
jl SYSCALL_ERROR_LABEL
/* 等于零则跳转到 L,否则直接返回 */
jz L(thread_start)

ret

L(thread_start):
cfi_startproc;
/* Clearing frame pointer is insufficient, use CFI. */
cfi_undefined (rip);
/* Clear the frame pointer. The ABI suggests this be done, to mark
the outermost frame obviously. */
xorl %ebp, %ebp

/* Set up arguments for the function call. */
popq %rax /* Function to call. 弹出函数地址 */
popq %rdi /* Argument. */
call *%rax /* 开始调用执行 */
/* Call exit with return value from function call. */
movq %rax, %rdi
movl $SYS_ify(exit), %eax
syscall
cfi_endproc;

开头的注释部分很好地揭示了用户态代码和内核态代码对于传参的不同要求,即寄存器的使用规则。关键点是movq %rdi,0(%rsi)这一句,它将线程要执行的fn地址推入了新进程的堆栈,并在L(thread_start):部分通过下面三条指令启动新线程执行流:

1
2
3
popq	%rax		/* Function to call. 弹出函数地址 */
popq %rdi /* Argument. */
call *%rax /* 开始调用执行 */

Go 并不使用标准的 C 库,其 runtime 重写了所有的系统调用封装函数,从 Go 的 clone 实现来看,也是相似的逻辑。不过,Go 使用 plan9 汇编,在阅读上会有一些不便:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void));
TEXT runtime·clone(SB),NOSPLIT,$0
MOVL flags+0(FP), DI
MOVQ stk+8(FP), SI
MOVQ $0, DX
MOVQ $0, R10
MOVQ $0, R8
// Copy mp, gp, fn off parent stack for use by child.
// Careful: Linux system call clobbers CX and R11.
MOVQ mp+16(FP), R13
MOVQ gp+24(FP), R9
MOVQ fn+32(FP), R12 // fn 被放入R12寄存器
CMPQ R13, $0 // m
JEQ nog1
CMPQ R9, $0 // g
JEQ nog1
LEAQ m_tls(R13), R8
#ifdef GOOS_android
// Android stores the TLS offset in runtime·tls_g.
SUBQ runtime·tls_g(SB), R8
#else
ADDQ $8, R8 // ELF wants to use -8(FS)
#endif
ORQ $0x00080000, DI //add flag CLONE_SETTLS(0x00080000) to call clone
nog1:
MOVL $SYS_clone, AX // 放入系统调用号,准备进入系统调用
SYSCALL
// 系统调用返回
// In parent, return. 父进程则返回
CMPQ AX, $0
JEQ 3(PC) // 若返回值为0,则跳跃3条指令
MOVL AX, ret+40(FP)
RET

// In child, on new stack.
// 如果是子进程则切换堆栈
MOVQ SI, SP

// If g or m are nil, skip Go-related setup.
CMPQ R13, $0 // m
JEQ nog2
CMPQ R9, $0 // g
JEQ nog2

// Initialize m->procid to Linux tid
MOVL $SYS_gettid, AX
SYSCALL
MOVQ AX, m_procid(R13)

// In child, set up new stack
get_tls(CX)
MOVQ R13, g_m(R9)
MOVQ R9, g(CX)
MOVQ R9, R14 // set g register
CALL runtime·stackcheck(SB)

nog2:
// Call fn. This is the PC of an ABI0 function.
// 调用 fn
CALL R12

// It shouldn't return. If it does, exit that thread.
MOVL $111, DI
MOVL $SYS_exit, AX
SYSCALL
JMP -3(PC) // keep exiting

回到操作系统内核新建进程的话题上来,前面讲过,在内核 3.0 到最新的 5.x 版本不再去刻意的保存和恢复进程的rip 了,因为新进程(被选中的进程)的起点总是在switch_to函数中。那么在新建进程初次运行的问题上,新旧版本虽然看上去不同,但殊途同归,最终都会把执行流引向ret_from_fork(),我们先看看 2.6 版本 32 位内核的操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#define switch_to(prev,next,last) do {					\
unsigned long esi,edi; \
asm volatile("pushfl\n\t" /* Save flags */ \
"pushl %%ebp\n\t" \
"movl %%esp,%0\n\t" /* save ESP */ \
"movl %5,%%esp\n\t" /* restore ESP */ \
"movl $1f,%1\n\t" /* save EIP */ \
"pushl %6\n\t" /* restore EIP */ \
"jmp __switch_to\n" \
"1:\t" \
"popl %%ebp\n\t" \
"popfl" \
:"=m" (prev->thread.esp),"=m" (prev->thread.eip), \
"=a" (last),"=S" (esi),"=D" (edi) \
:"m" (next->thread.esp),"m" (next->thread.eip), \
"2" (prev), "d" (next)); \
} while (0)

从注释可以看出,在切换了内核堆栈之后,开始保存旧进程的EIP恢复新进程的EIP,恢复新进程的EIP是通过pushl %6来完成的,这句话的意思是将next->thread.eip的值压入内核堆栈,以便后面的C函数__switch_to返回时跳转。这是比较巧妙的一个地方,对于一个曾经被调度过的进程,其thread.eip中保存的就是1:标号处的指令,这是通过movl $1f,%1指令实现的(此处即上文提到的跳转地址本身就位于switch_to函数中)。问题是对于一个未曾运行过的新进程来说,是没有执行过switch_to代码的,所以此处pushl %6入栈的并不是标号1:处的代码,而是在创建进程时设置的ret_from_fork(),当__switch_to执行到ret 时,从内核堆栈弹出要跳转的地址,这个地址就是ret_from_fork(),新建进程的旅程便由此开始!

64 位的 2.6 内核版本稍微有些区别,但结果都是一样的,这里不再赘述,感兴趣的可以参考Evolution of the x86 context switch in Linux 这篇文章,其详细描述了从内核1.0版本到4.14版本上下文切换部分的设计变更。

回到我们开始讨论的 5.10 版本的__switch_to_asm,当被选中的新进程是一个新建进程时,jmp __switch_to返回后会跳转到哪里呢?答案当然是ret_from_fork(),新进程没有调用过schedule(),故不会回到其调用栈,此时只需搞清楚新进程的内核栈内容就能明白执行流的走向了,新建进程时执行完copy_thread()之后,进程的内核堆栈示意图如下:

图 3-1 新进程的内核堆栈

​ 如图 3-1 所示,弹出6个寄存器之后,就只剩ret_from_fork()的地址了。

4

虽然ret_from_fork()是新建进程的开始,但在真正返回到用户空间之前仍然横着一道关卡,那就是检查是否需要进行再调度。这是有可能的,如果有一个优先级更高的进程需要运行,那么时钟中断处理程序兴许已经在thread_info中做好了标记,等待着schedule()被调用。在真正回到用户空间之前,ret_from_fork()会检查这些标记,如果有必要,就调用schedule()。所以,我们就可以发问了,schedule()返回之后,执行流会去向哪里?

ret_from_fork()而言很简单,如果它真的调用了schedule(),当调度函数返回时,进程会以一个“老程序”的身份初次进入用户空间。因此问题的本质是何时会调用schedule(),换句话说,内核的调度时机有哪些?

Linux 调度时机主要有:

  1. 进程状态转换时刻:进程终止、进程睡眠;
  2. 当前进程的时间片用完;
  3. 进程主动调用
  4. 进程从中断、异常及系统调用返回到用户态时。

时机1,进程要调用sleep()或exit()等函数进行状态转换时,这些系统调用会主动调用调度函数进行进程调度。

时机2,由于进程的时间片是由时钟中断来更新的,因此,这种情况和时机4是密不可分的。

时机3,当进程某些资源无法获取,或者设备驱动程序执行长而重复的任务时,进程主动调用调度函数主动放弃CPU,比如锁、缺页等场景。

时机4,调度的绝大多数占比应属于此种情况,CPU 会在每次执行完一条指令之后检查是否有中断产生,如有则进入中断处理程序。当从中断、异常及系统调用返回到用户态时都会检查调度标志,如果检测为真则进入调度函数。

《操作系统导论》中说道:一旦时钟开始运行,操作系统就感到安全了,因为控制权最终会还给它,因此操作系统可以自由运行用户程序。

可见,在中断返回之前进行调度是一个绝妙的方法。因此,我们可以说:schedule()返回之后的去向并不唯一,有可能是中断、异常和系统调用返回用户空间之前,也有可能是在系统调用进行之中。这也是为何进程切换只会发生在内核态。

我用一幅图来描绘一下,进程的执行流在用户态和内核态之间变换以及进程切换的场景,让我们以指令的视角在进程的用户态和内核态之间,在进程与进程之间进行一场穿梭之旅。

图 4-1 进程执行流

图 4-1 描述了3个进程在两颗CPU上的执行情况,注意这并不是以进程的视角来展现的,各种颜色的执行流仅表示进程处于用户态,灰色的执行流代表内核态,不同的是内核态的前半部分代表当前进程,后半部分则是经过进程切换后代表下一个进程。图中进程B切换为进程A,当执行流进入内核态执行切换时,切换之前内核代表进程B执行,切换之后内核代表新进程A执行。

图 4-1 仅仅展示了进程切换的场景,并不是指进程的执行流进入内核态后一定会发生进程切换

要理解“代表”的含义,就需要解释一下thread_info这个数据结构,我们前面曾提到过它,但未加以详述,此处有必要略作说明。

thread_info被称为线程描述符(不应纠结于概念,把线程理解为一个执行流即可),这个结构包含了指向进程描述符task_struct的指针,因此,内核若想确定当前运行的进程,只需要找到thread_info即可。但问题是如何确定其位置呢?内核的处理方法依然非常巧妙——将thread_info和进程内核堆栈一起存放!放一幅图即可一目了然:

图 4-2 thread_info 结构和内核堆栈存放在两个连续的页框中

图 4-2 所示,这是一块 8k 大小的内存,内核堆栈由高地址向低地址增长,thread_info 存放在最底部,其大小通常是 52 字节,因此,内核堆栈能扩展到8140字节。这个大小看上去很小,至少远远小于进程的用户态堆栈的默认大小(Linux 上用户态堆栈的大小为8M)。这是因为内核控制路径用到的堆栈很少,只需几千字节足矣!

《深入理解Linux内核》说道,从效率的观点来看,thread_info结构与内核态堆栈之间的紧密结合提供的主要好处是:内核很容易从esp寄存器(栈顶指针寄存器)的值获得当前在CPU上正在运行进程的thread_info结构的地址。假设thread_info和内核堆栈的结合体大小就是 8K,那么内核屏蔽掉 esp 的低 13 位有效位就可以获得thread_info结构的基地址。由此可知:切换了内核堆栈就等于切换了当前进程。

还可以从资源使用统计上来理解“代表”的含义。当进程进入内核态时,其对CPU的时间的消耗则记录在进程的sys、hiq、siq 等指标上,由此可以想见一个内核的流氓特征:与我无关的中断处理对于CPU的消耗都算在了我的头上!

之所以强调“代表”,是为了说明进程是相对执行流而言的,而不是相对程序而言。这里的程序指的是躺在硬盘上的二进制数据,或者说用户态的指令集,其实用户态的指令集囊括了磁盘上的二进制数据。换句话说,如果程序是完全静态链接的,程序的二进制就完全包括了用户态指令集,反之,用户态指令集是大于二进制数据中的指令集的,这是由动态链接决定的。

我曾在论坛上与人争论过一个问题:向 redis 发出一个删除大量key的命令,redis 的主线程是否可以说被阻塞了?

很多人的观点是主线程要释放内存会进入系统调用,因释放内存耗时较长故主线程被阻塞。而我的观点是:在删除大量key时,redis 主线程并没有阻塞,阻塞的是发出指令的客户端以及排队发送指令的其它客户端!经过上面的论证,我们知道内核态的执行流一部分代表当前进程,一部分代表切换后的新进程,所以,在被调度之前,不管是用户态还是内核态,都表示这个进程仍在CPU上运行。不能说 redis 线程发起系统调用后陷入了内核,这个线程就被阻塞了,这是不正确的。此时CPU上的执行流依然属于 redis 主线程,其对资源的消耗仍会被记录在该线程名下。redis 主线程一直在努力干活,没有被阻塞,只是这个线程进入到内核态后干的时间比较长,阻塞了发出指令的客户端,同样也阻塞了后续发指令的客户端,而这是由 redis 处理网络请求是单线程模型决定的!

线程是操作系统为用户提供的最轻便的并发模型,它的轻便来自于和进程的对比,二者在资源的使用上不可同日而语。即便如此,线程的资源占用仍然相当可观。从资源的有限性出发,不可能为了并发而创建任意数量的线程。除去堆栈资源累加产生的内存占用之外,过多的线程数量也会加重内核调度的负担,这种负担体现在过多指令浪费在进程切换上,而没有为真正的程序逻辑所用。

对高并发的执着追求,诞生了编程语言世界里五花八门的协程,或者说用户态线程,概念并不重要,重要的是,它们只能在用户态做文章。像Python、Java、C++、Go、Rust 等语言都提供了基于协程的并发模型,这里面由于 Go 是相对比较新生的语言,没有任何历史包袱,从而提供了完美的用户态线程,且将此并发模型内置于语言自身。这样做的好处是,开发并发程序变得异常简单,一个简简单单的go关键字就可以创建一个独立的任务,即一个独立的执行流,如果换做其它语言,想要创建一个独立的执行流只有线程或者进程这种由操作系统提供的原生方式(当然,很多语言也有对应的协程库,因本人未做过详细研究,故只考虑原生的并发模型);然而,为了并发体系的自洽,Go 为其用户屏蔽了操作系统提供的线程模型,虽然用起来简单了,但理解上却多了些许障碍。

5

《操作系统概念》4.3节简单讨论了线程模型,书中说有两种不同的方法来提供线程支持,用户层的用户线程内核层的内核线程,大致有如下三种模型

  1. 多对一模型

    图 5-1 多对一模型

    这种模型将用户态的多个线程映射到一个内核线程上,这种方式最大缺点就是任何一个用户线程发生阻塞调用时,整个内核线程就会被调离 CPU ,致使其它的用户线程失去执行机会。

  2. 一对一模型

    图 5-2 一对一模型

    一对一模型将每个用户线程映射到一个内核线程,如果一个线程阻塞在系统调用上时,剩余的仍然可以运行。这种模型唯一的缺点是创建一个用户线程就需要创建对应的内核线程,可以想见的是,创建内核线程的时间开销和资源开销是相当可观的,换句话说,成本决定了能开启线程的数量。Liunx和windows都实现了一对一的模型。

  3. 多对多模型

    图 5-3 多对多模型

    对多对模型多路复用多个用户级线程到多个内核级线程,通常来说用户线程要比内核线程多的多,这种模型没有上述两种模型的缺点,当一个用户线程发出导致内核线程阻塞的系统调用时,其余的用户线程依然可以被其它内核线程调度。同时,因为用户线程无比轻量,时间和资源成本较少,因此可以开启任意数量的用户线程用于并发,当然量变会引起质变,数量过于庞大的用户线程也会加重资源的消耗,因此会出现了各种各样的协程池用于刹车。Go 语言的并发模型就实现了这种 M:N 的并发模型,稍后我们会讲到,现在先让我来批判一下《操作系统概念》中的概念模糊问题。

不知道你是否被上面的用户线程内核线程映射等概念搅的一头雾水呢?按照其语义,用户线程和内核线程就像是对立的两方,会有某个东西将它们联系起来构成“映射”,或许是内核,又或许是C库,但书中没讲,我能够理解《操作系统概念》是讲操作系统的设计与实现原理,会兼顾大部分操作系统,讲解的也是较为抽象的部分,不过现代绝大部分人只接触过Linux,而Linux内核提供的并发模型就只有一对一这一种,所里书里的内容如今看上去多少有些不合时宜。在 Linux 内核当中甚至并不区分进程和线程,它们统一都由task_struct这一种数据结构表示,并且基于其进行调度,也就是说,在内核看来每个task_struct只有一个执行流,至于在用户态这个大的执行流干些什么,内核并不关心。

要正确理解书中所表达的意图,就需要先把概念捋清楚,我以 Linux 为例来进行说明。

首先,需要把这里的内核线程拿掉,换成操作系统线程,即 Linux 内核提供的线程。我之所以不用内核线程是因为内核线程这个概念在 Linux 中也有对应的存在(参见《深入linux内核架构》2.4.2 内核线程),内核线程是一种只运行在内核地址空间的线程。所有的内核线程共享内核地址空间(对于 32 位系统来说,就是 3-4GB 的虚拟地址空间),所以也共享同一份内核页表,并且没有用户地址空间,这也是为什么叫内核线程,而不叫内核进程的原因。

其次,将用户线程换成协程来表达,这里之所以用协程,完全是因为操作系统线程实在是太耀眼,太深入人心了,以至于人们都忘记了它原本抽象的含义,当需要表达在用户态实现的这个实体时,就有了“用户线程”、“协程”、“go程”等五花八门的名字,特别是“用户线程”,听起来让人如堕五里雾中。

前面讲过,进程是一堆资源和若干执行流的总称,Linux 中的task_struct记录了分配的资源和执行流的状态,是一个勉强能够被称之为进程或者线程的实体,协程也有记录资源和执行流状态的对应实体,从这些“实体”的意义上讲,也可以称协程和操作系统线程之间存在对应关系。现在让我们重新理解一下“对多对模型多路复用多个用户级线程到多个内核级线程”这句话,可以换一种表达:对多对模型指的是在多个Linux操作系统线程之内运行多个协程。我这里使用了“之内”而不是“之上”,是想着重表达操作系统进/线程是一个罐罐儿,用户态的所有花样都是在罐罐儿里玩,并没有超出操作系统进/线程的活动范围。一言以蔽之,用户态程序永远无法逃脱进程地址空间和执行流的手掌心!

或许,我们可以换个角度,抛开这些定义,站在 CPU 的角度去理解问题。所谓线程就是内核维护的一个数据结构,内核依靠这个结构来控制 CPU 上运行的代码,指令无论在用户态还是内核态,都是这个线程,所有的资源消耗都计入此线程,即便是与此线程无关的中断所消耗的资源也被记在该线程名下。协程就是用户空间代码维护的一个数据结构,用户空间代码可以通过控制 ip、sp 等寄存器来控制用户空间执行流的走向,就可以实现在不同的协程间切换,并把执行流的状态记录在对应的数据结构上,重要的是协程代码本身就属于操作系统用户态执行流中的指令。

协程和操作系统线程 M:N 的这种模型,现实中实现并不多,Go 绝对是最耀眼的那一颗!

6

go runtime 中也有和操作系统内核类似的schedule(),它会在最终调用用汇编代码写成的runtime·gogo(buf *gobuf),正如《溯源 goroutine 堆栈》 中提到的,它会将新的 goroutine 恢复执行,最后的指令JMP BX意味着schedule()函数不会返回,schedule()总是会在合适的点被调用,直到选择出可以运行的 goroutine。如果一个 goroutine 自然终止,执行流也会回到事先埋好的点runtime·goexit,而 goexit 最终会调用schedule()。在通往schedule()的调用路径中总会有 runtime·mcall的身影,ip、sp 等寄存器的内容就是在此处被保存,并最终在schedule()中被恢复。

经过上面这些铺垫,我们可以从指令的视角在宏观上来理解一下 go 的协程运作过程:

图 6-1 go协作式调度

仍然以前面展示进程切换时的图作为基础,为了配合 Go 的实际情况,将图中的进程换成线程来讲。现在想象一下,线程 A、B、C 是 go 程序底层的操作系统线程(GMP中的M),g1、g2、g3 为 go 程序在用户空间实现的协程,或者说叫 goroutine。值得注意的是,内核并不知道 goroutine 的存在,它仍然按照自己一贯的行为方式对 A、B、C 三个线程进行调度。

当A线程的时间片用完,或者发出阻塞的系统调用时,就会被内核调度出 CPU,继而把 C 线程调度到 CPU 上来执行。图中显示 A线程被换下 CPU0 的时候,B 线程仍然在 CPU1 上,这意味着其余的 goroutine 依然会得到执行的机会。再看被调度到 CPU0 上的 C 线程,它也是 go 程序底层的线程,这意味着在一个拥有双核的机器上,goroutine 总是有机会运行的,即便有些 goroutine 因为系统调用等某些原因导致其所在的操作系统线程被换下。go 总会保证有两个“活的”的线程一直待在 CPU 上轮番寻找 goroutine 来执行,除非操作系统内核看不下去,换其它的程序线程来执行,但 go 总会保证有两个准备好的线程可以随时被内核调度。

再来切一下近景,把 B 线程放大。在内核看来,黄色部分只代表 B 线程的用户态执行流,但就在这个黄色用户态执行流的内部正在轮番上演形形色色的任务,g1、g2、g3 三个 goroutine 正轮流在CPU上执行,绿色执行流代表 go 的 runtime,正是 runtime 居中调度,指挥得当,才让以 goroutine 为单位的任务都获得执行的机会。内核对这些一无所知,CPU也只会觉得奇怪:这个线程的用户态代码怎么老是频繁的切换堆栈?(见《溯源 goroutine 堆栈》 中对 go 协程堆栈的描述)

这种并发模型的优势显而易见,让我们来直观地感受一下。操作系统线程每次上下文切换需要大约 1000ns 的时间,而硬件有望在每纳秒的时间里执行 12 条指令,也就是说,当任务必须等待时,操作系统就会花费 12k 条指令去做线程切换,却不能将这些指令用在有意义的业务上。而 go 在用户态进行协程切换,极大地缓和了这种浪费,go 进行一次协程切换大概需要 200ns 或者 2.4k 条指令。简言之,go 用尽可能少的 os 级的线程调度来做更多的事情,方式就是在用户空间调度,这是语言级别的一种能力,可不严谨的说,go 的调度比 os 级调度便宜 5 倍,甚至更多!

操作系统内核之所以可以大胆地把 CPU 的使用权交给用户态程序,是因为时钟中断总会让控制权重归内核。那么,完全运行在用户态的 runtime 是如何获得控制权来调度 goroutine 的呢?go1.14 之前确实没办法做到,控制权的转移完全依靠 goroutine 的主动放弃,通过 runtime.Gosched 调用主动让出执行机会,或者当发生执行栈分段时,检查自身的抢占标记,决定是否继续执行;其中第二种情况是通过编译器编译时在函数调用之前插入指令来实现的,也就是说只有在 goroutine 发生函数调用时,runtime 才会获得短暂的执行权来实施调度。不难想见的是,下面这种无函数调用的代码会导致 goroutine 永远霸占 CPU,runtime 根本得不到执行的机会:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 此程序在 Go 1.14 之前的版本不会输出 OK
package main
import (
"runtime"
"time"
)
func main() {
runtime.GOMAXPROCS(1)
go func() {
for {
}
}()
time.Sleep(time.Millisecond)
println("OK")
}

换句话说,这种调度方式属于协作式调度,完全依赖执行方的主动弃权;为此,go1.14 基于操作系统信号实现了异步抢占,之所以叫异步,是因为从发送信号到信号被处理这个过程是异步的,并不同步。我这里不准备讲信号的安装、信号的发送逻辑,仅就信号的处理捡扼要处略作说明,感兴趣的朋友可以参考从源码剖析Go语言基于信号抢占式调度这篇文章。

既然 go 的异步抢占处理是基于操作系统信号的,那么在进入go 的处理之前,先来看看操作系统是如何处理信号的吧!

7

请允许我略过对信号繁琐的介绍,直接从进/线程注意到一个信号到来开始。

每个进程从内核态返回用户态之前都会检查TIF_SIGPENDING标志的值,也就是说,每当内核处理完一个中断或异常后,在返回用户态之前都会检查是否存在挂起信号;为了处理信号,内核会调用do_signal()函数,我们假设这个信号安装了专门的处理程序,do_signal()函数必须强迫该处理程序的执行,这是通过handle_signal()进行的。

难点在于,信号处理程序属于用户空间代码,内核不能直接执行用户代码,要执行用户态的信号处理程序,内核必须返回用户态,而一旦返回用户态,内核栈上的内容就会被清空(内核态堆栈包含了被中断进程的硬件上下文),当信号处理程序执行完毕时,又该如何回到正常的执行流程上呢?另外的复杂性是信号处理函数可以执行系统调用,依然要到内核态逛一圈再回到用户态,并且是回到信号处理程序,而不是回到进程原本的正常执行流。

Linux 采用的解决方案是把保存在内核态堆栈中的硬件上下文拷贝到当前进程的用户态堆栈中(稍后会看到go的信号处理抢占程序是如何利用这一点的)。用户态的堆栈也会被修改,使得信号处理函数执行完毕之后,自动调用sigreturn()系统调用把硬件上下文拷贝回内核态堆栈中,并恢复用户态堆栈中原来的内容。

图 7-1 捕获信号

图 7-1 说明了进程处理信号时的执行流,当中断或者异常发生的时候,进程陷入内核。在要返回用户态之前,执行了do_signal()函数,并开始处理信号(调用handle_signal()),在用户态堆栈上建立栈帧(调用setup_frame()setup_rt_frame)。

因为进入内核态时,用户态的寄存器值已被保存到内核态的堆栈上了,所以内核很容易拿到用户态堆栈的地址,并加以修改。建立的栈帧如下图所示:

图 7-2 用户态栈帧

其中sc中保存了进程的硬件上下文,这是从当前进程的内核堆栈上 copy 来的。precode位于栈顶,是信号处理函数的返回地址,将会发出sigreturn()系统调用。

当栈帧构造完成之后,内核需要修改保存在内核态堆栈上用户态寄存器程序计数器的值,让它指向信号处理程序,这样返回用户态之后就会跳转到信号处理程序处执行了:

1
2
3
4
5
6
regs->esp = (unsigned long) frame;
regs->eip = (unsigned long) ka->sa.sa_handler;
regs->eax = (unsigned long) sig;
regs->edx = regs->ecx = 0;
regs->xds = regs->xes = regs->xss = __USER_DS;
regs->xcs = __USER_CS;

这段代码来自《深入理解Linux内核》,大概是 2.6 版本的代码,我找了一下 5.10 版本的内核,其中设置用户栈和寄存器的函数是setup_signal_stack_si,我摘录其中修改内核栈中用户态硬件上下文的内容如下:

1
2
3
4
5
6
7
8
9
10
11
PT_REGS_SP(regs) = (unsigned long) frame;
PT_REGS_DI(regs) = sig;
/* In case the signal handler was declared without prototypes */
PT_REGS_AX(regs) = 0;
/*
* This also works for non SA_SIGINFO handlers because they expect the
* next argument after the signal number on the stack.
*/
PT_REGS_SI(regs) = (unsigned long) &frame->info;
PT_REGS_DX(regs) = (unsigned long) &frame->uc;
PT_REGS_IP(regs) = (unsigned long) ksig->ka.sa.sa_handler;

除了设置堆栈、程序计数器之外也设置了 rdi、rsi、rds 这几个寄存器,这是 x86_64 架构下 C 语言的函数调惯例,三个寄存器分别用于存放函数调用时前三个参数。记住这一点,后面介绍 Go 时会用到。

之后,再经过一系列检查,handle_signal()返回到do_signal()do_signal()返回到用户态,因为程序计数器指向信号处理程序的第一条指令,而栈顶指向已推进用户态堆栈的第一个内存单元。因此,信号处理程序被执行。

信号处理程序结束时,返回栈顶地址,该地址指向栈帧的precode字段所引用的vsyscall页中的代码:

1
2
3
_ _kernel_sigreturn:
popl %eax
movl $__NR_sigreturn, %eax int $0x80

它发出一个系统调用,再次陷入内核,调用restore_ sigcontext( ) 函数,将 sc 中记录的硬件上下文恢复到内核态堆栈,并从用户态堆栈删除之前建立的栈帧,之后返回用户态,程序的执行流开始回到中断之处继续执行。

稍后我们会看到,内核从 sc 恢复的内容是被 go 的 runtime 修改过的!

有这些内容做铺垫,就可以聊一聊 go 的信号抢占了!

8

go 在 M 上通过initsig()来初始化信号,对于需要安装处理程序的信号,会通过setsig来设置对应的动作,真正执行抢占动作的是doSigPreempt,此时的调用栈为:

1
sigtramp-->sigtrampgo-->sighandler-->doSigPreempt

doSigPreempt有个*sigctxt类型的参数,它表示的是我们上一节介绍过的内核保存在用户态堆栈的内核态堆栈内容(比较拗口,需要多读几遍),其中存放的是当前进程用户态的硬件上下文。这个参数是从sigtramp一路传下来的,我们看一下sigtramp的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Called using C ABI.
TEXT runtime·sigtramp(SB),NOSPLIT,$0
// Transition from C ABI to Go ABI.
PUSH_REGS_HOST_TO_ABI0()

// Call into the Go signal handler
NOP SP // disable vet stack checking
ADJSP $24
MOVQ DI, 0(SP) // sig
MOVQ SI, 8(SP) // info
MOVQ DX, 16(SP) // ctx
CALL ·sigtrampgo(SB)
ADJSP $-24

POP_REGS_HOST_TO_ABI0()
RET

sigtramp实际上是真正的信号处理函数,进程从内核态收到信号回到用户态调用的处理函数就是它,注释中表明这个函数以 C 语言的调用惯例被调用,Go 在这里通过PUSH_REGS_HOST_TO_ABI0保存 go 自己调用惯例用的寄存器后,转换成自己的调用规范,等函数调用完毕之后,再通过POP_REGS_HOST_TO_ABI0恢复这些寄存器的值。

还记得上一节介绍 5.10版本的内核修改用户态寄存器时设置的 rdi、rsi、rdx 的值吗?这三个寄存器的值就是内核模仿调用sigtramp时传入的参数,现在 go 需要以自己的调用规约将其放置到堆栈上,来表示 sig、info、ctx 这三个参数(go1.17 改变了调用规约,已经由堆栈传递参数改为寄存器传递了,不知道为何此处仍然使用堆栈传递,我此处引用的代码是版本 1.18.1)。

当调用到doSigPreempt时,会将ctx这个参数传入,其中包含了进程用户态硬件上下文,希望你还记得这一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// doSigPreempt handles a preemption signal on gp.
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
}
}

// Acknowledge the preemption.
atomic.Xadd(&gp.m.preemptGen, 1)
atomic.Store(&gp.m.signalPending, 0)

if GOOS == "darwin" || GOOS == "ios" {
atomic.Xadd(&pendingPreemptSignals, -1)
}
}

信号处理程序一旦被执行,舞台就交到了 go runtime 手里,ctxt的类型为*sigctxt,指向的是用户态堆栈中存放内核态堆栈内容的地址。然后信号处理程序通过isAsyncSafePoint来判断抢占位置是否安全,并返回安全的抢占地址。如果确认抢占没有问题,接着会调用pushCall方法来修改ctxt中的用户态硬件上下文,用于稍后再一次从内核态返回用户态时模拟出一个用户态程序调用asyncPreempt的假象:

1
2
3
4
5
6
7
8
func (c *sigctxt) pushCall(targetPC, resumePC uintptr) {
// Make it look like we called target at resumePC.
sp := uintptr(c.rsp())
sp -= goarch.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = resumePC
c.set_rsp(uint64(sp))
c.set_rip(uint64(targetPC))
}

pushCall干了两件事:

  1. 修改程序计数器的指向为asyncPreempt函数的地址。
  2. 修改栈顶指针,将当前 goroutine 的原本中断地址放入堆栈。

细心的你可能会问:内核在跳转信号处理程序之前不是已经拓展了堆栈,往里面塞了一个frame么?go runtime 在这里基于原始的栈顶再往里塞一个返回地址,不会引起冲突么?

确实不会引起冲突,因为在 X86-64 调用规范中有一个重要标准——红色区域(Red zone)。它指出:在 rsp 指向的栈顶之后的128 字节被保留,不能被信号和中断处理程序使用,因此我们可以在 Linux 的源码中看到如下处理:

1
2
3
4
5
6
struct rt_sigframe __user *frame;
......
frame = (struct rt_sigframe __user *)
round_down(stack_top - sizeof(struct rt_sigframe), 16);
/* Subtract 128 for a red zone and 8 for proper alignment */
frame = (struct rt_sigframe __user *) ((unsigned long) frame - 128 - 8);

内核在构造这个frame的时候留出了 128 字节的空隙,go runtime 见缝插针,将当前 goroutine 被中断时的下一条指令地址放入堆栈。这一套移花接木的功夫打完,信号处理函数执行完毕返回内核态,内核重新恢复原内核态堆栈上的内容,此时的内容是被 go runtime 修改后的。之后,执行流从内核态返回用户态,内核态堆栈被弹出,相关寄存器被恢复,程序计数器指向asyncPreempt,开始运行用户态代码。下面是asyncPreempt的汇编代码,我省略了大部分寄存器的保存和恢复指令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
TEXT ·asyncPreempt(SB),NOSPLIT|NOFRAME,$0-0
PUSHQ BP
MOVQ SP, BP
// Save flags before clobbering them
PUSHFQ
// obj doesn't understand ADD/SUB on SP, but does understand ADJSP
ADJSP $368
// But vet doesn't know ADJSP, so suppress vet stack checking
NOP SP
MOVQ AX, 0(SP)
......
MOVUPS X15, 352(SP)
CALL ·asyncPreempt2(SB)
MOVUPS 352(SP), X15
MOVUPS 336(SP), X14
......
ADJSP $-368
POPFQ
POPQ BP
RET

asyncPreempt保存执行现场后,调用了asyncPreempt2,这里要提一下,CALL ·asyncPreempt2(SB) 指令先将下一条指令的地址入栈再进行跳转,这样栈顶的地址就是asyncPreempt2返回时的地址。

asyncPreempt2会调用mcall函数,最终会执行调度函数schedule(),还记得吗?schedule()不会返回,执行完runtime·gogo(buf *gobuf)后,新的 goroutine 就在 CPU 上运行了,所以我们要看一下mcall函数保存的现场:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// func mcall(fn func(*g))
// Switch to m->g0's stack, call fn(g).
// Fn must never return. It should gogo(&g->sched)
// to keep running g.
TEXT runtime·mcall<ABIInternal>(SB), NOSPLIT, $0-8
MOVQ AX, DX // DX = fn

// save state in g->sched
MOVQ 0(SP), BX // caller's PC
MOVQ BX, (g_sched+gobuf_pc)(R14)
LEAQ fn+0(FP), BX // caller's SP
MOVQ BX, (g_sched+gobuf_sp)(R14)
MOVQ BP, (g_sched+gobuf_bp)(R14)

// switch to m->g0 & its stack, call fn
MOVQ g_m(R14), BX
MOVQ m_g0(BX), SI // SI = g.m.g0
CMPQ SI, R14 // if g == m->g0 call badmcall
JNE goodm
JMP runtime·badmcall(SB)
goodm:
MOVQ R14, AX // AX (and arg 0) = g
MOVQ SI, R14 // g = g.m.g0
get_tls(CX) // Set G in TLS
MOVQ R14, g(CX)
MOVQ (g_sched+gobuf_sp)(R14), SP // sp = g0.sched.sp
PUSHQ AX // open up space for fn's arg spill slot
MOVQ 0(DX), R12
CALL R12 // fn(g)
POPQ AX
JMP runtime·badmcall2(SB)
RET

这两句是保存程序计数器的值到g->sched:

1
2
MOVQ	0(SP), BX	// caller's PC
MOVQ BX, (g_sched+gobuf_pc)(R14)

可见,保存的地址是栈顶的内容,而此时栈顶内容是函数asyncPreemptCALL ·asyncPreempt2(SB)的下一条指令,即MOVUPS 352(SP), X15;也就是说,当该 goroutine 再次被调度时,会从asyncPreempt2中继续执行,然后返回到asyncPreemptasyncPreempt返回时从堆栈弹出将要跳转的地址,而这个地址就是 go 见缝插针塞进来的地址,这看起来就像是 goroutine 调用asyncPreempt一般。执行流走到这里,才真正意义上完成了 goroutine 的恢复执行。

由此观之,asyncPreempt更像是 Linux 内核中的schedule()调度发生在函数执行过程中,而函数执行完毕要等到下一次被调度的时候才会发生。 而 go 借助信号机制所实现的抢占,无非就是依靠信号处理程序这一次控制权埋点,以便在执行流最终从内核态返回时执行asyncPreempt代码,从而再一次收获 CPU 的控制权。

是时候再从指令的视角在宏观上来理解 go 的信号抢占流程了:

图 8-1 go 异步抢占

图 8-1 显示了通过异步抢占执行流从 g1 切换到 g2 的过程:

  1. g1被时钟中断,从内核返回时发现有抢占信号。
  2. 执行流从内核态返回到用户态,执行信号处理程序(第一个绿色的 go runtime 执行流)。
  3. 信号处理程序执行完毕返回内核,内核做一些恢复后,再次返回到用户态。
  4. 从内核态返回后的执行流被 go runtime 窃取,转而执行调度(第二个绿色的 go runtime 执行流)。
  5. g2 被选择,换上 CPU 执行。

9

文章写到这里,基本上把我想讲的已经讲完了,也算基本上完成了自己“观其大略,本其脉络”的目标,我曾在《从CPU的视角说起》 一文中说道:我目前所寻求的信息,意在建立计算机系统的世界观与 Go 语言的世界观,是在陷入具体细节之前为自己提供一个大致的轮廓,让自己对计算机运行的脉络有一个关键性的认识。 即便如此,里面也不可避免的出现很多具体而微的内容。我略过了很多环节,并不是因为它们不重要,是因为它们是细节,是更丰富的东西,也是我尚未探索的东西。

苏轼有自己的一套读书方法叫做“八面受敌”,他在写给侄女婿王庠的《又答王庠书》中作了详细介绍:“但卑意欲少年为学者,每一书皆作数过尽之。书富如入海,百货皆有,人之精力,不能兼收尽取,但得其所欲求者尔。故愿学者每次作一意求之。如欲求古今兴亡治乱、圣贤作用、但作此意求之,勿生余念。又别作一次,求事迹故实典章文物之类,亦如之。他皆仿此。此虽迂钝,而他日学成,八面受敌,与涉猎者不可同日而语也。”

这段话可谓深得我心,“人之精力,不能兼收尽取,但得其所欲求者尔”、“故愿学者每次作一意求之”,我欲寻求堆栈的本源,翻尽家中藏书,完成了《学渣三部曲》,又执着于调度,翻阅同样的书,才有了这一篇万字长文,此虽迂钝,而他日学成,八面受敌,与涉猎者不可同日而语也。

最后再聊一聊“程序”,第4节曾讨论过“程序”一词的含义,平时我们习惯将开发最终交付的制品称为“程序”,或有“我写了某某程序”之类云云,“程序”若是躺在计算机硬盘中的二进制文件,那当它被调入内存醒来时,一定会哀叹自己生命的不完整,因为每当它想“耳听之而为声,目遇之而成色”的时候,面前总会横着一道“系统调用”的鸿沟,以至于它永远无法亲自触摸“大自然”的无尽藏。换句话说,我们到底创造了什么?一条流动的进程中,有多少是属于我们这样平凡之人的?刨去内核指令,刨去运行时、库以及不可胜数的框架代码,我相信已所剩无几!

我之所以无法骄傲,是因为站在巨人的肩上!

参考文献

  1. 操作系统导论
  2. 深入理解计算机系统
  3. 深入理解LINUX内核
  4. 深入 Linux 内核架构
  5. 操作系统概念
  6. The Definitive Guide to Linux System Calls
  7. 深入golang runtime的调度
  8. 从源码剖析Go语言基于信号抢占式调度
  9. Misc on Linux fork, switch_to, and scheduling
  10. Evolution of the x86 context switch in Linux
  11. Go 语言原本
  12. Scheduling In Go : Part II - Go Scheduler

通过 从CPU的视角说起穿越虚拟内存的迷雾 两篇文章我们知道,所谓进程堆栈不过是应用程序向内核申请了一块连续内存后,设定相应的寄存器,从而将这块内存当做堆栈来使用,典型的用法就是用于函数调用。

我们在上一篇讨论了进线程的堆栈,现在继续探索 go 中的协程栈。如果吊一下书袋的话,口称 go 协程是不严谨的,go 的协程不同于其他语言的协程,go 的协程是一种有栈协程,每一个协程都有自己的协程堆栈,因此 go 官网发明了一个新词 goroutine,以区别于普通的 coroutine。我们接下来就聊聊 goroutine 的堆栈。在此之前,先来回顾一下上一篇中对进线程堆栈位置的总结。

本文基于 Linux 平台 x64 架构,使用 go 1.18 源码,禁用 cgo

1· 进线程堆栈

图 3-1 位于不同区域的线程 stack

图 3.1 为 64 位虚拟地址空间布局图,粉色标识说明了线程堆栈可能存在的位置,总结下来,不外乎以下三种情况:

  1. 主线程堆栈位于用户空间顶部,但 clone 时,子进程的主线程实际使用的堆栈未必如此。
  2. 有可能分配在 mmap 区域。
  3. 有可能通过 C 库 malloc 分配在 heap 区域。

2. goroutine 的堆栈

或许你已经知道 goroutine 的堆栈是从 heap 上分配的,但如果你足够好奇,你就会为 heap 在虚拟地址空间中的位置而发狂。

go 重写了运行时,如果不使用 cgo 的话,编译完成的 go 程序是静态链接的,不依赖任何C库,这使它拥有不错的可移植性,在较新内核上编译好的程序,拉到旧版本内核的操作系统上依然能够运行。在这一点上,rust 并没有多少优势,反而新生语言 hare 表现足够强劲。

不依赖 C 库,意味着 go 对 heap 的管理有自己的方式。 那么, go 管理的 heap 是否与之前内存空间布局图中的 heap 位置相同就要打一个大大的问号了。要搞清楚这个问题,我们需要到 runtime 的源码中一探究竟,且要挖到 go 与内核的接口处,找出其申请内存的方式方可。

本文并不打算分析 go 的内存分配器,也不打算介绍堆栈的分配算法,仅仅为了解决 goroutine 堆栈在虚拟地址空间中位置的疑惑。想了解内存管理和堆栈分配算法的读者可以参考详解Go中内存分配源码实现一文教你搞懂 Go 中栈操作

先从普通 goroutine 的创建开始吧!

在 go 中,每通过go func(){}的方式开启一个 goroutine 时,编译器都会将其转换成对 runtime.newproc的调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Create a new g running fn.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
// 切换到线程堆栈创建 g
systemstack(func() {
newg := newproc1(fn, gp, pc)

_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)

if mainStarted {
wakep()
}
})
}

newproc 仅仅是对 newproc1 的包装,创建新 g 的动作不能在用户堆栈上进行,所以这里切换到底层线程的堆栈来执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Create a new g in state _Grunnable, starting at fn. callerpc is the
// address of the go statement that created this. The caller is responsible
// for adding the new g to the scheduler.
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
_g_ := getg()

if fn == nil {
_g_.m.throwing = -1 // do not dump full stacks
throw("go of nil func value")
}
acquirem() // disable preemption because it can be holding p in a local var

_p_ := _g_.m.p.ptr()
// 从 P 的空闲链表中获取一个新的 G
newg := gfget(_p_)
// 获取不到则调用 malg 进行创建
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
......
}

newproc1 方法很长,里面主要是获取 G ,然后对获取到的 G 做一些初始化的工作。当创建 G 时,会先从缓存的空闲链表中获取,如果没有空闲的 G ,再进行创建。所以,我们这里只看 malg 函数的调用。

在调用 malg 函数的时候会传入一个最小堆栈大小值:**_StackMin**(linux 平台下为 2048)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Allocate a new g, with a stack big enough for stacksize bytes.
func malg(stacksize int32) *g {
newg := new(g)
if stacksize >= 0 {
stacksize = round2(_StackSystem + stacksize)
systemstack(func() {
newg.stack = stackalloc(uint32(stacksize))
})
newg.stackguard0 = newg.stack.lo + _StackGuard
newg.stackguard1 = ^uintptr(0)
// Clear the bottom word of the stack. We record g
// there on gsignal stack during VDSO on ARM and ARM64.
*(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
}
return newg
}

malg 会创建新的 G 并为其设置好堆栈,以及堆栈的边界,以供日后扩容使用。这里重点看 stackalloc 函数,堆栈的内存的分配就是由它来完成的,函数的返回值赋给新 Gstack 字段。

Gstack 字段是一个 stack 结构体类型,里面标记了堆栈的高地址和低地址:

1
2
3
4
5
6
7
// Stack describes a Go execution stack.
// The bounds of the stack are exactly [lo, hi),
// with no implicit data structures on either side.
type stack struct {
lo uintptr
hi uintptr
}

我们接着看这个 stack 是怎么创建出来的。

stackalloc 的函数比较长,里面涉及到大堆栈和小堆栈的分配逻辑,这里就不贴大段的代码了。这个函数不管是从 cache 还是 pool 中获取内存,最终都会在内存不够时调用 mheapallocManual函数去分配新的内存:

1
mheap_.allocManual(_StackCacheSize>>_PageShift, spanAllocStack)

到这里就遇见 go 管理的 heap 了,关于 heap 的位置我们稍后再讨论,现在继续挖 allocManual 直到我们找到系统调用为止。

1
2
3
4
5
6
func (h *mheap) allocManual(npages uintptr, typ spanAllocType) *mspan {
if !typ.manual() {
throw("manual span allocation called with non-manually-managed type")
}
return h.allocSpan(npages, typ, 0)
}

allocManual 只是对 allocSpan 的简单封装,这里简单提一下 go 对内存管理的最小单位是 mspan,它包含若干连续的页。

allocSpan 的逻辑较多,主要是从 heap 中分配 npages 个页来填充 span。一般随着程序的运行,内存的不断申请,heap 中会有很多空闲的页用来供给后续的内存申请。现在我们需要查看 cache 不足的情况,当 heap 中的 page 不够的时候,就需要推动 heap 增长了,allocSpan 通过调用 mheap.grow 来达成这一点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// Try to add at least npage pages of memory to the heap,
// returning how much the heap grew by and whether it worked.
func (h *mheap) grow(npage uintptr) (uintptr, bool) {
assertLockHeld(&h.lock)
ask := alignUp(npage, pallocChunkPages) * pageSize
totalGrowth := uintptr(0)
// This may overflow because ask could be very large
// and is otherwise unrelated to h.curArena.base.
// curArena 无需初始化,但问题是怎么判断 Arena 边界呢
end := h.curArena.base + ask
nBase := alignUp(end, physPageSize)
if nBase > h.curArena.end || /* overflow */ end < h.curArena.base {
// 尝试分配新的 Arena,但有可能跨越 hint 区域,所以全额申请
// Not enough room in the current arena. Allocate more
// arena space. This may not be contiguous with the
// current arena, so we have to request the full ask.
av, asize := h.sysAlloc(ask)
// 此时已经将需要的内存 reserve 了
if av == nil {
print("runtime: out of memory: cannot allocate ", ask, "-byte block (", memstats.heap_sys, " in use)\n")
return 0, false
}

if uintptr(av) == h.curArena.end {
// 说明是连续的,拓展此 curArena 的边界
// The new space is contiguous with the old
// space, so just extend the current space.
h.curArena.end = uintptr(av) + asize
} else {
// 感觉像是这一次不够分配的,但也别浪费,把剩余的内存标记为已使用,加入到一个地方以供分配
// The new space is discontiguous. Track what
// remains of the current space and switch to
// the new space. This should be rare.
if size := h.curArena.end - h.curArena.base; size != 0 {
// Transition this space from Reserved to Prepared and mark it
// as released since we'll be able to start using it after updating
// the page allocator and releasing the lock at any time.
sysMap(unsafe.Pointer(h.curArena.base), size, &memstats.heap_sys)
// Update stats.
atomic.Xadd64(&memstats.heap_released, int64(size))
stats := memstats.heapStats.acquire()
atomic.Xaddint64(&stats.releagrowsed, int64(size))
memstats.heapStats.release()
// Update the page allocator's structures to make this
// space ready for allocation.
h.pages.grow(h.curArena.base, size)
totalGrowth += size
}
// Switch to the new space.
// 把 curArena 切换到新的地址
h.curArena.base = uintptr(av)
h.curArena.end = uintptr(av) + asize
}

// Recalculate nBase.
// We know this won't overflow, because sysAlloc returned
// a valid region starting at h.curArena.base which is at
// least ask bytes in size.
nBase = alignUp(h.curArena.base+ask, physPageSize)
}

// 更新 base
// Grow into the current arena.
v := h.curArena.base
h.curArena.base = nBase

// 把分配的那块内存标记为 Prepared
// Transition the space we're going to use from Reserved to Prepared.
sysMap(unsafe.Pointer(v), nBase-v, &memstats.heap_sys)

// ...... 省略部分代码

// Update the page allocator's structures to make this
// space ready for allocation.
h.pages.grow(v, nBase-v)
totalGrowth += nBase - v
return totalGrowth, true
}

curArena的空闲内存(内核返回的内存空间往往会比请求的多一些)不足以满足分配时,调用mheap.sysAlloc来申请更多的空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (h *mheap) sysAlloc(n uintptr) (v unsafe.Pointer, size uintptr) {
assertLockHeld(&h.lock)

n = alignUp(n, heapArenaBytes)

// First, try the arena pre-reservation.
v = h.arena.alloc(n, heapArenaBytes, &memstats.heap_sys)
if v != nil {
size = n
goto mapped
}

// Try to grow the heap at a hint address.
for h.arenaHints != nil {
hint := h.arenaHints
p := hint.addr
if hint.down {
p -= n
}
if p+n < p {
// We can't use this, so don't ask.
v = nil
} else if arenaIndex(p+n-1) >= 1<<arenaBits {
// Outside addressable heap. Can't use.
v = nil
} else {
v = sysReserve(unsafe.Pointer(p), n)
}
// 如果不相等,则说明 mmap 在建议的地址上没能分配成功
if p == uintptr(v) {
// Success. Update the hint.
if !hint.down {
p += n
}
// 成功后,hint 的地址也跟着更新
hint.addr = p
size = n
break
}
// 此时,丢弃这次分配的内存,尝试下一个 arenaHints, 也就是下一个 1T 区间
// Failed. Discard this hint and try the next.
//
// TODO: This would be cleaner if sysReserve could be
// told to only return the requested address. In
// particular, this is already how Windows behaves, so
// it would simplify things there.
if v != nil {
sysFree(v, n, nil)
}
h.arenaHints = hint.next
h.arenaHintAlloc.free(unsafe.Pointer(hint))
}

if size == 0 {
if raceenabled {
// The race detector assumes the heap lives in
// [0x00c000000000, 0x00e000000000), but we
// just ran out of hints in this region. Give
// a nice failure.
throw("too many address space collisions for -race mode")
}

// All of the hints failed, so we'll take any
// (sufficiently aligned) address the kernel will give
// us.
// 所有的 hint 都失败了,然后让内核自动分配一个定量内存
v, size = sysReserveAligned(nil, n, heapArenaBytes)
if v == nil {
return nil, 0
}

// Create new hints for extending this region.
hint := (*arenaHint)(h.arenaHintAlloc.alloc())
hint.addr, hint.down = uintptr(v), true
hint.next, mheap_.arenaHints = mheap_.arenaHints, hint
hint = (*arenaHint)(h.arenaHintAlloc.alloc())
hint.addr = uintptr(v) + size
hint.next, mheap_.arenaHints = mheap_.arenaHints, hint
}
// ......省略大段代码
return
}

这里真正申请内存的操作是 sysReserve,让我们来一睹究竟:

1
2
3
4
5
6
7
func sysReserve(v unsafe.Pointer, n uintptr) unsafe.Pointer {
p, err := mmap(v, n, _PROT_NONE, _MAP_ANON|_MAP_PRIVATE, -1, 0)
if err != 0 {
return nil
}
return p
}

熟悉的 mmap 映入眼帘!我们已经抵达了内核的大门,查看其定义发现,它包裹了一个sysMmap函数,该函数就是发起mmap系统调用的所在,它是由汇编语言写成,Linux 下函数体位于 sys_linux_amd64.s 中:

1
2
// sysMmap calls the mmap system call. It is implemented in assembly.
func sysMmap(addr unsafe.Pointer, n uintptr, prot, flags, fd int32, off uint32) (p unsafe.Pointer, err int)

mmap调用中的 flag _PROT_NONE, _MAP_ANON|_MAP_PRIVATE表示申请的内存块是无文件背景的匿名映射,这里在调用时传入了一个提示地址,用于告知内核尽量从要求的地址开始分配。

内核当然不能保证这一点,但 go 也足够倔强,如果不能保证连续增长,就另找一段空间开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 如果不相等,则说明 mmap 在建议的地址上没能分配成功
if p == uintptr(v) {
// Success. Update the hint.
if !hint.down {
p += n
}
// 成功后,hint 的地址也跟着更新
hint.addr = p
size = n
break
}
// 此时,丢弃这次分配的内存,尝试下一个 arenaHints, 也就是下一个 1T 区间
// Failed. Discard this hint and try the next.
//
// TODO: This would be cleaner if sysReserve could be
// told to only return the requested address. In
// particular, this is already how Windows behaves, so
// it would simplify things there.
if v != nil {
sysFree(v, n, nil)
}
h.arenaHints = hint.next
h.arenaHintAlloc.free(unsafe.Pointer(hint))

sysAlloc 返回之后,就意味着已经从内核申请到了一块空间。回到 mheap.grow的代码,会看到调用了 sysMap 再次向内核申请内存,sysMap 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
func sysMap(v unsafe.Pointer, n uintptr, sysStat *sysMemStat) {
sysStat.add(int64(n))

p, err := mmap(v, n, _PROT_READ|_PROT_WRITE, _MAP_ANON|_MAP_FIXED|_MAP_PRIVATE, -1, 0)
if err == _ENOMEM {
throw("runtime: out of memory")
}
if p != v || err != 0 {
print("runtime: mmap(", v, ", ", n, ") returned ", p, ", ", err, "\n")
throw("runtime: cannot map pages in arena address space")
}
}

可见,也是一个mmap系统调用,但传入的 flag 不同,多了一个 _MAP_FIXED

查看 mmap 的手册便会明白,在不提供_MAP_FIXED 的情况下,内核会尽量从给出的地址分配空间,但避免冲突是第一位的,所以结果并不总能如意。而_MAP_FIXED保证了这一点,即使在请求的地址处已有其它映射的情况下也会覆盖之前的映射。

mmap 文档中也对 _MAP_FIXED 使用提出了警示,而 go 在这里使用是完全没有问题的,因为事先已经向内核申请了该块内存了,在里面隔上一刀根本不需要睁眼。

我们拿到了一块连续的内存,是时候从 allocSpan 返回了,如此 stackalloc 就为新 G 申请到了一块连续内存用作堆栈。

从 goroutine 的新建一直到内核的大门,我们发现了用于申请内存的方式是 mmap,但mmap从进程虚拟地址空间的哪个位置分配内存呢?runtime 源码中给与的提示地址又是从何而来呢?

3. mmap 申请内存的位置

mmap 既是一个系统调用,也是进程虚拟地址空间中的一个区域,让我再次援引《深入 Linux 内核架构》中的一幅图:

图3-2 mmap 区域自顶向下扩展

书中介绍了 2.6 版本的内核内存布局,其中 mmap 区域是和 heap 相对增长的,内核会留出足够的空间给主线程 stack,这样便可最大化的利用内存空间,好在 stack 通常不会很大。

但是 mmap 并非只能在概念上划出的区域进行分配,它甚至可以在用户空间内任意地方分配内存,这当然也包括传统的 heap 区域!还记得 _MAP_FIXED 吧?我打赌它绝对能让你的程序 crash 掉!

heap 是用来为进程动态分配内存的,传统的定义是:堆是一段长度可变的连续虚拟内存,始于进程的未初始化数据段的末尾,随着内存的分配和释放而增减

图 3-3 Linux 进程的虚拟内存布局

改变 heap 大小的系统调用是 brksbrk ,而 go 主要使用 mmap 来维护堆,这就说明 go 堆和传统的堆位置是不同的。位置虽然不同,但使命毫无二致,让我们来看一个 go 程序的内存布局:

00400000-004bd000 r-xp 00000000 103:02 8916313      playground/helloworld/hello/hello
004bd000-00574000 r--p 000bd000 103:02 8916313      playground/helloworld/hello/hello
00574000-0058f000 rw-p 00174000 103:02 8916313      playground/helloworld/hello/hello
0058f000-005c4000 rw-p 00000000 00:00 0 
c000000000-c000200000 rw-p 00000000 00:00 0 
c000200000-c017e00000 rw-p 00000000 00:00 0 
c017e00000-c018000000 rw-p 00000000 00:00 0 
c018000000-c018400000 rw-p 00000000 00:00 0 
c018400000-c01c000000 ---p 00000000 00:00 0 
7fef44906000-7fef449ba000 rw-p 00000000 00:00 0 
7fef449d2000-7fef47c19000 rw-p 00000000 00:00 0 
7fef47c19000-7fef57d99000 ---p 00000000 00:00 0 
7fef57d99000-7fef57d9a000 rw-p 00000000 00:00 0 
7fef57d9a000-7fef69c49000 ---p 00000000 00:00 0 
7fef69c49000-7fef69c4a000 rw-p 00000000 00:00 0 
7fef69c4a000-7fef6c01f000 ---p 00000000 00:00 0 
7fef6c01f000-7fef6c020000 rw-p 00000000 00:00 0 
7fef6c020000-7fef6c499000 ---p 00000000 00:00 0 
7fef6c499000-7fef6c49a000 rw-p 00000000 00:00 0 
7fef6c49a000-7fef6c519000 ---p 00000000 00:00 0 
7fef6c519000-7fef6c579000 rw-p 00000000 00:00 0 
7ffc335d5000-7ffc335f7000 rw-p 00000000 00:00 0                          [stack]
7ffc335f8000-7ffc335fc000 r--p 00000000 00:00 0                          [vvar]
7ffc335fc000-7ffc335fe000 r-xp 00000000 00:00 0                          [vdso]
ffffffffff600000-ffffffffff601000 --xp 00000000 00:00 0                  [vsyscall]

表3-1 Go 进程的内存布局映射

除了代码段不足 2M 的区域之外,似乎 c000000000 最值得怀疑,而且这份映射当中没有看到 heap 身影,这直接印证了上述猜想。关于 c000000000 我们要去源码中寻找答案,且看内存分配器的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
func mallocinit() {
// ...... 省略部分代码

// 只看 64 位系统的初始化部分
// Create initial arena growth hints.
if goarch.PtrSize == 8 {
// On a 64-bit machine, we pick the following hints
// because:
//
// 1. Starting from the middle of the address space
// makes it easier to grow out a contiguous range
// without running in to some other mapping.
//
// 2. This makes Go heap addresses more easily
// recognizable when debugging.
//
// 3. Stack scanning in gccgo is still conservative,
// so it's important that addresses be distinguishable
// from other data.
//
// Starting at 0x00c0 means that the valid memory addresses
// will begin 0x00c0, 0x00c1, ...
// In little-endian, that's c0 00, c1 00, ... None of those are valid
// UTF-8 sequences, and they are otherwise as far away from
// ff (likely a common byte) as possible. If that fails, we try other 0xXXc0
// addresses. An earlier attempt to use 0x11f8 caused out of memory errors
// on OS X during thread allocations. 0x00c0 causes conflicts with
// AddressSanitizer which reserves all memory up to 0x0100.
// These choices reduce the odds of a conservative garbage collector
// not collecting memory because some non-pointer block of memory
// had a bit pattern that matched a memory address.
//
// However, on arm64, we ignore all this advice above and slam the
// allocation at 0x40 << 32 because when using 4k pages with 3-level
// translation buffers, the user address space is limited to 39 bits
// On ios/arm64, the address space is even smaller.
//
// On AIX, mmaps starts at 0x0A00000000000000 for 64-bit.
// processes.
for i := 0x7f; i >= 0; i-- {
var p uintptr
switch {
case raceenabled:
// The TSAN runtime requires the heap
// to be in the range [0x00c000000000,
// 0x00e000000000).
p = uintptr(i)<<32 | uintptrMask&(0x00c0<<32)
if p >= uintptrMask&0x00e000000000 {
continue
}
case GOARCH == "arm64" && GOOS == "ios":
p = uintptr(i)<<40 | uintptrMask&(0x0013<<28)
case GOARCH == "arm64":
p = uintptr(i)<<40 | uintptrMask&(0x0040<<32)
case GOOS == "aix":
if i == 0 {
// We don't use addresses directly after 0x0A00000000000000
// to avoid collisions with others mmaps done by non-go programs.
continue
}
p = uintptr(i)<<40 | uintptrMask&(0xa0<<52)
default:
p = uintptr(i)<<40 | uintptrMask&(0x00c0<<32)
}
hint := (*arenaHint)(mheap_.arenaHintAlloc.alloc())
hint.addr = p
hint.next, mheap_.arenaHints = mheap_.arenaHints, hint
}
}
}

注释部分第一条便说:从地址空间的中间开始向上增长,很容易获得连续的区域,且不会和其它映射部位发生碰撞。

因此 go 选择了从 0x00c0开始,并且用一个 for 循环生成了 128 个提示地址,组成链表初始化到 mheap_.arenaHints

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0x7fc000000000
......
0x10c000000000
0x0fc000000000
0x0ec000000000
0x0dc000000000
0x0cc000000000
0x0bc000000000
0x0ac000000000
0x09c000000000
0x08c000000000
0x07c000000000
0x06c000000000
0x05c000000000
0x04c000000000
0x03c000000000
0x02c000000000
0x01c000000000
0x00c000000000

这 128 个起始地址除了最后一个之外,其余皆可向上增长 1TiB 的空间,最后一个距离用户空间顶部仅剩 256 GiB。

0x00c000000000 距离用户空间的开始有 765 GiB,这也是为什么不会和其它映射部位发生碰撞的原因!

mallocinit 初始化了mheap_.arenaHints,还记得 mheap 为增加 heap 而申请内存时的方法吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Try to grow the heap at a hint address.
for h.arenaHints != nil {
hint := h.arenaHints
p := hint.addr
if hint.down {
p -= n
}
if p+n < p {
// We can't use this, so don't ask.
v = nil
} else if arenaIndex(p+n-1) >= 1<<arenaBits {
// Outside addressable heap. Can't use.
v = nil
} else {
v = sysReserve(unsafe.Pointer(p), n)
}
// 如果不相等,则说明 mmap 在建议的地址上没能分配成功
if p == uintptr(v) {
// Success. Update the hint.
if !hint.down {
p += n
}
// 成功后,hint 的地址也跟着更新
hint.addr = p
size = n
break
}
// 此时,丢弃这次分配的内存,尝试下一个 arenaHints, 也就是下一个 1T 区间
// Failed. Discard this hint and try the next.
//
// TODO: This would be cleaner if sysReserve could be
// told to only return the requested address. In
// particular, this is already how Windows behaves, so
// it would simplify things there.
if v != nil {
sysFree(v, n, nil)
}
h.arenaHints = hint.next
h.arenaHintAlloc.free(unsafe.Pointer(hint))
}

mmap 的调用都是围绕着 arenaHints 来进行的,并且每次申请成功后都会更新 hint 的 addr,这样就实现了连续增长,直到失败。如果失败了,就从下一个 1TiB 的区间再次开始!

4. g0 堆栈

看过了普通 goroutine 堆栈的分配之后,再来简要说一下 g0 的堆栈。g0 是个比较特殊的 goroutine 它只是协助 runtime 来执行,但不承载任何执行函数,与普通的用户 goroutine 有所区别。在一定程度上,可以把它类比成操作系统上每个线程的内核栈,每当 runtime 获得控制权的时候就会将堆栈切换到 g0 代表的堆栈上。

go 的 GPM 模型此处不作介绍,建议阅读Scheduling In Go : Part II - Go Scheduler 来了解并发模型。我们只说其中的 M,每个M 都有一个 g0 堆栈,用于执行 runtime 代码,其中较为特殊的 M0 (即 go 进程的主线程,每个 go 程序仅有一个 M0)的 g0 堆栈是通过汇编语言进行初始化的。

我们先来看看 go 程序的入口地址:

richard@Richard-Manjaro:~ » readelf -h carefree 
ELF Header:
  Magic:   7f 45 4c 46 02 01 01 00 00 00 00 00 00 00 00 00 
  Class:                             ELF64
  Data:                              2's complement, little endian
  Version:                           1 (current)
  OS/ABI:                            UNIX - System V
  ABI Version:                       0
  Type:                              EXEC (Executable file)
  Machine:                           Advanced Micro Devices X86-64
  Version:                           0x1
  Entry point address:               0x463f20
  Start of program headers:          64 (bytes into file)
  Start of section headers:          456 (bytes into file)
  Flags:                             0x0
  Size of this header:               64 (bytes)
  Size of program headers:           56 (bytes)
  Number of program headers:         7
  Size of section headers:           64 (bytes)
  Number of section headers:         23
  Section header string table index: 3

读取 ELF文件头可知,入口地址为0x463f20,因为禁用了 cgo,没有动态链接库,所以 Entry point 指示的地址既是程序的入口地址。继续看一下该地址指示的代码:

richard@Richard-Manjaro:~ » lldb ./carefree 
(lldb) target create "./carefree"
Current executable set to '/home/richard/carefree' (x86_64).
(lldb) image lookup --address 0x463f20
      Address: carefree[0x0000000000463f20] (carefree.PT_LOAD[0]..text + 405280)
      Summary: carefree`_rt0_amd64_linux
(lldb) 

_rt0_amd64_linux 即为程序的入口,当运行程序时,shell 会 fork 一个子进程出来,之后执行 execve() 系统调用来装载 go 的可执行文件,当内核装载完毕之后,会将 CPU 的程序计数器设置为此入口点,之后 go 程序开始执行。

_rt0_amd64_linux 是对 asm_amd64.sruntime·rt0_go 的调用,看一下runtime·rt0_go 的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
TEXT runtime·rt0_go(SB),NOSPLIT|TOPFRAME,$0
// copy arguments forward on an even stack
MOVQ DI, AX // argc
MOVQ SI, BX // argv
SUBQ $(5*8), SP // 3args 2auto
ANDQ $~15, SP
MOVQ AX, 24(SP)
MOVQ BX, 32(SP)

// create istack out of the given (operating system) stack.
// _cgo_init may update stackguard.
// 初始化 g0
MOVQ $runtime·g0(SB), DI
LEAQ (-64*1024+104)(SP), BX
MOVQ BX, g_stackguard0(DI)
MOVQ BX, g_stackguard1(DI)
MOVQ BX, (g_stack+stack_lo)(DI)
MOVQ SP, (g_stack+stack_hi)(DI)

这段代码设置 g0 堆栈的方式是使用线程堆栈的栈顶指针减少 64KB + 104B 作为 g0 堆栈的低端,当前线程堆栈的栈顶为 g0 堆栈的高端。执行完成后,g0 的堆栈便被初始化为 64KB 了。令人惊讶的是,这居然是在系统线程的 8M 堆栈(Linux 的默认线程堆栈为 8 M)中分配的。

再来看一下其它新建 Mg0,go 通过 runtime.newm 来新建操作系统线程,顺藤摸瓜会发现其最终执行的系统调用为 clone:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func newosproc(mp *m) {
stk := unsafe.Pointer(mp.g0.stack.hi)
/*
* note: strace gets confused if we use CLONE_PTRACE here.
*/
if false {
print("newosproc stk=", stk, " m=", mp, " g=", mp.g0, " clone=", abi.FuncPCABI0(clone), " id=", mp.id, " ostk=", &mp, "\n")mp.g0.stack.hi
}

// Disable signals during clone, so that the new thread starts
// with signals disabled. It will enable them in minit.
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(abi.FuncPCABI0(mstart)))
sigprocmask(_SIG_SETMASK, &oset, nil)

if ret < 0 {
print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", -ret, ")\n")
if ret == -_EAGAIN {
println("runtime: may need to increase max user processes (ulimit -u)")
}
throw("newosproc")
}
}

clone 中堆栈起始地址传入的是 mp.g0.stack.hi,即该 Mg0 的堆栈高端地址,看一下 g0 的初始化,相应的代码在 runtime.allocm 中:

1
2
3
4
5
if iscgo || mStackIsSystemAllocated() {
mp.g0 = malg(-1)
} else {
mp.g0 = malg(8192 * sys.StackGuardMultiplier)
}

可见后续 g0 分配就是通过 malg 来进行的,该函数我们之前已经介绍过了,此处只要明白分配的堆栈大小为 8K 即可。由此可知,除了 m0g0 在传统的主线程堆栈区域外,后续 M 的堆栈都是分配自 go 堆中,其可能的区域自不待言,我们已在上一节论述过了。

5. goroutine 的堆栈切换

当 goroutine 被 runtime 调度到 CPU 上时,不仅要将程序计数器设置为该 goroutine 的执行函数地址,而且要切换到该 goroutine 的堆栈上执行后续操作,我们这一节就来看看 goroutine 的堆栈是如何切换的。堆栈的切换和调度密切相关,但此处只讨论和堆栈有关的内容,不再深入调度相关的细节。

m0 在初始化好一系列条件之后,会调用 runtime·mstart 从而真正的让 M0 跑起来,后续新建 M 时向 clone 传入的运行函数也是 runtime·mstart,而 runtime·mstart 最终会进入调度函数 runtime.schedule, 而 schedule 的工作就是千方百计的寻找空闲的 G 将它送到 CPU 上运行。当最终找到这个 G 的时候,会调用一段用汇编代码写成的函数 runtime·gogo(buf *gobuf)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $0-8
MOVQ buf+0(FP), BX // gobuf
MOVQ gobuf_g(BX), DX
MOVQ 0(DX), CX // make sure g != nil
JMP gogo<>(SB)

TEXT gogo<>(SB), NOSPLIT, $0
get_tls(CX)
MOVQ DX, g(CX)
MOVQ DX, R14 // set the g register
MOVQ gobuf_sp(BX), SP // restore SP
MOVQ gobuf_ret(BX), AX
MOVQ gobuf_ctxt(BX), DX
MOVQ gobuf_bp(BX), BP
MOVQ $0, gobuf_sp(BX) // clear to help garbage collector
MOVQ $0, gobuf_ret(BX)
MOVQ $0, gobuf_ctxt(BX)
MOVQ $0, gobuf_bp(BX)
MOVQ gobuf_pc(BX), BX
JMP BX

runtime·gogo 会调用 gogo,传入的参数是 g 结构体中和调度相关的一个字段 gobuf:

1
2
3
4
5
6
7
8
9
type gobuf struct {
sp uintptr
pc uintptr
g guintptr
ctxt unsafe.Pointer
ret uintptr
lr uintptr
bp uintptr // for framepointer-enabled architectures
}

其中有程序计数器和堆栈栈顶指针等重要的值,这些值都是该 goroutine 被调度出 CPU 的时候保存进来的,是 goroutine 的执行现场。gogo 会将现场恢复,这包括程序计数器和栈顶,之后这个 goroutine 就又从上次中断的地方跑起来了。

6. 总结

本文以探求 goroutine 堆栈在进程虚拟地址空间中的位置为诉求,对源代码进行有目的的展开,并最终找到内存分配的内核接口 mmap

mmap 的使用太过灵活,以至于非要刻板的对应到虚拟内存布局中的位置显得有些棘手,因为 go 堆 接管的是整个虚拟内存的用户空间,但我们仍然可以从其内存分配的设计思想中窥得一二。

go 堆的起始位置在用户空间的中段,确切的说是距离起始端 768 GiB 的地方开始,而从用户空间 128 TiB 的角度来看,这远远算不上中间,仅仅是相对于传统 heap 来说的。我想这也是 go 对于历史的一种尊重,好在 64 位模式下虚拟地址空间的跨度足够大,可以做出很灵活的设计。

go 堆把后续的空间划分成了 128 份,几乎每份都有 1TiB 的大小,然后默默地从地址 0x00c000000000 处向上增长,因为00 c0 既不是有效的 UTF8 编码,又有足够的辨识度。

参考文献

  1. 深入理解计算机系统
  2. 程序员的自我修养
  3. Linux/UNIX系统编程手册
  4. 深入 Linux 内核架构
  5. Linux 系统编程
  6. mmap
  7. 深入golang runtime的调度
  8. go-scheduler
  9. Go 语言原本
  10. 一文教你搞懂 Go 中栈操作
  11. 详解Go中内存分配源码实现
  12. Scheduling In Go : Part II - Go Scheduler

希望能给 Go 后学带来更多有意义的内容

effective go 中文版 是我个人的文档轮子。为了帮助需要的人准确理解原文,特采纳中英双语格式,目前翻译初稿已经完成。

互联网上已有 effective go 的中文版,甚至还不止一版,那为什么还要再造这个轮子呢?首先,我个人的初衷是为了锻炼自己的英文理解与翻译水平,同时也能磨砺中文的语言组织能力,更是为了把 go 语言的基础夯实。

其次呢,我在读第一遍英文时,碰到有疑问的地方在中文版里往往得不到想要的答案,后来自己思考也解决了一些疑惑,有些问题呢是出自翻译的问题,比如最后一章有一句话:The program here provides a nicer interface to one form of data: given a short piece of text, it calls on the chart server to produce a QR code, a matrix of boxes that encode the text.

冒号之后的内容姑且不论,它是对 a nicer interface 的详细阐述。我们看前半句中的 one form of data 到底指指什么?从字面意思看,或者由翻译软件翻译的话意思应该是:一种数据类别。我所见的两个翻译版本分别翻译成了 一种数据格式某种形式的数据,可见就是照字面翻译,只是为了语句通顺问题,各自进行了调整。但即便是进行了调整,整个句子依然说不通,此程序为一种数据格式提供了更好的的接口 其意几何呢?

其实,下文中文档便给出了详细代码,html 的代码中有一个输入表单。我们知道 form 不仅有形式、类别、种类的意思,还具有表格的意思,并且在前端领域我们习惯于术语表单。所以此处分明是一个数据的表单之意!再者说,form 之前使用了数量词 one 而非定冠词 a ,我认为这是另一条佐证,这句话的本意是:本程序为表单数据提供了更加友好的接口

除翻译问题外,春秋笔法,微言大义似乎也能造成困扰。effective go 虽为入门级必读资料,其内容算不上艰深,然而某些细微之处,于新手而言并不见得就能轻易领会,所以我在某些地方也注释了自己的心得。这看起来颇有几分古人注书的感觉,只不过我注的内容未必都对,所注条目也不甚多,但总归是自己花了精力的所思所想,相必对有些人有用也未可知,因此闲暇之余,又翻译了一版,很期待各位能够给予斧正。

因初稿未经校对,其中定有不少讹误疏漏之处,希望所有有兴趣的伙伴都能参与进来,不论是书写还是理解上的问题,都欢迎提交 issue 或者 pr,我定我会及时处理。

  1. 介绍
  2. 格式化
  3. 注释
  4. 命名
  5. 分号
  6. 控制结构
  7. 函数
  8. 数据
  9. 初始化
  10. 方法
  11. 接口和其它类型
  12. 空白标识符
  13. 内嵌
  14. 并发
  15. 错误
  16. 一个 web 服务

go 1.18 于近日发布,带来了 go 历史上最大的一次语言级改变——泛型!但本文只聚焦于本次发布中标准库 bufio 包中的一个小小的改变——Writer.AvailableBuffer。go 每次版本发布都会伴随着标准库的些许变动,本次发布即在 bufio 包中增加了一个 Writer.AvailableBuffer 方法。

该方法的添加源于一条名为 bufio: add Writer.AvailableBuffer 的 issue。作者认为 go 中很多 appendX 类型的 API 在与 bufio.Writer 一起工作的时候比较低效,原因在于 Write(p []byte) (nn int, err error) 方法只接受 []byte ,却不向外提供 []byte。这在一定程度上需要调用者自行分配内存,然后传给 write 使其再进行 copy,所以这里固定有一次内存的 allocationcopy

作者的提议是:由 bufio 的 Writer 向外暴露自己的 buffer 以为 appendx 函数使用:

1
2
3
4
5
6
// AvailableBuffer returns an empty buffer with b.Available capacity.
// This buffer is intended to be appended to and
// passed to an immediately succeeding Write call.
func (b *Writer) AvailableBuffer() []byte {
return b.buf[b.n:][:0]
}

AvailableBuffer 向外暴露了一个空的 []byte, 但是其 capacity 是和 Writer 的 buffer 余量相同的,这意味着返回的 []byteWriter 共享同一个底层数组。那这样做的好处是什么呢? 其好处就是在理想情况下,能够避免那固有的一次 allocationcopy,且看如下示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main

import (
"bufio"
"os"
"strconv"
)

func main() {
w := bufio.NewWriter(os.Stdout)
for _, i := range []int64{1, 2, 3, 4} {
b := w.AvailableBuffer()
b = strconv.AppendInt(b, i, 10)
b = append(b, ' ')
w.Write(b)
}
w.Flush()
}

该例循环体内使用的 buffer b 都返回自 AvailableBuffer,此时没有额外的内存分配。理想情况下 append 的 byte 数量不超过 Writer 的 buffer 余量时, copy 会立即返回,因为 bWriter 使用的同一个底层数组,不需要 copy,因此连 copy 的操作都省掉了。

极端情况下,由于 append 的扩张导致 b 的底层数组重新分配,那么 Write 也只是回到了其最初的工作方式。在很大程度上,该调整还是让 Writer 变得高效了不少!

那有什么不好的地方吗? 唯一缺点应该就是暴露了 bufio.Writer 内部的 buffer。然而该包中的其它类型,诸如 ReaderScanner 已经向外提供了 Reader.PeekReader.ReadSlice、以及 Scanner.Bytes 等对底层数组不安全的访问方法,故也不能独怪其罪。至少并没有破坏包的整体风格,而其利弊皆在人之为用!

0%