读者优先

impl in Cpp

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <random>
#include <condition_variable>
 
using namespace std;
 
const int BUFFER_SIZE = 5; // 缓冲区大小
vector<int> buffer(BUFFER_SIZE); // 缓冲区
int readcount = 0;
mutex mtx; // 用于互斥访问缓冲区
mutex readMutex; // 用于保护读者数量
condition_variable cv; // 用于条件变量
bool writerActive = false; // 写者是否在写
 
void reader() {
    while (true) {
        unique_lock<mutex> lock(readMutex);
        while (writerActive) {
            cv.wait(lock);
        }
        readcount++;
 
        if (readcount == 1) {
            mtx.lock(); // 第一个读者需要锁定缓冲区
        }
 
        lock.unlock();
 
        // 读取数据
        int randomIndex = rand() % BUFFER_SIZE;
        int data = buffer[randomIndex];
        cout << "Reader is reading data " << data << " from buffer." << endl;
 
        lock.lock();
        readcount--;
 
        if (readcount == 0) {
            mtx.unlock(); // 最后一个读者释放缓冲区
        }
 
        cv.notify_all(); // 唤醒其他等待的线程
        lock.unlock();
 
        // 模拟读操作
        this_thread::sleep_for(chrono::milliseconds(100)); 
    }
}
 
void writer() {
    int data = 1;
    while (true) {
        unique_lock<mutex> lock(readMutex);
        writerActive = true;
        while (readcount > 0) {
            cv.wait(lock);
        }
 
        // 写入数据到缓冲区
        int emptyIndex = -1;
        for (int i = 0; i < BUFFER_SIZE; ++i) {
            if (buffer[i] == 0) {
                emptyIndex = i;
                buffer[i] = data;
                break;
            }
        }
 
        if (emptyIndex != -1) {
            cout << "Writer is writing data " << data << " to buffer." << endl;
            data++;
        }
 
        writerActive = false;
        cv.notify_all(); // 唤醒其他等待的线程
    }
}
 
int main() {
    thread readerThread(reader);
    thread writerThread(writer);
 
    readerThread.join();
    writerThread.join();
 
    return 0;
}
 
 

impl in C and semaphore

#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
 
/*
This program provides a possible solution for first readers writers problem using mutex and semaphore.
I have used 10 readers and 5 producers to demonstrate the solution. You can always play with these values.
*/
 
sem_t wrt;
pthread_mutex_t mutex;
int cnt = 1;
int numreader = 0;
 
void *writer(void *wno)
{   
    sem_wait(&wrt);
    cnt = cnt*2;
    printf("Writer %d modified cnt to %d\n",(*((int *)wno)),cnt);
    sem_post(&wrt);
 
}
void *reader(void *rno)
{   
    // Reader acquire the lock before modifying numreader
    pthread_mutex_lock(&mutex);
    numreader++;
    if(numreader == 1) {
        sem_wait(&wrt); // If this id the first reader, then it will block the writer
    }
    pthread_mutex_unlock(&mutex);
    // Reading Section
    printf("Reader %d: read cnt as %d\n",*((int *)rno),cnt);
 
    // Reader acquire the lock before modifying numreader
    pthread_mutex_lock(&mutex);
    numreader--;
    if(numreader == 0) {
        sem_post(&wrt); // If this is the last reader, it will wake up the writer.
    }
    pthread_mutex_unlock(&mutex);
}
 
int main()
{   
 
    pthread_t read[10],write[5];
    pthread_mutex_init(&mutex, NULL);
    sem_init(&wrt,0,1);
 
    int a[10] = {1,2,3,4,5,6,7,8,9,10}; //Just used for numbering the producer and consumer
 
    for(int i = 0; i < 10; i++) {
        pthread_create(&read[i], NULL, (void *)reader, (void *)&a[i]);
    }
    for(int i = 0; i < 5; i++) {
        pthread_create(&write[i], NULL, (void *)writer, (void *)&a[i]);
    }
 
    for(int i = 0; i < 10; i++) {
        pthread_join(read[i], NULL);
    }
    for(int i = 0; i < 5; i++) {
        pthread_join(write[i], NULL);
    }
 
    pthread_mutex_destroy(&mutex);
    sem_destroy(&wrt);
 
    return 0;
    
}

