利用共享内存实现进程间通信

共享内存通信的好处

进程间通信有很多的选择,socket,共享内存,管道等等。

在设计网络游戏服务器的时候,一个比较常见的做法是在同一台机器上将网络接入和游戏逻辑分成不同进程,然后通过共享内存去通信.

为什么是共享内存呢?

共享内存可以提供适度的恢复机制(相当于一个缓冲池), 在服务进程失败重新拉起时, 可以继续从共享内存中读取通信消息, 对终端玩家来说是无感的. 网络通信、或者管道则不行.

这也可以用来做重启进程的热更新保障.

LSI 实现

这是一个早期实现, 代码 在这里.

LSI 是指 lockfree shm ipc 的缩写, 就是指无锁共享内存通信.

设计目标:

  • 提供工具,可以根据配置文件管理共享内存通道,包括创建、删除、查看状态等;这个通道被设计成 1 对 1 的,如果要支持 1 对多,必须创建多个通道;
  • 提供一组 api,进程可以直接调用,写或者读数据到共享内存通道;api 不是线程安全的,如果在多线程中要调用 api,需要自己保证锁;

在每一台物理机器上创建 LSI 时,需要读取 LSI 配置文件. 下面的配置文件是一个简单的例子,它配置了两条通道,分别是 191.1.0.2 <--> 191.2.0.3 和 191.1.0.5 <--> 191.5.0.1。所有的通道需要指定一个专门的 shmkey,在程序内调用 api 时会用到这个 key。

<?xml version="1.0" encoding="gbk" standalone="yes" ?>
<LsiCfg>
    <ShmKey>2012</ShmKey>
    <ChannelCount>2</ChannelCount>
    <Version>0</Version>
    <Channel>
            <Address>191.1.0.2</Address>
            <Address>191.2.0.1</Address>
            <!--size will be round up with 2^n by LSI tool-->
            <Size>1024000</Size>
    </Channel>
    <Channel>
            <Address>191.1.0.5</Address>
            <Address>191.5.0.1</Address>
            <!--size will be round up with 2^n by LSI tool-->
            <Size>1024000</Size>
    </Channel>
</LsiCfg>

如何创建和使用 LSI

源码编译后在 bin 目录下生成工具lsi,./lsi --conf=xxx init|status|clean 即可. init 创建,status 查看窗台,clean 清除.

LSI 的接口 api 如下,具体说明可以看注释。

// 创建LSI句柄
// lsi_id是配置文件中的shmkey
// addr是进程的地址,即191.1.0.2形式的地址,可以通过lsi_addr_aton转化得到
// lsi_version是配置文件中指定的版本
LSI* lsi_init(lsi_id_t lsi_id, lsi_ip_t addr, int lsi_version);

// 释放LSI句柄资源
int lsi_release(LSI* lsi);

// 发送数据到对方
// to:对端的地址
// 返回 0: 发送成功
// 返回 LSI_NoChannel: 没有该通道
// 返回 LSI_ChannFull: 通道已满
// 返回 LSI_Fail: 其他失败
int lsi_send(LSI* lsi, lsi_ip_t to, const char* send_buf, size_t buf_len);

// 从对端接收数据
// 返回 > 0: 接收的字节数
// 返回 LSI_NoChannel: 没有该通道
// 返回 LSI_ChannEmpty: 通道是空的,没有字节
// @buf_len: 输入表示缓冲区长度,输出为接收的字节
int lsi_recv(LSI* lsi, lsi_ip_t from, char* recv_buf, size_t* buf_len);

// 转化地址,从lsi_ip_t到字符串形式
const char* lsi_addr_ntoa(lsi_ip_t lsi_addr);

// 转化地址,从字符串到lsi_ip_t形式
lsi_ip_t lsi_addr_aton(const char* lsi_addr_str);

LSI 的缺点

LSI 的组织方式,存在一个天然的缺点:所有的 lsi 通道都是预分配的,这就意味着如果需要新加进程节点(这在运营环境下很正常), 要修改配置, 并且如果不做额外处理的话, 老的节点还无法感知新的 lsi 通道.

BUS 实现

代码开源在这里,并提供了一个 echo sample,测试运行正常.

设计目标

  • bus 由一块路由表和若干通信通道组成.
  • 每一个使用 bus 的实例,我们称之为一个 bus terminal.
  • bus 通道是一个 terminal 到 terminal 的共享内存通信通道,单向的.
  • bus 路由表中维护了两个数组:
    • 所有注册的 bus terminals 数组(地址), 预留大小 64.
    • 所有已分配的 bus 通道数组(通道的共享内存 key,from、to 的地址、通道 size 等信息), 预留大小 1024.
    • bus 路由表保存在共享内存中,每一个 terminal 都会 attach 到它,并有可能修改它,bus 路由表的构成如下图所示.
  • bus 通道会随着进程的通信需求按需分配,从而实现动态增长,不再需要预分配.

