操作系统之 信号量

信号量(Semaphore)

操作系统提供的一种协调共享资源访问的方法 由 Dijkstra 提出

  • 软件同步是平等线程间的一种同步协调机制
  • OS是管理者 地位高于进程
  • 用信号量表示系统资源数量
  • 早期操作系统主要同步机制(现在很少用)
  • 信号量是一种抽象数据类型
    • 由一个整型变量(sem)和两个原子操作组成
    • P() sem - 1 若 sem < 0 进入等待 否则继续
    • V() sem + 1 若 sem <= 0 唤醒一个等待进程

Synchronization_method_semaphore

信号量的特性

信号量是 被保护 的整数变量

  • 初始化完成后 只能通过 P() 和 V() 操作修改
  • 由操作系统保证 PV操作是原子操作
  • P() 可能阻塞(没有资源 处于等待状态)
  • V() 不会阻塞(只会释放资源 唤醒等待状态的进程)

通常假定信号量是 公平的

  • 线程不会被无限期阻塞在 P() 操作
  • 假定信号量等待按先进先出排队

信号量的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Semaphore {
int sem;
WaitQueue q;
}
Semaphore::P() {
sem--;
if (sem < 0) {
Add this thread t to q;
block(p);
}
}
Semaphore::V() {
sem++;
if (sem <= 0) { // +1 后 还小于等于 0 说明前面有进程还在等待
Remove a thread t from q;
wakeup(t);
}
}

信号量分类

  • 二进制信号量 资源数目为 0 或 1
  • 资源信号量 可为任何非负值
  • 两者等价 基于一个可以实现另一个

信号量的使用

  • 互斥访问
    • 临界区的互斥访问控制
  • 条件同步
    • 线程间等事件等待

信号量实现临界区互斥访问

每类资源设置一个信号量 初值为 1

1
2
3
4
5
6
7
8
9
10
mutex = new Semaphore(1);
------------------------
mutex->P();
/* 信号量 - 1 = 0 可以进入
若第二个进程也想进入
信号量 - 1 = -1 在这里进入等待状态 */
Critical Section;
mutex->V();
/* 信号量 + 1 = 0 说明有第二个线程还在等待状态
唤醒 第二个线程 */
  • 必须成对使用 P() 和 V() 操作
    • P() 保证互斥访问临界资源
    • V() 在使用后释放临界资源
    • PV操作不能次序错误 重复 遗漏

信号量实现条件同步

条件同步设置一个信号量 初值为 0(事件出现 + 1)

1
condition = new Semaphore(0);

线程A

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...M...

condition->P();
/*
若此时线程B还未执行完X模块
那么此时信号量 就会由 0 - 1 = -1
进入等待状态 等候 线程B执行完X模块 执行V()操作
将其唤醒

若此时线程B 已经执行完X模块
那么此时信号量 就会由 0 + 1 = 1
则线程A直接继续往下执行
*/

...需要线程B执行完 X模块...

线程B

1
2
3
4
5
...X...

condition->V();

...Y...

生产者消费者问题

有界缓冲区的生产者消费者问题

  • 一个或多个生产者在生成数据后放在一个缓冲区里
  • 单个消费者从缓冲区取出数据处理
  • 任何时刻 只能有一个 生产者或消费者可访问缓冲区

Producer-consumer_problem

信号量解决生产者消费者问题
  • 任何时刻只能有一个线程操作缓冲区 (互斥访问)
  • 缓冲区空时 消费者必须等待生产者 (条件同步)
  • 缓冲区满时 生产者必须等待消费者 (条件同步)

用信号量描述每个约束

  • 二进制信号量(mutex) 描述 互斥关系
  • 资源信号量(fullBuffers) 描述 缓冲区有数据
  • 资源信号量(emptyBuffers) 描述 缓冲区有位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Class BoundedBuffer {
mutex = new Semaphore(1);
fullBuffers = new Semaphore(0);
emptyBuffers = new Semaphore(n);
}
// 生产者
BoundedBuffer::Deposit(c) {
emptyBuffers->P();
// 检查是否有空缓存区 emptyBuffers 信号量 - 1

mutex->P(); // 互斥访问临界区
Add c to the buffer;
mutex->V();

fullBuffers->V();
// 写入了一个数据到缓冲区 fullBuffers + 1
}
// 消费者
BoundedBuffer::Remove(c) {
fullBuffers->P();
// 检查缓冲区是否有数据 fullBuffers + 1

mutex->P(); // 互斥访问临界区
Remove c from buffer;
mutex->V();

emptyBuffers->V();
// 取出缓冲区的内容 emptyBuffers 信号量 + 1
}

使用信号量的困难

  • 读/开发代码比较困难
    • 需要能运用信号量
  • 容易出错
    • 使用的信号量被另一个线程占用
    • 忘记释放信号量
  • 不能处理死锁

