[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <CAF=yD-J99GuA4ndHU8aeQjNcLEZjhVZ7ViWcL8bW2jqVhStNXg@mail.gmail.com>
Date: Sun, 28 Jan 2018 14:56:54 +0100
From: Willem de Bruijn <willemdebruijn.kernel@...il.com>
To: Sowmini Varadhan <sowmini.varadhan@...cle.com>
Cc: Network Development <netdev@...r.kernel.org>,
David Miller <davem@...emloft.net>, rds-devel@....oracle.com,
santosh.shilimkar@...cle.com
Subject: Re: [PATCH net-next 4/7] rds: support for zcopy completion notification
On Wed, Jan 24, 2018 at 12:45 PM, Sowmini Varadhan
<sowmini.varadhan@...cle.com> wrote:
> RDS removes a datagram (rds_message) from the retransmit queue when
> an ACK is received. The ACK indicates that the receiver has queued
> the RDS datagram, so that the sender can safely forget the datagram.
> When all references to the rds_message are quiesced, rds_message_purge
> is called to release resources used by the rds_message
>
> If the datagram to be removed had pinned pages set up, add
> an entry to the rs->rs_znotify_queue so that the notifcation
> will be sent up via rds_rm_zerocopy_callback() when the
> rds_message is eventually freed by rds_message_purge.
>
> rds_rm_zerocopy_callback() attempts to batch the number of cookies
> sent with each notification to a max of SO_EE_ORIGIN_MAX_ZCOOKIES.
> Each time a cookie is released by rds_message_purge(), the
> rs_znotify_queue is checked to see if the MAX_ZCOOKIES batch limit
> has been exceeded (in which case we send up a notification). If the
> limit has not been exceeded, the cookie is added to the rs_znotify_queue
> and a timer is set up
An alternative that does not require a timer is to batch on the sk
error queue itself, like tcp zerocopy. That queues the first notification
skb on the error queue without any notification latency.
Then, if a subsequent notification comes in while another is pending
with < MAX zcookies, it coalesces the new notification onto the pending
skb and consumes the other. For RDS notifications, the implementation
is an extra skb_put + uint32_t assignment.
Optionally, the socket can trigger another sk_error_report on each
new notification.
> +static void rds_rm_zerocopy_callback(struct rds_sock *rs,
> + struct rds_znotifier *znotifier,
> + bool force)
> +{
> + struct sock *sk = rds_rs_to_sk(rs);
> + struct sk_buff *skb;
> + struct sock_exterr_skb *serr;
> + unsigned long flags;
> + u32 *ptr;
> + int ncookies = 0, i;
> + struct rds_znotifier *znotif, *ztmp, *first;
> + LIST_HEAD(tmp_list);
> +
> + spin_lock_irqsave(&rs->rs_lock, flags);
> + ncookies = rs->rs_ncookies;
> + if (ncookies < SO_EE_ORIGIN_MAX_ZCOOKIES && !force) {
> + if (znotifier) { /* add this cookie to the list and return */
can be checked before taking lock.
More importantly, when is this ever NULL? This function is a callback
for a zerocopy struct of type znotifier. Is it doing double duty to flush
any outstanding if znotifier == NULL && force == true? If so, the first
condition probably never occurs unless force == true and thus the
second is redundant.
> + list_add_tail(&znotifier->z_list,
> + &rs->rs_znotify_queue);
> + rs->rs_ncookies++;
> + }
> + spin_unlock_irqrestore(&rs->rs_lock, flags);
> + return;
> + }
> + if (!ncookies) { /* timer finds a reaped list */
> + spin_unlock_irqrestore(&rs->rs_lock, flags);
> + return;
> + }
> + /* reap existing cookie list if we have hit the max, then add
> + * new cookie to the list for next round of reaping.
> + */
> + list_splice(&rs->rs_znotify_queue, &tmp_list); /* reap now */
> + INIT_LIST_HEAD(&rs->rs_znotify_queue);
> + rs->rs_ncookies = 0;
> + if (znotifier) { /* for next round */
This adds unnecessary notification latency to delivery of current
notification. The latest notification can be appended to tmp_list and
sent up immediately.
> + list_add_tail(&znotifier->z_list, &rs->rs_znotify_queue);
> + rs->rs_ncookies++;
> + }
> + spin_unlock_irqrestore(&rs->rs_lock, flags);
> +
> + first = list_first_entry(&tmp_list, struct rds_znotifier, z_list);
> + znotif = list_next_entry(first, z_list);
> + list_del(&first->z_list);
> +
> + skb = rds_skb_from_znotifier(first);
> + ptr = skb_put(skb, ncookies * sizeof(u32));
> + i = 0;
> + ptr[i++] = first->z_cookie;
> +
> + list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list) {
> + list_del(&znotif->z_list);
> + ptr[i++] = znotif->z_cookie;
> + mm_unaccount_pinned_pages(&znotif->z_mmp);
> + consume_skb(rds_skb_from_znotifier(znotif));
> + }
> + WARN_ON(!list_empty(&tmp_list));
> +
> + serr = SKB_EXT_ERR(skb);
> + serr->ee.ee_errno = 0;
> + serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
> + serr->ee.ee_data = ncookies;
> + serr->ee.ee_info = 0;
> + serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;
> +
> + if (sock_queue_err_skb(sk, skb))
> + consume_skb(skb);
> +}
> +
> +void rs_zcopy_notify(struct timer_list *t)
> +{
> + struct rds_sock *rs = from_timer(rs, t, rs_cookie_timer);
> +
> + rds_rm_zerocopy_callback(rs, NULL, true);
> +}
> +
> /*
> * This relies on dma_map_sg() not touching sg[].page during merging.
> */
> static void rds_message_purge(struct rds_message *rm)
> {
> unsigned long i, flags;
> + bool zcopy = false;
>
> if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
> return;
>
> + spin_lock_irqsave(&rm->m_rs_lock, flags);
> + if (rm->data.op_mmp_znotifier && rm->m_rs) {
> + struct rds_sock *rs = rm->m_rs;
> +
> + zcopy = true;
> + rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier, false);
> + rm->data.op_mmp_znotifier = NULL;
> + (void)mod_timer(&rs->rs_cookie_timer, RDS_REAP_TIMEOUT);
Resetting timeout on each queued notification causes unbound
notification latency for previous notifications on the queue.
> +
> + sock_put(rds_rs_to_sk(rs));
> + rm->m_rs = NULL;
These two lines are now called only if znotifier is true, but
used to be called whenever rm->m_rs != NULL. Intentional?
> + }
> + spin_unlock_irqrestore(&rm->m_rs_lock, flags);
> +
> for (i = 0; i < rm->data.op_nents; i++) {
> rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
> /* XXX will have to put_page for page refs */
> - __free_page(sg_page(&rm->data.op_sg[i]));
> + if (!zcopy)
> + __free_page(sg_page(&rm->data.op_sg[i]));
> + else
> + put_page(sg_page(&rm->data.op_sg[i]));
> }
> rm->data.op_nents = 0;
> - spin_lock_irqsave(&rm->m_rs_lock, flags);
> - if (rm->m_rs) {
> - sock_put(rds_rs_to_sk(rm->m_rs));
> - rm->m_rs = NULL;
> - }
> - spin_unlock_irqrestore(&rm->m_rs_lock, flags);
>
> if (rm->rdma.op_active)
> rds_rdma_free_op(&rm->rdma);
Powered by blists - more mailing lists