[<prev] [next>] [<thread-prev] [day] [month] [year] [list]
Message-Id: <20171222194755.8544-5-tom@quantonium.net>
Date: Fri, 22 Dec 2017 11:47:55 -0800
From: Tom Herbert <tom@...ntonium.net>
To: davem@...emloft.net
Cc: netdev@...r.kernel.org, dvyukov@...gle.com,
john.fastabend@...il.com, rohit@...ntonium.net,
Tom Herbert <tom@...ntonium.net>
Subject: [PATCH net-next 4/4] kcm: Address deadlock between TX and RX paths
Both the transmit and receive paths of KCM need to take both the
KCM socket lock and the psock socket lock, however they take the
locks in opposite order which can lead to deadlock.
This patch changes the transmit path (kcm_write_msgs to be specific)
so the locks are taken in the proper order. try_sock_lock is first used
to get the lower socket lock, if that is successful then sending data
can proceed with dropping KCM lock. If try_sock_lock fails then the KCM
lock is released and lock_sock is done on the lower socket followed by
the lock_sock on the KCM sock.
A doing_write_msgs flag has been added to kcm structure to prevent
multiple threads doing write_msgs when the KCM lock is dropped.
kernel_sendpage_locked is now called to do the send data with lock
already held.
Signed-off-by: Tom Herbert <tom@...ntonium.net>
---
include/net/kcm.h | 1 +
net/kcm/kcmsock.c | 64 ++++++++++++++++++++++++++++++++++++++++---------------
2 files changed, 48 insertions(+), 17 deletions(-)
diff --git a/include/net/kcm.h b/include/net/kcm.h
index 2a8965819db0..22bd7dd3eedb 100644
--- a/include/net/kcm.h
+++ b/include/net/kcm.h
@@ -78,6 +78,7 @@ struct kcm_sock {
/* Don't use bit fields here, these are set under different locks */
bool tx_wait;
bool tx_wait_more;
+ bool doing_write_msgs;
/* Receive */
struct kcm_psock *rx_psock;
diff --git a/net/kcm/kcmsock.c b/net/kcm/kcmsock.c
index d4e98f20fc2a..3eb3179b96b3 100644
--- a/net/kcm/kcmsock.c
+++ b/net/kcm/kcmsock.c
@@ -574,13 +574,19 @@ static void kcm_report_tx_retry(struct kcm_sock *kcm)
static int kcm_write_msgs(struct kcm_sock *kcm)
{
struct sock *sk = &kcm->sk;
- struct kcm_psock *psock;
- struct sk_buff *skb, *head;
- struct kcm_tx_msg *txm;
+ struct sk_buff *head = skb_peek(&sk->sk_write_queue);
unsigned short fragidx, frag_offset;
unsigned int sent, total_sent = 0;
+ struct kcm_psock *psock;
+ struct kcm_tx_msg *txm;
+ struct sk_buff *skb;
int ret = 0;
+ if (unlikely(kcm->doing_write_msgs))
+ return 0;
+
+ kcm->doing_write_msgs = true;
+
kcm->tx_wait_more = false;
psock = kcm->tx_psock;
if (unlikely(psock && psock->tx_stopped)) {
@@ -598,15 +604,36 @@ static int kcm_write_msgs(struct kcm_sock *kcm)
return 0;
}
+try_again:
+ psock = reserve_psock(kcm);
+ if (!psock)
+ goto out_no_release_psock;
+
+ /* Get lock for lower sock */
+ if (!try_lock_sock(psock->sk)) {
+ /* Someone else is holding the lower sock lock. We need to
+ * release the KCM lock and get the psock lock first. This is
+ * needed since the receive path obtains the locks in reverse
+ * order and we want to avoid deadlock. Note that
+ * write_msgs can't be reentered when we drop the KCM lock
+ * since doing_write_msgs is set.
+ */
+ release_sock(&kcm->sk);
+
+ /* Take locks in order that receive path does */
+ lock_sock(psock->sk);
+ lock_sock(&kcm->sk);
+ }
+
+ /* At this point we have a reserved psock and its lower socket is
+ * locked.
+ */
+
head = skb_peek(&sk->sk_write_queue);
txm = kcm_tx_msg(head);
if (txm->sent) {
/* Send of first skbuff in queue already in progress */
- if (WARN_ON(!psock)) {
- ret = -EINVAL;
- goto out;
- }
sent = txm->sent;
frag_offset = txm->frag_offset;
fragidx = txm->fragidx;
@@ -615,11 +642,6 @@ static int kcm_write_msgs(struct kcm_sock *kcm)
goto do_frag;
}
-try_again:
- psock = reserve_psock(kcm);
- if (!psock)
- goto out;
-
do {
skb = head;
txm = kcm_tx_msg(head);
@@ -643,11 +665,12 @@ static int kcm_write_msgs(struct kcm_sock *kcm)
goto out;
}
- ret = kernel_sendpage(psock->sk->sk_socket,
- frag->page.p,
- frag->page_offset + frag_offset,
- frag->size - frag_offset,
- MSG_DONTWAIT);
+ ret = kernel_sendpage_locked(psock->sk, frag->page.p,
+ frag->page_offset +
+ frag_offset,
+ frag->size -
+ frag_offset,
+ MSG_DONTWAIT);
if (ret <= 0) {
if (ret == -EAGAIN) {
/* Save state to try again when there's
@@ -669,6 +692,7 @@ static int kcm_write_msgs(struct kcm_sock *kcm)
*/
kcm_abort_tx_psock(psock, ret ? -ret : EPIPE,
true);
+ release_sock(psock->sk);
unreserve_psock(kcm);
txm->sent = 0;
@@ -704,7 +728,13 @@ static int kcm_write_msgs(struct kcm_sock *kcm)
total_sent += sent;
KCM_STATS_INCR(psock->stats.tx_msgs);
} while ((head = skb_peek(&sk->sk_write_queue)));
+
out:
+ release_sock(psock->sk);
+
+out_no_release_psock:
+ kcm->doing_write_msgs = false;
+
if (!head) {
/* Done with all queued messages. */
WARN_ON(!skb_queue_empty(&sk->sk_write_queue));
--
2.11.0
Powered by blists - more mailing lists