几分钟了解Pingora♿核心篇

2024-03-01, 杭州

之前Cloudflare宣称的用于替换Nginx的超高性能网关框架Pingora现在终于释出v0.1.0版了,为了蹭热度决定快速进行一个源码的读,这篇文章可能是简体中文网络上最早的源码解析。感觉源码copypasta的上下文切换很影响阅读体验,所以我尽量不出现大段源码,就算出现了,也可以直接跳过看下一段的总结。这份文章建议配合源码食用,不配合源码的话就当原理快速预览。啥都不想读就跳到太长不看

开始

需要注意,Pingora是一个框架,不是完整的二进制可执行文件分发,跟Nginx读nginx.conf然后直接起服务不同,你得自己写个rust把他包进去定义服务然后编译出二进制。看官方给的启动例子:

use async_trait::async_trait;
use pingora::prelude::*;
use std::sync::Arc;

fn main() {
    let mut my_server = Server::new(None).unwrap();
    my_server.bootstrap();

    // Note that upstreams needs to be declared as `mut` now
    let mut upstreams =
        LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:343"]).unwrap();

    let hc = TcpHealthCheck::new();
    upstreams.set_health_check(hc);
    upstreams.health_check_frequency = Some(std::time::Duration::from_secs(1));

    let background = background_service("health check", upstreams);
    let upstreams = background.task();

    // `upstreams` no longer need to be wrapped in an arc
    let mut lb = http_proxy_service(&my_server.configuration, LB(upstreams));
    lb.add_tcp("0.0.0.0:6188");

    my_server.add_service(background);

    my_server.add_service(lb);
    my_server.run_forever();
}

关键要素:

此外,还不得不提,Pingora每次调整都得编译出新的可执行螃蟹🦀,你可能会想更换二进制时不停机热更新咋办,这就涉及到Pingora用的二进制可执行的网关之间做的热升级策略了

创建

可以看到调用了pingora::prelude::Server来启动服务。戳进pingora/src/lib.rs,可以看到引用了pingora_corepingora_httppingora_timeout三个核心组件,然后根据缓存、负载均衡和反向代理三个场景,用feature区分出调用的具体的crate。

例子里调用了Server,可以认为这是核心服务了,跟进,参数看不懂啦,直接看怎么新建然后跑起来的。

直接看Server::new标准起手,做一对tx和rx用来告诉服务何时退出。然后根据传参生成配置。传参包括命令行参数(opt)和配置文件(conf,以YAML格式)两类。没conf的话就自己造一个空的YAML Self::from_yaml("---\nversion: 1").ok(),序列化到ServerConf类,然后空跑。有conf就先加载conf,然后用传入的参数覆盖配置值。

再次强调,Pingora的配置文件主要是配运行时的参数,服务定义是在Rust里包着的。

预热

结构建起来之后,各类服务都需要塞到Server里,在这之前,我们需要先bootstrap摇热做些准备工作。摇热无非就是在内存里把结构体创建好该塞的参数塞进去。不过bootstrap里面最好玩的是这句

#![allow(unused)]
fn main() {
match self.load_fds(self.options.as_ref().map_or(false, |o| o.upgrade)) {}
}

看他要的参数是ServerConf.upgrade,这个参数的说明是 Whether this server should try to upgrade from an running old server ,这个服务是不是要从正在跑着的旧服务热升级上来。函数名叫Server::load_fds,进去看,发现它试着把从Serverconf.upgrade_sock记载的sock用于通信,拿走老进程手上的文件描述符,然后加载到自己内存身上,从而实现热升级。注释也说了,这个参数标记升级用的sock地址,目的就是新进程和老进程之间协调零宕机时间热升级。是一手值得学习的设计。不过我们先考虑最一般的情况,热升级后面再说

预热完之后服务的结构在内存里就烤好了,不难发现,它是先注册好一堆服务再一口气跑起来的类型,所以接下来往里面塞各种服务。

集群负载

主要关注pingora_load_balancing::LoadBalancer结构。

值得注意,从它impl BackgroundService来看,集群负载本身是作为后台服务注册的,也就是它并非在每次有请求来的时候更新状态, 而是每时每刻都在后台异步运行。运行的入口方法则是LoadBalancer.start,这个方法会在Server启动时被调用。

