并发原理

棘手的原因

单处理器中,并发处理棘手之处在于:进程的相对执行速度不可预测。其取决于其他进程的活动、OS 处理中断的方式、OS 的调度策略。

因此并发问题面临着如下难关:

  • 全局资源的共享非常危险;
  • OS 很难对资源进行最优化分配;
  • 定位程序设计的错误非常困难;
  • 进程的相对执行速度不可预测

解决方案

  • 控制对共享资源的访问。

竞争条件

  • 多个进程或线程进行读写数据时,最终结果取决于多个进程的指令执行顺序,竞争失败者(最后操作)反而决定最终结果的走向

  • 如何用信号量解决竞争条件?查看:38-竞争条件与理发师问题

OS 如何处理并发?

  1. 必须跟踪不同的进程。即 PCB 的作用;
  2. 必须为每个活动进程分配和释放各种资源,如处理器时间、存储器、文件、IO 设备等;
  3. 必须保护每个进程的数据和物理资源
  4. 进程的功能和输出结果必须与执行速度无关;

进程间的关系

  • 竞争
  • 通过共享合作
  • 通过通信合作

进程间资源竞争

  • 没有任何信息交换,但一个进程的执行可能会影响到竞争进程的行为。

  • 竞争进程面临的控制问题:

    1. 互斥:一次只允许一个程序在临界区中;
    2. 死锁;
    3. 饥饿;

进程间通过共享合作

  • 互相在不确切知道对方的情况下进行交互

  • 知道其他进程可能访问同一数据,因此进程间必须合作以确保共享的数据能够正确管理。

  • 新的要求:数据一致性。考虑:

P1{ a=a+1; b=b+1;}
P2{ b=2*b; a=2*a;}

// 各自独立执行能够保证结果的一致,即a=b
// 若并发执行,则有可能a!=b,如:
a=a+1;
b=2*b;
b=b+1;
a=2*a;

进程间通过通信合作

  • 各进程与其他进程连接,通信提供同步和协调活动的方法。
  • 发送消息和接收消息的原语由程序设计语言提供,或由操作系统内核提供。
  • 传递消息中未共享任何对象,因此不必互斥,但仍存在死锁和饥饿问题

互斥的要求

  1. 忙则等待:必须强制实施互斥,互斥访问资源时一次只许一个进程进入临界区
  2. 进程独立性:在非临界区停止的进程不能干涉其他进程
  3. 不能出现死锁或饥饿
  4. 空则准入:临界区空时,需要进入临界区的进程能够立即进入
  5. 对相关进程的执行速度和处理器数量没有限制;
  6. 有限等待:进程驻留在临界区中的时间有限等待进入临界区中的时间也有限

满足互斥的方法

软件方法

  • 让并发执行的进程承担,如 dekkers 和 peterson 算法。

硬件方法

  • 使用专用机器指令,优点是系统开销小,缺点是难以通用;

操作系统或程序设计语言支持

  • 信号量
  • 管程
  • 消息传递

互斥:软件实现

错误一 :严格交替进入临界区否则永久阻塞

伪代码描述

Main(){
	int thread_number = 1;
	startThreads();
}

Thread1(){
	do {
		// entry section
		// wait until threadnumber is 1
		while (threadnumber == 2)	;

		// critical section

		// exit section
		// give access to the other thread
		threadnumber = 2;
	} while (completed == false)
}

Thread2(){
	do {
		// entry section
		// wait until threadnumber is 2
		while (threadnumber == 1)	;

		// critical section

		// exit section
		// give access to the other thread
		threadnumber = 1;
	} while (completed == false)
}

turn 值(此处为threadnumber)与进程号不对应,就自旋等待,直到另一进程恢复 turn 值。

缺点

  1. 进程严格交替使用,慢的进程会严重拖累整体运行效率;
  2. 一个进程终止,另一个进程被永久阻塞;

错误二:查看另一进程状态

伪代码描述

Main(){
	// flags to indicate if each thread is in
	// its critical section or not.
	boolean thread1 = false;
	boolean thread2 = false;

	startThreads();
}

Thread1(){
	do {
		// entry section
		// wait until thread2 is in its critical section
		while (thread2 == true)	;

		// indicate thread1 entering its critical section
		thread1 = true;

		// critical section

		// exit section
		// indicate thread1 exiting its critical section
		thread1 = false;
	} while (completed == false)
}

Thread2(){
	do {
		// entry section
		// wait until thread1 is in its critical section
		while (thread1 == true)	;

		// indicate thread2 entering its critical section
		thread2 = true;

		// critical section

		// exit section
		// indicate thread2 exiting its critical section
		thread2 = false;
	} while (completed == false)
}

flag 对应状态,flag[0] 关联 P0,flag[1] 关联 P1,周期性检查另一个进程的 flag,直到为 false,则修改自己 flag 为 true 并进入,离开时恢复自己 flag 为 false。

缺点

  1. 临界区外终止不会影响另一个进程,临界区内终止则另一进程永久阻塞;
  2. 当同时发现另一个进程 flag 为 false 时,会设置自己 flag 为 true,这样连互斥都实现不了。

错误三:抢先占用,但会死锁

伪代码描述

Main(){
	// flags to indicate if each thread is in
	// queue to enter its critical section
	boolean thread1wantstoenter = false;
	boolean thread2wantstoenter = false;

	startThreads();
}

Thread1(){
	do {
		thread1wantstoenter = true;

		// entry section
		// wait until thread2 wants to enter
		// its critical section
		while (thread2wantstoenter == true)	;

		// critical section

		// exit section
		// indicate thread1 has completed
		// its critical section
		thread1wantstoenter = false;
	} while (completed == false)
}

Thread2(){
	do {
		thread2wantstoenter = true;

		// entry section
		// wait until thread1 wants to enter
		// its critical section
		while (thread1wantstoenter == true)	;

		// critical section

		// exit section
		// indicate thread2 has completed
		// its critical section
		thread2wantstoenter = false;
	} while (completed == false)
}

algo 2 是由于检查 flag 但还未进入临界区时,就改变自身状态导致失败,因此改进办法是将置为 true 的语句放在忙等待之前。

