Producer-Consumer Problem: How to use BlockingQueue to achieve Concurrency? [Part-2]

Shubhmeet Kaur
4 min readJul 5, 2020

This post is continuation of the last Post of how to use java’s library: java.util.concurrent.* to achieve concurrency in java and solve multi-threading real world problems. One such problem is Producer-Consumer problem.

Producer-Consumer Problem

The producer-consumer problem also famously known as bounded buffer problem is classic example of multi-process synchronization which needs high levels of concurrency. Problem can be illustrated as having two processes: the producer and the consumer who work and share a common, fixed size buffer/queue. The two threads work independently. The role of producer is to produce the data and add it to the queue, where as the consumer consumes the data from the queue generated by the producer. Constraints imposed are producer shouldn’t be allowed to produce data once the queue is full and consumer shouldn’t try to remove data from an empty queue.

The solution can be classic wait and notify thread operations commonly used in multi-threading. There are different ways to implement the same: using semaphores or BlockingQueue. This post illustrates how to use the later to solve the above problem. The two variations of this problem will be discussed below:

  1. Single Producer and Single Consumer
  2. Multiple Producer and Single Consumer

Solution Using BlockingQueue:

The blockingQueue interface in java provides two major methods : put() and take() {discussed in detail in Part-1}. Method put() is used to add the element to the queue and take() is used to retrieve and remove the element from the queue. The two calls are blocking which means put() call will be blocked in case the queue has reached its capacity and take() will be blocked if the queue is empty. Hence, the constraints to solve the producer-consumer problem is met and there is no need to implement any kind of external thread synchronization.

Benefits

  1. Simple code and is more readable and maintainable
  2. Less error prone as there is no need to implement your own synchronization mechanism

Java basic Code Snippets

  1. Single Producer and Consumer : Code snipped for Producer class producing data and add it to the queue.
package com.java;import java.util.concurrent.BlockingQueue;

class Producer implements Runnable {

private final BlockingQueue<Integer> blockingQueue;

public Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}

public void run() {
for (int i = 0; i <= 50; i += 10) {
try {
System.out.println(String.format("Producer produced data %s and added to queue.",i));
blockingQueue.put(i);
} catch (InterruptedException ex) {
System.out.println("Producer thread is interrupted.");
}
}
}
}

Code snippet for Consumer class remove data from the queue and does some processing on it. It will wait in case the queue is empty.

package com.java;import java.util.concurrent.BlockingQueue;

class Consumer implements Runnable {

private final BlockingQueue<Integer> blockingQueue;

public Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}

public void run() {
while (true) {
try {
System.out.println(String.format("Consumer consumed data %s from queue.",blockingQueue.take()));
} catch (InterruptedException ex) {
System.out.println("Consumer thread is interrupted.");
}
}
}
}

Code snippet to test the workflow

package com.java;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducerConsumerExample {

public static void main(String[] args) {
// shared queue created using LinkedBlockingQueue
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
// Producer thread creation
Thread producer = new Thread(new Producer(blockingQueue));
// Consumer thread creation
Thread consumer = new Thread(new Consumer(blockingQueue));

// Start Producer and Consumer thread
producer.start();
consumer.start();
}
}

Sample Result Snippet

Producer produced data 0 and added to queue.
Consumer consumed data 0 from queue.
Producer produced data 10 and added to queue.
Consumer consumed data 10 from queue.
Producer produced data 20 and added to queue.
Consumer consumed data 20 from queue.
....

2. Multiple Producer and Single Consumer using java.util.concurrent.*

In this particular case multiple producer threads are producing data to the queue and only one consumer thread is consuming the data. Code is using Java’s internal implementation of Executor Service and LinkedBlockingDeque to achieve the same.The Java ExecutorService is a construct that allows you to pass a task to be executed by a thread asynchronously. Producer threads are producing tasks to submit to the common queue of ExecutorService. Consumer thread consumes data from the same queue.

package com.java;import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ProducerConsumerProblem {
private static volatile ExecutorService executorService;

public static void main(String[] args) throws Exception {
executorService = getExecutorService(1, 1000);
for (int i = 0; i < 10; i++) {
Thread producer = new Thread(new Producer(i));
producer.start();
Thread.sleep(1000);
}
}

public static ExecutorService getExecutorService(int poolsize, int queueSize) {
ThreadFactoryBuilder builder = new ThreadFactoryBuilder().setNameFormat("ConsumerThread");
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(poolsize, poolsize, 0, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(queueSize));
threadPoolExecutor.setThreadFactory(builder.build());
return threadPoolExecutor;
}

static class Task implements Runnable {
int value;

Task(int value) {
this.value = value;
}

@Override
public void run() {
System.out.println(String.format("Consumer consuming value %s", value));
}
}

static class Producer implements Runnable {
int value;

public Producer(int value) {
this.value = value;
}

public void run() {
System.out.println(String.format("Producer produced value %s", value));
executorService.submit(new Task(value));
}
}
}

Sample Result Snippet

Producer produced data 0
Consumer consuming data 0
Producer produced data 10
Consumer consuming data 10
Producer produced data 20
Consumer consuming data 20
....

Feel free to reach out to me with any questions/feedback.

References:

Thank you!

Shubhmeet

--

--

Shubhmeet Kaur

Software Engineer | Graduated MSCS,Fall 2018 | Code Enthusiastic