CyclicBarrier example: a parallel sort algorithm (ctd)
(Continued from our explanation of implementing
a parallel sort with a CyclicBarrier.)
The second stage of sorting is for each thread to go through a subset of the
data and put each item from that subset into its relevant "bucket". Things are pretty
much what they say on the tin. A couple of interesting points to note in the
implementation below are that each thread first builds up a set of thread-local
buckets, then at the end adds its thread-local buckets to the "master" buckets.
That's just designed to reduce contention on the master buckets during the main
part of the operation. The other part worthy of note is the use of
Collections.binarySearch(). Normally, Collections.binarySearch() is
used when we want to insert a new item into an already-ordered list, while still
maintaining order after the item is inserted. In this case, we use it on the
order list of split values (without ever inserting), and it
in effect tells us the index of the split value that a given item comes
before (or is equal to). The strange comparison with zero is
just because Collections.binarySearch() returns a negative number
if the item is not already in the list, and positive if it is.
private void assignItemsToBuckets(List data,
int threadNo, int startPos, int endPos) {
List<E> spl;
synchronized (splitPoints) {
spl = new ArrayList(splitPoints);
}
List<List<E>> bucketData = new ArrayList>(noThreads);
for (int i = 0; i < noThreads; i++) {
bucketData.add(new ArrayList(dataSize /
(noThreads * noThreads)));
}
Lock lck = dataLock.readLock();
lck.lock();
try {
for (int i = startPos; i < endPos; i++) {
E item = data.get(i);
int bucket = Collections.binarySearch(spl, item);
if (bucket < 0)
bucket = (-bucket) -1;
if (bucket >= noThreads)
bucket = noThreads-1;
bucketData.get(bucket).add(item);
}
} finally {
lck.unlock();
}
for (int i = 0; i < noThreads; i++) {
List l = bucketsToSort.get(i);
synchronized (l) {
l.addAll(bucketData.get(i));
}
}
}
At the end of this second stage, the CyclicBarrier calls our
sortStageComplete method, which then calls clearData(). This
simply calls clear() on the data list, having first remembered
to obtain the write lock while doing so:
private void clearData() {
Lock lck = dataLock.writeLock();
lck.lock();
try {
data.clear();
} finally {
lck.unlock();
}
}
The only reason for clearing the data list here is because "we may as well":
the references to the data objects are now safely the various buckets, so we may as
well free up the memory used by the list while the sorting is taking place. Aside from
the memory consideration, we could just leave data is it is, and clear it
at the beginning of the combineBuckets() method (see next paragraph).
The third stage of actually sorting each bucket is disappointingly boring:
we just call Collections.sort() on the given bucket, having remembered
to synchronize first on that particuar bucket (it's just the sortMyBucket()
method given on the previous page). At the end of the sorting phase,
the CyclicBarrier will call our sortStageComplete() method again,
which this time calls combineBuckets(). This simply adds each thread's
bucket back to data. Again, the only moderately tricky thing is
remembering to acquire the appropriate locks:
private void combineBuckets() {
Lock lck = dataLock.writeLock();
lck.lock();
try {
for (int i = 0; i < noThreads; i++) {
List l = bucketsToSort.get(i);
synchronized (l) {
data.addAll(l);
}
}
} finally {
lck.unlock();
}
}
And finally...
That's more or less it. We're just missing the controller method that the caller
will invoke to actually perform the sort. Essentially, we're going to just start the sorter
threads going, wait at the barrier three times, and handle any resulting exceptions.
We also add a slight "sanity check" at the beginning of the sort method: if the size of the
list to be sorted is too small to warrant the parallel sort, then we just call
boring old Collections.sort() on it. The resulting code is as follows:
public void sort() throws InterruptedException {
// See if it's not worth doing a parallel sort
Lock l = dataLock.writeLock();
l.lockInterruptibly();
try {
if (data.size() < noSamplesPerThread * 4 * noThreads) {
Collections.sort(data);
return;
}
} finally {
l.unlock();
}
// Start sorter threads going
List threads = new ArrayList(noThreads);
for (int i = 0; i < noThreads; i++) {
SorterThread thr = new SorterThread(i);
threads.add(thr);
thr.start();
}
// Wait for sorter threads
try {
barrier.await();
barrier.await();
barrier.await();
} catch (BrokenBarrierException bb) {
// Find the error that caused the broken barrier
for (int i = 0; i < noThreads; i++) {
SorterThread thr = threads.get(i);
Throwable t = thr.error;
if (t != null)
throw new RuntimeException("Error during sort", t);
}
if (completionStageError != null)
throw completionStageError;
else
throw new RuntimeException("Misc error during sort", bb);
}
}
And there you have it— finally! You may be interested in reviewing some of
the topics that arose during this section:
If you enjoy this Java programming article, please share with friends and colleagues. Follow the author on Twitter for the latest news and rants.
Editorial page content written by Neil Coffey. Copyright © Javamex UK 2021. All rights reserved.