缺点

  1. 临界区内或修改 flag 时失败,另一进程永久阻塞;
  2. 虽然实现了互斥,但若同时设置 flag 为 true,在 while 时会死锁。

错误四:随时重设标志以谦让,低概率活锁

伪代码描述

Main(){
	// flags to indicate if each thread is in
	// queue to enter its critical section
	boolean thread1wantstoenter = false;
	boolean thread2wantstoenter = false;

	startThreads();
}

Thread1(){
	do {
		thread1wantstoenter = true;

		while (thread2wantstoenter == true) {
			// gives access to other thread
			// wait for random amount of time
			thread1wantstoenter = false;
			sleep(rand_time);
			thread1wantstoenter = true;
		}

		// entry section
		// wait until thread2 wants to enter
		// its critical section

		// critical section

		// exit section
		// indicate thread1 has completed
		// its critical section
		thread1wantstoenter = false;
	} while (completed == false)
}

Thread2(){
	do {
		thread2wantstoenter = true;

		while (thread1wantstoenter == true) {
			// gives access to other thread
			// wait for random amount of time
			thread2wantstoenter = false;
			sleep(rand_time);
			thread2wantstoenter = true;
		}

		// entry section
		// wait until thread1 wants to enter
		// its critical section

		// critical section

		// exit section
		// indicate thread2 has completed
		// its critical section
		thread2wantstoenter = false;
	} while (completed == false)
}

algo 3 设置状态前不知另一进程的状态,且进入临界区之前不会回退,导致死锁。若采用谦让机制,每隔一段时间修改自己 flag,让另一进程进入临界区。

缺点

  • 若两进程执行速度完全一致,等待间隔也一致,会产生活锁;

Dekker 算法

伪代码描述

Main(){
	// to denote which thread will enter next
	int favouredthread = 1;

	// flags to indicate if each thread is in
	// queue to enter its critical section
	boolean thread1wantstoenter = false;
	boolean thread2wantstoenter = false;

	startThreads();
}

Thread1(){
	do {
		thread1wantstoenter = true;

		// entry section
		// wait until thread2 wants to enter
		// its critical section
		while (thread2wantstoenter == true) {
			// if 2nd thread is more favored
			if (favouredthread == 2) {
				// gives access to other thread
				thread1wantstoenter = false;

				// wait until this thread is favored
				while (favouredthread == 2) ;

				thread1wantstoenter = true;
			}
		}

		// critical section

		// favor the 2nd thread
		favouredthread = 2;

		// exit section
		// indicate thread1 has completed
		// its critical section
		thread1wantstoenter = false;
	} while (completed == false)
}

Thread2(){
	do {
		thread2wantstoenter = true;

		// entry section
		// wait until thread1 wants to enter
		// its critical section
		while (thread1wantstoenter == true) {
			// if 1st thread is more favored
			if (favaouredthread == 1) {
				// gives access to other thread
				thread2wantstoenter = false;

				// wait until this thread is favored
				while (favouredthread == 1)	;

				thread2wantstoenter = true;
			}
		}

		// critical section

		// favour the 1st thread
		favouredthread = 1;

		// exit section
		// indicate thread2 has completed
		// its critical section
		thread2wantstoenter = false;
	} while (completed == false)
}

实际步骤

  1. flag 表示进程状态;turn 表示哪个进程有权进入临界区,实现谦让的目的。
  2. 进入临界区时,设置自己 flag 为 true,检查另一个 flag,若为 false 则直接进入;
  3. 若为 true,则检查 turn 是否为自己,若是则另一进程同时延期执行并设置 flag 为 false 让步,本进程循环检查另一个 flag 直到其为 false;
  4. 执行完临界区后,设置 flag 为 false,并置 turn 为另一者。

Perterson 算法

伪代码描述

boolean flag[2];
int turn;
void P0(){
	while(1){
		flag[0]=true;
		turn=1;
		while(flag[1]&&turn==1) /*busy waiting*/ ;
		// critical section

		flag[0]=false;
		// rest code
	}
}

void P1(){
	while(1){
		flag[1]=true;
		turn=0;
		while(flag[1]&&turn==1) /*busy waiting*/ ;
		// critical section

		flag[1]=false;
		//rest code
	}
}
void main(){
	flag[0]=false;
	flag[1]=false;
	parbegin(P0,P1);
}

实际步骤

  1. 进程先声明自己想要进入临界区 (flagtrue),但都谦让对方 (turn=P_other),因此在忙等待中必有一者等待,一者进入临界区,
  2. 进入临界区的进程执行完后将自身 flag 置为 false,另一进程解锁,进入临界区。(最多谦让一次)

互斥:硬件支持

中断禁用

单处理器中,在进入临界区前禁用中断,

  • 代价是:
    1. 处理器不再能多道执行程序,显著降低执行效率;
    2. 多处理器体系结构中,不能保证互斥。

专用机器指令

处理器提供专用的原子指令,保证指令执行的过程中,任何其他指令都不能中断。

  • 优点

    • 适用于单处理器或共享内存的多处理器上任意数量进程;
    • 简单且易于证明;
    • 可支持多个临界区;
  • 缺点

    • ==忙等待持续消耗处理器时间==;
    • 由于处于硬件层,当多个进程等待进入临界区时,调度是随机的,==无法应用 OS 提供的多种调度策略,可能造成饥饿==;
    • 临界区内的进程申请资源被中断,而更高优先级的另一进程又需要临界区进程的资源,造成死锁

指令举例:

compare&swap 指令

int compare_and_swap(int *word, int testval, int newval){
	int oldval;
	oldval = *word;
	if(oldval == testval) *word=newval;
	return oldval;
}

用测试值 testval 检查内存单元 *word 的内容,若该内存单元值与 testval 相同,则用 newval 取代之,否则保持不变,程序总是返回内存单元的旧值。 这个原子指令由两部分组成,比较和交换。所有操作都是原子的,不接受中断。

应用

const int n=5; // 进程个数
int bolt; 
void P(int i){
	while(true){
		while(compare_and_swap(bolt,0,1)==1) /*忙等待*/;
		/*临界区*/;
		bolt=0;
		/*其余部分*/;
	}
}
void main(){
	bolt=0;
	parbegin(P(1),P(2),...,P(n));
}

