目录

一、生产者消费者模型的概念

二、生产者消费者模型的特点

三、生产者消费者模型优点

四、基于BlockingQueue的生产者消费者模型

4.1 基本认识

4.2 模拟实现


一、生产者消费者模型的概念

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

生产者和消费者彼此之间不直接通讯,而通过容器来通讯,所以生产者生产完数据之后不用等待消费者处理,直接将生产的数据放到这个容器当中;消费者也不用找生产者索要数据,而是直接从这个容器中取数据。容器就类似于一个缓冲区,平衡了生产者和消费者的处理能力,这个容器完成了生产者和消费者之间的解耦

二、生产者消费者模型的特点
  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)
  • 两种角色: 生产者和消费者(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区(可以自己通过某种方式组织)

生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?

介于生产者和消费者之间的容器可能会被多个执行流同时访问,因此需要将该临界资源用互斥锁保护起来。所以所有生产者和消费者都会竞争式的申请锁,因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系

生产者和消费者之间为什么会存在同步关系?

若一直让生产者生产,那么当生产者生产的数据装满容器后,生产者再生产数据就会生产失败。
反之,让消费者一直消费,那么当容器当中的数据被消费完后,消费者再进行消费就会消费失败。
虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。

注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来

三、生产者消费者模型优点
  • 解耦
  • 支持并发,提高效率
  • 支持忙闲不均

若在主函数中调用某一函数,那么必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合

四、基于BlockingQueue的生产者消费者模型

4.1 基本认识

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者消费者模型的数据结构

其与普通的队列的区别在于:

  • 当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素
  • 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出

阻塞队列最经典的应用场景:管道

4.2 模拟实现

下面以单生产者、单消费者为例进行讲解与实现

#include <iostream>
#include <queue>
#include <pthread.h>

template <class T>
class BlockQueue
{
public:
    BlockQueue(size_t capacity = 4) : _capacity(capacity)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_full,nullptr);
        pthread_cond_init(&_empty,nullptr);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_full);
        pthread_cond_destroy(&_empty);
    }

    void push(const T& data)
    {
        pthread_mutex_lock(&_mutex);
		while (IsFull()) {//不能进行生产,直到阻塞队列可以容纳新的数据
			pthread_cond_wait(&_full, &_mutex);
		}
		_queue.push(data);
        std::cout << "Producer: " << data << std::endl;
		pthread_mutex_unlock(&_mutex);
		pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程
    }
    void pop(T& data)
    {
        pthread_mutex_lock(&_mutex);
		while (IsEmpty()) {//不能进行消费,直到阻塞队列有新的数据
			pthread_cond_wait(&_empty, &_mutex);
		}
		data = _queue.front();
		_queue.pop();
        std::cout << "Consumer: " << data << std::endl;
		pthread_mutex_unlock(&_mutex);
		pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程
    }
    
private:
    bool IsFull() { return _queue.size() == _capacity; }
    bool IsEmpty() { return _queue.empty(); }

private:
    std::queue<T> _queue;
    size_t _capacity;
    pthread_mutex_t _mutex;
    pthread_cond_t _full;
    pthread_cond_t _empty;
};

判断是否满足生产消费条件时不能用if,而应该用while:

pthread_cond_wait函数有可能调用失败,调用失败后该执行流就会继续往后执行。为了避免出现上述情况,就要让线程被唤醒后再次进行判断,确认是否真的满足生产消费条件,因此这里必须要用while进行判断

生产者消费者步调一致

#include <unistd.h>
#include "BlockQueue.hpp"

void* Producer(void* arg)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	
	while (true) { //生产者不断进行生产
		sleep(1);
		int data = rand() % 100 + 1;
		bq->push(data);
	}
}
void* Consumer(void* arg)
{
    int data = 0;
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	while (true) { //消费者不断进行消费
		sleep(1);
		bq->pop(data);
	}
}

int main() 
{
    pthread_t producer,consumer;
    BlockQueue<int>* bq = new BlockQueue<int>;

    pthread_create(&producer,nullptr,Producer,(void*)bq);
    pthread_create(&consumer,nullptr,Consumer,(void*)bq);

    pthread_join(producer,nullptr);
    pthread_join(consumer,nullptr);
    delete bq;

    return 0;
}

由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调是一致的

生产者速度快,消费者速度慢

void* Producer(void* arg)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	
	while (true) { //生产者不断进行生产
		int data = rand() % 100 + 1;
		bq->push(data);
	}
}
void* Consumer(void* arg)
{
    int data = 0;
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	while (true) { //消费者不断进行消费
		sleep(1);
		bq->pop(data);
	}
}

此时由于生产者生产的很快,运行代码后一瞬间生产者就将阻塞队列装满。此时生产者想要再进行生产就只能在full条件变量下进行等待,直到消费者消费完一个数据后,生产者才会被唤醒进而继续进行生产,生产者生产完一个数据后又会进行等待,因此后续生产者和消费者的步调又变成一致的了

生产者速度慢,消费者速度快

void* Producer(void* arg)
{
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	
	while (true) { //生产者不断进行生产
        sleep(1);
		int data = rand() % 100 + 1;
		bq->push(data);
	}
}
void* Consumer(void* arg)
{
    int data = 0;
    BlockQueue<int>* bq = (BlockQueue<int>*)arg;
	while (true) { //消费者不断进行消费
		bq->pop(data);
	}
}

虽然消费者消费的快,但开始时阻塞队列中是没有数据的,因此消费者只能在empty条件变量下等待,直到生产者生产完一个数据后,消费者才会被唤醒进而进行消费,消费者消费完这一个数据后又会进行等待,因此生产者和消费者的步调就是一致的

设置唤醒策略

可以设置一些策略。譬如,下面当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费;当阻塞队列当中存储的数据小于队列容器的一半时,再唤醒生产者线程进行生产

void push(const T &data)
{
    pthread_mutex_lock(&_mutex);
    while (IsFull()) { // 不能进行生产,直到阻塞队列可以容纳新的数据
        pthread_cond_wait(&_full, &_mutex);
    }
    _queue.push(data);
    std::cout << "Producer: " << data << std::endl;
    if (_queue.size() >= _capacity / 2) {
        pthread_cond_signal(&_empty); // 唤醒在empty条件变量下等待的消费者线程
    }
    pthread_mutex_unlock(&_mutex);
}
void pop(T &data)
{
    pthread_mutex_lock(&_mutex);
    while (IsEmpty()) { // 不能进行消费,直到阻塞队列有新的数据
        pthread_cond_wait(&_empty, &_mutex);
    }
    data = _queue.front();
    _queue.pop();
    std::cout << "Consumer: " << data << std::endl;
    if (_queue.size() <= _capacity / 2) {
        pthread_cond_signal(&_full); // 唤醒在full条件变量下等待的生产者线程
    }
    pthread_mutex_unlock(&_mutex);
}

仍然让生产者生产快,消费者消费慢。运行代码后生产者还是一瞬间将阻塞队列装满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程,而是当阻塞队列当中的数据小于等于队列容器的一半时,才会唤醒生产者线程进行生产

基于任务的生产者消费者模型

实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,前面的代码只是为了理解生产者消费者模型而已。
编写BlockingQueue时当中存储的数据就进行了模板化,那么就可以让BlockingQueue当中存储其他类型的数据。

譬如编写一个Task类(其中包含需要执行的任务),BlockingQueue中就存储Task对象。此时生产者放入阻塞队列的数据就是Task对象,而消费者从阻塞队列拿到Task对象后,就可以用该对象调用Run成员函数进行数据处理。

总之,根据需要进行编写即可