This patch allows for limiting the number of readers a lock may have. The limit is default to "no limit". The read write locks now keep track of, not only the number of times a lock is held by read, but also the number of tasks that have a reader. i.e. If 2 tasks hold the same read/write lock, and one task holds the lock twice, the count for the read/write lock would be 3 and the owner count is 2. The limit of readers is controlled by /proc/sys/kernel/rwlock_reader_limit If this is set to zero or negative, than there is no limit. Signed-off-by: Steven Rostedt --- include/linux/rt_lock.h | 1 kernel/rtmutex.c | 89 +++++++++++++++++++++++++++++++++++------------- kernel/sysctl.c | 14 +++++++ 3 files changed, 80 insertions(+), 24 deletions(-) Index: linux-2.6.24.4-rt4/include/linux/rt_lock.h =================================================================== --- linux-2.6.24.4-rt4.orig/include/linux/rt_lock.h 2008-03-25 22:54:24.000000000 -0400 +++ linux-2.6.24.4-rt4/include/linux/rt_lock.h 2008-03-25 23:00:46.000000000 -0400 @@ -64,6 +64,7 @@ struct rw_mutex { struct task_struct *owner; struct rt_mutex mutex; atomic_t count; /* number of times held for read */ + atomic_t owners; /* number of owners as readers */ }; /* Index: linux-2.6.24.4-rt4/kernel/rtmutex.c =================================================================== --- linux-2.6.24.4-rt4.orig/kernel/rtmutex.c 2008-03-25 22:55:46.000000000 -0400 +++ linux-2.6.24.4-rt4/kernel/rtmutex.c 2008-03-25 23:06:32.000000000 -0400 @@ -927,6 +927,8 @@ __rt_spin_lock_init(spinlock_t *lock, ch } EXPORT_SYMBOL(__rt_spin_lock_init); +int rt_rwlock_limit; + static inline int rt_release_bkl(struct rt_mutex *lock, unsigned long flags); static inline void rt_reacquire_bkl(int saved_lock_depth); @@ -1000,6 +1002,10 @@ static int try_to_take_rw_read(struct rw goto taken; } + /* Check for rwlock limits */ + if (rt_rwlock_limit && atomic_read(&rwm->owners) >= rt_rwlock_limit) + return 0; + if (mtxowner && mtxowner != RT_RW_READER) { if (!try_to_steal_lock(mutex)) { /* @@ -1044,6 +1050,7 @@ static int try_to_take_rw_read(struct rw rt_rwlock_set_owner(rwm, RT_RW_READER, 0); taken: if (incr) { + atomic_inc(&rwm->owners); reader_count = current->reader_lock_count++; if (likely(reader_count < MAX_RWLOCK_DEPTH)) { current->owned_read_locks[reader_count].lock = rwm; @@ -1221,6 +1228,7 @@ rt_read_fastlock(struct rw_mutex *rwm, goto retry; } + atomic_inc(&rwm->owners); reader_count = current->reader_lock_count++; if (likely(reader_count < MAX_RWLOCK_DEPTH)) { current->owned_read_locks[reader_count].lock = rwm; @@ -1280,6 +1288,7 @@ retry: goto retry; } + atomic_inc(&rwm->owners); reader_count = current->reader_lock_count++; if (likely(reader_count < MAX_RWLOCK_DEPTH)) { current->owned_read_locks[reader_count].lock = rwm; @@ -1471,6 +1480,7 @@ rt_read_slowunlock(struct rw_mutex *rwm, struct rt_mutex *mutex = &rwm->mutex; struct rt_mutex_waiter *waiter; unsigned long flags; + unsigned int reader_count; int savestate = !mtx; int i; @@ -1493,6 +1503,7 @@ rt_read_slowunlock(struct rw_mutex *rwm, if (!current->owned_read_locks[i].count) { current->reader_lock_count--; WARN_ON_ONCE(i != current->reader_lock_count); + atomic_dec(&rwm->owners); } break; } @@ -1500,20 +1511,34 @@ rt_read_slowunlock(struct rw_mutex *rwm, WARN_ON_ONCE(i < 0); /* - * If there are more readers, let the last one do any wakeups. - * Also check to make sure the owner wasn't cleared when two - * readers released the lock at the same time, and the count - * went to zero before grabbing the wait_lock. + * If the last two (or more) readers unlocked at the same + * time, the owner could be cleared since the count went to + * zero. If this has happened, the rwm owner will not + * be set to current or readers. This means that another reader + * already reset the lock, so there is nothing left to do. */ - if (atomic_read(&rwm->count) || - (rt_rwlock_owner(rwm) != current && - rt_rwlock_owner(rwm) != RT_RW_READER)) { - spin_unlock_irqrestore(&mutex->wait_lock, flags); - return; - } + if ((rt_rwlock_owner(rwm) != current && + rt_rwlock_owner(rwm) != RT_RW_READER)) + goto out; + + /* + * If there are more readers and we are under the limit + * let the last reader do the wakeups. + */ + reader_count = atomic_read(&rwm->count); + if (reader_count && + (!rt_rwlock_limit || atomic_read(&rwm->owners) >= rt_rwlock_limit)) + goto out; /* If no one is blocked, then clear all ownership */ if (!rt_mutex_has_waiters(mutex)) { + /* + * If count is not zero, we are under the limit with + * no other readers. + */ + if (reader_count) + goto out; + /* We could still have a pending reader waiting */ if (rt_mutex_owner_pending(mutex)) { /* set the rwm back to pending */ @@ -1525,24 +1550,32 @@ rt_read_slowunlock(struct rw_mutex *rwm, goto out; } - /* We are the last reader with pending waiters. */ + /* + * If the next waiter is a reader, this can be because of + * two things. One is that we hit the reader limit, or + * Two, there is a pending writer. + * We still only wake up one reader at a time (even if + * we could wake up more). This is because we dont + * have any idea if a writer is pending. + */ waiter = rt_mutex_top_waiter(mutex); - if (waiter->write_lock) + if (waiter->write_lock) { + /* only wake up if there are no readers */ + if (reader_count) + goto out; rwm->owner = RT_RW_PENDING_WRITE; - else + } else { + /* + * It is also possible that the reader limit decreased. + * If the limit did decrease, we may not be able to + * wake up the reader if we are currently above the limit. + */ + if (rt_rwlock_limit && + unlikely(atomic_read(&rwm->owners) >= rt_rwlock_limit)) + goto out; rwm->owner = RT_RW_PENDING_READ; + } - /* - * It is possible to have a reader waiting. We still only - * wake one up in that case. A way we can have a reader waiting - * is because a writer woke up, a higher prio reader came - * and stole the lock from the writer. But the writer now - * is no longer waiting on the lock and needs to retake - * the lock. We simply wake up the reader and let the - * reader have the lock. If the writer comes by, it - * will steal the lock from the reader. This is the - * only time we can have a reader pending on a lock. - */ wakeup_next_waiter(mutex, savestate); out: @@ -1558,15 +1591,22 @@ rt_read_fastunlock(struct rw_mutex *rwm, int mtx) { WARN_ON(!atomic_read(&rwm->count)); + WARN_ON(!atomic_read(&rwm->owners)); WARN_ON(!rwm->owner); atomic_dec(&rwm->count); if (likely(rt_rwlock_cmpxchg(rwm, current, NULL))) { int reader_count = --current->reader_lock_count; + int owners; rt_mutex_deadlock_account_unlock(current); if (unlikely(reader_count < 0)) { reader_count = 0; WARN_ON_ONCE(1); } + owners = atomic_dec_return(&rwm->owners); + if (unlikely(owners < 0)) { + atomic_set(&rwm->owners, 0); + WARN_ON_ONCE(1); + } WARN_ON_ONCE(current->owned_read_locks[reader_count].lock != rwm); } else slowfn(rwm, mtx); @@ -1717,6 +1757,7 @@ void rt_mutex_rwsem_init(struct rw_mutex rwm->owner = NULL; atomic_set(&rwm->count, 0); + atomic_set(&rwm->owners, 0); __rt_mutex_init(mutex, name); } Index: linux-2.6.24.4-rt4/kernel/sysctl.c =================================================================== --- linux-2.6.24.4-rt4.orig/kernel/sysctl.c 2008-03-25 16:41:48.000000000 -0400 +++ linux-2.6.24.4-rt4/kernel/sysctl.c 2008-03-25 23:00:12.000000000 -0400 @@ -150,6 +150,10 @@ static int parse_table(int __user *, int void __user *, size_t, struct ctl_table *); #endif +#ifdef CONFIG_PREEMPT_RT +extern int rt_rwlock_limit; +#endif + #ifdef CONFIG_PROC_SYSCTL static int proc_do_cad_pid(struct ctl_table *table, int write, struct file *filp, @@ -399,6 +403,16 @@ static struct ctl_table kern_table[] = { .proc_handler = &proc_dointvec, }, #endif +#ifdef CONFIG_PREEMPT_RT + { + .ctl_name = CTL_UNNUMBERED, + .procname = "rwlock_reader_limit", + .data = &rt_rwlock_limit, + .maxlen = sizeof(int), + .mode = 0644, + .proc_handler = &proc_dointvec, + }, +#endif { .ctl_name = KERN_PANIC, .procname = "panic", -- -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/