lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite for Android: free password hash cracker in your pocket
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Date:   Sun, 28 Jan 2018 14:56:54 +0100
From:   Willem de Bruijn <willemdebruijn.kernel@...il.com>
To:     Sowmini Varadhan <sowmini.varadhan@...cle.com>
Cc:     Network Development <netdev@...r.kernel.org>,
        David Miller <davem@...emloft.net>, rds-devel@....oracle.com,
        santosh.shilimkar@...cle.com
Subject: Re: [PATCH net-next 4/7] rds: support for zcopy completion notification

On Wed, Jan 24, 2018 at 12:45 PM, Sowmini Varadhan
<sowmini.varadhan@...cle.com> wrote:
> RDS removes a datagram (rds_message) from the retransmit queue when
> an ACK is received. The ACK indicates that the receiver has queued
> the RDS datagram, so that the sender can safely forget the datagram.
> When all references to the rds_message are quiesced, rds_message_purge
> is called to release resources used by the rds_message
>
> If the datagram to be removed had pinned pages set up, add
> an entry to the rs->rs_znotify_queue so that the notifcation
> will be sent up via rds_rm_zerocopy_callback() when the
> rds_message is eventually freed by rds_message_purge.
>
> rds_rm_zerocopy_callback() attempts to batch the number of cookies
> sent with each notification  to a max of SO_EE_ORIGIN_MAX_ZCOOKIES.
> Each time a cookie is released by rds_message_purge(), the
> rs_znotify_queue is checked to see if the MAX_ZCOOKIES batch limit
> has been exceeded (in which case we send up a notification). If the
> limit has not been exceeded, the cookie is added to the rs_znotify_queue
> and a timer is set up

An alternative that does not require a timer is to batch on the sk
error queue itself, like tcp zerocopy. That queues the first notification
skb on the error queue without any notification latency.

Then, if a subsequent notification comes in while another is pending
with < MAX zcookies, it coalesces the new notification onto the pending
skb and consumes the other. For RDS notifications, the implementation
is an extra skb_put + uint32_t assignment.

Optionally, the socket can trigger another sk_error_report on each
new notification.

> +static void rds_rm_zerocopy_callback(struct rds_sock *rs,
> +                                    struct rds_znotifier *znotifier,
> +                                    bool force)
> +{
> +       struct sock *sk = rds_rs_to_sk(rs);
> +       struct sk_buff *skb;
> +       struct sock_exterr_skb *serr;
> +       unsigned long flags;
> +       u32 *ptr;
> +       int ncookies = 0, i;
> +       struct rds_znotifier *znotif, *ztmp, *first;
> +       LIST_HEAD(tmp_list);
> +
> +       spin_lock_irqsave(&rs->rs_lock, flags);
> +       ncookies = rs->rs_ncookies;
> +       if (ncookies < SO_EE_ORIGIN_MAX_ZCOOKIES && !force) {
> +               if (znotifier) { /* add this cookie to the list and return */

can be checked before taking lock.

More importantly, when is this ever NULL? This function is a callback
for a zerocopy struct of type znotifier. Is it doing double duty to flush
any outstanding if znotifier == NULL && force == true? If so, the first
condition probably never occurs unless force == true and thus the
second is redundant.

> +                       list_add_tail(&znotifier->z_list,
> +                                     &rs->rs_znotify_queue);
> +                       rs->rs_ncookies++;
> +               }
> +               spin_unlock_irqrestore(&rs->rs_lock, flags);
> +               return;
> +       }
> +       if (!ncookies) { /* timer finds a reaped list */
> +               spin_unlock_irqrestore(&rs->rs_lock, flags);
> +               return;
> +       }
> +       /* reap existing cookie list if we have hit the max, then add
> +        * new cookie to the list for next round of reaping.
> +        */
> +       list_splice(&rs->rs_znotify_queue, &tmp_list); /* reap now */
> +       INIT_LIST_HEAD(&rs->rs_znotify_queue);
> +       rs->rs_ncookies = 0;
> +       if (znotifier) { /* for next round */

This adds unnecessary notification latency to delivery of current
notification. The latest notification can be appended to tmp_list and
sent up immediately.

> +               list_add_tail(&znotifier->z_list, &rs->rs_znotify_queue);
> +               rs->rs_ncookies++;
> +       }
> +       spin_unlock_irqrestore(&rs->rs_lock, flags);
> +
> +       first = list_first_entry(&tmp_list, struct rds_znotifier, z_list);
> +       znotif = list_next_entry(first, z_list);
> +       list_del(&first->z_list);
> +
> +       skb = rds_skb_from_znotifier(first);
> +       ptr = skb_put(skb, ncookies * sizeof(u32));
> +       i = 0;
> +       ptr[i++] = first->z_cookie;
> +
> +       list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list) {
> +               list_del(&znotif->z_list);
> +               ptr[i++] = znotif->z_cookie;
> +               mm_unaccount_pinned_pages(&znotif->z_mmp);
> +               consume_skb(rds_skb_from_znotifier(znotif));
> +       }
> +       WARN_ON(!list_empty(&tmp_list));
> +
> +       serr = SKB_EXT_ERR(skb);
> +       serr->ee.ee_errno = 0;
> +       serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
> +       serr->ee.ee_data = ncookies;
> +       serr->ee.ee_info = 0;
> +       serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;
> +
> +       if (sock_queue_err_skb(sk, skb))
> +               consume_skb(skb);
> +}
> +
> +void rs_zcopy_notify(struct timer_list *t)
> +{
> +       struct rds_sock *rs = from_timer(rs, t, rs_cookie_timer);
> +
> +       rds_rm_zerocopy_callback(rs, NULL, true);
> +}
> +
>  /*
>   * This relies on dma_map_sg() not touching sg[].page during merging.
>   */
>  static void rds_message_purge(struct rds_message *rm)
>  {
>         unsigned long i, flags;
> +       bool zcopy = false;
>
>         if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
>                 return;
>
> +       spin_lock_irqsave(&rm->m_rs_lock, flags);
> +       if (rm->data.op_mmp_znotifier && rm->m_rs) {
> +               struct rds_sock *rs = rm->m_rs;
> +
> +               zcopy = true;
> +               rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier, false);
> +               rm->data.op_mmp_znotifier = NULL;
> +               (void)mod_timer(&rs->rs_cookie_timer, RDS_REAP_TIMEOUT);

Resetting timeout on each queued notification causes unbound
notification latency for previous notifications on the queue.

> +
> +               sock_put(rds_rs_to_sk(rs));
> +               rm->m_rs = NULL;

These two lines are now called only if znotifier is true, but
used to be called whenever rm->m_rs != NULL. Intentional?

> +       }
> +       spin_unlock_irqrestore(&rm->m_rs_lock, flags);
> +
>         for (i = 0; i < rm->data.op_nents; i++) {
>                 rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
>                 /* XXX will have to put_page for page refs */
> -               __free_page(sg_page(&rm->data.op_sg[i]));
> +               if (!zcopy)
> +                       __free_page(sg_page(&rm->data.op_sg[i]));
> +               else
> +                       put_page(sg_page(&rm->data.op_sg[i]));
>         }
>         rm->data.op_nents = 0;
> -       spin_lock_irqsave(&rm->m_rs_lock, flags);
> -       if (rm->m_rs) {
> -               sock_put(rds_rs_to_sk(rm->m_rs));
> -               rm->m_rs = NULL;
> -       }
> -       spin_unlock_irqrestore(&rm->m_rs_lock, flags);
>
>         if (rm->rdma.op_active)
>                 rds_rdma_free_op(&rm->rdma);

Powered by blists - more mailing lists