几分钟了解Pingora♿核心篇
2024-03-01, 杭州
之前Cloudflare宣称的用于替换Nginx的超高性能网关框架Pingora
现在终于释出v0.1.0版了,为了蹭热度,我决定快速进行一个源码的读,这篇文章可能是简体中文网络上最早的源码解析。感觉源码copypasta的上下文切换很影响阅读体验,所以我尽量不出现大段源码,就算出现了,也可以直接跳过看下一段的总结。这份文章建议配合源码食用,不配合源码的话就当原理快速预览。啥都不想读就跳到太长不看。
开始
需要注意,Pingora是一个框架,不是完整的二进制可执行文件分发,跟Nginx读nginx.conf
然后直接起服务不同,你得自己写个rust把他包进去定义服务然后编译出二进制。看官方给的启动例子:
use async_trait::async_trait; use pingora::prelude::*; use std::sync::Arc; fn main() { let mut my_server = Server::new(None).unwrap(); my_server.bootstrap(); // Note that upstreams needs to be declared as `mut` now let mut upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443", "127.0.0.1:343"]).unwrap(); let hc = TcpHealthCheck::new(); upstreams.set_health_check(hc); upstreams.health_check_frequency = Some(std::time::Duration::from_secs(1)); let background = background_service("health check", upstreams); let upstreams = background.task(); // `upstreams` no longer need to be wrapped in an arc let mut lb = http_proxy_service(&my_server.configuration, LB(upstreams)); lb.add_tcp("0.0.0.0:6188"); my_server.add_service(background); my_server.add_service(lb); my_server.run_forever(); }
关键要素:
- 服务器的创建(new)
- 在内存里预热(bootstrap)
- 创建后端集群负载(LoadBalancer)
- 基于这堆后端创建健康检查(TcpHealthCheck)
- 把这堆后端挂到对外的服务和监听(Service)
- 服务器,启动(run_forever)!
此外,还不得不提,Pingora每次调整都得编译出新的可执行螃蟹🦀,你可能会想更换二进制时不停机热更新咋办,这就涉及到Pingora用的二进制可执行的网关之间做的热升级策略了。
创建
可以看到调用了pingora::prelude::Server
来启动服务。戳进pingora/src/lib.rs
,可以看到引用了pingora_core
、pingora_http
和pingora_timeout
三个核心组件,然后根据缓存、负载均衡和反向代理三个场景,用feature区分出调用的具体的crate。
例子里调用了Server,可以认为这是核心服务了,跟进,参数看不懂啦,直接看怎么新建然后跑起来的。
直接看Server::new
标准起手,做一对tx和rx用来告诉服务何时退出。然后根据传参生成配置。传参包括命令行参数(opt)和配置文件(conf,以YAML格式)两类。没conf的话就自己造一个空的YAML Self::from_yaml("---\nversion: 1").ok()
,序列化到ServerConf
类,然后空跑。有conf就先加载conf,然后用传入的参数覆盖配置值。
再次强调,Pingora的配置文件主要是配运行时的参数,服务定义是在Rust里包着的。
预热
结构建起来之后,各类服务都需要塞到Server
里,在这之前,我们需要先bootstrap
摇热做些准备工作。摇热无非就是在内存里把结构体创建好该塞的参数塞进去。不过bootstrap里面最好玩的是这句
#![allow(unused)] fn main() { match self.load_fds(self.options.as_ref().map_or(false, |o| o.upgrade)) {} }
看他要的参数是ServerConf.upgrade
,这个参数的说明是 Whether this server should try to upgrade from an running old server ,这个服务是不是要从正在跑着的旧服务热升级上来。函数名叫Server::load_fds
,进去看,发现它试着把从Serverconf.upgrade_sock
记载的sock用于通信,拿走老进程手上的文件描述符,然后加载到自己内存身上,从而实现热升级。注释也说了,这个参数标记升级用的sock地址,目的就是新进程和老进程之间协调零宕机时间热升级。是一手值得学习的设计。不过我们先考虑最一般的情况,热升级后面再说。
预热完之后服务的结构在内存里就烤好了,不难发现,它是先注册好一堆服务再一口气跑起来的类型,所以接下来往里面塞各种服务。
集群负载
主要关注pingora_load_balancing::LoadBalancer
结构。
值得注意,从它impl BackgroundService
来看,集群负载本身是作为后台服务注册的,也就是它并非在每次有请求来的时候更新状态, 而是每时每刻都在后台异步运行。运行的入口方法则是LoadBalancer.start
,这个方法会在Server
启动时被调用。
可以看到一个循环里主要包含俩函数:self.update
和self.backends.run_health_check
。这俩函数是否能运行取决于有没有设频率,没设频率的话直接把下次运行时间推到timestamp的尽头。其中,一个自然是健康检查,另一个自然是服务发现。
而从结构体来看,符合直觉,pingora_load_balancing::LoadBalancer
是建立在Backend
之上的,这正好比Nginx里的:
upstream cluster1 {
server 100.114.51.4:1919;
server 100.114.51.5:1919;
}
注释提到,为了实现探活和服务发现,LoadBalancer
需要以pingora_core::services::background::BackgroundService
跑起来,Service这个概念现在还不清楚,先看最关键的LoadBalancer
、Backends
和Backend
三个结构。
单个Backend
没啥好考虑的,后端地址、权重和哈希罢了。哈希算法用的就是rust标准库的哈希。
Backend凑成Backends
就很好玩了。配合LoadBalancer
一起看,
#![allow(unused)] fn main() { pub struct LoadBalancer<S> { backends: Backends, selector: ArcSwap<S>, pub health_check_frequency: Option<Duration>, pub update_frequency: Option<Duration>, pub parallel_health_check: bool, } pub struct Backends { discovery: Box<dyn ServiceDiscovery + Send + Sync + 'static>, health_check: Option<Arc<dyn health_check::HealthCheck + Send + Sync + 'static>>, backends: ArcSwap<BTreeSet<Backend>>, health: ArcSwap<HashMap<u64, Health>>, } }
可以认为Backends负责决定有哪些服务可以用,LB负责指挥具体路由到谁的策略。抽象起来意思就是存一堆后端的索引结构,选择要转发的后端,多久做一次健康检查,多久做一次服务发现。这些就是考虑负载设计时的根本要素。
索引结构
backends: ArcSwap<BTreeSet<Backend>>
,给不懂Rust的读者解释一下就是支持原子操作的智能指针指向一个B树索引集合,树上挂着一堆Backend。接下来的三个部分都会对这个结构这这拨弄。
后端选择
LoadBalancer
做了个泛型,要求选择机制必须满足BackendSelection
trait。
#![allow(unused)] fn main() { pub trait BackendSelection { type Iter; fn build(backends: &BTreeSet<Backend>) -> Self; fn iter(self: &Arc<Self>, key: &[u8]) -> Self::Iter where Self::Iter: BackendIter; } pub trait BackendIter { fn next(&mut self) -> Option<&Backend>; } }
大致意思就是通过build
从LB所拥有的后端建立出一个机制,然后这个机制调用iter
得到一个迭代器,这个迭代器每次迭代都能得出下一个可能的后端,就实现了选择。
自带的LB当中,对BackendSelection
有两个实现,一个是Kentama一致性哈希选择,另一个是可以往泛型算法槽里填充指定加权算法的加权选择。
Kentama说白了就是“后端地址维持”,固定的源IP过来,算出来的哈希值就是固定的后端。
而加权的算法使用一个存有后端、权重和算法的结构实现,这个用俩数组做一一映射的结构也值得思考:
#![allow(unused)] fn main() { pub struct Weighted<H = FnvHasher> { backends: Box<[Backend]>, weighted: Box<[u16]>, algorithm: H, } impl<H> SelectionAlgorithm for H where H: Default + Hasher, { fn new() -> Self { H::default() } fn next(&self, key: &[u8]) -> u64 { let mut hasher = H::default(); hasher.write(key); hasher.finish() } } }
容许的选择算法是只要有Default
和std::hash::Hasher
这俩trait就行。说白了就是新建一个不传初始状态的哈希器要用到Default,然后摇哈希需要用到Hasher。
默认提供了轮询(RoundRobin)、随机(Random)和FNV Hash三种算法。
健康检查
这里我们只要关注pingora_load_balancing::health_check::HealthCheck
定义的trait:异步的检查函数和判断检查失败/检查成功的阈值。至于探测方法,TcpHealthCheck
和HttpHealthCheck
都各自实现了这个trait。而对于每个后端来说,它的健康状态是由pingora_load_balancing::health_check::Health
来表示的:
#![allow(unused)] fn main() { #[derive(Clone)] struct HealthInner { healthy: bool, enabled: bool, consecutive_counter: usize, } pub(crate) struct Health(ArcSwap<HealthInner>); }
一个原子操作结构,调整:是否健康、是否启用,以及该后端健康检查的失败次数。失败次数大于阈值就将标志位设成不健康。需要注意,真实世界的业务是复杂的,健不健康和启不启用是两回事,所以healthy和enabled要分开记录和看待。
服务发现
目前Pingora提供了服务发现的接口和一个纯静态的服务发现实现。服务发现的接口trait是pingora_load_balancing::discovery::ServiceDiscovery
,可以看到只要求实现discover方法,就可以在上述的后台服务大循环里被调用。除了返回发现的后端服务器Backend集合以外,还可以返回HashMap用于指示Backend的可用状态(默认可用)。内置的纯静态的服务发现则只是单纯从它所接收的参数构建可用的Backends,不管活不活。
服务和监听
你看到这里大概会期待类似Nginx里的server { listen 80; }
的东西,但Pingora把他归类到服务中。所以先看看Pingora的服务模型。
Pingora的事件驱动看起来是基于tokio做的(当然,运行时也有不需要tokio的NoSteal),所以它异步事件风味很浓。请看pingora_core::services::Service
。
先看注释里的两个定义。
- 服务(
Service
):在Pingora服务器上一直跑着直到服务器停机他才停的玩意。有两种服务很好用:- 监听服务:监听服务就是守在端点接受请求给响应的服务。服务可以加业务逻辑,也可以一个服务监听多个端点。
- 后台服务:不在请求-响应生命周期里,单独跑着的逻辑代码。
根据例子不难猜测,Pingora的服务模型是服务器启动之前先把一系列服务注册上去,服务器启动时会把服务异步转起来,关停时会走频道传信号让服务凋亡。
再看看源码:
#![allow(unused)] fn main() { #[async_trait] pub trait Service: Sync + Send { async fn start_service(&mut self, fds: Option<ListenFds>, mut shutdown: ShutdownWatch); fn name(&self) -> &str; fn threads(&self) -> Option<usize> { None } } #[cfg_attr(not(doc_async_trait), async_trait)] pub trait BackgroundService { async fn start(&self, mut shutdown: ShutdownWatch); } }
也就是说:能攥着一堆fd异步启动的,有名字的,有建议线程数的,就叫服务了。注意名字主要拿来打日志,只会用前16个字节。异步启动里塞了个关停watch,停服务时这个信号会传进去让服务自己凋亡。
而后台服务则是在trait埋了个异步start方法,服务器启动时就会试图触发这个异步方法做启动。至于怎么拉起来的,我们后面再说。现在再看看监听。
监听服务
在pingora_core::services::listening::Service<A>
。
按注释:监听服务就是守在端点接受请求给响应的服务。服务可以加业务逻辑,也可以一个服务监听多个端点。结构很诚实反映了这点:
#![allow(unused)] fn main() { pub struct Service<A> { name: String, listeners: Listeners, app_logic: Arc<A>, /// 建议线程数 pub threads: Option<usize>, } }
需要注意,监听器-业务逻辑两者设计上是解耦的,创建服务时可以创建只有业务逻辑而尚未添加监听端口的,然后再往上加,服务本身负责将他们绑定。但一定要在Pingora服务器启动前加好,不然就只能等后面发动热更新了。
监听器
先看看监听器套娃。
#![allow(unused)] fn main() { use tokio::net::{TcpListener, UnixListener}; pub struct Listeners { stacks: Vec<TransportStackBuilder>, } pub(crate) struct TransportStack { l4: ListenerEndpoint, tls: Option<Arc<Acceptor>>, upgrade_listeners: Option<ListenFds>, } pub struct ListenerEndpoint { listen_addr: ServerAddress, listener: Option<Listener>, } pub enum Listener { Tcp(TcpListener), Unix(UnixListener), } impl Listener { pub async fn accept(&self) -> io::Result<Stream> { match &self { Self::Tcp(l) => l.accept().await.map(|(stream, _)| stream.into()), Self::Unix(l) => l.accept().await.map(|(stream, _)| stream.into()), } } } }
它设计上就是考虑且只考虑一个服务当中,同时监听多个四层的,TCP/UDS/TLS端口,异步接受请求,的情况。注意是UDS,Unix Socks,不是UDP。
关注async fn ListenerEndpoint.listen
方法,发现他居然有个fds参数和一大坨逻辑,这个参数也是配合热升级用的,这里先不管,会发现它就是绑个端口罢了,没别的。
再关注accept
方法,对上面封了一层,加了个对Listener的防呆,然后就返回接受新请求的流。包到TransportStack
就把TCP和TLS封装到一个未初始化流里,给个握手函数。这么做是因为TCP建立连接后不用握手,TLS用,匹配到是TLS就先做个握手,是TCP就直接变成能用的流了。
拿到流,剩下的事情就不用监听器管了。这个流就进入了业务逻辑的视角。不过要想把流交给业务逻辑,还得先把业务逻辑和监听器绑在一起。
业务逻辑
listening::Service<A>
的泛型A就是用于编写业务逻辑的接口。要想能被start_service启动,业务逻辑(TCP层)必须实现trait:pingora_core::apps::ServerApp
,其中的process_new(Stream, &ShutdownWatch) -> Option<Stream>
方法就是业务逻辑的主体入口部分了,你要做业务逻辑二开就从这儿入手。这儿你直接拿到的正是上面监听器帮你处理好的TCP流,做你想做的事情就完事了。这里的返回值是Option<Stream>
,大有作用,见下一节。
业务逻辑和监听器的绑定
把业务逻辑和监听器真正绑在一起的地方是监听服务的启动入口pingora_core::services::listening::Service<A>::start_service
。
前面说过这个方法在Pingora启动时会被调用,它调用之后就是:过一遍监听器看有几个监听,把业务逻辑代码克隆N份,每份跟每个监听都扔到一个Service<A>::run_endpoint
方法实例里,然后在Pingora服务器的tokio实例里刷这N个方法实例异步跑起来,然后阻塞等待。
继续跟入这个方法,里面是一个大loop,来了,我们最期待的大loop来了!(当然后面还有别的loop要讲)。里面就是经典的tokio监听事件,是关停事件就跳出loop做打扫,是新连接就拿到流了,克隆一份关闭频道,克隆一份业务逻辑,再刷个Service<A>::handle_event
方法实例把流、逻辑和关闭频道扔进tokio里异步跑起来。
继续跟入handle_event
,这里比较妙的设计是连接复用的判断。它会调用业务逻辑的process_new
方法处理这个流,如果是短连接,你处理完返回None
,方法实例就结束了。如果是长连接复用,你返回Some
,它就会循环重入process_new
,直到业务逻辑返回None
。
监听(绑定端口),接受(获得流),处理和响应(读写流,关闭流),成了!现在我们来看看后台服务。
后台服务
像上面说的服务发现、负载均衡,以及服务自身状态上报之类,都是服务器自己在后台主动去做,反复做,不需要请求响应触发的业务。这些服务就是后台服务。
后台服务的业务逻辑需要实现pingora_core::services::BackgroundService
,这个trait里只有一个start
方法,这个方法在一个impl里包进了start_service
,这个函数跟着服务实例,会在Pingora服务器启动时被遍历调用,传入一个关闭频道,然后异步跑起来。Pingora赠送了个结构和默认方法:
#![allow(unused)] fn main() { pub struct GenBackgroundService<A> { name: String, task: Arc<A>, pub threads: Option<usize>, } pub fn background_service<SV>(name: &str, task: SV) -> GenBackgroundService<SV> { GenBackgroundService::new(format!("BG {name}"), Arc::new(task)) } }
泛型A就是后台服务的业务逻辑,在注册服务时调用background_service
方法把你写的后台服务传参进去,就会生成一个通用后台服务结构,把这个结构当作服务注册到Pingora身上就行。后台服务可以随时提前返回终止。
服务注册
Server
实例有个services: Vec<Box<dyn Service>>
结构,调用Server.add_service
或者Server.add_services
把服务挂上去就行了,服务器启动的速度是人手的数倍,服务器舒服了自己会帮你调用拉起服务的。这就是接下来的启动环节。
启动
一切准备就绪,🚀调用run_forever
就把服务跑起来了🚀,感觉高松灯会喜欢。需要注意文档中提到对Server
对象来说这个函数必须最后调用,因为它阻塞。
首先是经典加参数自行daemonize
,值得注意的是双叉之前调用了超时管理器的暂停,因为超时管理器有个RwLock
,这玩意在进程间传递是UB,所以要暂停一下等所有锁释放。你可能会觉得启动时不应该有锁,但这是个开放出去的函数,鬼知道调用它的程序员在启动前会做什么。此外文档中提到双叉时需要注意在 POSIX 标准中,当一个多线程程序调用fork()
时,只有调用fork()
的线程被复制到子进程中;其他线程并不会在新创建的子进程中存在。所以你在调这函数之前如果玩了多线程操作,可能会丢线程然后玩脱。
然后发现,一个Server实例会有多个运行时。先是给每个服务调用start_service
,他就把刚才说的services遍历一遍,每个服务都创建一个异步运行时,刷个实例跑服务,然后返回runtime。
此外服务器自己也有个运行时,用来跑整个服务器的主循环pingora_core::Server.main_loop
,在主循环挡住,不然上面一堆服务的runtime还没来得及工作呢主协程就跑退出了。这里全是些接收信号然后安详退出/快速退出的逻辑,稀松平常。不过同样值得注意的是留了个信号位给热升级。
热升级
主要逻辑全都讲完了,现在我们来看看热升级的逻辑。这里主要是用了Unix的特性SCM_RIGHTS
来实现的。
SCM_RIGHTS
Send or receive a set of open file descriptors from
another process. The data portion contains an integer
array of the file descriptors.
根据猛男页的说法,这玩意就是用来进程间传fd的。考虑到端口监听也是fd,Pingora通过使用unix sock在老进程和新进程之间传递监听的fd来实现不停机升级。
先看服务器配置格式里的:
#![allow(unused)] fn main() { #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(default)] pub struct ServerConf { .. pub upgrade_sock: String, .. } pub struct Opt { .. pub upgrade: bool, .. } }
新老进程使用的配置文件当中,upgrade_sock
字段必须保持一致来递交fd,并且新进程要求upgrade
,才会进行热升级。
重新回到Server.bootstrap
,预热到了load_fds
这一步,就会开始找老登要fd。顺着get_from_sock
看,Fds
调用get_fds_from
从老登那把fd全拿过来,然后再反序列化装入自身。然后再在load_fds
把自己交给Server
做后面的操作。从这儿开始吧。
新老进程递交fd
递交fd的通信协议基本都在pingora_core::server::transfer_fd::Fds
里定义。主要get_fds_from
搭配send_fds_to
一起看。
新进程先在get_fds_from
拿到这个sock的地址,然后在监听之前就把文件系统上的老进程的sock文件删掉,自己监听个同名的。
这个时候向老进程送一个SIGQUIT
,Server.main_loop
里面tokio抓到信号,开始锁住自己监听的所有fd,然后调用send_to_sock
将自己的fd记录表序列化调用send_fds_to送给新进程。先连接,如果中间没连接成功,还会重试几次。连接上了就开始使用SCM_RIGHTS送fd。新进程接收到之后就把fd反序列化装进Fds
里了。
新进程将fd装载到服务
新进程到run_service时就会将整个程序所有的fds传入,到装载新的监听服务时fds跟入listening::Service<A>.start_service()
,进而跟入build
,在这里和新进程从rust定义的TransportStack
会合,之后一并传入对应的listen
函数。在这里,新进程定义的服务就会在老进程递交过来的fds表里挑,如果表里刚好找到了服务自己本来就想监听的地址,它就直接把这个fd拿走接着听了,就实现了平滑的过渡。
太长不看
本文的意义就是看一个高速网关服务的架构设计,所以我不知道到底想要什么结论,这里主要给一些只想看结论的人。
- 为什么性能这么高这么快?
因为这是用Rust写的。大概因为事件驱动用的tokio框架能相当充分榨干性能,而且内存分配器用jemalloc,数据结构用B树,地址哈希算法用Kentama和FNV。但这你该问跑benchmark和optimization的人,我只看了看网关核心设计。
- 生产环境可以用吗?
CF说他们生产跑得很稳。但它用rust定义规则再编出二进制执行程序,并且用sock进行二进制文件热更新,我觉得你但凡有点迟疑都说明你的企业担待不起。
而且目前只支持Linux。
- HTTP代理之类的功能呢?
我还没看,这篇文章主要是蹭热度,叫核心篇,只看核心,后面有空再看HTTP和TinyUFO之类的神奇组件。