lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [thread-next>] [day] [month] [year] [list]
Date:   Sun, 10 Sep 2017 23:41:58 +0200
From:   Gerd Gerats <gerd.gerats.lkml@....de>
To:     tglx@...utronix.de, mingo@...hat.com, peterz@...radead.org,
        dvhart@...radead.org
Cc:     LKML <linux-kernel@...r.kernel.org>
Subject: [RFC] futex: hashbucket as list of futex instead of waiters

When using futex as a condition variable, for example: to manage a
threadpool, there may be a lot of threads inside the futex_wait to sleep on
this futex. The futex_hash_bucket consists therefore of many struct futex_q
for the same futex.

On bad luck another futex, used as mutex, hashed into the same bucket.
Every futex_wake on this mutex, has to scan the whole chain of above waiter
to find the struct futex_q for this mutex. For non-unusual threadpool sizes
of more than 20, this should be a considerable effort.

I therefore suggest to include in the hash-bucketchain only one struct
futex_q per futex and to queue additional waiter in an extrachain at the
'top' futex_q entry. Thus different futex are isolated from each other, the
cost of a hash collision is reduced.

To show the idea, I added a sample patch. Here, the plist is exchanged for
a futex-specific implementation. kernel/pring.h is certainly not not the
right place.

Signed-off-by: Gerd Gerats <gerd.gerats@....de>
---
 kernel/futex.c | 159 +++++++++++++++++++++++++++++++++++++++------------------
 kernel/pring.h | 110 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 219 insertions(+), 50 deletions(-)
 create mode 100644 kernel/pring.h

diff --git a/kernel/futex.c b/kernel/futex.c
index 357348a6cf6b..5598c4b1ad96 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -72,6 +72,8 @@
 
 #include "locking/rtmutex_common.h"
 
