kids 多线程实现

April 4, 2014

Overview

Kids (已开源)是一个消息订阅与发布系统。本文会先简单介绍下 kids 的架构,然后主要介绍 kids 的 多线程实现。

架构

整体架构上(架构图如下),采用 agent/server 模型,agent 收到消息后,根据配置的 store ,可以存本地,可以转发给上游,可以转发给多个上游等,比较灵活。配置文件的词法解析用的 ragel,语法解析用的 lemon。整个代码仓库,从 redis 中『借鉴』较多,包括字符串处理sds.c,网络库 deps/ae

image

多线程实现

本节会先讲主线程和工作线程的分工,然后讲工作线程间的负载均衡,最后讲线程间的通知机制。

主线程和工作线程的分工

分工图如下:

image

主线程,负责 accept 连接,一个信号线程,负责处理信号,多个工作线程,负责处理客户连接。

主线程在创建工作线程前屏蔽了常见信号SIGINTSIGTERMSIGPIPESIGUSR1(用于旋转日志),SIGHUP(reload 配置文件),并创建信号线程,从而所有信号都由信号线程处理,信号线程调用sigwait等待信号。

主线程收到连接后,用 round robin 策略 分配给工作线程,分配方式上每个工作线程都有一个连接队列,把 fd 放到连接队列,通过线程的通知机制来告诉工作线程有新连接来了。

工作线程采用 epoll + nonblock 实现。每个工作线程都有自己的 epoll 和自己的通知 fd,所有套接字都设为非阻塞,注册到 epoll 里,当套接字可读或可写时执行回调函数,在回调函数里根据程序逻辑,可能会取消或注册新的事件到 epoll。工作线程的具体实现中,必须保证每个套接字得到公平待遇,这是通过限制一次套接字读写事件处理的字节数来实现的。

工作线程间的负载均衡

Kids 工作线程间负载均衡的方法是采用主线程 accept 连接,然后把套接字传给工作线程。其他的均衡方法有:

线程间的通知机制

在有新连接来了,以及工作线程收到一个 publish 的消息后,都需要通知其他线程来处理,这是通过一个通知机制来进行的。

每个工作线程,为新连接通知、publish 消息通知分别建立了一个管道(采用 socketpair 实现),管道的读端注册到 epoll 里,通知一个线程时,只需要往管道的写端写一个字节,读端就会处理。同时,对其他每个工作线程,都建立了互相独立的管道,以防止多线程同时写一个套接字的情况。通知的其他实现方式包括:

必须考虑的问题是管道缓冲区满了怎么办。目前的做法是用msg_wait_to_notify_保存暂时没通知到位的消息数量,在Worker::Cron里会尝试去完成这些通知。也可以只通知一次,工作线程接收到通知后,把全局队列里所有新消息都消费完毕,而不是一次通知,只消费一个。这样可能会使订阅客户的响应缓冲区满,因为在一次事件里publish了好多消息。另外,也必须处理接到通知,却没新消息的情况,因为新消息可能被上一次通知给提前消费掉了。这样就可以不管缓冲区满的情况了。除了这种做法,也可以模仿信号里的做法。每个工作线程都有一个变量has_message表示有消息,publish消息时,如果has_message为0则通知工作线程,为1表示已通知,就不用write了。在工作线程处理新消息的事件里,清has_message。同时一直读全局队列读到尾部。