Lock-Free 学习总结

Lock-Free

Lock-Free:

如果一个方法是 Lock-Free 的, 它保证线程无限次调用这个方法都能够在有限步内完成.

相比于传统的基于 Mutex 锁机制, Lock-Free 有下面的优势:

  • Lock-Free 的速度更快
  • 线程之间不会相互影响, 没有死锁
  • 不受异步信号影响, 可重入
  • 不会发生线程优先级反转

在通常使用 Mutex 互斥锁的场景, 有的线程抢占了锁, 其他线程则会被阻塞, 当获得锁的进程挂掉之后, 整个程序就 block 住了. 但在 Lock-Free 的程序中, 单个线程挂掉, 也不会影响其他线程, 因为线程之间不会相互影响.

但是, Lock-Free 也有不少缺陷:

  • 只能利用有限的原子操作, 例如 CAS (操作的位数有限), 编码实现复杂
  • 竞争会加剧, 优先级不好控制
  • 测试时需要考虑各种软硬件环境, 很难做到尽善尽美

再引入一个 Wait-Free 概念:

假如一个方法是 Wait-Free 的, 那么它保证了每一次调用都可以在有限的步骤内结束.

一般来说: 阻塞 > Lock-Free > Wait-Free

本文的讨论范畴在 Lock-Free, 如果涉及 Wait-Free 会单独说明.

CAS 原语

CAS (compare and swap) 是 CPU 硬件同步原语(primitive), CAS(V, A, B) 操作可以用下面的代码来示意:

template <class T>
bool CAS(T* addr, T expect_val, T val) {
    if (*addr == expect_val) {
        *addr = val;
        return true;
    }
    return false;
}

从 80486 开始, 所有的 Intel 处理器上, 通过一条汇编指令 CMPXCHG 即可实现 CAS 操作. CAS 的价值在于它是一个原子操作, 不会被 CPU中断或者其他方式打断, 因为在硬件层实现, 所以开销极小.

CAS 并不是一项新技术, 它的使用可以追溯到 70 年代, 早在 80 年代就有很多经典书籍中提到使用 CAS 来实现并行编程, 如 USC 大牛 Kai HWang 的 "Computer Architecture and Parallel Processing".

GCC 4.1+ 开始支持 CAS 的原子操作:

bool __sync_bool_compare_and_swap (type *ptr, type oldval, type newval)
type __sync_val_compare_and_swap (type *ptr, type oldval, type newval)

通常将 CAS 用于同步的方式是从地址 V 读取值 A, 执行多步计算来获得新值 B, 然后使用 CAS 将 V 的值从 A 改为 B, 如果 V 处的值尚未同时更改, 则 CAS 操作成功.

与 CAS 类似的还有:

在一些 CPU 上还实现了 DCAS (double CAS) 原语, 比如 Motorola 68040, DCAS 的语义可以用下面的代码来表示:

template <class T1, class T2>
bool DCAS(T1* p1, T2* p2, T1 e1, T2 e2, T1 v1, T2 v2) {
    if (*p1 == e1 && *p2 == e2) {
        *p1 = v1;
        *p2 = v2;
        return true;
    }
    return false;
}

还有一些 32 位 CPU 上实现了 64 位的 CAS, 也被成为 CAS2: 相比于 DCAS, CAS2 原语比较内存中两个相邻字长, 功能弱, 但是更通用.

Lock-Free FIFO

引用参考文献[3]中的一个基于链表的 FIFO 示例:

// 伪码说明: q^ 表示内存 q 处的值

// 链表示意
// dummy -> a -> b -> c
//   |                |
//   V                V
//  head             tail

// 从尾部出列
Enquque(x)
    q <- new record
    q^.value <- x
    q^.next <- NULL
    // 通过 CAS 操作保证将新节点加入到链表尾部
    repeat
        p <- tail
        succ <- CAS(p^.next, NULL, q)
    util succ = TRUE
    // 更新 tail, 不可能失败(当前线程对 p^.next 赋值, 相当于一把锁)
    CAS(tail, p, q)