+#include "pring.h"
+
 /*
  * READ this before attempting to hack on futexes!
  *
@@ -229,7 +231,7 @@ struct futex_pi_state {
  * we can wake only the relevant ones (hashed queues may be shared).
  *
  * A futex_q has a woken state, just like tasks have TASK_RUNNING.
- * It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0.
+ * It is considered woken when futexq_lists_empty(&q) || q->lock_ptr == 0.
  * The order of wakeup is always to make the first condition true, then
  * the second.
  *
@@ -237,7 +239,8 @@ struct futex_pi_state {
  * the rt_mutex code. See unqueue_me_pi().
  */
 struct futex_q {
-	struct plist_node list;
+	struct list_head  list;
+	struct pring_node ring;
 
 	struct task_struct *task;
 	spinlock_t *lock_ptr;
@@ -262,7 +265,7 @@ static const struct futex_q futex_q_init = {
 struct futex_hash_bucket {
 	atomic_t waiters;
 	spinlock_t lock;
-	struct plist_head chain;
+	struct list_head chain;
 } ____cacheline_aligned_in_smp;
 
 /*
@@ -745,13 +748,78 @@ static struct futex_q *futex_top_waiter(struct futex_hash_bucket *hb,
 {
 	struct futex_q *this;
 
-	plist_for_each_entry(this, &hb->chain, list) {
+	list_for_each_entry(this, &hb->chain, list) {
 		if (match_futex(&this->key, key))
 			return this;
 	}
 	return NULL;
 }
 
+/**
+ * futexq_init() - initialize futex_q chain fields
+ * @q           The futex_q to initialize
+ * @prio:	priority
+ */
+static void futexq_init(struct futex_q *q, int prio)
+{
+	INIT_LIST_HEAD(&q->list);
+	pring_init(&q->ring, prio);
+}
+
+/**
+ * futexq_add() - Enqueue the futex_q on the futex_hash_bucket
+ * @q:	The futex_q to enqueue
+ * @hb:	The destination hash bucket
+ */
+static void futexq_add(struct futex_q *q, struct futex_hash_bucket *hb)
+{
+	struct futex_q *top;
+
+	top = futex_top_waiter(hb, &q->key);
+	if (top) {
+		if (pring_add(&q->ring, &top->ring) == &q->ring)
+			list_replace_init(&top->list, &q->list);
+	} else {
+		list_add_tail(&q->list, &hb->chain);
+	}
+}
+
+/**
+ * futexq_del() - Remove the futex_q from its futex_hash_bucket
+ * @q:	The futex_q to unqueue
+ * @hb:	hash bucket to remove from
+ */
+static void futexq_del(struct futex_q *q, struct futex_hash_bucket *hb)
+{
+	struct futex_q *next;
+
+	if (pring_is_singular(&q->ring)) {
+		list_del_init(&q->list);
+	} else {
+		next = pring_next_entry(q, ring);
+		if (!list_empty(&q->list))
+			list_replace_init(&q->list, &next->list);
+		pring_del(&q->ring);
+	}
+}
+
+static inline bool futexq_lists_empty(struct futex_q *q)
+{
+	return list_empty(&q->list) && pring_is_singular(&q->ring);
+}
+
+static inline struct futex_q *futexq_next_or_null(struct futex_q *q)
+{
+	struct futex_q *next = pring_next_entry(q, ring);
+
+	return list_empty(&next->list) ? next : 0;
+}
+
+#define futex_for_each_match_safe(pos, n, hb, key) \
+	for (pos = futex_top_waiter(hb, key); \
+		pos && ({ n = futexq_next_or_null(pos); 1; }); \
+		pos = n)
+
 static int cmpxchg_futex_value_locked(u32 *curval, u32 __user *uaddr,
 				      u32 uval, u32 newval)
 {
@@ -1352,11 +1420,11 @@ static void __unqueue_futex(struct futex_q *q)
 	struct futex_hash_bucket *hb;
 
 	if (WARN_ON_SMP(!q->lock_ptr || !spin_is_locked(q->lock_ptr))
-	    || WARN_ON(plist_node_empty(&q->list)))
+	    || WARN_ON(futexq_lists_empty(q)))
 		return;
 
 	hb = container_of(q->lock_ptr, struct futex_hash_bucket, lock);
-	plist_del(&q->list, &hb->chain);
+	futexq_del(q, hb);
 	hb_waiters_dec(hb);
 }
 
@@ -1384,7 +1452,7 @@ static void mark_wake_futex(struct wake_q_head *wake_q, struct futex_q *q)
 	 * is written, without taking any locks. This is possible in the event
 	 * of a spurious wakeup, for example. A memory barrier is required here
 	 * to prevent the following store to lock_ptr from getting ahead of the
-	 * plist_del in __unqueue_futex().
+	 * futexq_del/pring_del in __unqueue_futex().
 	 */
 	smp_store_release(&q->lock_ptr, NULL);
 }
@@ -1521,21 +1589,19 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
 
 	spin_lock(&hb->lock);
 