可以看到一个循环里主要包含俩函数:self.updateself.backends.run_health_check。这俩函数是否能运行取决于有没有设频率,没设频率的话直接把下次运行时间推到timestamp的尽头。其中,一个自然是健康检查,另一个自然是服务发现。

而从结构体来看,符合直觉,pingora_load_balancing::LoadBalancer是建立在Backend之上的,这正好比Nginx里的:

upstream cluster1 {
    server 100.114.51.4:1919;
    server 100.114.51.5:1919;
}

注释提到,为了实现探活和服务发现,LoadBalancer需要以pingora_core::services::background::BackgroundService跑起来,Service这个概念现在还不清楚,先看最关键的LoadBalancerBackendsBackend三个结构。

单个Backend没啥好考虑的,后端地址、权重和哈希罢了。哈希算法用的就是rust标准库的哈希。

Backend凑成Backends就很好玩了。配合LoadBalancer一起看,

#![allow(unused)]
fn main() {
pub struct LoadBalancer<S> {
    backends: Backends,
    selector: ArcSwap<S>,

    pub health_check_frequency: Option<Duration>,
    pub update_frequency: Option<Duration>,
    pub parallel_health_check: bool,
}

pub struct Backends {
    discovery: Box<dyn ServiceDiscovery + Send + Sync + 'static>,
    health_check: Option<Arc<dyn health_check::HealthCheck + Send + Sync + 'static>>,
    backends: ArcSwap<BTreeSet<Backend>>,
    health: ArcSwap<HashMap<u64, Health>>,
}
}

可以认为Backends负责决定有哪些服务可以用,LB负责指挥具体路由到谁的策略。抽象起来意思就是存一堆后端的索引结构选择要转发的后端,多久做一次健康检查,多久做一次服务发现。这些就是考虑负载设计时的根本要素。

索引结构

backends: ArcSwap<BTreeSet<Backend>>,给不懂Rust的读者解释一下就是支持原子操作的智能指针指向一个B树索引集合,树上挂着一堆Backend。接下来的三个部分都会对这个结构这这拨弄

后端选择

LoadBalancer做了个泛型,要求选择机制必须满足BackendSelection trait。

#![allow(unused)]
fn main() {
pub trait BackendSelection {
    type Iter;
    fn build(backends: &BTreeSet<Backend>) -> Self;
    fn iter(self: &Arc<Self>, key: &[u8]) -> Self::Iter
    where
        Self::Iter: BackendIter;
}

pub trait BackendIter {
    fn next(&mut self) -> Option<&Backend>;
}
}

大致意思就是通过build从LB所拥有的后端建立出一个机制,然后这个机制调用iter得到一个迭代器,这个迭代器每次迭代都能得出下一个可能的后端,就实现了选择。

自带的LB当中,对BackendSelection有两个实现,一个是Kentama一致性哈希选择,另一个是可以往泛型算法槽里填充指定加权算法的加权选择。

Kentama说白了就是“后端地址维持”,固定的源IP过来,算出来的哈希值就是固定的后端。

而加权的算法使用一个存有后端、权重和算法的结构实现,这个用俩数组做一一映射的结构也值得思考:

#![allow(unused)]
fn main() {
pub struct Weighted<H = FnvHasher> {
    backends: Box<[Backend]>,
    weighted: Box<[u16]>,
    algorithm: H,
}

impl<H> SelectionAlgorithm for H
where
    H: Default + Hasher,
{
    fn new() -> Self {
        H::default()
    }
    fn next(&self, key: &[u8]) -> u64 {
        let mut hasher = H::default();
        hasher.write(key);
        hasher.finish()
    }
}

}

容许的选择算法是只要有Defaultstd::hash::Hasher这俩trait就行。说白了就是新建一个不传初始状态的哈希器要用到Default,然后摇哈希需要用到Hasher。

默认提供了轮询(RoundRobin)、随机(Random)和FNV Hash三种算法。

健康检查

这里我们只要关注pingora_load_balancing::health_check::HealthCheck定义的trait:异步的检查函数和判断检查失败/检查成功的阈值。至于探测方法,TcpHealthCheckHttpHealthCheck都各自实现了这个trait。而对于每个后端来说,它的健康状态是由pingora_load_balancing::health_check::Health来表示的:

#![allow(unused)]
fn main() {
#[derive(Clone)]
struct HealthInner {
    healthy: bool,
    enabled: bool,
    consecutive_counter: usize,
}

pub(crate) struct Health(ArcSwap<HealthInner>);
}

