[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <1363034207.27803.8.camel@thor.lan>
Date: Mon, 11 Mar 2013 16:36:47 -0400
From: Peter Hurley <peter@...leysoftware.com>
To: Michel Lespinasse <walken@...gle.com>
Cc: Alex Shi <alex.shi@...el.com>, Ingo Molnar <mingo@...nel.org>,
David Howells <dhowells@...hat.com>,
Peter Zijlstra <a.p.zijlstra@...llo.nl>,
Thomas Gleixner <tglx@...utronix.de>,
Yuanhan Liu <yuanhan.liu@...ux.intel.com>,
Rik van Riel <riel@...hat.com>,
Andrew Morton <akpm@...ux-foundation.org>,
linux-kernel@...r.kernel.org
Subject: Re: [PATCH 11/12] rwsem: wake all readers when first waiter is a
reader
On Wed, 2013-03-06 at 15:21 -0800, Michel Lespinasse wrote:
> + retry_reader_grants:
> + oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
> + if (unlikely(oldcount < RWSEM_WAITING_BIAS)) {
> + /* A writer stole the lock. Undo our reader grants. */
> + if (rwsem_atomic_update(-adjustment, sem) < RWSEM_WAITING_BIAS)
> + goto out;
> + /* The writer left. Retry waking readers. */
> + goto retry_reader_grants;
> + }
This can be reduced to single looping cmpxchg in the grant reversal
path; then if reversing the grant fails, the count can simply be
re-tested for grant success, rather than trying to atomically re-grant.
For example, with a helper function, rwsem_cmpxchg():
static inline int rwsem_cmpxchg(long *old, long new, struct rw_semaphore *sem)
{
long tmp = *old;
*old = cmpxchg(&sem->count, *old, new);
return tmp == *old;
}
... then above becomes ...
count = rwsem_atomic_update(adjustment, sem);
do {
if (count - adjustment >= RWSEM_WAITING_BIAS)
break;
if (rwsem_cmpxchg(&count, count - adjustment, sem))
goto out; /* or simply return sem */
} while (1);
< wake up readers >
Also, this series and the original rwsem can mistakenly sleep reader(s)
when the lock is transitioned from writer-owned to waiting readers-owned
with no waiting writers. For example,
CPU 0 | CPU 1
|
| down_write()
... CPU 1 has the write lock for the semaphore.
Meanwhile, 1 or more down_read(s) are attempted and fail;
these are put on the wait list. Then ...
down_read() | up_write()
local = atomic_update(+read_bias) |
local <= 0? | local = atomic_update(-write_bias)
if (true) | local < 0?
down_read_failed() | if (true)
| wake()
| grab wait_lock
wait for wait_lock | wake all readers
| release wait_lock
... At this point, sem->count > 0 and the wait list is empty,
but down_read_failed() will sleep the reader.
In this case, CPU 0 has observed the sem count with the write lock (and
the other waiters) and so is detoured to down_read_failed(). But if
CPU 0 can't grab the wait_lock before the up_write() does (via
rwsem_wake()), then down_read_failed() will wake no one and sleep the
reader.
Unfortunately, this means readers and writers which observe the sem
count after the adjustment is committed by CPU 0 in down_read_failed()
will sleep as well, until the sem count returns to 0.
I think the best solution would be to preserve the observed count when
down_read() fails and pass it to rwsem_down_read_failed() -- of course,
this is also the most disruptive approach as it changes the per-arch
interface (the attached patch does not include the arch changes). The
other alternative is to go through the __rwsem_do_wake() path.
Regards,
Peter Hurley
--- >% ---
Subject: [PATCH] rwsem: Early-out tardy readers
Signed-off-by: Peter Hurley <peter@...leysoftware.com>
---
lib/rwsem.c | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
diff --git a/lib/rwsem.c b/lib/rwsem.c
index f9a5705..8eb2cdf 100644
--- a/lib/rwsem.c
+++ b/lib/rwsem.c
@@ -118,12 +118,11 @@ __rwsem_do_wake(struct rw_semaphore *sem, bool wakewrite)
/*
* wait for the read lock to be granted
*/
-struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
+struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem, long count)
{
signed long adjustment = -RWSEM_ACTIVE_READ_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
- signed long count;
/* set up my own style of waitqueue */
waiter.task = tsk;
@@ -131,6 +130,20 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
get_task_struct(tsk);
raw_spin_lock_irq(&sem->wait_lock);
+
+ /* Try to reverse the lock attempt but if the count has changed
+ * so that reversing fails, check if there are are no waiters,
+ * and early-out if not */
+ do {
+ if (rwsem_cmpxchg(&count, count + adjust, sem))
+ break;
+ if (count > 0) {
+ raw_spin_unlock_irq(&sem->wait_lock);
+ put_task_struct(tsk);
+ return sem;
+ }
+ } while (1);
+
sem->wait_readers++;
if (list_empty(&sem->wait_list))
adjustment += RWSEM_WAITING_BIAS;
--
1.8.1.2
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
Powered by blists - more mailing lists