-	plist_for_each_entry_safe(this, next, &hb->chain, list) {
-		if (match_futex (&this->key, &key)) {
-			if (this->pi_state || this->rt_waiter) {
-				ret = -EINVAL;
-				break;
-			}
+	futex_for_each_match_safe(this, next, hb, &key) {
+		if (this->pi_state || this->rt_waiter) {
+			ret = -EINVAL;
+			break;
+		}
 
-			/* Check if one of the bits is set in both bitsets */
-			if (!(this->bitset & bitset))
-				continue;
+		/* Check if one of the bits is set in both bitsets */
+		if (!(this->bitset & bitset))
+			continue;
 
-			mark_wake_futex(&wake_q, this);
-			if (++ret >= nr_wake)
-				break;
-		}
+		mark_wake_futex(&wake_q, this);
+		if (++ret >= nr_wake)
+			break;
 	}
 
 	spin_unlock(&hb->lock);
@@ -1604,30 +1670,26 @@ futex_wake_op(u32 __user *uaddr1, unsigned int flags, u32 __user *uaddr2,
 		goto retry;
 	}
 
-	plist_for_each_entry_safe(this, next, &hb1->chain, list) {
-		if (match_futex (&this->key, &key1)) {
-			if (this->pi_state || this->rt_waiter) {
-				ret = -EINVAL;
-				goto out_unlock;
-			}
-			mark_wake_futex(&wake_q, this);
-			if (++ret >= nr_wake)
-				break;
+	futex_for_each_match_safe(this, next, hb1, &key1) {
+		if (this->pi_state || this->rt_waiter) {
+			ret = -EINVAL;
+			goto out_unlock;
 		}
+		mark_wake_futex(&wake_q, this);
+		if (++ret >= nr_wake)
+			break;
 	}
 
 	if (op_ret > 0) {
 		op_ret = 0;
-		plist_for_each_entry_safe(this, next, &hb2->chain, list) {
-			if (match_futex (&this->key, &key2)) {
-				if (this->pi_state || this->rt_waiter) {
-					ret = -EINVAL;
-					goto out_unlock;
-				}
-				mark_wake_futex(&wake_q, this);
-				if (++op_ret >= nr_wake2)
-					break;
+		futex_for_each_match_safe(this, next, hb2, &key2) {
+			if (this->pi_state || this->rt_waiter) {
+				ret = -EINVAL;
+				goto out_unlock;
 			}
+			mark_wake_futex(&wake_q, this);
+			if (++op_ret >= nr_wake2)
+				break;
 		}
 		ret += op_ret;
 	}
@@ -1656,18 +1718,18 @@ void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1,
 {
 
 	/*
-	 * If key1 and key2 hash to the same bucket, no need to
+	 * If key1 and key2 hash to the same bucket, we still need to
 	 * requeue.
 	 */
+	futexq_del(q, hb1);
 	if (likely(&hb1->chain != &hb2->chain)) {
-		plist_del(&q->list, &hb1->chain);
 		hb_waiters_dec(hb1);
 		hb_waiters_inc(hb2);
-		plist_add(&q->list, &hb2->chain);
 		q->lock_ptr = &hb2->lock;
 	}
 	get_futex_key_refs(key2);
 	q->key = *key2;
+	futexq_add(q, hb2);
 }
 
 /**
@@ -1949,13 +2011,10 @@ static int futex_requeue(u32 __user *uaddr1, unsigned int flags,
 		}
 	}
 
-	plist_for_each_entry_safe(this, next, &hb1->chain, list) {
+	futex_for_each_match_safe(this, next, hb1, &key1) {
 		if (task_count - nr_wake >= nr_requeue)
 			break;
 
-		if (!match_futex(&this->key, &key1))
-			continue;
-
 		/*
 		 * FUTEX_WAIT_REQEUE_PI and FUTEX_CMP_REQUEUE_PI should always
 		 * be paired with each other and no other futex ops.
@@ -2110,8 +2169,8 @@ static inline void __queue_me(struct futex_q *q, struct futex_hash_bucket *hb)
 	 */
 	prio = min(current->normal_prio, MAX_RT_PRIO);
 
-	plist_node_init(&q->list, prio);
-	plist_add(&q->list, &hb->chain);
+	futexq_init(q, prio);
+	futexq_add(q, hb);
 	q->task = current;
 }
 
@@ -2396,7 +2455,7 @@ static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
 	 * If we have been removed from the hash list, then another task
 	 * has tried to wake us, and we can skip the call to schedule().
 	 */
-	if (likely(!plist_node_empty(&q->list))) {
+	if (likely(!futexq_lists_empty(q))) {
 		/*
 		 * If the timer has already expired, current will already be
 		 * flagged for rescheduling. Only call schedule if there
@@ -2918,7 +2977,7 @@ int handle_early_requeue_pi_wakeup(struct futex_hash_bucket *hb,
 		 * We were woken prior to requeue by a timeout or a signal.
 		 * Unqueue the futex_q and determine which it was.
 		 */
-		plist_del(&q->list, &hb->chain);
+		futexq_del(q, hb);
 		hb_waiters_dec(hb);
 
 		/* Handle spurious wakeups gracefully */
@@ -3495,7 +3554,7 @@ static int __init futex_init(void)
 
 	for (i = 0; i < futex_hashsize; i++) {
 		atomic_set(&futex_queues[i].waiters, 0);
-		plist_head_init(&futex_queues[i].chain);
+		INIT_LIST_HEAD(&futex_queues[i].chain);
 		spin_lock_init(&futex_queues[i].lock);
 	}
 
diff --git a/kernel/pring.h b/kernel/pring.h
new file mode 100644
index 000000000000..5821ee4797e6
--- /dev/null
+++ b/kernel/pring.h
@@ -0,0 +1,110 @@
+#ifndef _LINUX_PRING_H_
+#define _LINUX_PRING_H_
+
+/*
+ * futex specific priority-sorted ring
+ *
+ * based on include/linux/plist.h
+ *
+ * Simple ASCII art explanation:
+ *
+ * fl:futx_list
+ * pl:prio_list
+ * nl:node_list
+ *
+ * +------+
+ * |      v
+ * |     |fl|        HEAD
+ * |      ^
+ * |      |
+ * |      v
+ * |  +--------+
+ * |  +->|pl|<-+
+ * |     |10|   (prio)
+ * |     |  |
+ * |  +->|nl|<-+
+ * |  +--------+
+ * |      ^
+ * |      |
+ * |      v
+ * |  +------------------------------------+
+ * |  +->|pl|<->|pl|<--------------->|pl|<-+
+ * |     |10|   |21|   |21|   |21|   |40|   (prio)
+ * |     |  |   |  |   |  |   |  |   |  |
+ * |  +->|nl|<->|nl|<->|nl|<->|nl|<->|nl|<-+
+ * |  +------------------------------------+
+ * |      ^
+ * +------+
+ */
+
+#include <linux/kernel.h>
+#include <linux/list.h>
+
+struct pring_node {
+	int			prio;
+	struct list_head	prio_list;
+	struct list_head	node_list;
+};
+
+static void pring_init(struct pring_node *node, int prio)
+{
+	node->prio = prio;
+	INIT_LIST_HEAD(&node->prio_list);
+	INIT_LIST_HEAD(&node->node_list);
+}
+
+static inline bool pring_is_singular(struct pring_node *node)
+{
+	return list_empty(&node->node_list);
+}
+
+static void pring_del(struct pring_node *node)
+{
+	if (WARN_ON(list_empty(&node->node_list)))
+		return;
+	if (!list_empty(&node->prio_list)) {
+		struct pring_node *next;
+
+		next = list_next_entry(node, node_list);
+		/* add the next node into prio_list */
+		if (list_empty(&next->prio_list))
+			list_add(&next->prio_list, &node->prio_list);
+		list_del_init(&node->prio_list);
+	}
+	list_del_init(&node->node_list);
+}
+
+static struct pring_node *
+pring_add(struct pring_node *node, struct pring_node *top)
+{
+	struct pring_node *iter, *prev = NULL;
+	struct list_head *node_next = &top->node_list;
+
+	WARN_ON(!list_empty(&node->node_list));
+	WARN_ON(!list_empty(&node->prio_list));
+
+	iter = top;
+
+	do {
+		if (node->prio < iter->prio) {
+			node_next = &iter->node_list;
+			break;
+		}
+
+		prev = iter;
+		iter = list_entry(iter->prio_list.next,
+				struct pring_node, prio_list);
+	} while (iter != top);
+
+	if (!prev || prev->prio != node->prio)
+		list_add_tail(&node->prio_list, &iter->prio_list);
+	list_add_tail(&node->node_list, node_next);
+
+	return prev ? top : node;
+}
+
+#define pring_next_entry(pos, member) \
+	container_of(list_next_entry(&(pos)->member, node_list), \
+			typeof(*(pos)), member)
+
+#endif
-- 
2.14.1

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