Compilinggcc reader-writer.c -pthread

Running: ./a.out

写者优先

信号量实现

#include <iostream>
#include <thread>
#include <mutex>
#include <semaphore>
 
using namespace std;
 
int readcount = 0, writecount = 0;
sem_t x, y, z, wsem, rsem;
 
void reader() {
    while (true) {
        sem_wait(&z);
        sem_wait(&rsem);
        sem_wait(&x);
        readcount++;
        if (readcount == 1)
            sem_wait(&wsem);
        sem_post(&x);
        sem_post(&rsem);
        sem_post(&z);
 
        // Reading is performed
        cout << "Reader is reading..." << endl;
        this_thread::sleep_for(chrono::milliseconds(100));  // Simulate reading
 
        sem_wait(&x);
        readcount--;
        if (readcount == 0)
            sem_post(&wsem);
        sem_post(&x);
    }
}
 
void writer() {
    while (true) {
        sem_wait(&y);
        writecount++;
        if (writecount == 1)
            sem_wait(&rsem);
        sem_post(&y);
 
        sem_wait(&wsem);
        
        // Writing is performed
        cout << "Writer is writing..." << endl;
        this_thread::sleep_for(chrono::milliseconds(200));  // Simulate writing
 
        sem_post(&wsem);
 
        sem_wait(&y);
        writecount--;
        if (writecount == 0)
            sem_post(&rsem);
        sem_post(&y);
    }
}
 
int main() {
    sem_init(&x, 0, 1);
    sem_init(&y, 0, 1);
    sem_init(&z, 0, 1);
    sem_init(&wsem, 0, 1);
    sem_init(&rsem, 0, 1);
 
    thread readerThread(reader);
    thread writerThread(writer);
 
    readerThread.join();
    writerThread.join();
 
    sem_destroy(&x);
    sem_destroy(&y);
    sem_destroy(&z);
    sem_destroy(&wsem);
    sem_destroy(&rsem);
 
    return 0;
}
 
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <random>
#include <condition_variable>
 
using namespace std;
 
const int BUFFER_SIZE = 10; // 缓冲区大小
vector<char> buffer(BUFFER_SIZE); // 缓冲区
int readcount = 0;
mutex mtx; // 用于互斥访问缓冲区
mutex readMutex; // 用于保护读者数量
condition_variable cv; // 用于条件变量
bool writerActive = false; // 写者是否在写
 
char nextChar = '1'; // 下一个写入的字符
 
void reader() {
    while (true) {
        unique_lock<mutex> lock(readMutex);
        while (writerActive) {
            cv.wait(lock);
        }
        readcount++;
 
        if (readcount == 1) {
            mtx.lock(); // 第一个读者需要锁定缓冲区
        }
 
        lock.unlock();
 
        // 读取数据
        int randomIndex = rand() % BUFFER_SIZE;
        char data = buffer[randomIndex];
        cout << "Reader is reading data " << data << " from buffer." << endl;
 
        lock.lock();
        readcount--;
 
        if (readcount == 0) {
            mtx.unlock(); // 最后一个读者释放缓冲区
        }
 
        cv.notify_all(); // 唤醒其他等待的线程
        lock.unlock();
 
        // 模拟读操作
        this_thread::sleep_for(chrono::milliseconds(100)); 
    }
}
 
void writer() {
    while (true) {
        unique_lock<mutex> lock(readMutex);
        writerActive = true;
        while (readcount > 0) {
            cv.wait(lock);
        }
 
        // 写入数据到缓冲区
        for (int i = 0; i < BUFFER_SIZE; ++i) {
            buffer[i] = nextChar;
            if (nextChar == '9') {
                nextChar = 'A';
            } else if (nextChar == 'Z') {
                nextChar = '1';
            } else {
                nextChar++;
            }
        }
 
        cout << "Writer is writing data to buffer." << endl;
 
        writerActive = false;
        cv.notify_all(); // 唤醒其他等待的线程
    }
}
 
int main() {
    thread readerThread(reader);
    thread writerThread(writer);
 
    readerThread.join();
    writerThread.join();
 
    return 0;
}
 

