lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [thread-next>] [day] [month] [year] [list]
Date:   Tue, 3 Apr 2018 23:59:13 +0200
From:   Sebastian Andrzej Siewior <sebastian@...akpoint.cc>
To:     linux-mm@...ck.org, linux-kernel@...r.kernel.org
Cc:     Kan Liang <kan.liang@...el.com>,
        Tim Chen <tim.c.chen@...ux.intel.com>,
        Linus Torvalds <torvalds@...ux-foundation.org>,
        Peter Zijlstra <peterz@...radead.org>,
        Thomas Gleixner <tglx@...utronix.de>
Subject: [RFC PATCH] mm: use rbtree for page-wait

I noticed commit 2554db916586 ("sched/wait: Break up long wake list
walk") in which it is claimed that
|We saw page wait list that are up to 3700+ entries long in tests of
|large 4 and 8 socket systems.

Here is another approach: instead of a waitlist a rbtree is used. The
tree is ordered by page and bit_nr that it waits for. A wake up request
for specific page + bit_nr combination does not need to browse through
the whole set of waiters but does only look for the specific waiter(s).
For the actual wake up wake_q mechanism is used. That means we enqueue
all to-be-woken tasks on a queue and perform the actual wakeup without
holding the queue lock.

add_page_wait_queue() is currently not wired up which means it breaks
the one user we have right now. Instead of fixing that I would be
interrested in some specific benchmark to see if that is any help or
just making things worse.

Cc: Kan Liang <kan.liang@...el.com>
Cc: Tim Chen <tim.c.chen@...ux.intel.com>
Cc: Linus Torvalds <torvalds@...ux-foundation.org>
Cc: Peter Zijlstra <peterz@...radead.org>
Cc: Thomas Gleixner <tglx@...utronix.de>
Signed-off-by: Sebastian Andrzej Siewior <sebastian@...akpoint.cc>
---
 mm/filemap.c | 286 +++++++++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 208 insertions(+), 78 deletions(-)

diff --git a/mm/filemap.c b/mm/filemap.c
index 693f62212a59..6f44eaac1a53 100644
--- a/mm/filemap.c
+++ b/mm/filemap.c
@@ -36,6 +36,7 @@
 #include <linux/cleancache.h>
 #include <linux/shmem_fs.h>
 #include <linux/rmap.h>
+#include <linux/sched/wake_q.h>
 #include "internal.h"
 
 #define CREATE_TRACE_POINTS
@@ -957,11 +958,15 @@ EXPORT_SYMBOL(__page_cache_alloc);
  * at a cost of "thundering herd" phenomena during rare hash
  * collisions.
  */
+struct page_wait_rb {
+	struct rb_root	tree;
+	spinlock_t	lock;
+};
+
 #define PAGE_WAIT_TABLE_BITS 8
 #define PAGE_WAIT_TABLE_SIZE (1 << PAGE_WAIT_TABLE_BITS)
-static wait_queue_head_t page_wait_table[PAGE_WAIT_TABLE_SIZE] __cacheline_aligned;
-
-static wait_queue_head_t *page_waitqueue(struct page *page)
+static struct page_wait_rb page_wait_table[PAGE_WAIT_TABLE_SIZE] __cacheline_aligned;
+static struct page_wait_rb *page_waitqueue(struct page *page)
 {
 	return &page_wait_table[hash_ptr(page, PAGE_WAIT_TABLE_BITS)];
 }
@@ -970,77 +975,134 @@ void __init pagecache_init(void)
 {
 	int i;
 
-	for (i = 0; i < PAGE_WAIT_TABLE_SIZE; i++)
-		init_waitqueue_head(&page_wait_table[i]);
+	for (i = 0; i < PAGE_WAIT_TABLE_SIZE; i++) {
+		spin_lock_init(&page_wait_table[i].lock);
+		page_wait_table[i].tree = RB_ROOT;
+	}
 
 	page_writeback_init();
 }
 
 /* This has the same layout as wait_bit_key - see fs/cachefiles/rdwr.c */
-struct wait_page_key {
-	struct page *page;
-	int bit_nr;
-	int page_match;
-};
-
 struct wait_page_queue {
+	struct rb_node node;
 	struct page *page;
 	int bit_nr;
-	wait_queue_entry_t wait;
+	bool one;
+	bool dequeued;
+	struct task_struct *task;
+	struct list_head queue;
+	struct list_head head;
 };
 
-static int wake_page_function(wait_queue_entry_t *wait, unsigned mode, int sync, void *arg)
+static int wake_page_match(struct page *page, int bit_nr,
+			   struct wait_page_queue *page_q, bool *page_match)
 {
-	struct wait_page_key *key = arg;
-	struct wait_page_queue *wait_page
-		= container_of(wait, struct wait_page_queue, wait);
-
-	if (wait_page->page != key->page)
-	       return 0;
-	key->page_match = 1;
-
-	if (wait_page->bit_nr != key->bit_nr)
-		return 0;
-
-	/* Stop walking if it's locked */
-	if (test_bit(key->bit_nr, &key->page->flags))
+	if ((unsigned long)page < (unsigned long)page_q->page)
 		return -1;
 
-	return autoremove_wake_function(wait, mode, sync, key);
+	if ((unsigned long)page > (unsigned long)page_q->page)
+		return 1;
+
+	/* page hit */
+	*page_match = true;
+
+	if (bit_nr < page_q->bit_nr)
+		return -1;
+
+	if (bit_nr > page_q->bit_nr)
+		return 1;
+
+	return 0;
+}
+
+static struct rb_node *find_wake_page(struct page_wait_rb *rb,
+				      struct page *page, int bit_nr,
+				      bool *page_match)
+{
+	struct rb_node *node;
+
+	node = rb->tree.rb_node;
+	while (node) {
+		struct wait_page_queue *page_q;
+		int match;
+
+		page_q = rb_entry(node, struct wait_page_queue, node);
+		match = wake_page_match(page, bit_nr, page_q, page_match);
+
+		if (match < 0)
+			node = node->rb_left;
+		else if (match > 0)
+			node = node->rb_right;
+		else
+			break;
+	}
+	return node;
+}
+
+static void wake_up_rb(struct page_wait_rb *rb, struct wake_q_head *wake_q,
+		       struct wait_page_queue *page_q)
+{
+	struct wait_page_queue *next;
+	struct rb_node *node = &page_q->node;
+
+	while (1) {
+		struct task_struct *t;
+		bool one;
+
+		if (list_empty(&page_q->head)) {
+
+			rb_erase(node, &rb->tree);
+			RB_CLEAR_NODE(node);
+
+			t = READ_ONCE(page_q->task);
+			/* full barrier in wake_q_add() */
+			page_q->dequeued = true;
+			wake_q_add(wake_q, t);
+			break;
+		}
+
+		next = list_first_entry(&page_q->head, struct wait_page_queue,
+					queue);
+
+		list_del_init(&next->queue);
+		list_splice_init(&page_q->head, &next->head);
+
+		rb_replace_node(node, &next->node, &rb->tree);
+		RB_CLEAR_NODE(node);
+		t = READ_ONCE(page_q->task);
+		one = READ_ONCE(page_q->one);
+
+		/* full barrier in wake_q_add() */
+		page_q->dequeued = true;
+		wake_q_add(wake_q, t);
+		if (one == true)
+			break;
+		page_q = next;
+		node = &page_q->node;
+	}
 }
 
 static void wake_up_page_bit(struct page *page, int bit_nr)
 {
-	wait_queue_head_t *q = page_waitqueue(page);
-	struct wait_page_key key;
+	struct page_wait_rb *rb = page_waitqueue(page);
+	struct wait_page_queue *page_q;
+	struct rb_node *node;
 	unsigned long flags;
-	wait_queue_entry_t bookmark;
+	bool page_match = false;
+	DEFINE_WAKE_Q(wake_q);
 
-	key.page = page;
-	key.bit_nr = bit_nr;
-	key.page_match = 0;
+	spin_lock_irqsave(&rb->lock, flags);
 
-	bookmark.flags = 0;
-	bookmark.private = NULL;
-	bookmark.func = NULL;
-	INIT_LIST_HEAD(&bookmark.entry);
+	node = find_wake_page(rb, page, bit_nr, &page_match);
+	if (node) {
 
-	spin_lock_irqsave(&q->lock, flags);
-	__wake_up_locked_key_bookmark(q, TASK_NORMAL, &key, &bookmark);
-
-	while (bookmark.flags & WQ_FLAG_BOOKMARK) {
-		/*
-		 * Take a breather from holding the lock,
-		 * allow pages that finish wake up asynchronously
-		 * to acquire the lock and remove themselves
-		 * from wait queue
-		 */
-		spin_unlock_irqrestore(&q->lock, flags);
-		cpu_relax();
-		spin_lock_irqsave(&q->lock, flags);
-		__wake_up_locked_key_bookmark(q, TASK_NORMAL, &key, &bookmark);
+		page_q = rb_entry(node, struct wait_page_queue, node);
+		/* Stop walking if it's locked */
+		if (test_bit(bit_nr, &page->flags))
+			goto no_wakeup;
+		wake_up_rb(rb, &wake_q, page_q);
 	}
-
 	/*
 	 * It is possible for other pages to have collided on the waitqueue
 	 * hash, so in that case check for a page match. That prevents a long-
@@ -1050,7 +1112,7 @@ static void wake_up_page_bit(struct page *page, int bit_nr)
 	 * and removed them from the waitqueue, but there are still other
 	 * page waiters.
 	 */
-	if (!waitqueue_active(q) || !key.page_match) {
+	if (!page_match || RB_EMPTY_ROOT(&rb->tree)) {
 		ClearPageWaiters(page);
 		/*
 		 * It's possible to miss clearing Waiters here, when we woke
@@ -1060,7 +1122,10 @@ static void wake_up_page_bit(struct page *page, int bit_nr)
 		 * That's okay, it's a rare case. The next waker will clear it.
 		 */
 	}
-	spin_unlock_irqrestore(&q->lock, flags);
+no_wakeup:
+
+	spin_unlock_irqrestore(&rb->lock, flags);
+	wake_up_q(&wake_q);
 }
 
 static void wake_up_page(struct page *page, int bit)
@@ -1070,30 +1135,63 @@ static void wake_up_page(struct page *page, int bit)
 	wake_up_page_bit(page, bit);
 }
 
-static inline int wait_on_page_bit_common(wait_queue_head_t *q,
-		struct page *page, int bit_nr, int state, bool lock)
+static int wait_on_page_bit_common(struct page_wait_rb *rb, struct page *page,
+				   int bit_nr, int state, bool lock)
 {
-	struct wait_page_queue wait_page;
-	wait_queue_entry_t *wait = &wait_page.wait;
+	struct wait_page_queue page_q;
+	struct rb_node *node = &page_q.node;
+	struct rb_node **p;
+	struct rb_node *parent;
 	int ret = 0;
 
-	init_wait(wait);
-	wait->flags = lock ? WQ_FLAG_EXCLUSIVE : 0;
-	wait->func = wake_page_function;
-	wait_page.page = page;
-	wait_page.bit_nr = bit_nr;
+	page_q.page = page;
+	page_q.bit_nr = bit_nr;
+	page_q.task = current;
+	page_q.one = lock;
+	INIT_LIST_HEAD(&page_q.queue);
+	INIT_LIST_HEAD(&page_q.head);
+	RB_CLEAR_NODE(&page_q.node);
 
 	for (;;) {
-		spin_lock_irq(&q->lock);
+		spin_lock_irq(&rb->lock);
 
-		if (likely(list_empty(&wait->entry))) {
-			__add_wait_queue_entry_tail(q, wait);
-			SetPageWaiters(page);
+		if (likely(RB_EMPTY_NODE(node)) &&
+			list_empty(&page_q.queue)) {
+
+			page_q.dequeued = false;
+
+			p = &rb->tree.rb_node;
+			parent = NULL;
+			while (*p) {
+				struct wait_page_queue *tmp;
+				int match;
+				bool page_match;
+
+				parent = *p;
+				tmp = rb_entry(parent, struct wait_page_queue, node);
+
+				match = wake_page_match(page, bit_nr, tmp, &page_match);
+
+				if (match < 0) {
+					p = &parent->rb_left;
+
+				} else if (match > 0) {
+					p = &parent->rb_right;
+				} else {
+					list_add_tail(&page_q.queue,
+						      &tmp->head);
+					break;
+				}
+			}
+			if (list_empty(&page_q.queue)) {
+				rb_link_node(node, parent, p);
+				rb_insert_color(node, &rb->tree);
+			}
 		}
-
+		SetPageWaiters(page);
 		set_current_state(state);
 
-		spin_unlock_irq(&q->lock);
+		spin_unlock_irq(&rb->lock);
 
 		if (likely(test_bit(bit_nr, &page->flags))) {
 			io_schedule();
@@ -1112,8 +1210,34 @@ static inline int wait_on_page_bit_common(wait_queue_head_t *q,
 			break;
 		}
 	}
+	__set_current_state(TASK_RUNNING);
 
-	finish_wait(q, wait);
+	/* paired with the full barrier in wake_q_add() */
+	smp_rmb();
+	if (!page_q.dequeued) {
+		spin_lock_irq(&rb->lock);
+
+		if (!list_empty(&page_q.queue))
+			list_del_init(&page_q.queue);
+
+		if (!list_empty(&page_q.head)) {
+			struct wait_page_queue *tmp;
+
+			BUG_ON(RB_EMPTY_NODE(node));
+
+			tmp = list_first_entry(&page_q.head,
+					       struct wait_page_queue,
+					       queue);
+			list_del_init(&tmp->queue);
+			list_splice(&page_q.head, &tmp->head);
+
+			rb_replace_node(node, &tmp->node, &rb->tree);
+
+		} else if (!RB_EMPTY_NODE(node)) {
+			rb_erase(node, &rb->tree);
+		}
+		spin_unlock_irq(&rb->lock);
+	}
 
 	/*
 	 * A signal could leave PageWaiters set. Clearing it here if
@@ -1128,18 +1252,21 @@ static inline int wait_on_page_bit_common(wait_queue_head_t *q,
 
 void wait_on_page_bit(struct page *page, int bit_nr)
 {
-	wait_queue_head_t *q = page_waitqueue(page);
-	wait_on_page_bit_common(q, page, bit_nr, TASK_UNINTERRUPTIBLE, false);
+	struct page_wait_rb *rb = page_waitqueue(page);
+
+	wait_on_page_bit_common(rb, page, bit_nr, TASK_UNINTERRUPTIBLE, false);
 }
 EXPORT_SYMBOL(wait_on_page_bit);
 
 int wait_on_page_bit_killable(struct page *page, int bit_nr)
 {
-	wait_queue_head_t *q = page_waitqueue(page);
-	return wait_on_page_bit_common(q, page, bit_nr, TASK_KILLABLE, false);
+	struct page_wait_rb *rb = page_waitqueue(page);
+
+	return wait_on_page_bit_common(rb, page, bit_nr, TASK_KILLABLE, false);
 }
 EXPORT_SYMBOL(wait_on_page_bit_killable);
 
+#if 0
 /**
  * add_page_wait_queue - Add an arbitrary waiter to a page's wait queue
  * @page: Page defining the wait queue of interest
@@ -1158,6 +1285,7 @@ void add_page_wait_queue(struct page *page, wait_queue_entry_t *waiter)
 	spin_unlock_irqrestore(&q->lock, flags);
 }
 EXPORT_SYMBOL_GPL(add_page_wait_queue);
+#endif
 
 #ifndef clear_bit_unlock_is_negative_byte
 
@@ -1268,16 +1396,18 @@ EXPORT_SYMBOL_GPL(page_endio);
 void __lock_page(struct page *__page)
 {
 	struct page *page = compound_head(__page);
-	wait_queue_head_t *q = page_waitqueue(page);
-	wait_on_page_bit_common(q, page, PG_locked, TASK_UNINTERRUPTIBLE, true);
+	struct page_wait_rb *rb = page_waitqueue(page);
+
+	wait_on_page_bit_common(rb, page, PG_locked, TASK_UNINTERRUPTIBLE, true);
 }
 EXPORT_SYMBOL(__lock_page);
 
 int __lock_page_killable(struct page *__page)
 {
 	struct page *page = compound_head(__page);
-	wait_queue_head_t *q = page_waitqueue(page);
-	return wait_on_page_bit_common(q, page, PG_locked, TASK_KILLABLE, true);
+	struct page_wait_rb *rb = page_waitqueue(page);
+
+	return wait_on_page_bit_common(rb, page, PG_locked, TASK_KILLABLE, true);
 }
 EXPORT_SYMBOL_GPL(__lock_page_killable);
 
-- 
2.16.3

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