lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Date:	Wed, 04 Jan 2012 14:52:38 -0800
From:	Mike Waychison <mikew@...gle.com>
To:	Rusty Russell <rusty@...tcorp.com.au>,
	"Michael S. Tsirkin" <mst@...hat.com>
Cc:	netdev@...r.kernel.org, linux-kernel@...r.kernel.org,
	virtualization@...ts.linux-foundation.org
Subject: [RFC PATCH v1 2/2] virtio_net: Don't disable napi on low memory.

Currently, the virtio_net driver deals with low memory memory by kicking
off delayed work in process context to try and refill the rx queues.
This process-context filling is synchronized against the receive
bottom-half by serially:

  - disabling NAPI polling on the device,
  - allocating buffers,
  - enqueueing the buffers,
  - re-enabling napi.

Disabling NAPI just to synchronize virtio_add_buf calls is a bit
overkill, and there isn't any reason we shouldn't be able to continue
processing received packets as they come in.  In the simple case, this
does not involve allocating any memory, and in fact allows memory to be
released as the guest system receives and processes packets.

Instead of disabling NAPI, this change re-arranges the allocation and
enqueueing of buffers such that allocation of all buffers happens
separately from the enqueueing of said buffers.  In lieu of serializing
using NAPI for the refill out-of-band path, we now serialize using a
newly introduced bottom-half spinlock, vi->rx_fill_lock.

With the allocation split off from the enqueueing, we now must be a bit
more careful about how many allocations to make.  Before-hand, we were
limitting our allocations up to the point where the virtio_add_buf call
returned -ENOSPC, but with allocations done up front, we don't really
know how many allocations to perform.  Instead of grabbing the lock all
the time, we create batches of buffer allocations, as defined by the new
rx_allocate_batch module parameter.