消息传递实现

#include <iostream>
#include <thread>
#include <queue>
#include <condition_variable>
 
using namespace std;
 
// Message types
enum MessageType {
    ReadRequest,
    WriteRequest,
    Finished
};
 
struct Message {
    MessageType type;
    int id;
};
 
int count = 0;
int writer_id = -1;
 
queue<Message> readRequests;
queue<Message> writeRequests;
queue<Message> finishedMessages;
mutex mtx;
condition_variable cv;
 
void reader(int i) {
    while (true) {
        Message rmsg;
        rmsg.type = ReadRequest;
        rmsg.id = i;
        {
            unique_lock<mutex> lock(mtx);
            readRequests.push(rmsg);
            cv.notify_all();  // Notify the controller
            cv.wait(lock, [&]() { return rmsg.id == -1; });  // Wait for finished message
        }
        
        // Reading is performed
        cout << "Reader " << i << " is reading..." << endl;
        this_thread::sleep_for(chrono::milliseconds(100));  // Simulate reading
        
        rmsg.type = Finished;
        rmsg.id = i;
        {
            unique_lock<mutex> lock(mtx);
            finishedMessages.push(rmsg);
            cv.notify_all();  // Notify the controller
        }
    }
}
 
void writer(int j) {
    while (true) {
        Message rmsg;
        rmsg.type = WriteRequest;
        rmsg.id = j;
        {
            unique_lock<mutex> lock(mtx);
            writeRequests.push(rmsg);
            cv.notify_all();  // Notify the controller
            cv.wait(lock, [&]() { return rmsg.id == -1; });  // Wait for finished message
        }
        
        // Writing is performed
        cout << "Writer " << j << " is writing..." << endl;
        this_thread::sleep_for(chrono::milliseconds(200));  // Simulate writing
        
        rmsg.type = Finished;
        rmsg.id = j;
        {
            unique_lock<mutex> lock(mtx);
            finishedMessages.push(rmsg);
            cv.notify_all();  // Notify the controller
        }
    }
}
 
void controller() {
    while (true) {
        unique_lock<mutex> lock(mtx);
        
        if (count > 0) {
            if (!finishedMessages.empty()) {
                Message msg = finishedMessages.front();
                finishedMessages.pop();
                count++;
                lock.unlock();
                cv.notify_all();  // Notify waiting threads
            } else if (!writeRequests.empty()) {
                Message msg = writeRequests.front();
                writeRequests.pop();
                writer_id = msg.id;
                count = count - 100;
            } else if (!readRequests.empty()) {
                Message msg = readRequests.front();
                readRequests.pop();
                count--;
                lock.unlock();
                cv.notify_all();  // Notify waiting threads
                // Simulate sending "OK" back to the reader
                cout << "Sending OK to reader " << msg.id << endl;
                this_thread::sleep_for(chrono::milliseconds(10));
                lock.lock();
            }
        }
        
        if (count == 0) {
            if (writer_id != -1) {
                cout << "Sending OK to writer " << writer_id << endl;
                writer_id = -1;
            }
            if (!finishedMessages.empty()) {
                Message msg = finishedMessages.front();
                finishedMessages.pop();
                count++;
                lock.unlock();
                cv.notify_all();  // Notify waiting threads
            } else {
                lock.unlock();
            }
        }
        
        while (count < 0) {
            Message msg = finishedMessages.front();
            finishedMessages.pop();
            count++;
            lock.unlock();
            cv.notify_all();  // Notify waiting threads
            lock.lock();
        }
 
        cv.wait(lock);  // Wait for signals
    }
}
 
int main() {
    thread controllerThread(controller);
    thread readerThreads[5];
    thread writerThreads[3];
 
    for (int i = 0; i < 5; ++i) {
        readerThreads[i] = thread(reader, i);
    }
 
    for (int j = 0; j < 3; ++j) {
        writerThreads[j] = thread(writer, j);
    }
 
    for (int i = 0; i < 5; ++i) {
        readerThreads[i].join();
    }
 
    for (int j = 0; j < 3; ++j) {
        writerThreads[j].join();
    }
 
    controllerThread.join();
 
    return 0;
}