CyclicBarrier example: a parallel sort algorithm (ctd)
On the previous page, we outlined a technique for performing
a multithreaded parallel sort. We end up with
a sort that takes place in three stages, where each stage has a parallel part and
a serial part. The three stages can be summarised as follows:
Stage | Parallel step (executed by all participating threads) | Serial step (executed by one random thread after the parallel step) |
1. Find split points | Choose a random sample of (say) 16 values from a portion of the list. Each thread considers an equal-sized sublist
(e.g. if there are 300 elements and 3 threads, thread 1 chooses samples from elements 0-99, 2 considers 100-199 etc). |
Amalgamate and sort the list of samples, then make a new list of "split points" containing the 16th, 32nd etc element from the sorted list. |
2. Allocate into buckets | Each thread goes through a sublist of (size / noThreads) elements and, given the split points, puts each element in the sublist into its correct bucket. | Essentially none, although in our example we perform a small "memory clearing" task at this point. |
3. Sort buckets | Each thread calls Collections.sort() on its bucket. | Amalgamate the sorted buckets. |
Overall pattern
First, let's take a top-level view of how the above will be put together using a
CyclicBarrier. That will give you an idea of how CyclicBarrier works in case
your interest is really to adapt the pattern to another purpose. If you actually want to implement
a parallel sort, then stay tuned, because we'll look at a possible implementation in a moment.
We'll start with an outline of a worker thread. Each thread will essentially be identical,
and will have a structure roughly as follows:
class SorterThread extends Thread {
...
public void run() {
// Work out which portion of data our thread is working on
double div = (double) data.size() / noThreads;
int startPos = (int) (div * threadNo),
endPos = (int) (div * (threadNo + 1));
try {
// Stage 1
gatherSplitPointSample(data, startPos, endPos);
barrier.await();
// Stage 2
assignItemsToBuckets(data, threadNo, startPos, endPos);
barrier.await();
// Stage 3
sortMyBucket();
barrier.await();
} catch (InterruptedException e) {
...
} catch (BrokenBarrierException e) {
...
}
}
}
Notice, then, that the essential pattern is that the thread executes each
of the process, then calls barrier.await() at the end of stage (where
of course barrier is an instance of CyclicBarrier). In other
applications where there weren't discrete stages but rather some arbitrary number
of iterations, the thread would sit in a loop, performing work then
calling await(). But the principle is essentially the same.
So much for the parallel parts. We then need a a method that
handles the serial parts of the sort— that is a method that will
be called by one of our threads each time all of them have got to
the point of calling await(). Logically, we want three separate methods,
one for each stage. But for reasons we'll see in a minute, we actually
need to wrap them in a single method that will be called each time. But
a simple variable to record the current stage number will suffice:
private void sortStageComplete() {
switch (stageNo) {
case 0 :
amalgamateSplitPointData();
break;
case 1 :
// not necessarily any action at this point
break;
case 2 :
combineBuckets();
break;
default :
throw new RuntimeException("Don't expect to be called at stage " + stageNo);
}
stageNo++;
}
Putting things together with CyclicBarrier
Armed with our outline worker thread and our methods to amalgamate the data
from each stage (OK, bar actually implementing the above methods...), we now put
things together with a CyclicBarrier. Typically, we'll have some method
that is called to actually start the process off, and we might want that
method to return only once the sort is complete. So we'll actually make the caller
thread an extra participant in the barrier. The process will thus be as follows:
- we construct a CyclicBarrier, telling it that there'll
be (noThreads + 1) participants;
- to the constructor of the barrier, we also pass our sortStageComplete()
method (actually, a Runnable wrapper around it);
- then, we we ourselves call await() (three times, in fact, for the
three stages).
The code looks something as follows:
private final int noThreads = Runtime.getRuntime().availableProcessors();
private final CyclicBarrier barrier =
new CyclicBarrier(noThreads + 1, new Runnable() {
public void run() {
sortStageComplete();
}
});
for (int i = 0; i < noThreads; i++) {
SorterThread thr = new SorterThread(i);
thr.start();
}
try {
barrier.await();
barrier.await();
barrier.await();
} catch (BrokenBarrierException bb) {
// process was interrupted for some reason
}
If the thread setting up the operation didn't specifically need to know about
completion of the task or interruptions thereof, then it of course need not
actually participate in the barrier. Any of the other regular means for communicating
between threads could also be used if they serve your purpose,
such as a CountDownLatch.
Next: error handling and implementing the parallel sort
That's essentially it for the high-level overview of CyclicBarrier.
An issue that you may wish to deal with is error
handling: notifying the controller thread of what exception caused the
barrier to be broken, rather than simply the fact that it was broken.
Then, on the following pages, we consider how
to implement the parallel sort, which basically means filling in the methods
in the above skeleton code.
If you enjoy this Java programming article, please share with friends and colleagues. Follow the author on Twitter for the latest news and rants.
Editorial page content written by Neil Coffey. Copyright © Javamex UK 2021. All rights reserved.