// 从头部入列
Dequeue()
    repeat
        p <- head
        if p^.next = NULL
            error "queue empty"
    until CAS(head, p, p^.next)
    return p^.next

上面的实现会存在一个问题, 线程之前会相互影响, 即并非 Lock-Free:

  1. 当多个线程并发时, 只有一个线程能完成 CAS(p^.next, NULL, q), 其他线程只能等待这个线程完成 CAS(tail, p, q) 操作之后才能继续.
  2. 更坏的情况, 如果这个线程在 CAS(tail, p, q) 之前出现异常, 那么所有的线程都被挂住了.

文献[3]中提出了两种改进方案:

// 改进方案一: 严格保证 tail 总是指向链表尾部, 增加一步 CAS 操作
Enqueue(x)
    q <- new record
    q^.value <- x
    q^.next <- NULL
    // 通过 CAS 操作保证将新节点加入到链表尾部
    repeat
        p <- tail
        succ <- CAS(p^.next, NULL, q)
        // 其他线程并发操作, 尝试更新 tail (失败忽略)
        if succ != TRUE
            CAS(tail, p, p^.next)
    util succ = TRUE
    // 尝试更新 tail (失败忽略)
    CAS(tail, p, q)

// 改进方案二: tail 并不总是指向链表尾, 以一定的 CPU 开销来换取 Lock-Free
// 如果有 p 个并发线程, 则 tail 最多与实际链表尾部相差 2p - 1 个位置, 时间开销是可控的
Enqueue(x)
    q <- new record
    q^.value <- x
    q^.next <- NULL
    p <- tail
    old_p <- p
    // p 不一定是链表尾, 通过遍历保证操作合法
    repeat
        while p^.next != NULL
            p = p^.next
    until CAS(p^.next, NULL, q)
    // 尝试更新 tail (失败忽略)
    CAS(tail, old_p, q)

Lock-Free MAP

上一节中给出的都是伪代码的示例, 落地到实际的编程语言中, 需要更具象. 引用文献[1]的一个 WRRMMap (Write Rarely Read Many) 示例:

template <class K, class V>
class WRRM {
    Map<K, V>* map_;

public:
    // 查询无需加锁
    V LookUp(const K& k) {
        return (*map_)[k];
    }
    // 更新做一次全量拷贝, 并插入新数据
    void Update(const K& k, const V& v) {
        Map<K, V>* new_ = 0;
        Map<K, V>* old_;
        do {
            old_ = map_;
            if (new_) delete new_;
            new_ = new Map<K, V>(*old_);
            new_->insert(make_pair(k, v));
        } while (!CAS(&map_, old_, new_));
    }
    // 什么时候释放 old_ 资源?
};

上面例子中都存在一个问题, 即 MAP 的 Update() 操作中, 何时释放老的资源?

  • 如果是有自动 GC 机制的语言, 那就不需要纠结, 直接交给语言去处理就好了.
  • 如果需要手动回收资源, 例如 C++, 则需要周详考虑, 因为当准备 delete old 时, 可能还有其他线程在访问.

文献[1] 中对第 2 种情给出了两种解决方法:

  • 等待一段时间, 在 Update() 完成拷贝之后, 其他线程再访问 LookUp() 会访问新数据, 老数据的访问会越来越少, 直到没有. 但这并不是一个完备的方案, 而且有可能带来优先级反转.
  • 另外一个解决方法是对 map 的访问做引用计数, 并利用 DCAS (或者 CAS2) 保证同步安全, 并给出了一个 sample:
