信号量(Semaphore) 操作系统提供的一种协调共享资源访问的方法 由 Dijkstra 提出
软件同步是平等线程间的一种同步协调机制 OS是管理者 地位高于进程 用信号量表示系统资源数量 早期操作系统主要同步机制(现在很少用) 信号量是一种抽象数据类型由一个整型变量(sem)和两个原子操作组成 P() sem - 1 若 sem < 0 进入等待 否则继续 V() sem + 1 若 sem <= 0 唤醒一个等待进程
信号量的特性 信号量是 被保护 的整数变量
初始化完成后 只能通过 P() 和 V() 操作修改 由操作系统保证 PV操作是原子操作 P() 可能阻塞(没有资源 处于等待状态) V() 不会阻塞(只会释放资源 唤醒等待状态的进程) 通常假定信号量是 公平的
线程不会被无限期阻塞在 P() 操作 假定信号量等待按先进先出排队 信号量的实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Semaphore { int sem; WaitQueue q; } Semaphore::P () { sem--; if (sem < 0 ) { Add this thread t to q; block (p); } } Semaphore::V () { sem++; if (sem <= 0 ) { Remove a thread t from q; wakeup (t); } }
信号量分类 二进制信号量 资源数目为 0 或 1 资源信号量 可为任何非负值 两者等价 基于一个可以实现另一个 信号量的使用 信号量实现临界区互斥访问 每类资源设置一个信号量 初值为 1
1 2 3 4 5 6 7 8 9 10 mutex = new Semaphore (1 ); ------------------------ mutex->P (); Critical Section; mutex->V ();
必须成对使用 P() 和 V() 操作P() 保证互斥访问临界资源 V() 在使用后释放临界资源 PV操作不能次序错误 重复 遗漏 信号量实现条件同步 条件同步设置一个信号量 初值为 0(事件出现 + 1)
1 condition = new Semaphore (0 );
线程A
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ...M... condition->P(); /* 若此时线程B还未执行完X模块 那么此时信号量 就会由 0 - 1 = -1 进入等待状态 等候 线程B执行完X模块 执行V()操作 将其唤醒 若此时线程B 已经执行完X模块 那么此时信号量 就会由 0 + 1 = 1 则线程A直接继续往下执行 */ ...需要线程B执行完 X模块...
线程B
1 2 3 4 5 ...X... condition->V(); ...Y...
生产者消费者问题 有界缓冲区的生产者消费者问题
一个或多个生产者在生成数据后放在一个缓冲区里 单个消费者从缓冲区取出数据处理 任何时刻 只能有一个 生产者或消费者可访问缓冲区
信号量解决生产者消费者问题 任何时刻只能有一个线程操作缓冲区 (互斥访问) 缓冲区空时 消费者必须等待生产者 (条件同步) 缓冲区满时 生产者必须等待消费者 (条件同步) 用信号量描述每个约束
二进制信号量(mutex) 描述 互斥关系 资源信号量(fullBuffers) 描述 缓冲区有数据 资源信号量(emptyBuffers) 描述 缓冲区有位置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Class BoundedBuffer { mutex = new Semaphore (1 ); fullBuffers = new Semaphore (0 ); emptyBuffers = new Semaphore (n); } BoundedBuffer::Deposit (c) { emptyBuffers->P (); mutex->P (); Add c to the buffer; mutex->V (); fullBuffers->V (); } BoundedBuffer::Remove (c) { fullBuffers->P (); mutex->P (); Remove c from buffer; mutex->V (); emptyBuffers->V (); }
使用信号量的困难 管程(Monitor) 用于多线程互斥访问 共享资源 的程序结构
采用面向对象方法 简化了线程间的同步控制 任一时刻最多只有一个线程执行管程代码 正在管程中的线程 可临时放弃 管程的互斥访问 等待事件出现时恢复
管程的使用 在对象/模块中 收集相关共享数据 定义访问共享数据的方法 管程的组成 一个锁 0或者多个条件变量(0个条件变量 则和 信号量差不多)
条件变量(Condition Variable) 管程内的等待机制
进入管程的线程因资源被占用而进入等待状态 每个条件变量表示一种等待原因 对应一个等待队列 Wait()操作将自己阻塞在等待队列中 唤醒一个等待者或释放管程的互斥访问 Signal()操作将等待队列中的一个线程唤醒 如果等待队列为空 则等同于空操作 条件变量实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Class Condition { int numWaiting = 0 ; WaitQueue q; } Condition::Wait (lock){ numWaiting++; Add this thread t to q; release (lock); schedule (); require (lock); } Condition::Signal (){ if (numWaiting > 0 ) { Remove a thread t from q; wakeup (t); numWaiting--; } }
管程解决生产者消费者问题 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class BoundedBuffer { Lock lock; int count = 0 ; Condition notFull, notEmpty; } BoundedBuffer::Deposit (c) { lock->Acquire (); while (count == n) notFull.Wait (&lock); Add c to the buffer; count++; notEmpty.Signal (); lock->Release (); } BoundedBuffer::Remove (c) { lock->Acquire (); while (count == 0 ) notEmpty.Wait (&lock); Remove c from buffer; count--; notFull.Signal (); lock->Release (); }
管程条件变量的释放处理方式 Hansen管程(当前正在执行的线程更优先) Hoare管程 Hansen管程 当前正在执行的线程优先级更高 因此线程连续执行 所以其效率更高
条件变量释放仅是一个提示 需要重新检查条件 用 while 因为在另一个线程中 它释放了条件变量 但是它还是在继续往下执行 有可能在这段期间 条件变量发生变化
1 2 3 4 5 6 7 8 9 10 Hansen_style::Deposit (){ lock->acquire (); while (count == n) { notFull.wait (&lock); } Add thing; count++; notEmpty.signal (); lock->release (); }
Hoare管程 等待条件变量的优先级更高
条件变量释放同时表示放弃管程访问 释放后条件变量的状态可用 1 2 3 4 5 6 7 8 9 10 Hoare_style::Deposit (){ lock->acquire (); if (count == n) { notFull.wait (&lock); } Add thing; count++; notEmpty.signal (); lock->release (); }
哲学家就餐问题 5个哲学家围绕一张圆桌坐
桌子放5支叉子 每两个哲学家之间放一支 哲学家动作包含思考和就餐进餐时需同时拿到左右两边两把叉子 思考时将两支叉子放回原处 要求不出现有人永远拿不到叉子
信号量解决哲学家就餐问题 方案1
会发生死锁! 当五个哲学家同时拿起左边的叉子 所有人都在等待别人放下右边的叉子 1 2 3 4 5 6 7 8 9 10 11 12 13 #define N 5 semaphore fork[5 ]; void philosopher (int i) { while (ture) { think(); P(fork[i]); P(fork[(i + 1 ) % N]); eat(); V(fork[i]); V(fork[(i + 1 ) % N]); } }
方案2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #define N 5 semaphore fork[5 ]; semaphore mutex; void philosopher (int i) while (TRUE) { think(); P(mutex); P(fork[i]); P(fork[(i + 1 ) % N]); eat(); V(fork[i]); V(fork[(i + 1 ) % N]); V(mutex); }
方案3
根据编号不同 采取不同动作 避免都拿到一部分资源构造环路的情况 没有死锁 并允许多人同时就餐 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #define N 5 semaphore fork[5 ]; void philosopher (int i) while (TRUE) { think(); if (i % 2 == 0 ) { P(fork[i]); P(fork[(i + 1 ) % N]); } else { P(fork[(i + 1 ) % N]); P(fork[i]); } eat(); V(fork[i]); V(fork[(i + 1 ) % N]); }
读者写者问题 共享资源的两类使用者
读者 只读取数据 不修改 写者 读取和修改数据 读读允许 读写互斥 写写互斥 信号量解决读者写者问题 此方案 读者优先
信号量WriteMutex 读者计数Rcount 信号量CountMutex(对 Rcount 进行保护) Write线程
1 2 3 P(WriteMutex); write(); V(WriteMutex);
Reader线程
1 2 3 4 5 6 7 8 9 10 11 12 13 P(CountMutex); if (Rcount == 0 ) P(WriteMutex); ++Rcount; V(CountMutex); read(); P(CountMutex); --Rcount; if (Rcount == 0 ) V(WriteMutex); V(CountMutex);
读者写者问题的优先策略 读者优先策略只要有读者正在读状态 后来的读者都能直接进入 若读者持续不断进入 则写者就处于饥饿 写者优先策略只要有写者就绪 写者应尽快执行写操作 若写者持续不断就绪 则读者就处于饥饿 管程解决读者写者问题 管程的状态变量
1 2 3 4 5 6 7 AR = 0 ; AW = 0 ; WR = 0 ; WW = 0 ; Lock lock; Condition okToRead; Condition okToWrite;
读者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 Database::Read () { StartRead (); read database; DoneRead (); } Database::StartRead () { lock.Acquire (); while ((AW+WW) > 0 ) { WR++; okToRead.wait (&lock); WR--; } AR++; lock.Release (); } Database::DoneRead () { lock.Acquire (); AR--; if (AR == 0 && WW > 0 ) { okToWrite.signal (); } lock.Release (); }
写者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 Database::Write () { StartWrite (); write database; DoneWrite (); } Database::StartWrite () { lock.Acquire (); while ((AW+AR) > 0 ) { WW++; okToWrite.wait (&lock); WW--; } AW++; lock.Release (); } Database::DoneWrite () { lock.Acquire (); AW--; if (WW > 0 ) { okToWrite.signal (); } else if (WR > 0 ) { okToRead.broadcast (); } lock.Release (); }