bolt 初始化为 0,只有发现 bolt 为 0 的进程才可以停止忙等待,进入临界区。离开临界区时,恢复 bolt 为 0,允许下一个进程进入临界区。

exchange 指令

void exchange(int *register, int *memory){
	int temp;
	temp=*memory;
	*memory=*register;
	*register=temp;
}

交换一个寄存器和存储单元的内容。

应用

int const n = 5;
int bolt;
void P(int i){
	while(true){
		int keyi=1;
		do exchange(&keyi,&bolt)
		while(keyi != 0);
		/*临界区*/;
		bolt=0;
		/*其余部分*/;
	}
}
void main(){
	bolt=0;
	parbegin(P(1),P(2),...,P(n));
}

该方法基于表达式: 。bolt 等于 0 时,没有进程进入临界区;bolt=1 时,有且只有一个进程进入临界区,其 key=0。

信号量

原语

// 信号量原语
struct semaphore{
	int count;
	queueType queue;
};
void semWait(semaphore s){
	s.count--;
	if(s.count<0){
		/*当前进程插入阻塞队列*/;
		/*阻塞当前进程*/;
	}
}
void semSignal(semaphore s){
	s.count++;
	if(s.count<=0){
		/*将进程P移出阻塞队列*/;
		/*将P移入就绪队列*/;
	}
}

操作

  1. 初始化信号量 sem 为非负数:若为正数,表明发出资源申请的进程可以立即获得资源开始运行的进程数量;若值为零,则发出资源申请的下一个进程被阻塞,sem 变为负值,每次资源申请负值都会加大,其等于正在等待资源而被阻塞的进程数;
  2. semWait 使信号量减 1,相当于申请资源,若 sem 变为负数,则提出申请的进程被阻塞;
  3. semSignal 使信号量加 1,相当于释放资源,若 sem 为非正,则解除一个阻塞队列中的进程;

推论

  • 对信号量减 1 之前,无法知道该信号量是否会被阻塞;
  • 对信号量加 1 之后,唤醒另一个进程,并发执行时不知道哪个进程会立即继续运行;
  • 对信号量发出 semSignal 信号后,不需要知道是否有另一个进程在等待,被解除阻塞的进程要么为 0 要么为 1.

二元信号量原语

// 二元信号量原语
struct bi_semaphore{
	enum {zero,one} value;
	queueType queue;
};
void bi_semWait(bi_semaphore s){
	if(s.value==one) s.value=zero;
	else{
		/*当前进程插入阻塞队列*/;
		/*阻塞当前进程*/;
	}
}
void bi_semSignal(bi_semaphore s){
	if(s.count == zero) s.value=one;
	else{
		/*将进程P移出阻塞队列*/;
		/*将P移入就绪队列*/;
	}
}

互斥锁

二元信号量可以用以实现互斥,但与互斥锁不同。互斥锁是一个编程标志位,可以锁定或释放一个对象,设定互斥锁为 0 视为加锁,而设定为 1 视为解锁。

区别在于,互斥锁只能由一个进程持有;信号量则可以由多个进程操作。

强弱信号量

  • 等待队列的调度策略
    • FIFO:称为 强信号量
    • 不指定移出顺序: 弱信号量

  • 强信号量可以保证不会饥饿,弱信号量无法保证。

信号量互斥实现

// 信号量互斥协议
const int n=5; // 进程数
semaphore s = 1;
void P(int i){
	while(true){
		semWait(s);
		//临界区
		semSignal(s);
		//其他部分
	}
}
void main(){parbegin(P(1),P(2),...,P(n));}

生产者消费者问题

错误方法

int n; // 生产者与消费者指针位置的距离(in-out)
bi_semaphore s=1,delay=0; // s用于互斥,delay迫使消费者在缓冲区空时等待
void producer(){
	while(true){
		produce();
		bi_semWait(s);
		append();
		n++;
		if(n==1) bi_semSignal(delay);
		bi_semSignal(s);
	}
}
void consumer(){
	while(true){
		bi_semWait(s);
		take();
		n--;
		bi_semSignal(s);
		consume();
		if(n==0) bi_semWait(delay);
	}
}
void main(){
	n=0;
	parbegin(producer(),consumer())
}

错误原因

  • 在这段代码中,消费者首先尝试获取信号量 s,然后进行消费操作,并减少计数器 n。然后,它释放信号量 s,但问题出现在这里。
  • 在下一步,它检查 n 的值是否为 0,如果是,则等待延迟信号。然而,在这个步骤中,由于已经释放了信号量 s,其他线程(包括生产者)有机会修改共享资源 n 的值。如果此时一个生产者线程在消费者释放信号量之后立即增加了 n 的值,那么消费者就会在等待延迟信号时进入等待状态,而生产者也可能在等待 delay 信号的情况下等待信号量 s,导致死锁。

信号量解决无限缓冲区生产者-消费者问题

二元信号量实现的伪代码描述
//二元信号量
int n; 
bi_semaphore s=1,delay=0; 
void producer(){
	while(true){
		produce();
		bi_semWait(s);
		append();
		n++;
		if(n==1) bi_semSignal(delay);
		bi_semSignal(s);
	}
}
void consumer(){
	int m; // 局部变量
	bi_semWait(delay);
	while(true){
		bi_semWait(s);
		take();
		n--;
		m=n;
		bi_semSignal(s);
		consume();
		if(m==0) bi_semWait(delay);
	}
}
void main(){
	n=0;
	parbegin(producer(),consumer())
}
二元信号量实现的分析
  • 在消费者代码段中,它首先等待延迟信号,确保缓冲区中有数据可以被消费。
  • 然后,它通过获取信号量 s 来获取互斥访问权,以避免与其他线程同时访问共享资源。
  • 它执行消费操作,然后减少计数器 n,并将计数器的值赋给局部变量 m。最后,它释放信号量 s,并根据 m 的值来判断是否需要等待延迟信号,以确保消费者不会过早地消费。