terminal 的工作流程描述

  • bus terminal 尝试 attach bus 路由表, 如果没有路由表(第一次),就创建一块:
    • 如果路由表中还没有自己的地址,注册自己的地址到路由表中;同时保存所有的 terminals 列表到本地.
    • 读取 bus 通道列表,如果与自己相关,就 attach 这一块 bus 通道,并记录到链表(recv bus 链表和 send bus 链表).
  • bus terminal a 需要向 bus terminal b 发消息时,先检查 bus 通道列表是否存在 a->b 的 bus 通道,如果没有则创建一块,同时写到路由表中,再向通道发消息.
  • bus terminal 需要定时去检查路由表,如果发现版本有更新时,则需要重新读取路由表,更新本地的 bus 通道列表和 terminal 列表.

bus 的适用场景

在同一台物理机上的多进程系统,偶尔会需要新加点进程(比如负载不够的情况)。但是绝不适合频繁变更的情况,因为 bus 中,所有已注册的信息,包括 terminal 和通道,都不会删除,除非清掉共享内存。这也符合设计的初衷:利用共享内存的特性,在进程 coredump 或者其他异常情况下,保证 bus 通道中消息不会丢,在一定程度上提供容灾能力。

每一个 bus terminal 在启动时,需要指定自己的 bus 地址和一个提前约定的全局 bus key。

注意,bus terminal 的 bus 地址不能重复,代码中没有对 bus 重的情况做检查,因为比较难界定是不是真正的重复了,还是进程 coredump 后重新拉起。这里个需要使用者自己保证。

关于 bus 中的锁

在 bus 中,每一个 bus terminal 都有可能去读写 bus 路由表,为了保证同步,每次读写 bus 路由表时,需要加进程锁(信号量实现)。

这个加锁的情况很少:只有在当前网络中增加 bus 通道,或者增加 bus 节点时,才会有路由信息的变更,而且大部分会集中在系统启动之初。

每一次 tick 检查更新则是通过比较版本号来实现的,无需加锁(版本号的修改都是原子操作)。

所以认为这个锁的开销是可以接受的(代码比较简单,也是一个原因)。

bus api 介绍

#ifndef BUS_TERMINAL_H_
#define BUS_TERMINAL_H_

#include "core/os_def.h"

#define BUS_MAX_CHANNLE_COUNT 1024
#define BUS_MAX_TERMINAL_COUNT 64

typedef int bus_addr_t;

#define bus_addr_type(addr) (addr >> 16)
#define bus_addr_id(addr) ((addr << 16) >> 16)

struct bus_terminal_t;

enum {
    bus_err_peer_not_found = -100,
    bus_err_send_fail,
    bus_err_recv_fail,
    bus_err_peek_fail,
    bus_err_channel_full,
    bus_err_channel_fail,
    bus_err_empty,
    bus_err_fail,
    bus_ok = 0,
};

// 初始化bus路由表,key: 16 bits, 预留16 bits给bus通道
struct bus_terminal_t* bus_terminal_init(int16_t key, bus_addr_t ba);

void bus_terminal_release(struct bus_terminal_t* bt);

// 检查bus路由表版本,如果版本变更,则做更新
void bus_terminal_tick(struct bus_terminal_t* bt);

// bus消息发送:指定地址发送、指定类型广播、全广播;
int32_t bus_terminal_send(struct bus_terminal_t* bt, const char* buf,
                          size_t buf_size, bus_addr_t to);
int32_t bus_terminal_send_by_type(struct bus_terminal_t* bt, const char* buf,
                                  size_t buf_size, int bus_type);
int32_t bus_terminal_send_all(struct bus_terminal_t* bt, const char* buf,
                              size_t buf_size);

// bus消息接收:指定地址接收、全接收;
int32_t bus_terminal_recv(struct bus_terminal_t* bt, char* buf,
                          size_t* buf_size, bus_addr_t from);
int32_t bus_terminal_recv_all(struct bus_terminal_t* bt, char* buf,
                              size_t* buf_size, bus_addr_t* from);

// bus通道的统计信息
uint32_t bus_terminal_send_bytes(struct bus_terminal_t* bt, bus_addr_t to);
uint32_t bus_terminal_recv_bytes(struct bus_terminal_t* bt, bus_addr_t from);

// bus的统计信息debug
void bus_terminal_dump(struct bus_terminal_t* bt, char* debug,
                       size_t debug_size);
#endif

共享内存的一个 tip

之前在 Mac OS 上测试时,碰到了共享内存分配失败,errno=22,invalid argument,后来发现是共享内存的默认配置 shmmax 的原因,默认是 4m,sysctl -w kern.sysv.shmmax=268435456,修改为 256M 就 ok 了。

这个可以参考这篇文章