[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20151103173021.GE1707@linux-uzut.site>
Date: Tue, 3 Nov 2015 09:30:21 -0800
From: Davidlohr Bueso <dave@...olabs.net>
To: Sebastian Andrzej Siewior <bigeasy@...utronix.de>
Cc: linux-kernel@...r.kernel.org, George Spelvin <linux@...izon.com>,
Thomas Gleixner <tglx@...utronix.de>,
Peter Zijlstra <peterz@...radead.org>,
Manfred Spraul <manfred@...orfullife.com>,
Andrew Morton <akpm@...ux-foundation.org>
Subject: Re: [PATCH v2] ipc/msg: Implement lockless pipelined wakeups
On Tue, 03 Nov 2015, Sebastian Andrzej Siewior wrote:
>@@ -577,26 +570,23 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
>
> list_del(&msr->r_list);
> if (msr->r_maxsize < msg->m_ts) {
>- /* initialize pipelined send ordering */
>- msr->r_msg = NULL;
>- wake_up_process(msr->r_tsk);
>- /* barrier (B) see barrier comment below */
>- smp_wmb();
>+ wake_q_add(wake_q, msr->r_tsk);
> msr->r_msg = ERR_PTR(-E2BIG);
> } else {
>- msr->r_msg = NULL;
> msq->q_lrpid = task_pid_vnr(msr->r_tsk);
> msq->q_rtime = get_seconds();
>- wake_up_process(msr->r_tsk);
>- /*
>- * Ensure that the wakeup is visible before
>- * setting r_msg, as the receiving can otherwise
>- * exit - once r_msg is set, the receiver can
>- * continue. See lockless receive part 1 and 2
>- * in do_msgrcv(). Barrier (B).
>- */
>- smp_wmb();
>+ wake_q_add(wake_q, msr->r_tsk);
> msr->r_msg = msg;
>+ /*
>+ * Rely on the implicit cmpxchg barrier from
>+ * wake_q_add such that we can ensure that
>+ * updating msr->r_msg is the last write
>+ * operation: As once set, the receiver can
>+ * continue, and if we don't have the reference
>+ * count from the wake_q, yet, at that point we
>+ * can later have a use-after-free condition and
>+ * bogus wakeup.
>+ */
Not sure why you placed the comment here. Why not between smp_wmb() and the r_msg
write as we have it?
You might also want to add a reference to this comment in expunge_all(), which
does the same thing.
> [...]
>
> /* Lockless receive, part 2:
>- * Wait until pipelined_send or expunge_all are outside of
>- * wake_up_process(). There is a race with exit(), see
>- * ipc/mqueue.c for the details. The correct serialization
>- * ensures that a receiver cannot continue without the wakeup
>- * being visibible _before_ setting r_msg:
>+ * The work in pipelined_send() and expunge_all():
>+ * - Set pointer to message
>+ * - Queue the receiver task for later wakeup
>+ * - Wake up the process after the lock is dropped.
> *
>- * CPU 0 CPU 1
>- * <loop receiver>
>- * smp_rmb(); (A) <-- pair -. <waker thread>
>- * <load ->r_msg> | msr->r_msg = NULL;
>- * | wake_up_process();
>- * <continue> `------> smp_wmb(); (B)
>- * msr->r_msg = msg;
>- *
>- * Where (A) orders the message value read and where (B) orders
>- * the write to the r_msg -- done in both pipelined_send and
>- * expunge_all.
>+ * Should the process wake up before this wakeup (due to a
>+ * signal) it will either see the message and continue ...
> */
>- for (;;) {
>- /*
>- * Pairs with writer barrier in pipelined_send
>- * or expunge_all.
>- */
>- smp_rmb(); /* barrier (A) */
>- msg = (struct msg_msg *)msr_d.r_msg;
>- if (msg)
>- break;
>
>- /*
>- * The cpu_relax() call is a compiler barrier
>- * which forces everything in this loop to be
>- * re-loaded.
>- */
>- cpu_relax();
>- }
>-
>- /* Lockless receive, part 3:
>- * If there is a message or an error then accept it without
>- * locking.
>- */
>+ msg = msr_d.r_msg;
But you're getting rid of the barrier pairing (smp_rmb) we have in pipelined sends
and expunge_all, which is necesary even if we don't busy wait on nil. Likewise,
there's no need to remove the comment above that illustrates this.
Thanks,
Davidlohr
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
Powered by blists - more mailing lists