计数信号量实现的伪代码描述
//计数信号量
semaphore n=0,s=1; //n为缓冲区中项数
void producer(){
	while(true){
		produce();
		semWait(s);
		append();
		semSignal(s);
		semSignl(n);
	}
}
void consumer(){
	while(true){
		semWait(n);
		semWait(s);
		take();
		semSignal(s);
		consume();
	}
}
void main(){
	parbegin(producer(),consumer());
}

若操作 semSignal (s) 和 semSignal (n)互换,即生产者在临界区中执行 semSignal(n) 时不会被消费者或另一个生产者打断。这不会导致程序错误,因为消费者在继续进行前必须在两个信号量上等待。

但若 semWait(n) 和 semWait (s)被颠倒,则会致命错误。缓冲区为空时消费者曾进入过临界区,任何一个生产者都不能继续向缓冲区添加数据项,系统发生死锁。

信号量解决有限循环缓冲区生产者-消费者问题

指针值必须按缓冲区大小取模,因此生产者和消费者实际上运行模式如下:

producer:
while(true){
	// producing
	while((in+1)%n == out) ; //忙等待
	b[in]=v;
	in=(in+1)%n;
}

consumer:
while(true){
	while(in==out) ; //忙等待
	w=b[out];
	out=(out+1)%n;
	//consuming
}

使用信号量的解决办法是:

const int sizeofbuffer = 500; //缓冲区大小
semaphore s=1,n=0,e=sizeofbuffer;
void producer(){
	while(true){
		produce();
		semWait(e);
		semWait(s);
		append();
		semSignal(s);
		semSignal(n);
	}
}
void consumer(){
	while(true){
		semWait(n);
		semWait(s);
		take();
		semSignal(s);
		semSignal(e);
		consume();
	}
}
void main(){
	parbegin(producer(),consumer());
}

实现信号量

  • 信号量的操作 semWait 和 semSignal 必须是原子的,

    • 因此要么调用硬件指令,
    • 要么使用 dekker 或 peterson 算法。
  • 以硬件支持的互斥方案为例,可以如下实现:

    • 左边虽然使用了忙等待,但信号量操作的耗时很短,开销较小;
    • 右边使用关中断,这在单处理器中是可行的。

管程

管程是由一个或多个过程、一个初始化序列和局部数据组成的软件模块,

  • 主要特点如下:
    1. 局部变量只能被管程的过程访问
    2. 一个进程通过调用管程的一个过程进入管程
    3. 任何时候,只能有一个进程在管程中执行

使用信号的管程(Hoare 管程)

管程通过条件变量来支持同步,这些条件变量存放在管程中,只有管程的过程可以访问。

  • 管程原语:
    • cwait (c): 调用进程的执行在条件 c 上阻塞,管程可被另一进程使用;
    • csignal (c): 恢复执行 cwait 之后因某些条件而被阻塞的进程。

管程中的 wait 和 signal 若无进程等待该信号,则信号被丢弃,这与信号量不同。

使用管程解决有界缓冲区生产者 、消费者问题伪代码

monitor boundedbuffer;
char buffer [N]; /* space for N items */
int nextin, nextout; /* buffer pointers */
int count; /* number of items in buffer */
cond notfull, notempty; /* condition variables for synchronization */
void append (char x){
	if (count == N) cwait(notfull); /* buffer is full; avoid overflow */
	buffer[nextin] = x;
	nextin = (nextin + 1) % N;
	count++;
	/* one more item in buffer */
	csignal (notempty); /*resume any waiting consumer */
}
void take (char x){
	if (count == 0) cwait(notempty); /* buffer is empty; avoid underflow */
	x = buffer[nextout];
	nextout = (nextout + 1) % N);
	count--; /* one fewer item in buffer */
	csignal (notfull); /* resume any waiting producer */
}

{ /* monitor body */
	nextin = 0; 
	nextout = 0; 
	count = 0; /* buffer initially empty */
}

void producer(){
	char x;
	while (true) {
	produce(x);
	append(x);
	}
}
void consumer(){
	char x;
	while (true) {
	take(x);
	consume(x);
	}
}
void main(){
	parbegin (producer, consumer);
}

管程机制细节

  • 管程构造了自己的互斥机制,生产者和消费者不能同时访问缓冲区,cwait 和 csignal 需要做为原语放在管程中,防止缓冲区空时读取或者满时写入。
  • 若进程在管程内还未执行到 csignal 时就被阻塞,则应放入紧急队列中,优先级高于外部的就绪队列,当需求满足时尽快执行。
  • 没有进程在条件 x 上等待,则 csignal (x)没有任何效果
  • 管程易于验证同步的正确性,也易于检测出错误。只要管程编写正确,则所有进程对受保护资源的访问都是正确的,而对于信号量必须所有访问资源的进程都编写正确,资源访问才会正确。

使用通知和广播的管程 (Lampson/Redell 管程)

Hoare 管程中,进程在 csignal 调用后必须退出管程,恢复之前 cwait 的进程,

  • Hoare 管程缺陷在于:

    • 若该进程在 csignal 之后未结束,则需要额外两次进程切换;
    • 并且与信号相关的进程调度必须可靠,不仅要相应激活条件队列中的进程,还必须确保在激活前没有其他进程进入管程。
  • Mesa 管程使用 cnotify 取代 csignal:

    • 它的作用是发送特定条件后继续执行,接收到条件的条件队列会在合适时调用队列头的进程。

Mesa 管程实现有界缓冲区部分操作的伪代码

void append (char x){
	while (count == N) cwait(notfull); /* buffer is full; avoid overflow */
	buffer[nextin] = x;
	nextin = (nextin + 1) % N;
	count++; /* one more item in buffer */
	cnotify(notempty); /* notify any waiting consumer */
}

void take (char x){
	while (count == 0) cwait(notempty); /* buffer is empty; avoid underflow */
	x = buffer[nextout];
	nextout = (nextout + 1) % N);
	count--; /* one fewer item in buffer */
	cnotify(notfull); /* notify any waiting producer */
}

Mesa 管程实现有界缓冲区的优点

由于不能保证通知时和结束运行时之间,是否有其他进程插入,因此

  • 使用 while 循环取代 if 判断条件变量,但是判断总是比进程切换的开销小得多的

