lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1430494072-30283-4-git-send-email-dave@stgolabs.net>
Date:	Fri,  1 May 2015 08:27:52 -0700
From:	Davidlohr Bueso <dave@...olabs.net>
To:	Peter Zijlstra <peterz@...radead.org>,
	Thomas Gleixner <tglx@...utronix.de>,
	Ingo Molnar <mingo@...hat.com>
Cc:	Sebastian Andrzej Siewior <bigeasy@...utronix.de>,
	Linus Torvalds <torvalds@...ux-foundation.org>,
	Chris Mason <clm@...com>, Steven Rostedt <rostedt@...dmis.org>,
	Manfred Spraul <manfred@...orfullife.com>,
	George Spelvin <linux@...izon.com>,
	linux-kernel@...r.kernel.org, Davidlohr Bueso <dave@...olabs.net>,
	Davidlohr Bueso <dbueso@...e.de>
Subject: [PATCH 3/3] ipc/mqueue: lockless pipelined wakeups

This patch moves the wakeup_process() invocation so it is not done under
the info->lock by making use of a lockless wake_q. With this change, the
waiter is woken up once it is STATE_READY and it does not need to loop
on SMP if it is still in STATE_PENDING. In the timeout case we still need
to grab the info->lock to verify the state.

This change should also avoid the introduction of preempt_disable() in
-RT which avoids a busy-loop which pools for the STATE_PENDING -> STATE_READY
change if the waiter has a higher priority compared to the waker.

Additionally, this patch micro-optimizes wq_sleep by using the cheaper
cousin of set_current_state(TASK_INTERRUPTABLE) as we will block no
matter what, thus get rid of the implied barrier.

Signed-off-by: Davidlohr Bueso <dbueso@...e.de>
---
 ipc/mqueue.c | 50 +++++++++++++++++++++++++++++++-------------------
 1 file changed, 31 insertions(+), 19 deletions(-)

diff --git a/ipc/mqueue.c b/ipc/mqueue.c
index 3aaea7f..5f422f2 100644
--- a/ipc/mqueue.c
+++ b/ipc/mqueue.c
@@ -47,8 +47,7 @@
 #define RECV		1
 
 #define STATE_NONE	0
-#define STATE_PENDING	1
-#define STATE_READY	2
+#define STATE_READY	1
 
 struct posix_msg_tree_node {
 	struct rb_node		rb_node;
@@ -571,15 +570,12 @@ static int wq_sleep(struct mqueue_inode_info *info, int sr,
 	wq_add(info, sr, ewp);
 
 	for (;;) {
-		set_current_state(TASK_INTERRUPTIBLE);
+		__set_current_state(TASK_INTERRUPTIBLE);
 
 		spin_unlock(&info->lock);
 		time = schedule_hrtimeout_range_clock(timeout, 0,
 			HRTIMER_MODE_ABS, CLOCK_REALTIME);
 
-		while (ewp->state == STATE_PENDING)
-			cpu_relax();
-
 		if (ewp->state == STATE_READY) {
 			retval = 0;
 			goto out;
@@ -909,9 +905,14 @@ out_name:
  * bypasses the message array and directly hands the message over to the
  * receiver.
  * The receiver accepts the message and returns without grabbing the queue
- * spinlock. Therefore an intermediate STATE_PENDING state and memory barriers
- * are necessary. The same algorithm is used for sysv semaphores, see
- * ipc/sem.c for more details.
+ * spinlock. The used algorithm is different from sysv semaphores (ipc/sem.c):
+ *
+ * - Set pointer to message.
+ * - Queue the receiver task's for later wakeup (without the info->lock).
+ * - Update its state to STATE_READY. Now the receiver can continue.
+ * - Wake up the process after the lock is dropped. Should the process wake up
+ *   before this wakeup (due to a timeout or a signal) it will either see
+ *   STATE_READY and continue or acquire the lock to check the sate again.
  *
  * The same algorithm is used for senders.
  */
@@ -919,21 +920,29 @@ out_name:
 /* pipelined_send() - send a message directly to the task waiting in
  * sys_mq_timedreceive() (without inserting message into a queue).
  */
-static inline void pipelined_send(struct mqueue_inode_info *info,
+static inline void pipelined_send(struct wake_q_head *wake_q,
+				  struct mqueue_inode_info *info,
 				  struct msg_msg *message,
 				  struct ext_wait_queue *receiver)
 {
 	receiver->msg = message;
 	list_del(&receiver->list);
-	receiver->state = STATE_PENDING;
-	wake_up_process(receiver->task);
-	smp_wmb();
+	wake_q_add(wake_q, receiver->task);
+	/*
+	 * Rely on the implicit cmpxchg barrier from wake_q_add such
+	 * that we can ensure that updating receiver->state is the last
+	 * write operation: As once set, the receiver can continue,
+	 * and if we don't have the reference count from the wake_q,
+	 * yet, at that point we can later have a use-after-free
+	 * condition and bogus wakeup.
+	 */
 	receiver->state = STATE_READY;
 }
 
 /* pipelined_receive() - if there is task waiting in sys_mq_timedsend()
  * gets its message and put to the queue (we have one free place for sure). */
-static inline void pipelined_receive(struct mqueue_inode_info *info)
+static inline void pipelined_receive(struct wake_q_head *wake_q,
+				     struct mqueue_inode_info *info)
 {
 	struct ext_wait_queue *sender = wq_get_first_waiter(info, SEND);
 
@@ -944,10 +953,9 @@ static inline void pipelined_receive(struct mqueue_inode_info *info)
 	}
 	if (msg_insert(sender->msg, info))
 		return;
+
 	list_del(&sender->list);
-	sender->state = STATE_PENDING;
-	wake_up_process(sender->task);
-	smp_wmb();
+	wake_q_add(wake_q, sender->task);
 	sender->state = STATE_READY;
 }
 
@@ -965,6 +973,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
 	struct timespec ts;
 	struct posix_msg_tree_node *new_leaf = NULL;
 	int ret = 0;
+	WAKE_Q(wake_q);
 
 	if (u_abs_timeout) {
 		int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1049,7 +1058,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
 	} else {
 		receiver = wq_get_first_waiter(info, RECV);
 		if (receiver) {
-			pipelined_send(info, msg_ptr, receiver);
+			pipelined_send(&wake_q, info, msg_ptr, receiver);
 		} else {
 			/* adds message to the queue */
 			ret = msg_insert(msg_ptr, info);
@@ -1062,6 +1071,7 @@ SYSCALL_DEFINE5(mq_timedsend, mqd_t, mqdes, const char __user *, u_msg_ptr,
 	}
 out_unlock:
 	spin_unlock(&info->lock);
+	wake_up_q(&wake_q);
 out_free:
 	if (ret)
 		free_msg(msg_ptr);
@@ -1084,6 +1094,7 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
 	ktime_t expires, *timeout = NULL;
 	struct timespec ts;
 	struct posix_msg_tree_node *new_leaf = NULL;
+	WAKE_Q(wake_q);
 
 	if (u_abs_timeout) {
 		int res = prepare_timeout(u_abs_timeout, &expires, &ts);
@@ -1155,8 +1166,9 @@ SYSCALL_DEFINE5(mq_timedreceive, mqd_t, mqdes, char __user *, u_msg_ptr,
 				CURRENT_TIME;
 
 		/* There is now free space in queue. */
-		pipelined_receive(info);
+		pipelined_receive(&wake_q, info);
 		spin_unlock(&info->lock);
+		wake_up_q(&wake_q);
 		ret = 0;
 	}
 	if (ret == 0) {
-- 
2.1.4

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