管程(Monitor)

用于多线程互斥访问 共享资源 的程序结构

  • 采用面向对象方法 简化了线程间的同步控制
  • 任一时刻最多只有一个线程执行管程代码
  • 正在管程中的线程 可临时放弃 管程的互斥访问 等待事件出现时恢复

Synchronization_method_monitor

管程的使用

  • 在对象/模块中 收集相关共享数据
  • 定义访问共享数据的方法

管程的组成

  • 一个锁
    • 控制管程代码的互斥访问
  • 0或者多个条件变量(0个条件变量 则和 信号量差不多)
    • 管理共享数据的并发访问

Monitor

条件变量(Condition Variable)

管程内的等待机制

  • 进入管程的线程因资源被占用而进入等待状态
  • 每个条件变量表示一种等待原因 对应一个等待队列
  • Wait()操作
    • 将自己阻塞在等待队列中
    • 唤醒一个等待者或释放管程的互斥访问
  • Signal()操作
    • 将等待队列中的一个线程唤醒
    • 如果等待队列为空 则等同于空操作
条件变量实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Class Condition {
int numWaiting = 0; // 等待线程数
WaitQueue q;
}
Condition::Wait(lock){
numWaiting++; // 有线程处于等待状态
Add this thread t to q;
release(lock); // 释放管程的访问权
schedule(); //need mutex
require(lock); // 请求管程访问权
}
Condition::Signal(){
if (numWaiting > 0) {
// 如果等待队列为空 则为空操作
Remove a thread t from q;
wakeup(t); //need mutex
numWaiting--;
}
}

管程解决生产者消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class BoundedBuffer {
Lock lock;
// 管程入口的锁
int count = 0;
// 缓冲区里的个数
Condition notFull, notEmpty;
// 条件变量
}
BoundedBuffer::Deposit(c) {
lock->Acquire();
// 管程进入申请

/*
用信号量解决的时候 不能把检查条件放在申请互斥操作的后面 是因为会产生死锁 因为都申请了互斥操作了 别的线程不可读不可写 当前线程只能一直在那等

而用管程不同 它可以放弃管程的互斥访问权限 调度到其他线程去
*/
while (count == n)
notFull.Wait(&lock);
// 判断缓冲区是否已经满了 满了就等待条件满足 再继续执行

Add c to the buffer;
count++;

notEmpty.Signal();

lock->Release();
// 管程释放
}

BoundedBuffer::Remove(c) {
lock->Acquire();

while (count == 0)
notEmpty.Wait(&lock);

Remove c from buffer;
count--;

notFull.Signal();
// 将等待线程中的其中一个唤醒

lock->Release();
}

管程条件变量的释放处理方式

  • Hansen管程(当前正在执行的线程更优先)
    • 用于真实OS或Java中
  • Hoare管程
    • 多见于教材

Hansen管程

当前正在执行的线程优先级更高 因此线程连续执行 所以其效率更高

  • 条件变量释放仅是一个提示 需要重新检查条件 用 while

因为在另一个线程中 它释放了条件变量 但是它还是在继续往下执行 有可能在这段期间 条件变量发生变化

1
2
3
4
5
6
7
8
9
10
Hansen_style::Deposit(){
lock->acquire();
while (count == n) { // 这里是 while
notFull.wait(&lock);
}
Add thing;
count++;
notEmpty.signal();
lock->release();
}

Hansen_monitor

Hoare管程

等待条件变量的优先级更高

  • 条件变量释放同时表示放弃管程访问
  • 释放后条件变量的状态可用
1
2
3
4
5
6
7
8
9
10
Hoare_style::Deposit(){
lock->acquire();
if (count == n) { // 这里是 if
notFull.wait(&lock);
}
Add thing;
count++;
notEmpty.signal();
lock->release();
}

Hoare_monitor

哲学家就餐问题

5个哲学家围绕一张圆桌坐

  • 桌子放5支叉子
  • 每两个哲学家之间放一支
  • 哲学家动作包含思考和就餐
    • 进餐时需同时拿到左右两边两把叉子
    • 思考时将两支叉子放回原处
  • 要求不出现有人永远拿不到叉子

Dining_philosophers_problem

信号量解决哲学家就餐问题

方案1

  • 会发生死锁!
  • 当五个哲学家同时拿起左边的叉子 所有人都在等待别人放下右边的叉子
1
2
3
4
5
6
7
8
9
10
11
12
13
#define N 5 // 哲学家个数
semaphore fork[5]; // 5支叉子的信号量初值为1
void philosopher(int i) {
// 哲学家编号:0 - 4
while (ture) {
think();
P(fork[i]); // 拿左边的叉子
P(fork[(i + 1) % N]); // 拿右边的叉子
eat();
V(fork[i]); // 放下左边的叉子
V(fork[(i + 1) % N]); // 放下右边的叉子
}
}

