Here it is, the first blog post which has been requested by one of you, my fellow readers! Today, we dive into a multi-threaded queue and discover nasty concurrency problems.
The problem we want to look at is a multi-thread ready queue. It shall support the following functions:
- Insert: A non-blocking insert which returns immediately.
- Get: A get which blocks until there is something to return.
- Wait: A blocking wait which - who would believe it - waits until the queue is empty again.
So here we go. We'll take a single Mutex to guard access to the local variables, and two conditions. One conditions will check if the queue is empty; we will use it to block the wait call. The other condition will be used to block the get call.
We'll implement it in Python, so we don't have to implement threading primitives on our own.
from threading import * class MT_Queue: def __init__ (self): self.queue = list () self.mainLock = Lock () self.queueEmptyCondition = Condition (self.mainLock) self.queueFullCondition = Condition (self.mainLock) def get (self): self.mainLock.acquire () # straightforward? if (len (self.queue) == 0): self.queueFullCondition.wait () item = self.queue.pop (0) if (len (self.queue) == 0): self.queueEmptyCondition.notifyAll () self.mainLock.release () return item def insert (self, item): self.mainLock.acquire () self.queue.append (item) self.mainLock.release () def wait (self): self.mainLock.acquire () if (len (self.queue) == 0): self.mainLock.release () return else: self.queueEmptyCondition.wait () self.mainLock.release ()
Seems pretty simple, yet we have a real big problem as soon as we have one thread blocked on the get call and another which is running somewhere else while a third thread inserts a new item. (Probably you say this is kinda unlikely, but try for yourself, most probably you'll run into this problem immediately.)
The error we will get is that a thread will try to pop an empty list. How is that you might ask? Let me show you a picture which illustrates the problem. The start situation is like this:
- Thread #1 is blocked inside get on the wait condition.
- Thread #2 is working somewhere.
- Thread #3 is about to insert an item.
Shame on us, we've just written a race condition! If thread #2 is faster than thread #1, everything fails! Let's see how to fix this problem:
def get (self): self.mainLock.acquire () # fixed while True: if (len (self.queue) == 0): self.queueFullCondition.wait () if (len (self.queue) == 0): continue else: break item = self.queue.pop (0) if (len (self.queue) == 0): self.queueEmptyCondition.notifyAll () self.mainLock.release () return item
In case we've been waiting but someone else was faster, we'll simply run once more into the wait condition. This way, nothing bad can happen.
Multithreading has many very suble points one has to be aware of. When using a condition, always take a look in which order threads may wake up, and be aware that two threads waiting on the same mutex have the same chance to get it. There is no preference to the thread which waits on a condition. If you have questions, feel free to ask in the comments!