template <class K, class V>
class WRRM {
    // map <--> 访问的引用计数
    // map 和 ref-count 的内存是连续的, 所以使用 CAS2 原语
    typedef std::pair<Map<K, V>*, unsigned> Data;
    Data data_;

public:
    // CAS 操作保证引用计数正确增加
    V LookUp(const K& k) {
        Data old, fresh;
        do {
            old = data_;
            fresh = old;
            ++ fresh.second;    // 开始访问 引用计数自增
        } while (!CAS2(&data_, old, fresh));
        V result = (fresh.first)[k];
        do {
            old = data_;
            fresh = old;
            -- fresh.second;    // 访问完毕 引用计数自减
        } while (!CAS2(&data_, old, fresh));
        return result;
    }

    // 原文献中 引用计数的 CAS 阈值设置为 1 (old.second = 1)
    // 但是从代码理解与实现角度来看 0 会更合适一些
    // 只有在 data 的引用计数 == 0 时, 才完成 update, 并释放原来的 map 资源
    void Update(const K& k, const V& v) {
        Data old, fresh;
        old.second = 0;
        fresh.first = NULL;
        fresh.second = 0;
        // 原文献这里还有一处临时变量的优化, 这里忽略掉了, 不影响示范
        do {
            old.first = data_.first;
            if (fresh.first) delete fresh.first;
            fresh.first = new Map<K, V>(old.first);
            fresh.first->insert(std::make_pair(k, v));
        } while (!CAS2(&data_, old, fresh));
        delete old.first;
    }
};

上面的例子虽然解决了垃圾回收的问题, 但还存在一定的缺陷:

在 Update() 时, 如果一直有线程在 LookUp(), 引用计数一直大于 0, 那么 Update() 线程会一直处于等待的状态, 还有可能导致线程饿死. 所以, WRRMMap 成了一个 Write-Rarely-Read-Many-But-Not-Too-Many 的 map.

针对上述问题, 文献[1]的作者在文献[2]又中提出了一种更完善的解决方案: Hazard 链表.

  • 全局维护一份 hazard 链表, LookUp() 过程中将 map 指针挂载上去, 访问完毕再移除.(一定程度上相当于做了引用计数).
  • 每个线程维护一份待释放的 garbage 链表, 每次 Update() 之后将要释放的 old map 挂载上去.
  • per 线程的 garbage 链表在一定时机做回收检查, 如果数据在 hazard 链表中不存在, 即不再被访问, 即可以安全释放.

Hazard 链表能解决的问题在于:

  • LookUp() 和 Update() 操作都是真正的 Lock-Free, 读/写线程都不会受其他线程的影响, 保持各自独立的行为.
  • 额外引入的数据量(全局的 Hazard 链表, 每个线程各自的 garbage 链表)不算大, 链表也可以被 O(1) 的哈希表代替, 效率比较高.
  • 这篇文献中使用了 Map 来做示范, 实际上可以很容易的扩展到其他类型的数据结构.
  • 如果要扩展到 Wait-Free, 可以更进一步考虑陷阱技术(trap), 这里不深入讨论.

代码示意: (为了方便理解, 稍微做了一些调整)


// 全局的 hazard 链表
class Hazard;
static Hazard* hazard_head;
static int hazard_len;

// 每个线程的 garbage 链表
__per_thread__ vector< Map<K, V>* > garbage_list;
static const size_t GARBAGE_THRESHOLD = 4;

class Hazard {
    Hazard* next_;
    int active_;
public:
    void* hazard_;

    static Hazard* Acquire() {
        Hazard* p = hazard_head;

        // 从 hazard 链表出找到一个空闲的, 并将 active 置为 1
        for (; p; p = p->next_) {
            if (p->active_ || !CAS(&p->active_, 0, 1)) {
                continue;
            }
            return p;
        }

        // 现有的找不到, 创建一个
        // 先修改链表长度
        int old_len;
        do {
            old_len = hazard_len;
        } while (!CAS(&hazard_len, old_len, old_len + 1));

        // 再创建后插入链表
        p = new Hazard;
        p->active_ = 1;
        p->hazard_ = NULL;
        Hazard* old;
        do {
            old = hazard_head;
            p->next = old;
        } while (!CAS(&hazard_head, old, p));

        return p;
    }