一个原子操作结构,调整:是否健康、是否启用,以及该后端健康检查的失败次数。失败次数大于阈值就将标志位设成不健康。需要注意,真实世界的业务是复杂的,健不健康和启不启用是两回事,所以healthy和enabled要分开记录和看待。

服务发现

目前Pingora提供了服务发现的接口和一个纯静态的服务发现实现。服务发现的接口trait是pingora_load_balancing::discovery::ServiceDiscovery,可以看到只要求实现discover方法,就可以在上述的后台服务大循环里被调用。除了返回发现的后端服务器Backend集合以外,还可以返回HashMap用于指示Backend的可用状态(默认可用)。内置的纯静态的服务发现则只是单纯从它所接收的参数构建可用的Backends,不管活不活。

服务和监听

你看到这里大概会期待类似Nginx里的server { listen 80; }的东西,但Pingora把他归类到服务中。所以先看看Pingora的服务模型。

Pingora的事件驱动看起来是基于tokio做的(当然,运行时也有不需要tokio的NoSteal),所以它异步事件风味很浓。请看pingora_core::services::Service

先看注释里的两个定义。

  • 服务(Service):在Pingora服务器上一直跑着直到服务器停机他才停的玩意。有两种服务很好用:
    • 监听服务:监听服务就是守在端点接受请求给响应的服务。服务可以加业务逻辑,也可以一个服务监听多个端点。
    • 后台服务:不在请求-响应生命周期里,单独跑着的逻辑代码。

根据例子不难猜测,Pingora的服务模型是服务器启动之前先把一系列服务注册上去,服务器启动时会把服务异步转起来,关停时会走频道传信号让服务凋亡。

再看看源码:

#![allow(unused)]
fn main() {
#[async_trait]
pub trait Service: Sync + Send {
    async fn start_service(&mut self, fds: Option<ListenFds>, mut shutdown: ShutdownWatch);

    fn name(&self) -> &str;

    fn threads(&self) -> Option<usize> {
        None
    }
}

#[cfg_attr(not(doc_async_trait), async_trait)]
pub trait BackgroundService {
    async fn start(&self, mut shutdown: ShutdownWatch);
}
}

也就是说:能攥着一堆fd异步启动的,有名字的,有建议线程数的,就叫服务了。注意名字主要拿来打日志,只会用前16个字节。异步启动里塞了个关停watch,停服务时这个信号会传进去让服务自己凋亡。

而后台服务则是在trait埋了个异步start方法,服务器启动时就会试图触发这个异步方法做启动。至于怎么拉起来的,我们后面再说。现在再看看监听。

监听服务

pingora_core::services::listening::Service<A>

按注释:监听服务就是守在端点接受请求给响应的服务。服务可以加业务逻辑,也可以一个服务监听多个端点。结构很诚实反映了这点:

#![allow(unused)]
fn main() {
pub struct Service<A> {
    name: String,
    listeners: Listeners,
    app_logic: Arc<A>,
    /// 建议线程数
    pub threads: Option<usize>,
}
}

需要注意,监听器-业务逻辑两者设计上是解耦的,创建服务时可以创建只有业务逻辑而尚未添加监听端口的,然后再往上加,服务本身负责将他们绑定。但一定要在Pingora服务器启动前加好,不然就只能等后面发动热更新了。

监听器

先看看监听器套娃。

#![allow(unused)]
fn main() {
use tokio::net::{TcpListener, UnixListener};

pub struct Listeners {
    stacks: Vec<TransportStackBuilder>,
}

pub(crate) struct TransportStack {
    l4: ListenerEndpoint,
    tls: Option<Arc<Acceptor>>,
    upgrade_listeners: Option<ListenFds>,
}
pub struct ListenerEndpoint {
    listen_addr: ServerAddress,
    listener: Option<Listener>,
}
pub enum Listener {
    Tcp(TcpListener),
    Unix(UnixListener),
}
impl Listener {
    pub async fn accept(&self) -> io::Result<Stream> {
        match &self {
            Self::Tcp(l) => l.accept().await.map(|(stream, _)| stream.into()),
            Self::Unix(l) => l.accept().await.map(|(stream, _)| stream.into()),
        }
    }
}
}

