Or: How to achieve single lock-free producer, single blocking consumer communication. While writing the AAudio backend for the cubeb library I came across an interesting multithreaded problem. I’m no expert on multithreading and couldn’t find a solution on the internet - maybe because I’m just not knowing the problem’s name? - so solving this on my own was exciting.
The problem:
To achieve the best audio latency, we register a callback with AAudio in which we render sounds or process input from the microphone. This callback will then be called from a high priority thread that AAudio manages. The catch: since we are working with potentially really low latencies, we don’t have a whole lot of time to process or output the data. The AAudio docs explicitly state that this callback must not block. Not even for locking a mutex. Using lock-free primitives is quite common when programming the bare bones of an audio engine and I’m already used to that but the situation we have to integrate this with cubeb is somewhat more complicated.
We forward the callback that AAudio triggers in the audio thread to the
user of the cubeb library. The user-supplied callback is allowed to
signal via its return value that the stream should be drained (i.e. stopped
when all data that was already buffered has been output). AAudio has
a matching interface: AAudioStream_requestStop
. This function outputs
all remaining data for an output stream and then stops the stream.
Nice, isn’t it? Almost. Cubeb furthermore states that the backend will
call a user-supplied state callback as soon as the draining is complete.
AAudio, on the other hand, has no mechanism to notify us when a stream is
drained, i.e. stopped. We can just manually block for a state change
using AAudioStream_waitForStageChange
.
Additionally, AAudio states that the audio callback itself must not call
AAudioStream_requestStop
but instead leave this to another thread.
So, obviously: we need another worker thread that does all this for us.
But we don’t want to start a thread per stream and we additionally have
no way to wake up AAudioStream_waitForStageChange
.
My first solution
was to just check all streams over and over again for state changes or
stop requests (when draining) with a sleeping time of 5 milliseconds in between.
That worked well. But this is inefficient, especially if you consider
that most applications don’t care much for stream draining - or even state
changes at all.
With that we arrive at the problem: we want to communicate from the realtime
audio thread - that must not block under any circumstances - with the
consumer thread - that should sleep, i.e. block as long as there isn’t anything
to do or wait upon. This could be generalized to a single lock-free producer,
single blocking consumer queue. Or the problem of waking up a thread reliably
without the chance of blocking in the thread that initiates the wakeup.
Shouldn’t be too hard, right?
So, spin up a mutex and take the condition variables out of your closet. We can quickly come up with a first trivial solution (leaving out details like joining logic or the actual work to be done):
mutex mutex;
condition_variable cv;
// This function is called from the AAudio-managed realtime thread.
// It can be seen as producer. It must not block.
audio_callback {
// output audio data...
// when we change the stream state:
cv.notify_one();
}
// This is the thread that should only get active when there is something
// to do. It can be seen as consumer.
consumer {
unique_lock lock(mutex):
while(true) {
cv.wait(lock);
// check for state changes or things to do
// process everything there is to do
}
}
The mutex we just need for waiting on the condition variable is
somewhat useless, right? After a small crisis on my understanding of what
condition variables are at all anyhow, I came across a great stackoverflow
question and explanation.
The main magic in a condition variable is just that it can unlock a mutex
and go to sleep atomically. There can’t be anything in between.
So I already suspected that not using the mutex anywhere else above
was a bad sign regarding my use of the condition variable.
Where’s the issue? what could go wrong?
Well, the consumer thread could miss signals and go to sleep even though
there is still something to do. Imagine this: cv.notify_one()
is called
just as the consumer is about to go asleep. The signal will not have any
effect since the consumer isn’t sleeping yet. But immediately
after this, the consumer will go to sleep even though there is still work
to do. And it’s not guaranteed to wake up in finite time again.
Later on, I found a stackoverflow question
asking pretty much precisely for what I need. It has one answer that seriously
tries to solve the given problem - from Jonathan Wakely, an libstdc++ author -
and gives more or less the solution presented above. But in the following
discussion, they found its problems as well without pursuing alternative
options.
I thought of various ways this could be solved - with additional atomic flags,
an additional mutex on which the audio thread always just calls try_lock
or
with a second condition variable. But it all boils down to the same situation:
when the consumer thread is about to go asleep, there’s just nothing the
realtime thread can do about it. It could wait until the consumer thread
really is asleep and then signal the condition variable, but that involves
waiting again (and the common way do to so would be to use the mutex).
The solution
All these failing thought paths finally lead me to the solution: what if
we just spin up another thread that is used for exactly this situation?
A thread that must be sleeping when the consumer thread is active and that
will simply echo the condition variable notification as soon as the
consuming thread is sleeping. How to achieve this? Let’s make use
of the previously unused mutex! We already acquire the mutex when
the consumer thread is active, let’s acquire the mutex in the new helper
thread as well. I called this new helper thread notifier
:
// This is the helper thread that echoes the condition variable notification
// when that is needed.
notifier {
unique_lock(mutex);
while(true) {
cv.wait(lock);
cv.notify_one();
}
}
And that’s it. We just need to add the notifier thread and without any other
modifications, the consumer can’t miss signals anymore. I initially
started with a cv.notify_all()
in the realtime (producer) thread, but
that’s not really needed. So, why does this work?
At any time (after the initial setup; i.e. after both
threads reached cv.wait(lock)
for the first time) at least one
of notifier
or consumer
must be sleeping (i.e. waiting on cv
) because when they
are not sleeping, they own the locked mutex
. That’s where we
finally use the real condition variable magic.
What happens if the realtime (producer) thread calls cv.notify_one()
?
If only the consumer thread is sleeping, it will eventually be woken
up by the notification.
When the consumer thread is currently active on the other hand -
even if it’s about to go asleep without anything else in between -
the notifier thread must be currently sleeping since the mutex is still
locked. And it will wake up as soon as the consumer thread is going to sleep.
And then it will wake up the state thread via the echoed notification.
When both threads are waiting on the condition variable, one of them
will wake up. And even if this is the notifier thread: it will just
echo the notification and wake up the consumer thread with that echo.
The example we are building is quite prone to spurious wakeups - waiting on a condition variable without an external condition is usually a bad idea - but we can fix that as well:
mutex mutex;
condition_variable cv;
atomic<bool> wakeup {false};
// This function is called from the AAudio-managed realtime thread.
// It can be seen as producer. It must not block.
audio_callback {
// output audio data...
// when we change the stream state:
wakeup.store(true);
cv.notify_one();
}
// This is the thread that should only get active when there is something
// to do. It can be seen as consumer.
consumer {
unique_lock lock(mutex):
while(true) {
cv.wait(lock);
while(wakeup.load()) {
wakeup.store(false);
// check for state changes or things to do
// process everything there is to do
}
}
}
// This is the helper thread that echoes the condition variable notification
// when that is needed.
notifier {
unique_lock(mutex);
while(true) {
cv.wait(lock);
if(wakeup.load()) {
cv.notify_one();
}
}
}
Problems?
This solution has some costs: you need a completely new separate thread. This might be a dealbreaker for some situations. And for some wakeups, you get the overhead of a wrong thread being woken up first. But especially in cases where wakeups should not be needed too often - as it can be expected to be the case for my original case, the AAudio cubeb backend - this is better than just checking in a loop with a fixed time spent sleeping in between.