相应的,

  • 每个条件原语关联一个监视计时器,不论条件是否被通知,等待时间超时的进程将被设置为就绪态,激活后再进行条件检查。这样最大的好处是,不会产生饥饿

  • 使用 cbroadcast 原语可以使所有等待该条件的进程都就绪,在不确定有多少进程等待激活、或者哪个进程需要激活时,很适用

Hoare 、Lampson/Redell 管程的对比

与 Hoare 管程相比,

  • Lampson/Redell 管程的一个优点是不易出错
    • 在 Lampson/Redell 方法中,由于每个存储过程都会在收到信号后检查监控变量,因此使用 while 结构时,进程可以错误地发出信号或进行广播,而不会导致被信号程序出错。
    • 被发信号的程序将检查相关变量,如果没有满足预期条件,则继续等待。
  • Lampson/Redell 管程的另一个优点是,可以采用更模块化的方法来构建程序
    • 例如,考虑缓冲区分配器的实现。合作的顺序进程需要满足两层条件:
      1. 数据结构一致。管程会强制执行互斥,并在完成输入或输出操作后才允许对缓冲区进行其他操作。
      2. 在 1 基础上,外加足够的内存供该进程完成分配请求。

模块化问题

  • 在 Hoare 管程中,每个信号都传达了 1 级条件,但同时也隐含了一条信息:“我已经释放了足够的字节,让你的特定分配调用现在可以工作”。因此,信号隐含了 2 级条件。
    • 如果程序员日后更改了 2 级条件的定义,就必须重新编写所有信号处理程序。
    • 如果程序员改变了任何特定等待进程的假设(即等待稍有不同的 2 级不变量),则可能需要对所有信号进程重新编程。
    • 这是不模块化的,在修改代码时很可能会导致同步错误(如误唤醒)。程序员必须记住,每当对 2 级条件进行微小改动时,都要修改监控器中的所有程序。
  • 在 Lampson/Redell 管程中,广播可确保 1 级条件,并提示第 2 级条件可能成立;每个进程应自行检查 2 级条件。如果等待者或信号发出者中的 2 级条件发生变化,就不会出现错误唤醒,因为每个进程都会检查自己的 2 级条件。因此, 2 级条件可以隐藏在每个过程中。
  • 而使用 Hoare 管程时, 2 级条件必须从等待程序进入每个信号处理程序的代码,这违反了数据抽象和程序间模块化原则。

Hansen 管程

monitor HansenMonitor {
    int buffer[MAX_SIZE];
    int count = 0;
    condition not_full, not_empty;

    procedure producer() {
        while (true) {
            if (count == MAX_SIZE) wait(not_full);
            // Produce item and add to buffer
            count++;
            signal(not_empty);
        }
    }

    procedure consumer() {
        while (true) {
            if (count == 0) wait(not_empty);
            // Consume item from buffer
            count--;
            signal(not_full);
        }
    }
}

Hansen、Hoare、Mesa 三种管程的区别

Options of the Signaler

  • Run the signaled thread immediately and suspend the current one (Hoare)
    • If the signaler has other work to do, life is complex
    • It is difficult to make sure there is nothing to do, because the signal implementation is not aware of how it is used
    • It is easy to prove things
  • Exit the monitor (Hansen)
    • Signal must be the last statement of a monitor procedure
  • Continues its execution (Mesa)
    • Easy to implement
    • But, the condition may not be true when the awaken process actually gets a chance to run

信号量 Vs. 管程条件变量

SemaphoresCondition Variables
Can be used anywhere in a program, but should not be used in a monitorCan only be used in monitors
Wait() does not always block the caller (i.e., when the semaphore counter is greater than zero).Wait() always blocks the caller.
Signal() either releases a blocked thread, if there is one, or increases the semaphore counter.Signal() either releases a blocked thread, if there is one, or the signal is lost as if it never happens.
If Signal() releases a blocked thread, the caller and the released thread both continue.If Signal() releases a blocked thread, the caller yields the monitor (Hoare type) or continues (Mesa Type). Only one of the caller or the released thread can continue, but not both.

消息传递

  • 进程交互的两个基本要求:
    • 同步和通信,
    • 同步实现互斥,通信实现合作。

消息传递是实现方式之一,广泛用于分布式系统、共享内存的多处理器系统和单处理器系统。

同步

send 和 receive 原语在执行后有确定的选择——阻塞还是继续运行,这形成了几种组合:

  1. 阻塞 send,阻塞 receive:send 后阻塞直到接收后再解除;receive 阻塞直到收到信息;
  2. 无阻塞 send,阻塞 receive:最常用,允许一个进程给多个进程快速发送一条或多条信息;
  3. 无阻塞 send,无阻塞 receive:随缘。
  • 各有优劣:
    • 无阻塞 send 常用,但潜在危险是——由于错误可能导致重复产生消息,这会消耗系统资源;并且无阻塞 send 对于程序员来说增加了负担,实际实现功能时必须确定消息是否收到,因而进程必须使用应答确认;
    • 阻塞 receive 也常用,但是若消息丢失或发送失败,则会无限期阻塞。解决办法是进程先检查是否已有消息在等待,再调用 receive,或者在 receive 中确定多个源进程。

寻址

  • 直接寻址:

    • send 包含发送进程的 PID,receive 可以显式指定期望的源进程消息,也可以隐式等待请求。
  • 间接寻址:

    • 利用信箱作为共享数据结构,临时保存消息

进程与信箱之间关联可以是静态的或动态的,静态指端口关联到特定进程,永久绑定;动态指多发送者时,使用 connect 和 disconnect 按需求接收消息。

消息格式

排队规则

  • FIFO
  • 设置优先级
  • 接收者遍历消息队列并选择。

消息传递实现互斥伪代码

const int n = /* number of process */
void P(int i){
	message msg;
	while (true) {
		receive (box, msg);
		/* critical section */;
		send (box, msg);
		/* remainder */;
	}
}
void main(){
	create mailbox (box);
	send (box, null);
	parbegin (P(1), P(2), . . . , P(n));
}

阻塞 receive 和非阻塞 send 合作,利用共享信箱 box:信箱空则阻塞请求;信箱非空则获得临界区权限。

消息传递实现有界缓冲区生产者消费者问题伪代码

