Asynchronous APIs in Qt 6

As readers may already know, Qt provides several multithreading constructs (threads, mutexes, wait conditions, etc.), and higher level APIs like QThreadPool, Qt Concurrent and other related classes. Those who are not familiar with Qt threading support yet or want to learn more, can participate in our online training sessions . In this post we will concentrate on higher level asynchronous APIs and changes introduced in Qt 6.

Higher level concurrency APIs in Qt

Qt Concurrent makes multithreaded programming easier by eliminating the need for low-level synchronization (primitives, such as mutexes and locks) and managing multiple threads manually. It provides map, filter and reduce algorithms (better known from functional programming) for parallel processing of iterable containers. In addition, there are classes like QFuture, QFutureWatcher and QFutureSynchronizer to access and monitor the results of asynchronous computations. Although all of these are very useful, there were still some drawbacks, like inability to use QFuture outside Qt Concurrent, lacking support for chaining multiple computations for simpler and cleaner code, lack of flexibility of Qt Concurrent APIs, etc. For Qt 6 we tried to address the feedback gathered over the past years and make multithreaded programming with Qt more enjoyable and fun!

Attaching continuations to QFuture

A common scenario in multithreaded programming is running an asynchronous computation, which in turn needs to invoke another asynchronous computation and pass data to it, which depends on another one, and so on. Since each stage requires the results of the previous one, you need to either wait (by blocking or polling) until the previous stage completes and use its results, or structure your code in "call-callback" fashion. None of these options is perfect: you either waste resources on waiting, or get complex unmaintainable code. Adding new stages or logic (for error handling, etc.) increases the complexity even further.

To get a better understanding of the problem, let's consider the following example. Let's say we want to download a big image from network, do some heavy processing on it and show the resulting image in our application. So we have the following steps:

  • make a network request and wait until all data is received
  • create an image from the raw data
  • process the image
  • show it

And we have the following methods for each step that need to be invoked sequentially:

QByteArray download(const QUrl &url);
QImage createImage(const QByteArray &data);
QImage processImage(const QImage &image);
void show(const QImage &image);

We can use QtConcurrent to run these tasks asynchronously and QFutureWatcher to monitor the progress:


void loadImage(const QUrl &url) {
    QFuture<QByteArray> data = QtConcurrent::run(download, url);
    QFutureWatcher<QByteArray> dataWatcher;
    dataWatcher.setFuture(data);
    
connect(&dataWatcher, &QFutureWatcher<QByteArray>::finished, this, [=] {         // handle possible errors         // ...         QImage image = createImage(data);         // Process the image         // ...         QFuture<QImage> processedImage = QtConcurrent::run(processImage, image);         QFutureWatcher<QImage> imageWatcher;         imageWatcher.setFuture(processedImage);         connect(&imageWatcher, &QFutureWatcher<QImage>::finished, this, [=] {             // handle possible errors             // ...             show(processedImage);         });     }); }

Doesn't look nice, does it? The application logic is mixed with the boilerplate code required to link things together. And you just know it's going to get uglier, the more steps we add to the chain. QFuture helps to solve this problem by adding support for attaching continuations via QFuture::then() method:

auto future = QtConcurrent::run(download, url)
            .then(createImage)
            .then(processImage)
            .then(show);

This undoubtedly looks much better! But one thing is missing: the error handling. You could do something like:

auto future = QtConcurrent::run(download, url)
            .then([](QByteArray data) {
                // handle possible errors from the previous step
                // ...
                return createImage(data);
            })    
            .then(...)    
            ...

This will work, but the error handling code is still mixed with program logic. Also we probably don't want to run the whole chain if one of the steps has failed. This can be solved by using QFuture::onFailed() method, which allows us to attach specific error handlers for each possible error type:

auto future = QtConcurrent::run(download, url)
            .then(createImage)
            .then(processImage)
            .then(show)
            .onFailed([](QNetworkReply::NetworkError) {
                // handle network errors
            })
            .onFailed([](ImageProcessingError) {
                // handle image processing errors
            })
            .onFailed([] {
                // handle any other error
            });

Note that using .onFailed() requires exceptions to be enabled. If any of the steps fails with an exception, the chain is interrupted, and the error handler matching with the exception type that was thrown is called.

Similar to .then() and .onFailed(), there is also .onCanceled(), for attaching handlers in case the future got canceled.

Creating QFuture from signals

Similar to futures, signals also represent something that will become available sometime in the future, so it seems natural to be able to work with them as with futures, attach continuations, failure handlers, and so on. Given a QObject-based class MyObject with a signal void mySignal(int), you can use this signal as a future in the following way:

QFuture<int> intFuture = QtFuture::connect(&object, &MyObject::mySignal);

Now you can attach continuations, failure or cancellation handlers to the resulting future.

Note that the type of the resulting future matches with the argument type of the signal. If it has no arguments, then QFuture<void> is returned. In case of multiple arguments, the result is stored in a std::tuple.

Let's go back to the first (i.e. download) step of our image processing example, to see how this can be useful in practice. There are many ways to implement it, we will use QNetworkAccessManager to send the network request and get the data:

QNetworkAccessManager manager;    
...

QByteArray download(const QUrl &url) {        
    QNetworkReply *reply = manager.get(QNetworkRequest(url));
    QObject::connect(reply, &QNetworkReply::finished, [reply] {...});
    
    // wait until we've received all data
    // ...    
    return data;        
}

But the blocking wait above is not good, it would be better if we could get rid of it, and instead say "when QNetworkAccessManager gets the data, create an image, then process it and then show". We can do it by connecting the network access manager's finished() signal to QFuture:

QNetworkReply *reply = manager.get(QNetworkRequest(url));

auto future = QtFuture::connect(reply, &QNetworkReply::finished)
        .then([reply] {
            return reply->readAll();
        })
        .then(QtFuture::Launch::Async, createImage)
        .then(processImage)
        .then(show)        
        ...

You can notice that now instead of using QtConcurrent::run() to asynchronously download and return the data in a new thread, we are simply connecting to the QNetworkAccessManager::finished() signal, which starts the chain of computations. Also note the additional parameter in the following line:

        .then(QtFuture::Launch::Async, createImage)

By default continuations attached by .then() are invoked in the same thread in which the parent has been running (the main thread in our case). Now that we don't use QtConcurrent::run() to asynchronously launch the chain, we need to pass the additional QtFuture::Launch::Async parameter, to launch the chain of continuations in a separate thread and avoid blocking the UI.

Creating a QFuture

So far the only "official" way of creating and storing a value inside QFuture was using one of the methods of QtConcurrent. So outside QtConcurrent, QFuture was not very useful. In Qt 6 we finally have the "setter" counterpart of QFuture: QPromise, introduced by Andrei Golubev. It can be used to set values, progress and exceptions for an asynchronous computation, which can be later accessed via QFuture. To demonstrate how it works, let's rewrite the image processing example again, and make use of the QPromise class:

QFuture<QByteArray> download(const QUrl &url) {
    QPromise<QByteArray> promise;
    QFuture<QByteArray> future = promise.future();
    
    promise.start(); // notify that download is started
    
    QNetworkReply *reply = manager.get(QNetworkRequest(url));
    QObject::connect(reply, &QNetworkReply::finished,
            [reply, p = std::move(promise)] {
                p.addResult(reply->readAll());
                p.finish(); // notify that download is finished
                reply->deleteLater();
            });
    
    return future;
}
auto future = download()
        .then(QtFuture::Launch::Async, createImage)
        .then(processImage)
        .then(show)
        ...

Changes in QtConcurrent

Thanks to Jarek Kobus, Mårten Nordheim, Karsten Heimrich, Timur Pocheptsov and Vitaly Fanaskov, QtConcurrent also has received nice updates. The existing APIs got some improvements, in particular:

- You can now set a custom thread pool to all methods of QtConcurrent, instead of always running them on the global thread pool and potentially blocking the execution of other tasks.
- Map and filter reduce algorithms can now take an initial value, so you don't have to do workarounds for types that don't have a default constructor.
- QtConcurrent::run has been improved to work with a variable number of arguments and move-only types.

Additionally, we've added two new APIs to QtConcurrent to give more flexibility to the users. Let's look at those in more detail.

QtConcurrent::run with promise

Thanks to Jarek Kobus, the QtConcurrent::run()method has been improved to provide access to the promise object associated with the given task inside the run() method. This allows the users to do progress reporting, report multiple results, suspend or cancel the execution if it was requested. This is achieved by making the runnable passed to QtConcurrent::run() accept a reference to the QPromise object:

auto future = QtConcurrent::run(
            [] (QPromise<T> &promise, /* other arguments may follow */ ...) {
                // ...
                for (auto value : listOfValues) {
                    if (promise.isCanceled())
                        // handle the cancellation
        
                // do some processing...
        
                promise.addResult(...);
                promise.setProgressValue(...);
                }
            },
            /* pass other arguments */ ...);

As you can see, in this mode the users have more control over the task, and can react to cancellation or suspension requests, do progress reporting, etc., which was not possible before.

QtConcurrent::task

QtConcurrent::task() provides a fluent interface for running a task in a separate thread. It is a more modern alternative for QtConcurrent::run(), and allows configuring the tasks in a more convenient way. Instead of using one of the few overloads of QtConcurrent::run() to pass the parameters for running a task, you can specify them in any order, skip the ones that are not needed, and so on. For example:

QFuture<int> future = QtConcurrent::task(doSomething)
        .withArguments(1, 2, 3)
        .onThreadPool(pool)
        .withPriority(10)
        .spawn();

Note that, unlike run(), you can also pass a priority for the task.

If you find the new functionality interesting, you can also check the Qt 6 documentation snapshots for more details and examples. And please don't hesitate to leave feedback!

 


Blog Topics:

Comments