它设计上就是考虑且只考虑一个服务当中,同时监听多个四层的,TCP/UDS/TLS端口异步接受请求,的情况。注意是UDS,Unix Socks,不是UDP。

关注async fn ListenerEndpoint.listen方法,发现他居然有个fds参数和一大坨逻辑,这个参数也是配合热升级用的,这里先不管,会发现它就是绑个端口罢了,没别的。

再关注accept方法,对上面封了一层,加了个对Listener的防呆,然后就返回接受新请求的流。包到TransportStack就把TCP和TLS封装到一个未初始化流里,给个握手函数。这么做是因为TCP建立连接后不用握手,TLS用,匹配到是TLS就先做个握手,是TCP就直接变成能用的流了。

拿到流,剩下的事情就不用监听器管了。这个流就进入了业务逻辑的视角。不过要想把流交给业务逻辑,还得先把业务逻辑和监听器绑在一起。

业务逻辑

listening::Service<A>的泛型A就是用于编写业务逻辑的接口。要想能被start_service启动,业务逻辑(TCP层)必须实现trait:pingora_core::apps::ServerApp,其中的process_new(Stream, &ShutdownWatch) -> Option<Stream>方法就是业务逻辑的主体入口部分了,你要做业务逻辑二开就从这儿入手。这儿你直接拿到的正是上面监听器帮你处理好的TCP流,做你想做的事情就完事了。这里的返回值是Option<Stream>,大有作用,见下一节。

业务逻辑和监听器的绑定

把业务逻辑和监听器真正绑在一起的地方是监听服务的启动入口pingora_core::services::listening::Service<A>::start_service

前面说过这个方法在Pingora启动时会被调用,它调用之后就是:过一遍监听器看有几个监听,把业务逻辑代码克隆N份,每份跟每个监听都扔到一个Service<A>::run_endpoint方法实例里,然后在Pingora服务器的tokio实例里刷这N个方法实例异步跑起来,然后阻塞等待。

继续跟入这个方法,里面是一个大loop,来了,我们最期待的大loop来了!(当然后面还有别的loop要讲)。里面就是经典的tokio监听事件,是关停事件就跳出loop做打扫,是新连接就拿到流了,克隆一份关闭频道,克隆一份业务逻辑,再刷个Service<A>::handle_event方法实例把流、逻辑和关闭频道扔进tokio里异步跑起来。

继续跟入handle_event,这里比较妙的设计是连接复用的判断。它会调用业务逻辑的process_new方法处理这个流,如果是短连接,你处理完返回None,方法实例就结束了。如果是长连接复用,你返回Some,它就会循环重入process_new,直到业务逻辑返回None

监听(绑定端口),接受(获得流),处理和响应(读写流,关闭流),成了!现在我们来看看后台服务。

后台服务

像上面说的服务发现、负载均衡,以及服务自身状态上报之类,都是服务器自己在后台主动去做,反复做,不需要请求响应触发的业务。这些服务就是后台服务。

后台服务的业务逻辑需要实现pingora_core::services::BackgroundService,这个trait里只有一个start方法,这个方法在一个impl里包进了start_service,这个函数跟着服务实例,会在Pingora服务器启动时被遍历调用,传入一个关闭频道,然后异步跑起来。Pingora赠送了个结构和默认方法:

#![allow(unused)]
fn main() {
pub struct GenBackgroundService<A> {
    name: String,
    task: Arc<A>,
    pub threads: Option<usize>,
}
pub fn background_service<SV>(name: &str, task: SV) -> GenBackgroundService<SV> {
    GenBackgroundService::new(format!("BG {name}"), Arc::new(task))
}
}

泛型A就是后台服务的业务逻辑,在注册服务时调用background_service方法把你写的后台服务传参进去,就会生成一个通用后台服务结构,把这个结构当作服务注册到Pingora身上就行。后台服务可以随时提前返回终止。

服务注册

Server实例有个services: Vec<Box<dyn Service>>结构,调用Server.add_service或者Server.add_services把服务挂上去就行了,服务器启动的速度是人手的数倍,服务器舒服了自己会帮你调用拉起服务的。这就是接下来的启动环节。

启动

一切准备就绪,🚀调用run_forever就把服务跑起来了🚀,感觉高松灯会喜欢。需要注意文档中提到对Server对象来说这个函数必须最后调用,因为它阻塞