const int
	capacity = /* buffering capacity */ ;
	null = /* empty message */ ;
int i;
void producer(){ message pmsg;
	while (true) {
		receive (mayproduce,pmsg);
		pmsg = produce();
		send (mayconsume,pmsg);
	}
}
void consumer(){
	message cmsg;
	while (true) {
		receive (mayconsume,cmsg);
		consume (cmsg);
		send (mayproduce,null);
	}
}
void main(){
	create_mailbox (mayproduce);
	create_mailbox (mayconsume);
	for (int i = 1;i<= capacity;i++) send (mayproduce,null);
	parbegin (producer,consumer);
}

读写者问题

读者优先策略伪代码

int readcount;//记录读者数量
semaphore x = 1,wsem = 1; //wsem是写者信号量,用于临界区互斥,x信号量用于确保readcount正确更新
void reader(){
	while (true){
		semWait (x); //申请读信号量,若为正,则读者数量+1
		readcount++;
		if(readcount == 1)
			semWait (wsem); //若有第一个读者试图读,则申请wsem禁止写者权限,保证“读者优先”,弊端是写者可能饥饿
		semSignal (x);//若读者不止1个,则随便数量的读者都可以进入临界区
		READUNIT();
		semWait (x);
		readcount--;
		if(readcount == 0)
			semSignal (wsem);
		semSignal (x);
	}
}
void writer(){
	while (true){
		semWait (wsem);//申请wsem,若为正,则进入临界区开始写内容
		WRITEUNIT();
		semSignal (wsem);
	}
}
void main(){
	readcount = 0;
	parbegin(reader(),writer());
}

写者优先策略伪代码

int readcount,writecount;
semaphore x = 1, y = 1, z = 1, wsem = 1, rsem = 1;//x保证读者数量更新,y保证写者数量更新,z使多余读进程在此排队,wsem是写者信号量,rsem是读者信号量
void reader(){
	while (true){
		semWait (z);
			semWait (rsem);
				semWait (x);
					readcount++;
					if (readcount == 1)
						semWait (wsem);
					semSignal (x);
			semSignal (rsem);
		semSignal (z);
		READUNIT();
		semWait (x);
			readcount--;
			if (readcount == 0) semSignal (wsem);
		semSignal (x);
	}
}
void writer (){
	while (true){
		semWait (y); //申请写信号量y,成功后写者数量+1
			writecount++;
			if (writecount == 1) //第一个写者开始时,禁止所有读者
				semWait (rsem);
		semSignal (y);
		semWait (wsem);
		WRITEUNIT();
		semSignal (wsem);
		semWait (y);
			writecount--;
			if (writecount == 0) semSignal (rsem);
		semSignal (y);
	}
}
void main(){
	readcount = writecount = 0;
	parbegin (reader, writer);
}

rsem 上不允许建造长队列,否则写进程将无法跳过,因此只允许一个读进程在 rsem 上排队,其余读者在信号量 z 上排队。上述程序中进程队列状态表如下:

利用消息传递实现读者优先

  • 使用 count 实现互斥,初始化为一个大于可能的读进程数的值:
    • count>0,无读进程等待,不确定是否有活动的读进程。要清除活动读进程,首先服务于所有 finished 消息,然后服务于写请求,最后是读请求;
    • count=0,唯一未解决的请求是写请求,允许写进程继续执行并等待 finished 消息;
    • count<0:一个写进程已发出一条请求,正在等待消除所有活动的读进程,因此只有 finished 消息得到服务。

伪代码实现

void reader(int i){
	message rmsg;
	while (true){
		rmsg=i;
		send(readrequest,rmsg);
		receive(mbox[i],rmsg);
		READUNIT();
		rmsg=i;
		send(finished,rmsg);
	}
}
void writer(int j){
	message rmsg;
	while(true){
		rmsg=j;
		send(writerequest,rmsg);
		receive(mbox[j],rmsg);
		WRITEUNIT();
		rmsg=j;
		send(finished,rmsg);
	}
}
void controller(){
	while(true){
		if(count>0){
			if(!empty(finished)){
				receive(finished,msg);
				count++;
			}else if(!empty(writerequest)){
				receive(writerequest,msg);
				writer_id=msg.id;
				count=count-100;
			}else if(!empty(readrequest)){
				receive(readrequest,msg);
				count--;
				send(msg.id,"OK");
			}
		}
		if(count ==0){
			send(writer_id,"OK");
			receive(finished,msg);
			count=100;
		}
		while(count<0){
			receive(finished,msg);
			count++;
		}
	}
}

死锁

原理

联合进程图

  • 灰色区域中,死锁不可避免。

可重用资源

  • 指一次仅供一个进程安全使用且不因使用而耗尽的资源,得到该类资源单元并使用后,会释放这些资源供其他进程使用。

    • 如处理器、IO 通道、内存外存、设备、数据结构(文件、数据库、信号量)等;
  • 两个例子:

    1. 两进程都要访问磁盘文件 D 和磁带设备 T,都占有一个资源并请求另一个资源。解决办法是——==系统设计中添加资源请求顺序的约束==;
    2. 内存分配请求,事先不知道存储空间总量,各自又都已经占据了一部分空间,下一次申请超出了空间上限。解决办法是——==使用虚存==;

可消耗资源

  • 指可被创建、销毁的资源。如中断、信号、消息和 IO 缓冲区中的信息。

  • 例子:

    • 两个进程都需要对方的消息才能开始,可是都是在 receive 之后才能发送消息;

资源分配图

  • 存在申请、分配环,且资源不足时,就会死锁。

死锁四条件

  1. 互斥:一次只有一个进程可以使用资源;
  2. 占有且申请、等待;
  3. 不可抢占;
  4. 循环等待:存在闭合的进程链,每个进程至少占有链中下一个进程所需的一个资源

1,2,3 为必要条件,4 为充分条件;1,2,3 的存在导致了 4 的发生,四者构成死锁的充要条件。

  • 敏感区域与死锁必要条件的关系:
    • 前文敏感区域正是前三个条件都满足时发生,若至少一个条件不满足,都不存在敏感区域。敏感区域中不仅进程死锁,其资源请求顺序也会死锁。