As well, due to the way we infer the "vi->max" number of buffers the
queue can take, (as we really don't know up front), we now simply assume
that vi->max is whatever the number of successful allocations made were
at driver initialization, and update it later if we were wrong.

In empirical experiments where memory is tight, this patch allows the
out-of-band refill path to contribute to memory pressure for reclaim,
while reducing the perceived outage of the interface's receive path.
This in turn reduces network stalls that lead to congestion window
collapses.

Signed-off-by: Mike Waychison <mikew@...gle.com>
---
 drivers/net/virtio_net.c |  210 +++++++++++++++++++++++++++++++++++-----------
 1 files changed, 160 insertions(+), 50 deletions(-)

diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
index 76fe14e..90b1e6d 100644
--- a/drivers/net/virtio_net.c
+++ b/drivers/net/virtio_net.c
@@ -34,6 +34,9 @@ static bool csum = true, gso = true;
 module_param(csum, bool, 0444);
 module_param(gso, bool, 0444);
 
+static int rx_allocate_batch = 32;
+module_param(rx_allocate_batch, int, 0644);
+
 /* FIXME: MTU in config. */
 #define MAX_PACKET_LEN (ETH_HLEN + VLAN_HLEN + ETH_DATA_LEN)
 #define GOOD_COPY_LEN	128
@@ -72,6 +75,9 @@ struct virtnet_info {
 	/* Work struct for refilling if we run low on memory. */
 	struct delayed_work refill;
 
+	/* Lock used to synchronize adding buffers to the rx queue */
+	spinlock_t rx_fill_lock;
+
 	/* Chain pages by the private ptr. */
 	struct page *pages;
 
@@ -353,58 +359,82 @@ frame_err:
 	dev_kfree_skb(skb);
 }
 
-static int add_recvbuf_small(struct virtnet_info *vi, gfp_t gfp)
+static int alloc_recvbuf_small(struct virtnet_info *vi, struct list_head *list,
+			       gfp_t gfp)
 {
 	struct sk_buff *skb;
-	struct skb_vnet_hdr *hdr;
-	int err;
 
 	skb = __netdev_alloc_skb_ip_align(vi->dev, MAX_PACKET_LEN, gfp);
 	if (unlikely(!skb))
 		return -ENOMEM;
 
 	skb_put(skb, MAX_PACKET_LEN);
+	list_add_tail((struct list_head *)skb, list);
+	return 0;
+}
+
+static int add_recvbuf_small(struct virtnet_info *vi, struct sk_buff *skb)
+{
+	struct skb_vnet_hdr *hdr;
+	int err;
 
 	hdr = skb_vnet_hdr(skb);
 	sg_set_buf(vi->rx_sg, &hdr->hdr, sizeof hdr->hdr);
 
 	skb_to_sgvec(skb, vi->rx_sg + 1, 0, skb->len);
 
-	err = virtqueue_add_buf_gfp(vi->rvq, vi->rx_sg, 0, 2, skb, gfp);
+	err = virtqueue_add_buf(vi->rvq, vi->rx_sg, 0, 2, skb);
 	if (err < 0)
 		dev_kfree_skb(skb);
-
 	return err;
 }
 
-static int add_recvbuf_big(struct virtnet_info *vi, gfp_t gfp)
+static int alloc_recvbuf_big(struct virtnet_info *vi, struct list_head *list,
+			     gfp_t gfp)
 {
-	struct page *first, *list = NULL;
-	char *p;
-	int i, err, offset;
+	struct page *first, *tail = NULL;
+	int i;
 
 	/* page in vi->rx_sg[MAX_SKB_FRAGS + 1] is list tail */
 	for (i = MAX_SKB_FRAGS + 1; i > 1; --i) {
 		first = get_a_page(vi, gfp);
 		if (!first) {
-			if (list)
-				give_pages(vi, list);
+			if (tail)
+				give_pages(vi, tail);
 			return -ENOMEM;
 		}
 		sg_set_buf(&vi->rx_sg[i], page_address(first), PAGE_SIZE);
 
 		/* chain new page in list head to match sg */
-		first->private = (unsigned long)list;
-		list = first;
+		first->private = (unsigned long)tail;
+		tail = first;
 	}
 
 	first = get_a_page(vi, gfp);
 	if (!first) {
-		give_pages(vi, list);
+		give_pages(vi, tail);
 		return -ENOMEM;
 	}
-	p = page_address(first);
 
+	/* chain first in list head */
+	first->private = (unsigned long)tail;
+
+	list_add_tail(&first->lru, list);
+
+	return 0;
+}
+
+/*
+ *  Add a big page threaded through first->private to the rx queue.
+ *
+ *  Consumes the page list, even in case of failure.
+ */
+static int add_recvbuf_big(struct virtnet_info *vi, struct page *first)
+{
+	char *p;
+	int err, offset;
+
+	p = page_address(first);
 	/* vi->rx_sg[0], vi->rx_sg[1] share the same page */
 	/* a separated vi->rx_sg[0] for virtio_net_hdr only due to QEMU bug */
 	sg_set_buf(&vi->rx_sg[0], p, sizeof(struct virtio_net_hdr));
@@ -413,65 +443,140 @@ static int add_recvbuf_big(struct virtnet_info *vi, gfp_t gfp)
 	offset = sizeof(struct padded_vnet_hdr);
 	sg_set_buf(&vi->rx_sg[1], p + offset, PAGE_SIZE - offset);
 
-	/* chain first in list head */
-	first->private = (unsigned long)list;
-	err = virtqueue_add_buf_gfp(vi->rvq, vi->rx_sg, 0, MAX_SKB_FRAGS + 2,
-				    first, gfp);
+	err = virtqueue_add_buf(vi->rvq, vi->rx_sg, 0, MAX_SKB_FRAGS + 2,
+				first);
 	if (err < 0)
 		give_pages(vi, first);
-
 	return err;
 }
 
-static int add_recvbuf_mergeable(struct virtnet_info *vi, gfp_t gfp)
+static int alloc_recvbuf_mergeable(struct virtnet_info *vi,
+				   struct list_head *list, gfp_t gfp)
 {
 	struct page *page;
-	int err;
 
 	page = get_a_page(vi, gfp);
 	if (!page)
 		return -ENOMEM;
 
-	sg_init_one(vi->rx_sg, page_address(page), PAGE_SIZE);
+	list_add_tail(&page->lru, list);
+	return 0;
+}
 
-	err = virtqueue_add_buf_gfp(vi->rvq, vi->rx_sg, 0, 1, page, gfp);
+/* Add a page to the rx queue.  Consumes the page even in failure. */
+static int add_recvbuf_mergeable(struct virtnet_info *vi, struct page *page)
+{
+	int err;
+	sg_init_one(vi->rx_sg, page_address(page), PAGE_SIZE);
+	err = virtqueue_add_buf(vi->rvq, vi->rx_sg, 0, 1, page);
 	if (err < 0)
 		give_pages(vi, page);
-
 	return err;
 }
 
-/*
- * Returns false if we couldn't fill entirely (OOM).
- *
- * Normally run in the receive path, but can also be run from ndo_open
- * before we're receiving packets, or from refill_work which is
- * careful to disable receiving (using napi_disable).
- */
-static bool try_fill_recv(struct virtnet_info *vi, gfp_t gfp)
+/* Returns false if we couldn't fill entirely (OOM). */
+static inline bool try_fill_recv_pages(struct virtnet_info *vi, gfp_t gfp,
+		int (*alloc)(struct virtnet_info *, struct list_head *, gfp_t),
+		int (*add)(struct virtnet_info *, struct page *))
 {
+	LIST_HEAD(local_list);
+	struct page *page, *nextpage;
 	int err;
 	bool oom;
 
-	do {
-		if (vi->mergeable_rx_bufs)
-			err = add_recvbuf_mergeable(vi, gfp);
-		else if (vi->big_packets)
-			err = add_recvbuf_big(vi, gfp);
-		else
-			err = add_recvbuf_small(vi, gfp);
+	while (1) {
+		int count = rx_allocate_batch;
+		do {
+			err = alloc(vi, &local_list, gfp);
+			oom |= err == -ENOMEM;
+			if (err < 0)
+				break;
+		} while (err >= 0 && --count);
+
+		spin_lock_bh(&vi->rx_fill_lock);
+		list_for_each_entry_safe(page, nextpage, &local_list, lru) {
+			list_del(&page->lru);
+			err = add(vi, page);
+			oom |= err == -ENOMEM;
+			if (err >= 0)
+				++vi->num;
+		}
+		if (unlikely(vi->num > vi->max))
+			vi->max = vi->num;
+		if (err < 0 || vi->num == vi->max)
+			break;
+		spin_unlock_bh(&vi->rx_fill_lock);
+	}
+	virtqueue_kick(vi->rvq);
+	spin_unlock_bh(&vi->rx_fill_lock);
+	return !oom;
+}
+
+static bool try_fill_recv_mergeable(struct virtnet_info *vi, gfp_t gfp)
+{
+	return try_fill_recv_pages(vi, gfp, alloc_recvbuf_mergeable,
+				   add_recvbuf_mergeable);
+}
+
+static bool try_fill_recv_big(struct virtnet_info *vi, gfp_t gfp)
+{
+	return try_fill_recv_pages(vi, gfp, alloc_recvbuf_big, add_recvbuf_big);
+}
+
+static bool try_fill_recv_small(struct virtnet_info *vi, gfp_t gfp)
+{
+	LIST_HEAD(local_list);
+	struct list_head *pos, *n;
+	int err;
+	bool oom;
 
-		oom = err == -ENOMEM;
-		if (err < 0)
+	while (1) {
+		int count = rx_allocate_batch;
+		do {
+			err = alloc_recvbuf_small(vi, &local_list, gfp);
+			oom |= err == -ENOMEM;
+			/* In case of error, still process the local_list */
+			if (err < 0)
+				break;
+		} while (err >= 0 && --count);
+
+		spin_lock_bh(&vi->rx_fill_lock);
+		list_for_each_safe(pos, n, &local_list) {
+			struct sk_buff *skb = (struct sk_buff *)pos;
+			list_del(pos);
+			err = add_recvbuf_small(vi, skb);
+			oom |= err == -ENOMEM;
+			if (err >= 0)
+				++vi->num;
+		}
+		if (unlikely(vi->num > vi->max))
+			vi->max = vi->num;
+		if (err < 0 || vi->num == vi->max)
 			break;
-		++vi->num;
-	} while (err > 0);
-	if (unlikely(vi->num > vi->max))
-		vi->max = vi->num;
+		spin_unlock_bh(&vi->rx_fill_lock);
+	}
 	virtqueue_kick(vi->rvq);
+	spin_unlock_bh(&vi->rx_fill_lock);
 	return !oom;
 }
 
+/*
+ * Returns false if we couldn't fill entirely (OOM).
+ *
+ * Normally run in the receive path, but can also be run from ndo_open
+ * before we're receiving packets, or from refill_work.  Serializes
+ * access to the virtioqueue using vi->rx_fill_lock.
+ */
+static bool try_fill_recv(struct virtnet_info *vi, gfp_t gfp)
+{
+	if (vi->mergeable_rx_bufs)
+		return try_fill_recv_mergeable(vi, gfp);
+	else if (vi->big_packets)
+		return try_fill_recv_big(vi, gfp);
+	else
+		return try_fill_recv_small(vi, gfp);
+}
+
 static void skb_recv_done(struct virtqueue *rvq)
 {
 	struct virtnet_info *vi = rvq->vdev->priv;
@@ -502,9 +607,7 @@ static void refill_work(struct work_struct *work)
 	bool still_empty;
 
 	vi = container_of(work, struct virtnet_info, refill.work);
-	napi_disable(&vi->napi);
 	still_empty = !try_fill_recv(vi, GFP_KERNEL);
-	virtnet_napi_enable(vi);
 
 	/* In theory, this can happen: if we don't get any buffers in
 	 * we will *never* try to fill again. */
@@ -519,12 +622,15 @@ static int virtnet_poll(struct napi_struct *napi, int budget)
 	unsigned int len, received = 0;
 
 again:
+	/* We must serialize access to the queue from any concurrent refills. */
+	spin_lock(&vi->rx_fill_lock);
 	while (received < budget &&
 	       (buf = virtqueue_get_buf(vi->rvq, &len)) != NULL) {
 		receive_buf(vi->dev, buf, len);
 		--vi->num;
 		received++;
 	}
+	spin_unlock(&vi->rx_fill_lock);
 
 	if (vi->num < vi->max / 2) {
 		if (!try_fill_recv(vi, GFP_ATOMIC))
@@ -675,7 +781,7 @@ static int virtnet_set_mac_address(struct net_device *dev, void *p)
 
 	if (virtio_has_feature(vdev, VIRTIO_NET_F_MAC))
 		vdev->config->set(vdev, offsetof(struct virtio_net_config, mac),
-		                  dev->dev_addr, dev->addr_len);
+				  dev->dev_addr, dev->addr_len);
 
 	return 0;
 }
@@ -1053,6 +1159,7 @@ static int virtnet_probe(struct virtio_device *vdev)
 		goto free;
 
 	INIT_DELAYED_WORK(&vi->refill, refill_work);
+	spin_lock_init(&vi->rx_fill_lock);
 	sg_init_table(vi->rx_sg, ARRAY_SIZE(vi->rx_sg));
 	sg_init_table(vi->tx_sg, ARRAY_SIZE(vi->tx_sg));
 
@@ -1089,8 +1196,11 @@ static int virtnet_probe(struct virtio_device *vdev)
 		goto free_vqs;
 	}
 
-	/* Last of all, set up some receive buffers. */
+	/* Last of all, set up some receive buffers.
+	 * Start off with a huge max, and then clamp it down to the real max */
+	vi->max = ~0U;
 	try_fill_recv(vi, GFP_KERNEL);
+	vi->max = vi->num;
 
 	/* If we didn't even get one input buffer, we're useless. */
 	if (vi->num == 0) {

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Powered by blists - more mailing lists