Multithreading is a tough nut in software development. Not just because there are dozens of ways to approach a single problem. But also since one can get so many things wrong.
In this article I want to present how to realize the concept of a Looper with Dispatchers
in C++17.
Many widely used systems are based on this paradigm, despite their various expressions of the individual use case based on it.
Just to name a few:
AndroidOS – Loopers as a message queue and one or more Handler types, depending on the specific message.
(See: https://developer.android.com/reference/android/os/Looper )
Qt Framework – Also as message queue upon which the signal and slot mechanism is built to signal across thread boundaries.
(See: https://doc.qt.io/qt-5/signalsandslots.html and https://woboq.com/blog/how-qt-signals-slots-work.html )
Windowing systems with a UI-Thread and event-callbacks.
Most Game-Loops in game engines (even though they might not be reusable components), which attach to the main thread and hook into operating system specific event systems – the classic WINAPI-hooks (admit it, you know them 😉 )
Let’s examine the concept.
The problem: Executing long(er) running tasks on worker threads
Usually, it is no problem to start a thread and execute a function on it, e.g. using C++11’s <thread>
and std::thread
:
#include <thread> #include <iostream> #include <stdint.h> void work(uint32_t const &aNumberOfIterations) { for(uint32_t k=0; k<aNumberOfIterations; ++k) { std::cout << "I am a manifestation of an iteration\n"; } // Thread will terminate here. } // ... std::thread worker(work); // ATTENTION: Will start immediately! worker.join(); // Block and wait for completion // ...
So, why not use it everywhere and be happy?
Well, threads are not for free.
There will at least be a stack allocated for the thread, there is management of all threads to be done with respect to the governing process in kernel space and the operation system implementation and when having a large number of threads, scaleability, will almost certainly become a critical factor, regarding the huge amount of permutations of target systems.
And even worse, the specific expression of a thread is dependent on the operation system and the threading library used.
See:
https://eli.thegreenplace.net/2016/c11-threads-affinity-and-hyperthreading/
Finally, we hardly have any control about the threads and it’s execution.
- Are things executed in proper order?
- Who maintains the threads?
- How to receive results from asynchronous execution?
- What about task priorities or delayed insertions?
- Maybe even event driven dispatching?
As long as we don’t have co_routines
and executors
, let’s look at another way to approach thread reusability and controlled threading.
May I introduce: Loopers
Loopers, in it’s core, are objects, which contain or are attached to a thread with a conditional infinite loop, which runs as long as the abort-criteria is unmet. Within this loop, arbitrary actions can be performed.
Usually, methodology like start
, run
and stop
are provided.
Let’s derive an example class in three steps.
Wrapping a thread
First things first, we define the CLooper
-class, which contains an std::thread
-member and a run
-method, which will create the thread, invoking runFunc
– our second method – implementing the effective thread operation.
#include <thread> #include <atomic> #include <memory> #include <functional> #include <stdexcept> class CLooper { public: CLooper() { } // Copy denied, Move to be implemented ~CLooper() { } // To be called, once the looper should start looping. bool run() { try { mThread = std::thread(&CLooper::runFunc, this); } catch(...) { return false; } return true; } private: void runFunc() { // Thread function } private: std::thread mThread; };
Running the infinite loop
Then, we add the infinite loop to the looper implementation as well as an atomic flag mRunning
and a corresponding getter running()
indicating whether the looper is running or not.
public: // Methods bool running() const { return mRunning.load(); } private: // Methods // Conditionally-infinite loop doing sth. iteratively void runFunc() { mRunning.store(true); while(true) { try { // Do something... } catch(std::runtime_error& e) { // Some more specific } catch(...) { // Make sure that nothing leaves the thread for now... } } mRunning.store(false); } private: // Members std::atomic_bool mRunning;
Stopping the looper cleanly
In order to stop the looper, however, we need some more methodology.
We add an abort-criteria to the infinite loop – mAbortRequested
– of type std::atomic<bool>
, which is checked against in each iteration.
We also add a private method abortAndJoin()
, which will set the mAbortRequested
-flag to true, invoke join()
on the thread and waits until the looper-function has been exited and the worker thread was joined. The destructor, will also invoke abortAndJoin()
in case the looper goes out of scope.
The public method stop()
serves as a public API handle to control the looper.
public: // Ctor/Dtor ~CLooper() { abortAndJoin(); } public: // Methods void stop() { abortAndJoin(); } private: // Methods // Conditionally-infinite loop doing sth. iteratively void runFunc() { mRunning.store(true); // We now check against abort criteria while(false == mAbortRequested.load()) { try { // Do something... } catch(std::runtime_error& e) { // Some more specific } catch(...) { // Make sure that nothing leaves the thread for now... } } mRunning.store(false); } // Shared implementation of exiting the loop-function and joining // to the main thread. void abortAndJoin() { mAbortRequested.store(true); if(mThread.joinable()) { mThread.join(); } } private: // Members std::atomic_bool mAbortRequested;
This basic construct can now be used as follows:
std::unique_ptr<CLooper> looper = std::make_unique<CLooper>(); std::cout << "Starting looper" << std::endl; // To start and run looper->run(); using namespace std::chrono_literals; std::this_thread::sleep_for(5ms); std::cout << "Stopping looper" << std::endl; // To stop it and clean it up looper->stop(); looper = nullptr;
Filling it with life: Tasks
The above example implementation, however, is an iterative no-op, it doesn’t do anything.
Let’s base upon it and fill it with life by permitting the looper to execute something.
In the context of loopers these are little executable portions of code sharing a common signature, i.e. one or more Tasks, which can be fetched from an internal collection, e.g. a FIFO-queue, and be executed on the worker-thread.
Let’s start with the definition of a task type by adding this to the CLooper
-class:using Runnable = std::function<void()>;
Next, permit the looper to hold runnables by adding
std::recursive_mutex mRunnablesMutex;
std::queue<Runnable> mRunnables;
to the list of members.
The mutex is required to guard against simultaneous access to the task collection by the worker and dispatching thread.
In order to access the Runnables, in case the queue is not empty, add the below function.
Runnable next() { std::lock_guard<std::recursive_mutex> guard(mRunnablesMutex); if(mRunnables.empty()) { return nullptr; } Runnable runnable = mRunnables.front(); mRunnables.pop(); return runnable; }
And finally, in order to have the runnables be executed, add the below snippet into runFunc
‘s try-block.
Runnable r = next(); if(nullptr != r) { r(); } else { using namespace std::chrono_literals; std::this_thread::sleep_for(1ms); }
If there were any means of adding tasks yet, the looper would happily process the tasks pushed to the vector now.
Accepting work: Dispatchers
The looper still is useless, since no tasks can be pushed to the queue.
The final concept to solve this is the Dispatcher.
Imagine the dispatcher to be a bouncer in front of the looper.
It will accept a task but will manage insertion into the working-queue.
This way, some fancy usage scenarios can be enabled, e.g. delayed execution or immediate posting.
In this blog-post, however, I will elaborate regular FIFO-insertion only.
Let’s describe the dispatcher-class briefly, as a nested class in CLooper
BELOW the alias Runnable
.
public: using Runnable = std::function<void()>; class CDispatcher { friend class CLooper; // Allow the looper to access the private constructor. public: // Yet to be defined method, which will post the runnable // into the looper-queue. bool post(CLooper::Runnable &&aOther); private: // construction, since we want the looper to expose it's dispatcher exclusively! CDispatcher(CLooper &aLooper) : mAssignedLooper(aLooper) {} private: // Store a reference to the attached looper in order to // emplace tasks into the queue. CLooper &mAssignedLooper; };
With this definition given, we add a std::shared_ptr<CDispatcher> mDispatcher;
in CLooper and add mDispatcher(std::shared_ptr<CDispatcher>(new CDispatcher(*this)))
to the constructor’s initialization-list.
Remark:
The std::shared_ptr<T>
-constructor is required over std::make_shared
, since the constructor of CDispatcher
is private and inaccessible from std::make_shared
.
Next, add the below method into the CLooper
-class, in order to retrieve the dispatcher:
std::shared_ptr<CDispatcher> getDispatcher() { return mDispatcher; }
Next, let’s implement the CDispatcher
‘s post
-method as below:
bool post(CLooper::Runnable &&aRunnable) { return mAssignedLooper.post(std::move(aRunnable)); }
And finally, add this private method to CLooper
.
private: bool post(Runnable &&aRunnable) { if(not running()) { // Deny insertion return false; } try { std::lock_guard<std::recursive_mutex> guard(mRunnablesMutex); mRunnables.push(std::move(aRunnable)); } catch(...) { return false; } return true; }
The whole construct can be used as follows now:
std::unique_ptr<CLooper> looper = std::make_unique<CLooper>(); std::cout << "Starting looper" << std::endl; // To start and run looper->run(); // Give the atomic variable store's some time to be reflected. // Depending on the system the memory visibility took a while // and caused some or even all of the runnables posted below // to be discarded before insertion. using namespace std::chrono_literals; std::this_thread::sleep_for(5ms); std::shared_ptr<CLooper::CDispatcher> dispatcher = looper->getDispatcher(); std::cout << "Adding tasks" << std::endl; for(uint32_t k=0; k<500; ++k) { auto const task = [k]() { std::cout << "Invocation " << k << ": Hello, I have been executed asynchronously on the looper for " << (k + 1) << " times." << std::endl; }; dispatcher->post(std::move(task)); } std::cout << "Waiting 5 seconds for completion" << std::endl; std::this_thread::sleep_for(1ms); std::cout << "Stopping looper" << std::endl; // To stop it and clean it up dispatcher = nullptr; looper->stop(); looper = nullptr;
Working example: https://wandbox.org/permlink/JXUVwLahx3O63qAJ
Where to continue from here?
This example code can be improved in a lot of places and is far from perfect and I would say not necessarily even safe.
- It can be extended using
<future>
and it’sstd::future
andstd::promise
features toexecute asynchronously
andreceive a result
. - The dispatcher can be extended to permit
priority execution
(immediate execution) anddelayed execution
. - The entire looper can be made
lock-free
. - We could attach a messaging system upon the looper.
- We could support handlers and different handler-types for dispatched messages, i.e. functors, which are automatically invoked based on some identifying criteria in the message or being provided by the dispatcher.
There are many things we can do, which will maybe described in follow-up posts.
Conclusion
This construct is a good starting point to regain control of your threading and reuse threads while simultaneously reducing threading-overhead.
The design is simple and comprehensible and permits thread-safe dispatching of work-items to a single worker-thread, while reducing the spread of thread-dispatches throughout the codebase.
It has it’s limitations, though!
The looper is just a control-construct attached to a single worker-thread and can not handle parallellized execution or workload balancing, which Thread-Pool
s with work-stealing
are perfect for.
But, if there’s a single worker thread required for a specific type of tasks, the Looper can be a more simple and more comprehensible approach to solve the multi-threading issue!
Comments by DottiDeveloper