死锁预防(未雨绸缪)

  • 间接死锁预防——消除互斥、占有并申请、不可抢占三个必要条件;

    1. 互斥无法消除:因为这就是目的;
    2. 占有并申请:==要求进程一次性申请所有需要的资源,并阻塞到所有资源都满足==(可行但低效,进程运行不一定占用所有资源,有一部分资源甚至占用了也不使用,并且进程事先也未必知道需要多少资源);
    3. 抢占策略:==申请资源被拒绝就释放所有占用资源,等待下一次申请==;优先级较高时,由操作系统强制抢占低优先级进程的资源;(只有资源状态可以方便保存和恢复时才实用)
  • 直接死锁预防——直接防止循环等待的发生;

    • 通过==定义资源类型的线性顺序获取==来预防。(低效,可能导致没有必要的情况下拒绝进程的资源访问)

死锁避免(以退为进)

  • 思路

    • 允许三个必要条件,但通过对进程未来资源请求状况的分析、决策,使其不会进入死锁敏感区。
  • 优势

    • 较死锁预防,并发性能更好
    • 无需死锁预防中的抢占和回滚进程,限制较少;
  • 缺点

    • 必须事先声明每个进程请求的最大资源
    • 所讨论进程的执行顺序必须没有任何同步要求的限制
    • 分配的资源数量必须是固定的;
    • 占有资源时,进程不能退出

进程启动拒绝

  • 系统中资源量和进程数的基本关系:
    1. :资源总量=可分配+已分配;
    2. :申请量不大于资源总量;
    3. :已分配量不大于申请量;

由此,

  • 通过拒绝启动一个需求资源可能导致死锁的进程,可以实现死锁避免,仅当:

时才启动新进程 ,即满足当前进程最大请求量及新进程请求量时,才会启动该进程。

  • 性能评估:
    • 该策略假设了最坏情况——所有进程都同时发出最大资源请求,但实际上进程并非如此,因此性能会更进一步。

资源分配拒绝(银行家算法)

银行家算法:仍然基于之前的资源总量向量 和可用资源向量 ,以及申请资源矩阵 和已占用资源矩阵 表示。

  • ==安全状态==:至少有一个资源分配序列不会导致死锁;
  • ==不安全状态==:没有安全的资源分配序列;

  • 要查看进程所需的资源是否支持运行到结束,则判断:

  • 策略表述:

    1. 进程请求一组资源时,假设同意该请求,由其改变的系统状态确定结果是否处于安全状态,若是则同意请求;
    2. 若不是,则阻塞进程直到请求后系统保持安全状态。
    3. 安全状态的前置状态也必然是安全的
  • 原理分析:

    • 死锁避免策略不能确切地预测死锁,而是通过保持系统处于安全状态,而确保不会出现死锁的可能

死锁避免伪代码逻辑

struct state { // global data structure
	int resource[m];
	int available[m];
	int claim[n][m];
	int alloc[n][m];
}

// resource allocation algorithm
if (alloc [i,*] + request [*] > claim [i,*])
	<error>; // total request > claim
else if (request [*] > available [*])
	<suspend process>;
else { // simulate alloc
	<define newstate by: 
	alloc [i,*] = alloc [i,*] + request [*];
	available [*] = available [*] - request [*]>;
}
if (safe (newstate)) // determine the new state is safe
	<carry out allocation>;
else { //if not safe, restore original state and block requesting process
	<restore original state>;
	<suspend process>;
}

// determin if the state is safe
boolean safe (state S) {
	int currentavail[m];
	process rest[<number of processes>];
	currentavail = available;
	rest = {all processes};
	possible = true;
	while (possible) {
		<find a process Pk in rest such that
		claim [k,*] – alloc [k,*]<= currentavail;>
		if (found) { /* simulate execution of Pk */
			currentavail = currentavail + alloc [k,*];
			rest = rest - {Pk};
		}
		else possible = false;
	}
	return (rest == null);
}

死锁检测(亡羊补牢)

死锁预防相当保守,强加约束的行为极大地降低了系统效率。 死锁检测尽可能地分配资源,定期检查系统是否存在死锁,再统一处理。

死锁检测算法(coffman 算法)

Coffman 算法:利用 Allocation 矩阵、Available 向量、请求矩阵 Q( 表示进程 i 请求 j 类资源的数量)。

  • 思路是标记未死锁的进程,直到查找结束后检测标志

  • 流程:

    1. 最初,所有进程都没有标记。然后执行以下步骤:
    2. 标记 矩阵中一行为零的每个进程。没有分配资源的进程不能参与死锁。==——什么资源都未得到==
    3. 初始化一个临时向量 ,使其等于 向量。
    4. 查找下标 ,使进程 当前未被标记,且 的第 行小于或等于 ,即 。若找不到这样的行(进程),则终止算法。==——结束检测的出口,查找是否进程的请求资源可以满足==
    5. 若找到这样的行,则标记进程 ,并把 矩阵中的相应行加到 中,即 。之后返回步骤三。==——更新,叠加已分配资源到可分配资源矩阵上==

coffman 算法的伪代码逻辑

int n; // Number of processes
int m; // Number of resource types

bool* marked = new bool[n]; // To keep track of marked processes
bool* finished = new bool[n]; // To keep track of finished processes

int** allocation; // Allocation matrix
int* available; // Available vector
int* request; // Request matrix

void deadlockDetection() {
    // Initialize marked and finished arrays
    for (int i = 0; i < n; ++i) {
        marked[i] = false;
        finished[i] = false;
    }

    // Step 1: Mark processes with no allocation
    for (int i = 0; i < n; ++i) {
        bool hasAllocation = false;
        for (int j = 0; j < m; ++j) {
            if (allocation[i][j] > 0) {
                hasAllocation = true;
                break;
            }
        }
        if (!hasAllocation) {
            marked[i] = true;
        }
    }

    while (true) {
        // Step 2: Initialize temporary vector W
        int* W = new int[m];
        for (int i = 0; i < m; ++i) {
            W[i] = available[i];
        }

        bool found = false;
        int processIndex = -1;

        // Step 3: Find an unmarked process that can be satisfied
        for (int i = 0; i < n; ++i) {
            if (!marked[i] && !finished[i]) {
                bool canBeSatisfied = true;
                for (int j = 0; j < m; ++j) {
                    if (request[i][j] > W[j]) {
                        canBeSatisfied = false;
                        break;
                    }
                }
                if (canBeSatisfied) {
                    found = true;
                    processIndex = i;
                    break;
                }
            }
        }

        if (!found) {
            // No unmarked process can be satisfied, deadlock check ends
            break;
        }

        // Step 4: Mark the process and update W
        marked[processIndex] = true;
        for (int j = 0; j < m; ++j) {
            W[j] += allocation[processIndex][j];
        }

        delete[] W;
    }

    // Print processes in deadlock
    for (int i = 0; i < n; ++i) {
        if (marked[i] && !finished[i]) {
            printf("Process %d is in deadlock.\n", i);
        }
    }

    delete[] marked;
    delete[] finished;
}

