Peterson 算法实现
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <random>
constexpr int BSIZE = 8; // buffer size
constexpr int PWT = 2; // producer wait time limit
constexpr int CWT = 10; // consumer wait time limit
constexpr int RT = 10; // program run-time in seconds
std::mutex mtx; // 互斥锁,用于保护共享数据的访问
std::condition_variable cv; // 条件变量,用于线程间的等待和通知
bool SHM1[2] = {false, false}; // 用于进程间的标志位共享
int SHM2 = 0; // 用于进程间的共享变量
std::vector<int> SHM3(BSIZE, 0); // 用于进程间的共享数组
int SHM4 = 1; // 用于进程间的共享变量
int myrand(int n) {
static std::mt19937 gen(std::chrono::high_resolution_clock::now().time_since_epoch().count());
std::uniform_int_distribution<int> dist(1, n);
return dist(gen);
}
void producer() {
while (true) {
SHM1[1] = true; // 表示生产者准备好了
std::cout << "Producer is ready now." << std::endl << std::endl;
SHM2 = 0; // 生产者将共享变量设置为0
std::unique_lock<std::mutex> lock(mtx); // 获取互斥锁
cv.wait(lock, [&]{ return !SHM1[0] && SHM2 == 0; }); // 等待条件变量满足,解锁互斥锁
// 临界区开始
int index = 0;
while (index < BSIZE) {
if (SHM3[index] == 0) {
int tempo = myrand(BSIZE * 3);
std::cout << "Job " << tempo << " has been produced" << std::endl;
SHM3[index] = tempo;
break;
}
index++;
}
if (index == BSIZE) {
std::cout << "Buffer is full, nothing can be produced!!!" << std::endl;
}
std::cout << "Buffer: ";
for (int value : SHM3) {
std::cout << value << " ";
}
std::cout << std::endl;
// 临界区结束
SHM1[1] = false; // 生产者完成
lock.unlock(); // 解锁互斥锁
cv.notify_one(); // 通知等待的线程
if (SHM4 == 0) { // 如果共享变量SHM4为0,退出循环
break;
}
int wait_time = myrand(PWT);
std::cout << "Producer will wait for " << wait_time << " seconds" << std::endl << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(wait_time));
}
}
void consumer() {
std::this_thread::sleep_for(std::chrono::seconds(5));
while (true) {
SHM1[0] = true; // 表示消费者准备好了
std::cout << "Consumer is ready now." << std::endl << std::endl;
SHM2 = 1; // 消费者将共享变量设置为1
std::unique_lock<std::mutex> lock(mtx); // 获取互斥锁
cv.wait(lock, [&]{ return !SHM1[1] && SHM2 == 1; }); // 等待条件变量满足,解锁互斥锁
// 临界区开始
if (SHM3[0] != 0) {
std::cout << "Job " << SHM3[0] << " has been consumed" << std::endl;
SHM3[0] = 0;
for (size_t i = 1; i < BSIZE; ++i) {
SHM3[i - 1] = SHM3[i];
}
SHM3[BSIZE - 1] = 0;
} else {
std::cout << "Buffer is empty, nothing can be consumed!!!" << std::endl;
}
std::cout << "Buffer: ";
for (int value : SHM3) {
std::cout << value << " ";
}
std::cout << std::endl;
// 临界区结束
SHM1[0] = false; // 消费者完成
lock.unlock(); // 解锁互斥锁
cv.notify_one(); // 通知等待的线程
if (SHM4 == 0) { // 如果共享变量SHM4为0,退出循环
break;
}
int wait_time = myrand(CWT);
std::cout << "Consumer will sleep for " << wait_time << " seconds" << std::endl << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(wait_time));
}
}
int main() {
std::thread producerThread(producer);
std::thread consumerThread(consumer);
std::this_thread::sleep_for(std::chrono::seconds(RT));
SHM4 = 0;
cv.notify_all(); // 通知所有等待的线程
producerThread.join();
consumerThread.join();
std::cout << "The clock ran out." << std::endl;
return 0;
}