方案2

  • 互斥访问正确
  • 每次只能有一个哲学家进餐 性能差
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#define N 5 // 哲学家个数
semaphore fork[5]; // 信号量初值为1
semaphore mutex; // 互斥信号量,初值1
void philosopher(int i)
// 哲学家编号:0 - 4
while(TRUE) {
think();
P(mutex); // 进入临界区 只有一个哲学家能就餐

P(fork[i]); // 去拿左边的叉子
P(fork[(i + 1) % N]);// 去拿右边的叉子
eat();
V(fork[i]); // 放下左边的叉子
V(fork[(i + 1) % N]); // 放下右边的叉子

V(mutex); // 退出临界区
}

方案3

  • 根据编号不同 采取不同动作 避免都拿到一部分资源构造环路的情况
  • 没有死锁 并允许多人同时就餐
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#define N 5 // 哲学家个数
semaphore fork[5]; // 信号量初值为1
void philosopher(int i) // 哲学家编号:0 - 4
while(TRUE) {
think();

if (i % 2 == 0) {
// 偶数 先拿左 后拿右 奇数 先拿右 后拿左
P(fork[i]); // 去拿左边的叉子
P(fork[(i + 1) % N]); // 去拿右边的叉子
} else {
P(fork[(i + 1) % N]); // 去拿右边的叉子
P(fork[i]); // 去拿左边的叉子
}
eat();

V(fork[i]); // 放下左边的叉子
V(fork[(i + 1) % N]); // 放下右边的叉子
}

读者写者问题

共享资源的两类使用者

  • 读者 只读取数据 不修改
  • 写者 读取和修改数据
  • 读读允许
    • 同一时刻允许有多个读者同时读
  • 读写互斥
    • 没有写者时 读者才能读
    • 没有读者时 写者才能写
  • 写写互斥
    • 没有其他写者时 写者才能写
信号量解决读者写者问题

此方案 读者优先

  • 信号量WriteMutex
    • 控制读写操作互斥
    • 初始化为 1
  • 读者计数Rcount
    • 正在进行读操作的读者数目
    • 初始化为 0
  • 信号量CountMutex(对 Rcount 进行保护)
    • 控制对读者计数的互斥修改
    • 初始化为 1

Write线程

1
2
3
P(WriteMutex); // 读写互斥
write();
V(WriteMutex);

Reader线程

1
2
3
4
5
6
7
8
9
10
11
12
13
P(CountMutex); // 保护 Rcount
if (Rcount == 0)
P(WriteMutex);
// 只在第一个读者 开启读写互斥
++Rcount; // 加一读者
V(CountMutex);
read();
P(CountMutex);
--Rcount;
if (Rcount == 0)
V(WriteMutex);
// 没有读者了 让写者有机会访问临界区去写
V(CountMutex);
读者写者问题的优先策略
  • 读者优先策略
    • 只要有读者正在读状态 后来的读者都能直接进入
    • 若读者持续不断进入 则写者就处于饥饿
  • 写者优先策略
    • 只要有写者就绪 写者应尽快执行写操作
    • 若写者持续不断就绪 则读者就处于饥饿
管程解决读者写者问题

管程的状态变量

1
2
3
4
5
6
7
AR = 0; // 当前读的读者
AW = 0; // 当前写的写者
WR = 0; // 等待读的读者
WW = 0; // 等待写的写者
Lock lock;
Condition okToRead;
Condition okToWrite;

读者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Database::Read() {
// Wait until no writers;

StartRead();
read database;
// check out – wake up waiting writers;
DoneRead();
}
Database::StartRead() {
lock.Acquire();
while ((AW+WW) > 0) {
// 只要有写者就等 说明写者优先
WR++;
okToRead.wait(&lock);
WR--;
}
AR++;
lock.Release();
}
Database::DoneRead() {
lock.Acquire();
AR--;
if (AR == 0 && WW > 0) {
// 当前没有读者且有等待写的写者 则唤醒 写者
okToWrite.signal();
}
lock.Release();
}

写者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Database::Write() {
// Wait until no readers/writers;

StartWrite();
write database;
// check out-wake up waiting readers/writers;
DoneWrite();
}
Database::StartWrite() {
lock.Acquire();
while ((AW+AR) > 0) {
// 只要有正在写的写者或者正在读的读者 就等(不等等待读的读者 说明写者优先)
WW++;
okToWrite.wait(&lock);
WW--;
}
AW++;
lock.Release();
}
Database::DoneWrite() {
lock.Acquire();
AW--;
if (WW > 0) {
// 优先唤醒等待写的写者
okToWrite.signal();
}
else if (WR > 0) {
// 如果没有等待写的写者 才唤醒等待读的读者
okToRead.broadcast();
}
lock.Release();
}