int main() {
    // Initialize allocation, available, request, n, and m
    // ...

    deadlockDetection();

    return 0;
}

coffman 算法实例

  • 当且仅当 Coffman 算法最终结果有未标记进程时,才存在死锁,每个未标记进程都是死锁状态。

  1. Mark P4, because P4 has no allocated resources.
  2. Set ;
  3. The request of process P3 is less than or equal to , so mark P3 and set ;
  4. No other unmarked process has a row in that is less than or equal to . Therefore, terminate the algorithm.

The algorithm concludes with P1 and P2 unmarked, indicating these processes are deadlocked.

死锁解除

复杂度递增的解除方法:

  1. 取消所有的死锁进程;
  2. 为每个死锁进程回滚到之前的检查点,再重新启动;(系统需要回滚和重启机制,并且死锁可能再次发生)
  3. 连续取消死锁进程直至不再存在死锁;(每取消一定数量进程后,需要再次检测)
  4. 连续抢占资源直到不再存在死锁;(有代价,且需要每次抢占后调度检测算法,被抢占资源的进程必须回滚到之前的检查点)
    • 3、4 的选择原则有:目前消耗 CPU 时间最少、产生输出最少、预计剩余时间最长、目前分配资源最少、优先级最低;

综合的死锁策略

  • 将资源分成不同的类(如可交换空间、进程资源、内存、内部资源),
    • 资源类之间的申请采用线性排序策略,
    • 资源类内部的申请设计该类合适的方法:
      • 可交换空间:一次性分配所有请求的资源来预防死锁
      • 进程资源:死锁避免策略
      • 内存:抢占
      • 内部资源:基于资源排序的预防策略

哲学家就餐问题

解决办法

  • 多买五把叉子(增加资源)、
  • 教会哲学家只用一把叉子吃饭(优化进程)、
  • 增加服务员只允许 4 位哲学家上桌(OS 的调控,至少有一个进程可以运行)

基于信号量的方案

semaphore fork[5] = {1};
semaphore room = {4}; //限制就餐人数
int i;
void philosopher (int i){
	while (true) {
		think();
		wait (room);
		wait (fork[i]);
		wait (fork [(i+1) mod 5]);
		eat();
		signal (fork [(i+1) mod 5]);
		signal (fork[i]);
		signal (room);
	}
}
void main(){
	parbegin (philosopher (0), philosopher (1),philosopher (2), philosopher (3),philosopher (4));
}

基于管程的方案

伪代码描述

monitor dining_controller;
cond ForkReady[5]; /* condition variable for synchronization,Each fork corresponds to a condition variable*/
boolean fork[5] = {true};/* availability status of each fork */
void get_forks(int pid)/* pid is the philosopher id number */
{
	int left = pid;
	int right = (++pid) % 5;
	/*grant the left fork*/
	if (!fork[left])
		cwait(ForkReady[left]); /* queue on condition variable */
	fork(left) = false;
	/*grant the right fork*/
	if (!fork[right])
		cwait(ForkReady[right]);/* queue on condition variable */
	fork[right] = false:
}

void release_forks(int pid){
	int left = pid;
	int right = (++pid) % 5;
	/*release the left fork*/
	if (empty(ForkReady[left]) /*no one is waiting for this fork*/
		fork[left] = true;
	else /* awaken a process waiting on this fork */
		csignal(ForkReady[left]);
	/*release the right fork*/
	if (empty(ForkReady[right])/*no one is waiting for this fork*/
		fork[right] = true;
	else /* awaken a process waiting on this fork */
		csignal(ForkReady[right]);
}

void philosopher[k=0 to 4] /* the five philosopher clients */
{
	while (true) {
		<think>;
		get_forks(k); /* client requests two forks via monitor*/
		<eat spaghetti>;
		release_forks(k); /* client releases forks via the monitor*/
	}
}

The get_forks procedure is used by a philosopher to seize his or her left and right forks. If either fork is unavailable, the philosopher process is queued on the appropriate condition variable. This enables another philosopher process to enter the monitor.

The release_forks procedure is used to make two forks available.

Note the structure of this solution is similar to that of the semaphore solution proposed in Figure 6.12(a wrong allocation method) . In both cases, a philosopher seizes first the left fork then the right fork. Unlike the semaphore solution, this monitor solution does not suffer from deadlock, because only one process at a time may be in the monitor.

For example, the first philosopher process to enter the monitor is guaranteed that it can pick up the right fork after it picks up the left fork before the next philosopher to the right has a chance to seize his or her left fork, which is this philosopher’s right fork.

管程方案分析

  • 哲学家使用 get_forks 过程来获取其左右两个叉子。如果任一叉子不可用,哲学家进程就会在相应的条件变量上排队。这样,另一个哲学家进程就可以进入管程。
  • release_forks 过程用于使两个叉子可用。
  • 请注意,此解决方案的结构与图 6.12(错误的分配方法)中提出的信号解决方案相似。在这两种情况下,哲学家都是先抓住左叉子,然后再抓住右叉子。与信号灯方案不同的是,这种管程方案不会出现死锁,因为每次只有一个进程处于管程中
  • 例如,第一个进入监控器的哲学家进程在右边的下一个哲学家有机会夺取他或她的左叉子(也就是这个哲学家的右叉子)之前,可以保证它在夺取左叉子之后还能夺取右叉子。