信号量方法的实现
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
class BiSemaphore {
public:
explicit BiSemaphore(int initial) : value(initial) {}
void signal() {
std::unique_lock<std::mutex> lock(mutex);
value++;
cv.notify_all();
}
void wait() {
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [this]() { return value > 0; });
value--;
}
void waitWithTimeout(std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mutex);
if (!cv.wait_for(lock, timeout, [this]() { return value > 0; })) {
// Handle timeout if needed
}
value--;
}
std::mutex mutex;
private:
int value;
std::condition_variable cv;
};
int n = 0;
BiSemaphore s(1);
BiSemaphore delay(0);
std::vector<int> buffer; // 缓冲区
void produce() {
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Producer is ready now." << std::endl;
// Simulate a production operation
int produced_data = n;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Job " << produced_data << " has been produced" << std::endl;
{
std::unique_lock<std::mutex> lock(s.mutex);
buffer.push_back(produced_data);
std::cout << "Buffer: ";
for (const int& data : buffer) {
std::cout << data << " ";
}
std::cout << std::endl;
}
s.wait();
n++;
if (n == 1) {
delay.signal();
}
s.signal();
std::this_thread::sleep_for(std::chrono::seconds(2));
}
void consume() {
std::this_thread::sleep_for(std::chrono::seconds(8));
std::cout << "Consumer is ready now." << std::endl;
int consumed_data;
{
std::unique_lock<std::mutex> lock(s.mutex);
if (!buffer.empty()) {
consumed_data = buffer.back(); // 从缓冲区取出数据
buffer.pop_back();
std::cout << "Job " << consumed_data << " has been consumed" << std::endl;
std::cout << "Buffer: ";
for (const int& data : buffer) {
std::cout << data << " ";
}
std::cout << std::endl;
}
}
s.wait();
n--;
s.signal();
std::this_thread::sleep_for(std::chrono::seconds(8));
}
void producer() {
for (int i = 0; i < 20; i++) { // 生产20个数据
produce();
}
}
void consumer() {
delay.wait();
for (int i = 0; i < 15; i++) { // 消费15个数据
consume();
}
// 停止代码运行
std::cout << "Consumer: Consumer finished." << std::endl;
exit(0);
}
int main() {
n = 0;
std::thread producerThread(producer);
std::thread consumerThread(consumer);
producerThread.join();
consumerThread.join();
return 0;
}
管程方法实现
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <chrono>
class BoundedBuffer {
public:
BoundedBuffer(int bufferSize) : buffer(bufferSize), nextin(0), nextout(0), count(0) {}
void append(char x) {
std::unique_lock<std::mutex> lock(mutex);
notFull.wait(lock, [this]() { return count < buffer.size(); });
buffer[nextin] = x;
std::cout << "Producer produced: " << x << std::endl;
nextin = (nextin + 1) % buffer.size();
count++;
std::cout << "Buffer content: ";
for (size_t i = 0; i < count; i++) {
std::cout << buffer[(nextout + i) % buffer.size()] << " ";
}
std::cout << std::endl;
notEmpty.notify_one();
}
void take(char &x) {
std::unique_lock<std::mutex> lock(mutex);
notEmpty.wait(lock, [this]() { return count > 0; });
x = buffer[nextout];
std::cout << "Consumer consumed: " << x << std::endl;
nextout = (nextout + 1) % buffer.size();
count--;
std::cout << "Buffer content: ";
for (size_t i = 0; i < count; i++) {
std::cout << buffer[(nextout + i) % buffer.size()] << " ";
}
std::cout << std::endl;
notFull.notify_one();
}
private:
std::vector<char> buffer;
size_t nextin, nextout, count;
std::mutex mutex;
std::condition_variable notFull, notEmpty;
};
const int N = 10; // 缓冲区大小
BoundedBuffer boundedBuffer(N);
void produce(char &x) {
// Simulate a production operation
x = 'A' + rand() % 26;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
void consume(char x) {
// Simulate a consumption operation
std::this_thread::sleep_for(std::chrono::milliseconds(150)); // 加快消费速度
}
void producer() {
char x;
for (int i = 0; i < 20; i++) { // 生产20个数据
produce(x);
boundedBuffer.append(x);
}
}
void consumer() {
char x;
for (int i = 0; i < 15; i++) { // 消费15个数据
boundedBuffer.take(x);
consume(x);
}
}
int main() {
std::thread producerThread(producer);
std::thread consumerThread(consumer);
producerThread.join();
consumerThread.join();
return 0;
}
消息传递方法实现
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <mutex>
#include <chrono>
const int capacity = 25/* buffering capacity */;
int i;
struct Message {
int value;
};
std::queue<Message> buffer;
std::mutex bufferMutex;
std::condition_variable mayProduce, mayConsume;
Message nullMessage;
Message produce() {
Message msg;
msg.value = ++i;
return msg;
}
void producer() {
while (i < 20) { // 生产20个数据
Message pmsg;
{
std::unique_lock<std::mutex> lock(bufferMutex);
mayProduce.wait(lock, [] { return buffer.size() < capacity; });
pmsg = produce();
buffer.push(pmsg);
// 显示生产的数据
std::cout << "Produced: " << pmsg.value << std::endl;
// 显示缓冲区中的数据
std::cout << "Buffer content: ";
std::queue<Message> temp = buffer;
while (!temp.empty()) {
std::cout << temp.front().value << " ";
temp.pop();
}
std::cout << std::endl;
}
mayConsume.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 控制生产速度
}
}
void consume(const Message& msg) {
std::cout << "Consumed: " << msg.value << std::endl;
}
void consumer() {
std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // 等待2秒,确保有足够的数据供消费
while (i < 15) { // 消费15个数据
Message cmsg;
{
std::unique_lock<std::mutex> lock(bufferMutex);
mayConsume.wait(lock, [] { return !buffer.empty(); });
cmsg = buffer.front();
buffer.pop();
}
if (cmsg.value == nullMessage.value) {
// Break the loop if null message is received
break;
}
consume(cmsg);
mayProduce.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(150)); // 控制消费速度
}
}
int main() {
i = 0;
std::thread producerThread(producer);
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // 等待1秒,确保生产者有足够的时间生产数据
std::thread consumerThread(consumer);
// Create initial null messages in the buffer
for (int i = 1; i <= capacity; i++) {
buffer.push(nullMessage);
}
// Start producer and consumer threads
producerThread.join();
consumerThread.join();
return 0;
}