    static void Release(Hazard* p) {
        // 重置 active 标记
        p->hazard_ = NULL;
        p->active = 0;
    }
};

// 垃圾回收
void GarbageCollect(Hazard* head) {
    // 遍历 hazard 链表, 排序
    std::vector<void*> hazards;
    while (head) {
        void* p = head->hazard_;
        if (p) hazards.push_back(p);
        head = head->next_;
    }
    std::sort(hazards.begin(), hazards.end(), std::less<void*>());

    // 遍历当前线程的 garbage 链表 检索
    std::vector<Map<K, V>*>::iterator it = garbage_list.begin();
    while (it != garbage_list.end()) {
        // 如果不在 hazard 链表中: 即不在被访问, 可以安全删除
        if (!std::binary_search(hazards.begin(), hazards.end(), *it)) {
            delete *it;
            // 一个小trick, 与链表尾部做一下交换
            if (&*it != &garbage_list.back()) {
                *it = garbage_list.back();
            }
            garbage_list.pop_back();
        } else {
            ++ it;
        }
    }
}

// Lock-Free 的数据结构 Write-Rarely-Read-Many-Map
template<class K, class V>
class WRRM {
    Map<K, V>* map_;

public:
    V LookUp(const K& k) {
        Hazard* record = Hazard::Aquire();

        // 保证挂载到 hazard 链表中的 与 正在访问的 map_ 指针一致
        // 避免同步问题
        Map<K, V>* ptr;
        do {
            ptr = map_;
            record->hazard_ = ptr;
        } while (map_ != ptr);

        V result = (\*ptr)[k];
        Hazard::Release(record);
        return result;
    }

    void Update(const K& k, const V& v) {
        Map<K, V>* old_map, new_map = 0;
        do {
            old_map = map_;
            if (new_map) delete new_map;
            new_map = new Map<K, V>(*old_map);
            new_map[k] = v;
        } while (!CAS(&map_, old_map, old_map));

        // 这里回收 old_map 资源
        garbage_list.push_back(old_map);
        if (garbage_list.size() >= GARBAGE_THRESHOLD) {
            GarbageCollect(hazard_head);
        }
    }
};

Lock-Free Queue

CodeProject 上有一篇关于基于数组的 Lock-Free 循环队列的参考文章[4], 直接看示例代码(为了篇幅简洁和方便理解做了一些修改):


// 仅示意 不要在意语法细节
templaye <typename T, typename SIZE>
struct Queue {
    // 循环数组
    T* data_[SIZE];

    // 三组游标控制
    volatile uint32_t read_index_;
    volatile uint32_t write_index_;
    volatile uint32_t read_max_index_;
};

// 下标映射
template <typename T, uint32_t SIZE>
inline uint32_t Queue<T, SIZE>::index(uint32_t count) {
    return (count % SIZE);
}

// 插入数据
template <typename T>
bool Queue<T>::push(const T &data) {
    uint32_t read_index, write_index;
    do {
        read_index = read_index_;
        write_index = write_index_;
        // 队列已满
        if (index(write_index + 1) == index(read_index)) {
            return false;
        }
    // 先占位, 修改 write_index_ 游标
    } while (!CAS(&write_index_, write_index, (write_index + 1)));

    data_[index(write_index)] = data;

    // 修改 read_max_index_ 游标
    // 这里有可能需要等待其他 producer 操作完成, 所以如果操作失败, 可以 yield 一段时间
    while (!CAS(&read_max_index_, write_index, (write_index + 1))) {
        sched_yield();
    }
    return true;
}

