MapReduce in Qt Concurrent

MapReduce was originally developed by Google to simplify writing parallel algorithms for computer clusters. The basic idea is that you divide your algorithm into two parts: one part that can be run in parallel on individual pieces of the input data ('map'), and one sequential part that collects the map results and produces the final result ('reduce'). Your program then sends the map and reduce functions along with your input data to the MapReduce framework which automatically parcels out the data to each cluster node and collects the results afterwards.

MapReduce in Qt Concurrent is implemented to work on shared-memory systems, so instead of managing cluster nodes it manages threads on a single computer. This also means we can drop some of the features that the Google version has, such as fault tolerance. (we assume that processors don't fail)

The API looks like this:

QFuture<T> mappedReduced(list, mapFunction, reducefunction);

As an example, let's say we want to do a word frequency count on the contents of several documents. Here the map function will count the word occurrences in each document in parallel, and the reduce function will combine the them into a final frequency count.

The input for the function is a list of text strings that contains the documents:

QList<QString> list;

The map function takes one document and produces a hash that stores the frequency count for each word in the document. This function will be called in parallel by several threads, so it can't have any side-effects such as modifying global data (or more accurately: any side-effcts must be thread-safe). The number of threads used will be scaled according to the number of CPU cores on the system.

QHash<QString, int> mapFunction(const QString &document);

The reduce function takes one intermediate result hash and aggregates it into the final result. Qt Concurrent will make sure only one thread calls this function at a time. This has two implications: there is no need to use a mutex lock when updating the result variable, and the system can be smarter about how it manages threads. If a thread tries to become the reducer thread while another thread is reducing, the first thread doesn't have to block but can put the result on the to-be-reduced queue and then call the map function on a new piece of data instead.

void reduceFunction(QHasht<QString, int> &finalResut, const QHash<QString, int> &intermediateResult);

Finally we put it all together like this:

QFuture<QHash<QString, int> >counting =  mappedReduced(list, mapFunction, reduceFunction);

Since mappedReduced returns a QFuture we have several options on how to synchronize with the result. The simplest thing is to just call QFuture::result() which will block until the result is ready. If blocking is inappropriate (say we are in the gui thread) we can use signal-slot connections to get progress and result notifications instead. It's also possible to cancel mappedReduced by calling QFuture::cancel().

I've skipped over the function implementations here, the complete word count example is available in the Qt Concurrent package.

Blog Topics: