lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  PHC 
Open Source and information security mailing list archives
 
Hash Suite for Android: free password hash cracker in your pocket
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Date:   Thu, 27 Aug 2020 10:17:00 +0200
From:   Julian Wiedmann <jwi@...ux.ibm.com>
To:     David Miller <davem@...emloft.net>,
        Jakub Kicinski <kuba@...nel.org>
Cc:     netdev <netdev@...r.kernel.org>,
        linux-s390 <linux-s390@...r.kernel.org>,
        Heiko Carstens <hca@...ux.ibm.com>,
        Ursula Braun <ubraun@...ux.ibm.com>,
        Karsten Graul <kgraul@...ux.ibm.com>,
        Julian Wiedmann <jwi@...ux.ibm.com>
Subject: [PATCH net-next 3/8] s390/qeth: make queue lock a proper spinlock

queue->state is a ternary spinlock in disguise, used by
OSA's TX completion path to lock the Output Queue and flush any pending
packets on it to the device. If the Queue is already locked by our TX
code, setting the lock word to QETH_OUT_Q_LOCKED_FLUSH lets the TX
completion code move on - the TX path will later take care of things
when it unlocks the Queue.

This sort of DIY locking is a non-starter of course, just let the
TX completion path block on the spinlock when necessary. If that ends up
causing additional latency due to lock contention, then converting
the OSA path to use xmit_more is the right way to go forward.

Also slightly expand the locked section and capture all of
qeth_do_send_packet(), so that the update for the 'bufs_pack' statistics
is done race-free.

While reworking the TX completion path's code, remove a barrier() that
doesn't make any sense.

Signed-off-by: Julian Wiedmann <jwi@...ux.ibm.com>
---
 drivers/s390/net/qeth_core.h      |  8 +---
 drivers/s390/net/qeth_core_main.c | 80 +++++++++----------------------
 2 files changed, 23 insertions(+), 65 deletions(-)

diff --git a/drivers/s390/net/qeth_core.h b/drivers/s390/net/qeth_core.h
index ecfd6d152e86..00ed58428163 100644
--- a/drivers/s390/net/qeth_core.h
+++ b/drivers/s390/net/qeth_core.h
@@ -420,12 +420,6 @@ struct qeth_qdio_out_buffer {
 
 struct qeth_card;
 
-enum qeth_out_q_states {
-       QETH_OUT_Q_UNLOCKED,
-       QETH_OUT_Q_LOCKED,
-       QETH_OUT_Q_LOCKED_FLUSH,
-};
-
 #define QETH_CARD_STAT_ADD(_c, _stat, _val)	((_c)->stats._stat += (_val))
 #define QETH_CARD_STAT_INC(_c, _stat)		QETH_CARD_STAT_ADD(_c, _stat, 1)
 
@@ -486,12 +480,12 @@ struct qeth_qdio_out_q {
 	struct qeth_qdio_out_buffer *bufs[QDIO_MAX_BUFFERS_PER_Q];
 	struct qdio_outbuf_state *bufstates; /* convenience pointer */
 	struct qeth_out_q_stats stats;
+	spinlock_t lock;
 	u8 next_buf_to_fill;
 	u8 max_elements;
 	u8 queue_no;
 	u8 do_pack;
 	struct qeth_card *card;
-	atomic_t state;
 	/*
 	 * number of buffers that are currently filled (PRIMED)
 	 * -> these buffers are hardware-owned
diff --git a/drivers/s390/net/qeth_core_main.c b/drivers/s390/net/qeth_core_main.c
index 5742a682a193..26bc8c15ffb8 100644
--- a/drivers/s390/net/qeth_core_main.c
+++ b/drivers/s390/net/qeth_core_main.c
@@ -2702,6 +2702,7 @@ static int qeth_alloc_qdio_queues(struct qeth_card *card)
 		card->qdio.out_qs[i] = queue;
 		queue->card = card;
 		queue->queue_no = i;
+		spin_lock_init(&queue->lock);
 		timer_setup(&queue->timer, qeth_tx_completion_timer, 0);
 		queue->coalesce_usecs = QETH_TX_COALESCE_USECS;
 		queue->max_coalesced_frames = QETH_TX_MAX_COALESCED_FRAMES;
@@ -3068,7 +3069,6 @@ static int qeth_init_qdio_queues(struct qeth_card *card)
 		queue->bulk_max = qeth_tx_select_bulk_max(card, queue);
 		atomic_set(&queue->used_buffers, 0);
 		atomic_set(&queue->set_pci_flags_count, 0);
-		atomic_set(&queue->state, QETH_OUT_Q_UNLOCKED);
 		netdev_tx_reset_queue(netdev_get_tx_queue(card->dev, i));
 	}
 	return 0;
@@ -3741,37 +3741,31 @@ static void qeth_flush_queue(struct qeth_qdio_out_q *queue)
 
 static void qeth_check_outbound_queue(struct qeth_qdio_out_q *queue)
 {
-	int index;
-	int flush_cnt = 0;
-	int q_was_packing = 0;
-
 	/*
 	 * check if weed have to switch to non-packing mode or if
 	 * we have to get a pci flag out on the queue
 	 */
 	if ((atomic_read(&queue->used_buffers) <= QETH_LOW_WATERMARK_PACK) ||
 	    !atomic_read(&queue->set_pci_flags_count)) {
-		if (atomic_xchg(&queue->state, QETH_OUT_Q_LOCKED_FLUSH) ==
-				QETH_OUT_Q_UNLOCKED) {
-			/*
-			 * If we get in here, there was no action in
-			 * do_send_packet. So, we check if there is a
-			 * packing buffer to be flushed here.
-			 */
-			index = queue->next_buf_to_fill;
-			q_was_packing = queue->do_pack;
-			/* queue->do_pack may change */
-			barrier();
-			flush_cnt += qeth_switch_to_nonpacking_if_needed(queue);
-			if (!flush_cnt &&
-			    !atomic_read(&queue->set_pci_flags_count))
-				flush_cnt += qeth_prep_flush_pack_buffer(queue);
+		unsigned int index, flush_cnt;
+		bool q_was_packing;
+
+		spin_lock(&queue->lock);
+
+		index = queue->next_buf_to_fill;
+		q_was_packing = queue->do_pack;
+
+		flush_cnt = qeth_switch_to_nonpacking_if_needed(queue);
+		if (!flush_cnt && !atomic_read(&queue->set_pci_flags_count))
+			flush_cnt = qeth_prep_flush_pack_buffer(queue);
+
+		if (flush_cnt) {
+			qeth_flush_buffers(queue, index, flush_cnt);
 			if (q_was_packing)
 				QETH_TXQ_STAT_ADD(queue, bufs_pack, flush_cnt);
-			if (flush_cnt)
-				qeth_flush_buffers(queue, index, flush_cnt);
-			atomic_set(&queue->state, QETH_OUT_Q_UNLOCKED);
 		}
+
+		spin_unlock(&queue->lock);
 	}
 }
 
@@ -4283,29 +4277,22 @@ int qeth_do_send_packet(struct qeth_card *card, struct qeth_qdio_out_q *queue,
 			unsigned int offset, unsigned int hd_len,
 			int elements_needed)
 {
+	unsigned int start_index = queue->next_buf_to_fill;
 	struct qeth_qdio_out_buffer *buffer;
 	unsigned int next_element;
 	struct netdev_queue *txq;
 	bool stopped = false;
-	int start_index;
 	int flush_count = 0;
 	int do_pack = 0;
-	int tmp;
 	int rc = 0;
 
-	/* spin until we get the queue ... */
-	while (atomic_cmpxchg(&queue->state, QETH_OUT_Q_UNLOCKED,
-			      QETH_OUT_Q_LOCKED) != QETH_OUT_Q_UNLOCKED);
-	start_index = queue->next_buf_to_fill;
 	buffer = queue->bufs[queue->next_buf_to_fill];
 
 	/* Just a sanity check, the wake/stop logic should ensure that we always
 	 * get a free buffer.
 	 */
-	if (atomic_read(&buffer->state) != QETH_QDIO_BUF_EMPTY) {
-		atomic_set(&queue->state, QETH_OUT_Q_UNLOCKED);
+	if (atomic_read(&buffer->state) != QETH_QDIO_BUF_EMPTY)
 		return -EBUSY;
-	}
 
 	txq = netdev_get_tx_queue(card->dev, skb_get_queue_mapping(skb));
 
@@ -4328,8 +4315,6 @@ int qeth_do_send_packet(struct qeth_card *card, struct qeth_qdio_out_q *queue,
 			    QETH_QDIO_BUF_EMPTY) {
 				qeth_flush_buffers(queue, start_index,
 							   flush_count);
-				atomic_set(&queue->state,
-						QETH_OUT_Q_UNLOCKED);
 				rc = -EBUSY;
 				goto out;
 			}
@@ -4361,31 +4346,8 @@ int qeth_do_send_packet(struct qeth_card *card, struct qeth_qdio_out_q *queue,
 
 	if (flush_count)
 		qeth_flush_buffers(queue, start_index, flush_count);
-	else if (!atomic_read(&queue->set_pci_flags_count))
-		atomic_xchg(&queue->state, QETH_OUT_Q_LOCKED_FLUSH);
-	/*
-	 * queue->state will go from LOCKED -> UNLOCKED or from
-	 * LOCKED_FLUSH -> LOCKED if output_handler wanted to 'notify' us
-	 * (switch packing state or flush buffer to get another pci flag out).
-	 * In that case we will enter this loop
-	 */
-	while (atomic_dec_return(&queue->state)) {
-		start_index = queue->next_buf_to_fill;
-		/* check if we can go back to non-packing state */
-		tmp = qeth_switch_to_nonpacking_if_needed(queue);
-		/*
-		 * check if we need to flush a packing buffer to get a pci
-		 * flag out on the queue
-		 */
-		if (!tmp && !atomic_read(&queue->set_pci_flags_count))
-			tmp = qeth_prep_flush_pack_buffer(queue);
-		if (tmp) {
-			qeth_flush_buffers(queue, start_index, tmp);
-			flush_count += tmp;
-		}
-	}
+
 out:
-	/* at this point the queue is UNLOCKED again */
 	if (do_pack)
 		QETH_TXQ_STAT_ADD(queue, bufs_pack, flush_count);
 
@@ -4459,8 +4421,10 @@ int qeth_xmit(struct qeth_card *card, struct sk_buff *skb,
 	} else {
 		/* TODO: drop skb_orphan() once TX completion is fast enough */
 		skb_orphan(skb);
+		spin_lock(&queue->lock);
 		rc = qeth_do_send_packet(card, queue, skb, hdr, data_offset,
 					 hd_len, elements);
+		spin_unlock(&queue->lock);
 	}
 
 	if (rc && !push_len)
-- 
2.17.1

Powered by blists - more mailing lists