// pop 数据
template <typename T>
bool Queue<T>::pop(T &data) {
    uint32_t read_max_index, read_index;
    do {
        read_index = read_index_;
        read_max_index = read_max_index_;
        // 队列为空
        if (index(read_index) == index(read_max_index)) {
            return false;
        }

        data = data_[index(read_index)];

        // 修改 read_index_ 游标
        if (CAS(&read_index_, read_index, (read_index + 1))) {
            return true;
        }
    } while(1);
    return false;
}

上面的循环队列是一个很实用的例子, 但很遗憾, 确并非一个符合"Lock-Free"标准的设计, 不同线程之间会相互影响:

线程 T1 执行 push(), 在 CAS(&writeindex, XX, XX) 之后给 data 赋值时挂了, 对其他生产者线程来说, CAS(&read_maxindex, XX, XX) 游标再也不可能成功, 进程会被卡住, 这已经违背了 Lock-Free 的定义.

幸运的是, 在只有单个生产者时, read_maxindex 退化为 writeindex, 会减少一次 CAS 操作, 这是 Queue 会进化为 Lock-Free 的完全体, 并提高效率. 退化后的代码参考如下:

// 插入数据
template <typename T>
bool Queue<T>::push(const T &data) {
    uint32_t read_index, write_index;
    do {
        read_index = read_index_;
        write_index = write_index_;
        // 队列已满
        if (index(write_index + 1) == index(read_index)) {
            return false;
        }
    // 先占位, 修改 write_index_ 游标
    } while (!CAS(&write_index_, write_index, (write_index + 1)));
    data_[index(write_index)] = data;
    return true;
}

// pop 数据
template <typename T>
bool Queue<T>::pop(T &data) {
    uint32_t read_max_index, read_index;
    do {
        read_index = read_index_;
        read_max_index = write_index_;
        // 队列为空
        if (index(read_index) == index(read_max_index)) {
            return false;
        }
        data = data_[index(read_index)];
        // 修改 read_index_ 游标
        if (CAS(&read_index_, read_index, (read_index + 1))) {
            return true;
        }
    } while(1);
    return false;
}

此时, 即便生产者线程(仅有一个) push() 操作意外失败, 带来的影响也仅仅是那个 index 上的数据错误, 不影响整个进程的演进, 可以根据应用场景来决定处理策略: 恢复生产者, 或者告警退出.

CAS 的 ABA 问题

ABA 问题描述:

  • 切换到线程 T1, 获取内存 V 的值 A
  • 切换到线程 T2, 获取内存 V 的值 A, 修改成 B, 然后再修改成 A
  • 切换到线程 T1, 获取内存 V 的值还是 A, 继续执行

coolshell 上有篇文章给出了一个生动的例子(From 维基百科):

你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了.

更具有参考意义的是Hazard Pointer Wiki上提到的一个 Lock-Free 堆栈的例子:

  • 当前栈元素 [A, B, C], 栈顶 head 指向 A
  • 线程 T1 执行 pop() 准备 CAS(&head, B, A)
  • 线程 T2 抢占, pop A, pop B, 然后 push A
  • 线程 T1 恢复, CAS(&head, B, A) 成功, 则此时 head 指向一个被 pop 的元素 B

借鉴无锁 MAP 的设计, ABA 问题也可以通过引用计数来解决: 对内存 V 做一个引用计数 Ref, 当修改 V 的值时, Ref 自增, compare 会同时比较 V 和 Ref, 当 值都一致时才能做 swap.

引用计数的解决方法需要 DCAS 原语支持, 或者保证 V 和 Ref 在内存中连续的前提下 CAS2 原语支持, 不过这样也会带来编码的设计的复杂度.

DCAS不是无锁算法设计的银弹.

参考文章

  1. Lock-Free Data Structures, Andrei Alexandrescu, 2004
  2. Lock-Free Data Structures with Hazard Pointers, Andrei Alexandrescu, Maged Michael, 2004
  3. Implementing Lock-Free Queues, John D, Valois, 1994
  4. Yet another implementation of a lock-free circular array queue, Faustino Frechilla, 2011