[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <1321017627.955.254.camel@zakaz.uk.xensource.com>
Date: Fri, 11 Nov 2011 13:20:27 +0000
From: Ian Campbell <Ian.Campbell@...rix.com>
To: "Michael S. Tsirkin" <mst@...hat.com>
CC: "netdev@...r.kernel.org" <netdev@...r.kernel.org>,
"David S. Miller" <davem@...emloft.net>,
Neil Brown <neilb@...e.de>,
"J. Bruce Fields" <bfields@...ldses.org>,
"linux-nfs@...r.kernel.org" <linux-nfs@...r.kernel.org>
Subject: Re: [PATCH 4/4] sunrpc: use SKB fragment destructors to delay
completion until page is released by network stack.
On Fri, 2011-11-11 at 12:38 +0000, Michael S. Tsirkin wrote:
> On Wed, Nov 09, 2011 at 03:02:07PM +0000, Ian Campbell wrote:
> > This prevents an issue where an ACK is delayed, a retransmit is queued (either
> > at the RPC or TCP level) and the ACK arrives before the retransmission hits the
> > wire. If this happens to an NFS WRITE RPC then the write() system call
> > completes and the userspace process can continue, potentially modifying data
> > referenced by the retransmission before the retransmission occurs.
> >
> > Signed-off-by: Ian Campbell <ian.campbell@...rix.com>
> > Acked-by: Trond Myklebust <Trond.Myklebust@...app.com>
> > Cc: "David S. Miller" <davem@...emloft.net>
> > Cc: Neil Brown <neilb@...e.de>
> > Cc: "J. Bruce Fields" <bfields@...ldses.org>
> > Cc: linux-nfs@...r.kernel.org
> > Cc: netdev@...r.kernel.org
>
> So this blocks the system call until all page references
> are gone, right?
Right. The alternative is to return to userspace while the network stack
still has a reference to the buffer which was passed in -- that's the
exact class of problem this patch is supposed to fix.
> But, there's no upper limit on how long the
> page is referenced, correct?
Correct.
> consider a bridged setup
> with an skb queued at a tap device - this cause one process
> to block another one by virtue of not consuming a cloned skb?
Hmm, yes.
One approach might be to introduce the concept of an skb timeout to the
stack as a whole and cancel (or deep copy) after that timeout occurs.
That's going to be tricky though I suspect...
A simpler option would be to have an end points such as a tap device
which can swallow skbs for arbitrary times implement a policy in this
regard, either to deep copy or drop after a timeout?
Ian.
>
> > ---
> > include/linux/sunrpc/xdr.h | 2 ++
> > include/linux/sunrpc/xprt.h | 5 ++++-
> > net/sunrpc/clnt.c | 27 ++++++++++++++++++++++-----
> > net/sunrpc/svcsock.c | 3 ++-
> > net/sunrpc/xprt.c | 13 +++++++++++++
> > net/sunrpc/xprtsock.c | 3 ++-
> > 6 files changed, 45 insertions(+), 8 deletions(-)
> >
> > diff --git a/include/linux/sunrpc/xdr.h b/include/linux/sunrpc/xdr.h
> > index a20970e..172f81e 100644
> > --- a/include/linux/sunrpc/xdr.h
> > +++ b/include/linux/sunrpc/xdr.h
> > @@ -16,6 +16,7 @@
> > #include <asm/byteorder.h>
> > #include <asm/unaligned.h>
> > #include <linux/scatterlist.h>
> > +#include <linux/skbuff.h>
> >
> > /*
> > * Buffer adjustment
> > @@ -57,6 +58,7 @@ struct xdr_buf {
> > tail[1]; /* Appended after page data */
> >
> > struct page ** pages; /* Array of contiguous pages */
> > + struct skb_frag_destructor *destructor;
> > unsigned int page_base, /* Start of page data */
> > page_len, /* Length of page data */
> > flags; /* Flags for data disposition */
> > diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> > index 15518a1..75131eb 100644
> > --- a/include/linux/sunrpc/xprt.h
> > +++ b/include/linux/sunrpc/xprt.h
> > @@ -92,7 +92,10 @@ struct rpc_rqst {
> > /* A cookie used to track the
> > state of the transport
> > connection */
> > -
> > + struct skb_frag_destructor destructor; /* SKB paged fragment
> > + * destructor for
> > + * transmitted pages*/
> > +
> > /*
> > * Partial send handling
> > */
> > diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> > index c5347d2..919538d 100644
> > --- a/net/sunrpc/clnt.c
> > +++ b/net/sunrpc/clnt.c
> > @@ -61,6 +61,7 @@ static void call_reserve(struct rpc_task *task);
> > static void call_reserveresult(struct rpc_task *task);
> > static void call_allocate(struct rpc_task *task);
> > static void call_decode(struct rpc_task *task);
> > +static void call_complete(struct rpc_task *task);
> > static void call_bind(struct rpc_task *task);
> > static void call_bind_status(struct rpc_task *task);
> > static void call_transmit(struct rpc_task *task);
> > @@ -1113,6 +1114,8 @@ rpc_xdr_encode(struct rpc_task *task)
> > (char *)req->rq_buffer + req->rq_callsize,
> > req->rq_rcvsize);
> >
> > + req->rq_snd_buf.destructor = &req->destructor;
> > +
> > p = rpc_encode_header(task);
> > if (p == NULL) {
> > printk(KERN_INFO "RPC: couldn't encode RPC header, exit EIO\n");
> > @@ -1276,6 +1279,7 @@ call_connect_status(struct rpc_task *task)
> > static void
> > call_transmit(struct rpc_task *task)
> > {
> > + struct rpc_rqst *req = task->tk_rqstp;
> > dprint_status(task);
> >
> > task->tk_action = call_status;
> > @@ -1309,8 +1313,8 @@ call_transmit(struct rpc_task *task)
> > call_transmit_status(task);
> > if (rpc_reply_expected(task))
> > return;
> > - task->tk_action = rpc_exit_task;
> > - rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
> > + task->tk_action = call_complete;
> > + skb_frag_destructor_unref(&req->destructor);
> > }
> >
> > /*
> > @@ -1383,7 +1387,8 @@ call_bc_transmit(struct rpc_task *task)
> > return;
> > }
> >
> > - task->tk_action = rpc_exit_task;
> > + task->tk_action = call_complete;
> > + skb_frag_destructor_unref(&req->destructor);
> > if (task->tk_status < 0) {
> > printk(KERN_NOTICE "RPC: Could not send backchannel reply "
> > "error: %d\n", task->tk_status);
> > @@ -1423,7 +1428,6 @@ call_bc_transmit(struct rpc_task *task)
> > "error: %d\n", task->tk_status);
> > break;
> > }
> > - rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
> > }
> > #endif /* CONFIG_SUNRPC_BACKCHANNEL */
> >
> > @@ -1589,12 +1593,14 @@ call_decode(struct rpc_task *task)
> > return;
> > }
> >
> > - task->tk_action = rpc_exit_task;
> > + task->tk_action = call_complete;
> >
> > if (decode) {
> > task->tk_status = rpcauth_unwrap_resp(task, decode, req, p,
> > task->tk_msg.rpc_resp);
> > }
> > + rpc_sleep_on(&req->rq_xprt->pending, task, NULL);
> > + skb_frag_destructor_unref(&req->destructor);
> > dprintk("RPC: %5u call_decode result %d\n", task->tk_pid,
> > task->tk_status);
> > return;
> > @@ -1609,6 +1615,17 @@ out_retry:
> > }
> > }
> >
> > +/*
> > + * 8. Wait for pages to be released by the network stack.
> > + */
> > +static void
> > +call_complete(struct rpc_task *task)
> > +{
> > + dprintk("RPC: %5u call_complete result %d\n",
> > + task->tk_pid, task->tk_status);
> > + task->tk_action = rpc_exit_task;
> > +}
> > +
> > static __be32 *
> > rpc_encode_header(struct rpc_task *task)
> > {
> > diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
> > index 852a258..3685cad 100644
> > --- a/net/sunrpc/svcsock.c
> > +++ b/net/sunrpc/svcsock.c
> > @@ -196,7 +196,8 @@ int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
> > while (pglen > 0) {
> > if (slen == size)
> > flags = 0;
> > - result = kernel_sendpage(sock, *ppage, NULL, base, size, flags);
> > + result = kernel_sendpage(sock, *ppage, xdr->destructor,
> > + base, size, flags);
> > if (result > 0)
> > len += result;
> > if (result != size)
> > diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
> > index f4385e4..925aa0c 100644
> > --- a/net/sunrpc/xprt.c
> > +++ b/net/sunrpc/xprt.c
> > @@ -1103,6 +1103,16 @@ static inline void xprt_init_xid(struct rpc_xprt *xprt)
> > xprt->xid = net_random();
> > }
> >
> > +static int xprt_complete_skb_pages(void *calldata)
> > +{
> > + struct rpc_task *task = calldata;
> > + struct rpc_rqst *req = task->tk_rqstp;
> > +
> > + dprintk("RPC: %5u completing skb pages\n", task->tk_pid);
> > + rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
> > + return 0;
> > +}
> > +
> > static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
> > {
> > struct rpc_rqst *req = task->tk_rqstp;
> > @@ -1115,6 +1125,9 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
> > req->rq_xid = xprt_alloc_xid(xprt);
> > req->rq_release_snd_buf = NULL;
> > xprt_reset_majortimeo(req);
> > + atomic_set(&req->destructor.ref, 1);
> > + req->destructor.destroy = &xprt_complete_skb_pages;
> > + req->destructor.data = task;
> > dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
> > req, ntohl(req->rq_xid));
> > }
> > diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> > index f79e40e9..af3a106 100644
> > --- a/net/sunrpc/xprtsock.c
> > +++ b/net/sunrpc/xprtsock.c
> > @@ -408,7 +408,8 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
> > remainder -= len;
> > if (remainder != 0 || more)
> > flags |= MSG_MORE;
> > - err = sock->ops->sendpage(sock, *ppage, NULL, base, len, flags);
> > + err = sock->ops->sendpage(sock, *ppage, xdr->destructor,
> > + base, len, flags);
> > if (remainder == 0 || err != len)
> > break;
> > sent += err;
> > --
> > 1.7.2.5
> >
> > --
> > To unsubscribe from this list: send the line "unsubscribe netdev" in
> > the body of a message to majordomo@...r.kernel.org
> > More majordomo info at http://vger.kernel.org/majordomo-info.html
--
To unsubscribe from this list: send the line "unsubscribe netdev" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Powered by blists - more mailing lists