2021-03-14
RCU is a synchronization mechanism that firstly used in Linux kernel. Now there is also a userspace RCU implementation library called liburcu. In general, RCU is used to protect read-mostly data structures. This post is about how qemu implements RCU.
QEMU rcu is ported from liburcu. librcu has various version, for least invasive QEMU chose the urcu-mb implementation.
QEMU RCU core has a global counter named ‘rcu_gp_ctr’ which is used by both readers and updaters. Every thread has a thread local variable of ‘ctr’ counter in ‘rcu_reader_data’ struct.
The updater will updates this counter in ‘synchronize_rcu’ to indicate a new of the resource. The reader will copy ‘rcu_gp_ctr’ to his own ‘ctr’ varaible when calling ‘rcu_read_lock’.
When the ‘synchronize_rcu’ find that the readers’ ‘ctr’ is not the same as the ‘rcu_gp_ctr’ it will set the ‘rcu_reader_data->waiting’ bool variable, and when the ‘rcu_read_unlock’ finds this bool variable is set it will trigger a event thus notify the ‘synchronize_rcu’ that it leaves the critical section. Following shows the idea of QEMU RCU.
‘rcu_reader_data’ is defined as following:
struct rcu_reader_data {
/* Data used by both reader and synchronize_rcu() */
unsigned long ctr;
bool waiting;
/* Data used by reader only */
unsigned depth;
/* Data used for registry, protected by rcu_registry_lock */
QLIST_ENTRY(rcu_reader_data) node;
};
Every thread that uses RCU need to call ‘rcu_register_thread’ to insert the thread local variable ‘rcu_reader’ to the global registry list.
void rcu_register_thread(void)
{
assert(rcu_reader.ctr == 0);
qemu_mutex_lock(&rcu_registry_lock);
QLIST_INSERT_HEAD(®istry, &rcu_reader, node);
qemu_mutex_unlock(&rcu_registry_lock);
}
‘rcu_read_lock’ is used by the reader. The ‘rcu_reader->depth’ is used for nested lock case. Here we can see it copies the ‘rcu_gp_ctr’ to the ‘rcu_reader->ctr’.
static inline void rcu_read_lock(void)
{
struct rcu_reader_data *p_rcu_reader = &rcu_reader;
unsigned ctr;
if (p_rcu_reader->depth++ > 0) {
return;
}
ctr = qatomic_read(&rcu_gp_ctr);
qatomic_set(&p_rcu_reader->ctr, ctr);
/* Write p_rcu_reader->ctr before reading RCU-protected pointers. */
smp_mb_placeholder();
}
‘rcu_read_unlock’ is used by the reader when leaves the critical section. It reset ‘rcu_reader->ctr’ to 0 and if it finds ‘rcu_reader->waiting’ is set, it will set the ‘rcu_gp_event’.
static inline void rcu_read_unlock(void)
{
struct rcu_reader_data *p_rcu_reader = &rcu_reader;
assert(p_rcu_reader->depth != 0);
if (--p_rcu_reader->depth > 0) {
return;
}
/* Ensure that the critical section is seen to precede the
* store to p_rcu_reader->ctr. Together with the following
* smp_mb_placeholder(), this ensures writes to p_rcu_reader->ctr
* are sequentially consistent.
*/
qatomic_store_release(&p_rcu_reader->ctr, 0);
/* Write p_rcu_reader->ctr before reading p_rcu_reader->waiting. */
smp_mb_placeholder();
if (unlikely(qatomic_read(&p_rcu_reader->waiting))) {
qatomic_set(&p_rcu_reader->waiting, false);
qemu_event_set(&rcu_gp_event);
}
}
The updater will call ‘call_rcu’ which will insert a node to the RCU thread queue. And the thread function ‘call_rcu_thread’ will process this queue and it will call ‘synchronize_rcu’. For the most case, it will add ‘rcu_gp_ctr’ and call ‘wait_for_readers’.
void synchronize_rcu(void)
{
QEMU_LOCK_GUARD(&rcu_sync_lock);
/* Write RCU-protected pointers before reading p_rcu_reader->ctr.
* Pairs with smp_mb_placeholder() in rcu_read_lock().
*/
smp_mb_global();
QEMU_LOCK_GUARD(&rcu_registry_lock);
if (!QLIST_EMPTY(®istry)) {
/* In either case, the qatomic_mb_set below blocks stores that free
* old RCU-protected pointers.
*/
if (sizeof(rcu_gp_ctr) < 8) {
...
} else {
/* Increment current grace period. */
qatomic_mb_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
}
wait_for_readers();
}
}
‘rcu_gp_ongoing’ is used to check whether the there is a read in critical section. If it is, the new ‘rcu_gp_ctr’ will not be the same as the ‘rcu_reader_data->ctr’ and will set ‘rcu_reader_data->waiting’ to be true. If ‘registry’ is empty it means all readers has leaves the critical section and this means no old reader hold the old version pointer and the RCU thread can call the callback which insert to the RCU queue.
static void wait_for_readers(void)
{
ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
struct rcu_reader_data *index, *tmp;
for (;;) {
/* We want to be notified of changes made to rcu_gp_ongoing
* while we walk the list.
*/
qemu_event_reset(&rcu_gp_event);
/* Instead of using qatomic_mb_set for index->waiting, and
* qatomic_mb_read for index->ctr, memory barriers are placed
* manually since writes to different threads are independent.
* qemu_event_reset has acquire semantics, so no memory barrier
* is needed here.
*/
QLIST_FOREACH(index, ®istry, node) {
qatomic_set(&index->waiting, true);
}
/* Here, order the stores to index->waiting before the loads of
* index->ctr. Pairs with smp_mb_placeholder() in rcu_read_unlock(),
* ensuring that the loads of index->ctr are sequentially consistent.
*/
smp_mb_global();
QLIST_FOREACH_SAFE(index, ®istry, node, tmp) {
if (!rcu_gp_ongoing(&index->ctr)) {
QLIST_REMOVE(index, node);
QLIST_INSERT_HEAD(&qsreaders, index, node);
/* No need for mb_set here, worst of all we
* get some extra futex wakeups.
*/
qatomic_set(&index->waiting, false);
}
}
if (QLIST_EMPTY(®istry)) {
break;
}
/* Wait for one thread to report a quiescent state and try again.
* Release rcu_registry_lock, so rcu_(un)register_thread() doesn't
* wait too much time.
*
* rcu_register_thread() may add nodes to ®istry; it will not
* wake up synchronize_rcu, but that is okay because at least another
* thread must exit its RCU read-side critical section before
* synchronize_rcu is done. The next iteration of the loop will
* move the new thread's rcu_reader from ®istry to &qsreaders,
* because rcu_gp_ongoing() will return false.
*
* rcu_unregister_thread() may remove nodes from &qsreaders instead
* of ®istry if it runs during qemu_event_wait. That's okay;
* the node then will not be added back to ®istry by QLIST_SWAP
* below. The invariant is that the node is part of one list when
* rcu_registry_lock is released.
*/
qemu_mutex_unlock(&rcu_registry_lock);
qemu_event_wait(&rcu_gp_event);
qemu_mutex_lock(&rcu_registry_lock);
}
/* put back the reader list in the registry */
QLIST_SWAP(®istry, &qsreaders, node);
}
static inline int rcu_gp_ongoing(unsigned long *ctr)
{
unsigned long v;
v = qatomic_read(ctr);
return v && (v != rcu_gp_ctr);
}