>From fe2f257b1950a19bf5c6f67e71aa25c2f13bcdc3 Mon Sep 17 00:00:00 2001 From: Manfred Spraul Date: Sun, 24 May 2020 14:47:31 +0200 Subject: [PATCH 2/2] ipc/msg.c: Handle case of senders not enqueuing the message The patch "ipc/msg.c: wake up senders until there is a queue empty capacity" avoids the thundering herd problem by wakeing up only as many potential senders as there is free space in the queue. This patch is a fix: If one of the senders doesn't enqueue its message, then a search for further potential senders must be performed. Signed-off-by: Manfred Spraul --- ipc/msg.c | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/ipc/msg.c b/ipc/msg.c index 52d634b0a65a..f6d5188db38a 100644 --- a/ipc/msg.c +++ b/ipc/msg.c @@ -208,6 +208,12 @@ static inline void ss_del(struct msg_sender *mss) list_del(&mss->list); } +/* + * ss_wakeup() assumes that the stored senders will enqueue the pending message. + * Thus: If a woken up task doesn't send the enqueued message for whatever + * reason, then that task must call ss_wakeup() again, to ensure that no + * wakeup is lost. + */ static void ss_wakeup(struct msg_queue *msq, struct wake_q_head *wake_q, bool kill) { @@ -843,6 +849,7 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext, struct msg_queue *msq; struct msg_msg *msg; int err; + bool need_wakeup; struct ipc_namespace *ns; DEFINE_WAKE_Q(wake_q); @@ -869,6 +876,7 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext, ipc_lock_object(&msq->q_perm); + need_wakeup = false; for (;;) { struct msg_sender s; @@ -898,6 +906,13 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext, /* enqueue the sender and prepare to block */ ss_add(msq, &s, msgsz); + /* Enqueuing a sender is actually an obligation: + * The sender must either enqueue the message, or call + * ss_wakeup(). Thus track that we have added our message + * to the candidates for the message queue. + */ + need_wakeup = true; + if (!ipc_rcu_getref(&msq->q_perm)) { err = -EIDRM; goto out_unlock0; @@ -935,12 +950,18 @@ static long do_msgsnd(int msqid, long mtype, void __user *mtext, msq->q_qnum++; atomic_add(msgsz, &ns->msg_bytes); atomic_inc(&ns->msg_hdrs); + + /* we have fulfilled our obligation, no need for wakeup */ + need_wakeup = false; } err = 0; msg = NULL; out_unlock0: + if (need_wakeup) + ss_wakeup(msq, &wake_q, false); + ipc_unlock_object(&msq->q_perm); wake_up_q(&wake_q); out_unlock1: -- 2.26.2