2001-04-11 The Java Specialists' Newsletter [Issue 016] - Blocking Queue
2001-04-11 The Java Specialists' Newsletter [Issue 016] - Blocking Queue
Author: Dr. Heinz M. KabutzJDK version: Category:
Performance
You can subscribe from our home page:
http://www.javaspecialists.co.za (which also hosts all previous issues,
available free of charge
Welcome to the 16th issue of "The Java(tm) Specialists' Newsletter", written in
a dreary-weathered-Germany. Since I'm a summer person, I really like living in
South Africa where we have 9 months of summer and 3 months of sort-of-winter.
It's quite difficult to explain to my 2-year old son the concepts of snow,
snow-man, snow-ball, etc. Probably as difficult as explaining to a German child
the concepts of cloudless-sky, beach, BSE-free meat, etc.
Next week I will again not be able to post the newsletter due to international
travelling (Mauritius), but the week after that I will demonstrate how it is
possible to write type-safe enum types in Java using inner classes and how it is
possible to "switch" on their object references. Switch statements should never
be used, but it is nevertheless fascinating to watch how the Java language
constructs can be abused...
Blocking Queues for inter-thread communication
This week I want to speak about a very useful construct that we use for
inter-thread communication, called a blocking queue. Quite often in threaded
applications we have a producer-consumer situation where some threads want to
pop jobs onto a queue, and some other worker threads want to remove jobs from
the queue and then execute them. It is quite useful in such circumstances to
write a queue which blocks on pop when there is nothing on the queue. Otherwise
the consumers would have to poll, and polling is not very good because it wastes
CPU cycles.
I have written a very simple version of the BlockingQueue, a more advanced
version would include alarms that are generated when the queue reaches a certain
length.
---
Warning Advanced:
When I write pieces of code which are synchronized, I usually avoid
synchronizing on "this" or marking the whole method as synchronized. When you
synchronize on "this" inside the class, it might happen that other code outside
of your control also synchronize on the handle to your object, or worse, call
notify on your handle. This would severely mess up your well-written
BlockingQueue code. I therefore as a habit always use private data members as
locks inside a class, in this case I use the private queue data member.
Another disadvantage of indiscriminately synchronizing on "this" is that it is
very easy to then lock out parts of your class which do not necessarily have to
be locked out from each other. For example, I might have a list of listeners in
my BlockingQueue which are notified when the list gets too long. Adding and
removing such listeners from the BlockingQueue should be synchronized, but you
do not have to synchronize in respect of the push and pop operations, otherwise
you limit concurrency.
--- Code BlockingQueue.java below:
//: BlockingQueue.java
import java.util.*;
public class BlockingQueue {
/**
It makes logical sense to use a linked list for a FIFO queue,
although an ArrayList is usually more efficient for a short
queue (on most VMs).
*/
private final LinkedList queue = new LinkedList();
/**
This method pushes an object onto the end of the queue, and
then notifies one of the waiting threads.
*/
public void push(Object o) {
synchronized(queue) {
queue.add(o);
queue.notify();
}
}
/**
The pop operation blocks until either an object is returned
or the thread is interrupted, in which case it throws an
InterruptedException.
*/
public Object pop() throws InterruptedException {
synchronized(queue) {
while (queue.isEmpty()) {
queue.wait();
}
return queue.removeFirst();
}
}
/** Return the number of elements currently in the queue. */
public int size() {
return queue.size();
}
}
Now we've got a nice little test case that uses the blocking queue for 10 worker
threads which will each pull as many tasks as possible from the queue. To end
the test, we put one poison pill onto the queue for each of the worker threads,
which, when executed, interrupts the current thread (evil laughter).
//: BlockingQueueTest.java
public class BlockingQueueTest {
private final BlockingQueue bq = new BlockingQueue();
/**
The Worker thread is not very robust. If a RuntimeException
occurse in the run method, the thread will stop.
*/
private class Worker extends Thread {
public Worker(String name) { super(name); start(); }
public void run() {
try {
while(!isInterrupted()) {
((Runnable)bq.pop()).run();
}
} catch(InterruptedException ex) {}
System.out.println(getName() + " finished");
}
}
public BlockingQueueTest() {
// We create 10 threads as workers
Thread[] workers = new Thread[10];
for (int i=0; inew Worker("Worker Thread " + i);
// We then push 100 commands onto the queue
for (int i=0; i<100; i++) {
final String msg = "Task " + i + " completed";
bq.push(new Runnable() {
public void run() {
System.out.println(msg);
// Sleep a random amount of time, up to 1 second
try { Thread.sleep((long)(Math.random()*1000)); }
catch(InterruptedException ex) { }
}
});
}
// We then push one "poison pill" onto the queue for each
// worker thread, which will only be processed once the other
// tasks are completed.
for (int i=0; ipublic void run() {
Thread.currentThread().interrupt();
}
});
}
// Lastly we join ourself to each of the Worker threads, so
// that we only continue once all the worker threads are
// finished.
for (int i=0; itry {
workers[i].join();
} catch(InterruptedException ex) {}
}
System.out.println("BlockingQueueTest finished");
}
public static void main(String[] args) throws Exception{
new BlockingQueueTest();
}
}
The concepts in the newsletter can be expanded quite a bit. They could, for
example, be used as a basis for implementing a ThreadPool, or otherwise you can
implement an "ActiveQueue" which performs callbacks to listeners each time an
event is pushed onto the queue via a Thread running inside the ActiveQueue.
It is also possible to use PipedInputStream and PipedOutputStream to send
messages between threads, but then you have to set up a whole protocol, and if
you want to exchange objects you have to use ObjectOutputStream which will be
alot slower than just passing handles.
Until next week, and please remember to forward this newsletter in its entirety
to as many Java users as you know.
Heinz
Copyright 2000-2004 Maximum Solutions, South Africa
Reprint Rights. Copyright subsists in all the material included in this email,
but you may freely share the entire email with anyone you feel may be
interested, and you may reprint excerpts both online and offline provided that
you acknowledge the source as follows: This material from The Java(tm)
Specialists' Newsletter by Maximum Solutions (South Africa). Please contact
Maximum Solutions
for more information.
Java and Sun are trademarks or registered trademarks of Sun Microsystems, Inc.
in the United States and other countries. Maximum Solutions is independent of
Sun Microsystems, Inc.
7470 bytes more | 14 comments | | Score: 0
|