首先是经典加参数自行daemonize,值得注意的是双叉之前调用了超时管理器的暂停,因为超时管理器有个RwLock,这玩意在进程间传递是UB,所以要暂停一下等所有锁释放。你可能会觉得启动时不应该有锁,但这是个开放出去的函数,鬼知道调用它的程序员在启动前会做什么。此外文档中提到双叉时需要注意在 POSIX 标准中,当一个多线程程序调用fork()时,只有调用fork()的线程被复制到子进程中;其他线程并不会在新创建的子进程中存在。所以你在调这函数之前如果玩了多线程操作,可能会丢线程然后玩脱。

然后发现,一个Server实例会有多个运行时。先是给每个服务调用start_service,他就把刚才说的services遍历一遍,每个服务都创建一个异步运行时,刷个实例跑服务,然后返回runtime。

此外服务器自己也有个运行时,用来跑整个服务器的主循环pingora_core::Server.main_loop,在主循环挡住,不然上面一堆服务的runtime还没来得及工作呢主协程就跑退出了。这里全是些接收信号然后安详退出/快速退出的逻辑,稀松平常。不过同样值得注意的是留了个信号位给热升级。

热升级

主要逻辑全都讲完了,现在我们来看看热升级的逻辑。这里主要是用了Unix的特性SCM_RIGHTS来实现的。

       SCM_RIGHTS
              Send or receive a set of open file descriptors from
              another process.  The data portion contains an integer
              array of the file descriptors.

根据猛男页的说法,这玩意就是用来进程间传fd的。考虑到端口监听也是fd,Pingora通过使用unix sock在老进程和新进程之间传递监听的fd来实现不停机升级。

先看服务器配置格式里的:

#![allow(unused)]
fn main() {
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct ServerConf {
    ..
    pub upgrade_sock: String,
    ..
}
pub struct Opt {
    ..
    pub upgrade: bool,
    ..
}
}

新老进程使用的配置文件当中,upgrade_sock字段必须保持一致来递交fd,并且新进程要求upgrade,才会进行热升级。

重新回到Server.bootstrap,预热到了load_fds这一步,就会开始找老登要fd。顺着get_from_sock看,Fds调用get_fds_from从老登那把fd全拿过来,然后再反序列化装入自身。然后再在load_fds把自己交给Server做后面的操作。从这儿开始吧。

新老进程递交fd

递交fd的通信协议基本都在pingora_core::server::transfer_fd::Fds里定义。主要get_fds_from搭配send_fds_to一起看。

新进程先在get_fds_from拿到这个sock的地址,然后在监听之前就把文件系统上的老进程的sock文件删掉,自己监听个同名的。

这个时候向老进程送一个SIGQUITServer.main_loop里面tokio抓到信号,开始锁住自己监听的所有fd,然后调用send_to_sock将自己的fd记录表序列化调用send_fds_to送给新进程。先连接,如果中间没连接成功,还会重试几次。连接上了就开始使用SCM_RIGHTS送fd。新进程接收到之后就把fd反序列化装进Fds里了。

新进程将fd装载到服务

新进程到run_service时就会将整个程序所有的fds传入,到装载新的监听服务时fds跟入listening::Service<A>.start_service(),进而跟入build,在这里和新进程从rust定义的TransportStack会合,之后一并传入对应的listen函数。在这里,新进程定义的服务就会在老进程递交过来的fds表里挑,如果表里刚好找到了服务自己本来就想监听的地址,它就直接把这个fd拿走接着听了,就实现了平滑的过渡。

太长不看

本文的意义就是看一个高速网关服务的架构设计,所以我不知道到底想要什么结论,这里主要给一些只想看结论的人。

  • 为什么性能这么高这么快?

    因为这是用Rust写的。

    大概因为事件驱动用的tokio框架能相当充分榨干性能,而且内存分配器用jemalloc,数据结构用B树,地址哈希算法用Kentama和FNV。但这你该问跑benchmark和optimization的人,我只看了看网关核心设计。

  • 生产环境可以用吗?

    CF说他们生产跑得很稳。但它用rust定义规则再编出二进制执行程序,并且用sock进行二进制文件热更新,我觉得你但凡有点迟疑都说明你的企业担待不起。

    而且目前只支持Linux。

  • HTTP代理之类的功能呢?

    我还没看,这篇文章主要是蹭热度,叫核心篇,只看核心,后面有空再看HTTP和TinyUFO之类的神奇组件。