[<prev] [next>] [<thread-prev] [day] [month] [year] [list]
Message-ID: <000201da29c8$2eeccd80$8cc66880$@wangsu.com>
Date: Fri, 8 Dec 2023 19:17:50 +0800
From: "Pengcheng Yang" <yangpc@...gsu.com>
To: "'John Fastabend'" <john.fastabend@...il.com>,
"'Jakub Sitnicki'" <jakub@...udflare.com>,
"'Eric Dumazet'" <edumazet@...gle.com>,
"'Jakub Kicinski'" <kuba@...nel.org>,
<bpf@...r.kernel.org>,
<netdev@...r.kernel.org>
Subject: Re: [PATCH bpf-next v2 1/3] skmsg: Support to get the data length in ingress_msg
John Fastabend <john.fastabend@...il.com> wrote:
>
> Pengcheng Yang wrote:
> > Currently msg is queued in ingress_msg of the target psock
> > on ingress redirect, without increment rcv_nxt. The size
> > that user can read includes the data in receive_queue and
> > ingress_msg. So we introduce sk_msg_queue_len() helper to
> > get the data length in ingress_msg.
> >
> > Note that the msg_len does not include the data length of
> > msg from recevive_queue via SK_PASS, as they increment rcv_nxt
> > when received.
> >
> > Signed-off-by: Pengcheng Yang <yangpc@...gsu.com>
> > ---
> > include/linux/skmsg.h | 26 ++++++++++++++++++++++++--
> > net/core/skmsg.c | 10 +++++++++-
> > 2 files changed, 33 insertions(+), 3 deletions(-)
> >
>
> This has two writers under different locks this looks insufficient
> to ensure correctness of the counter. Likely the consume can be
> moved into where the dequeue_msg happens? But, then its not always
> accurate which might break some applications doing buffer sizing.
> An example of this would be nginx.
>
> > diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h
> > index c1637515a8a4..423a5c28c606 100644
> > --- a/include/linux/skmsg.h
> > +++ b/include/linux/skmsg.h
> > @@ -47,6 +47,7 @@ struct sk_msg {
> > u32 apply_bytes;
> > u32 cork_bytes;
> > u32 flags;
> > + bool ingress_self;
> > struct sk_buff *skb;
> > struct sock *sk_redir;
> > struct sock *sk;
> > @@ -82,6 +83,7 @@ struct sk_psock {
> > u32 apply_bytes;
> > u32 cork_bytes;
> > u32 eval;
> > + u32 msg_len;
> > bool redir_ingress; /* undefined if sk_redir is null */
> > struct sk_msg *cork;
> > struct sk_psock_progs progs;
> > @@ -311,9 +313,11 @@ static inline void sk_psock_queue_msg(struct sk_psock *psock,
> > struct sk_msg *msg)
> > {
> > spin_lock_bh(&psock->ingress_lock);
> > - if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED))
> > + if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) {
> > list_add_tail(&msg->list, &psock->ingress_msg);
> > - else {
> > + if (!msg->ingress_self)
> > + WRITE_ONCE(psock->msg_len, psock->msg_len + msg->sg.size);
>
> First writer here can be from
>
> sk_psock_backlog()
> mutex_lock(psock->work_mutex)
> ...
> sk_psock_handle_skb()
> sk_psock_skb_ingress()
> sk_psock_skb_ingress_enqueue()
> sk_psock_queue_msg()
> spin_lock_bh(psock->ingress_lock)
> WRITE_ONCE(...)
> spin_unlock_bh()
>
> > + } else {
> > sk_msg_free(psock->sk, msg);
> > kfree(msg);
> > }
> > @@ -368,6 +372,24 @@ static inline void kfree_sk_msg(struct sk_msg *msg)
> > kfree(msg);
> > }
> >
> > +static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len)
> > +{
> > + WRITE_ONCE(psock->msg_len, psock->msg_len - len);
> > +}
> > +
> > +static inline u32 sk_msg_queue_len(const struct sock *sk)
> > +{
> > + struct sk_psock *psock;
> > + u32 len = 0;
> > +
> > + rcu_read_lock();
> > + psock = sk_psock(sk);
> > + if (psock)
> > + len = READ_ONCE(psock->msg_len);
> > + rcu_read_unlock();
> > + return len;
> > +}
> > +
> > static inline void sk_psock_report_error(struct sk_psock *psock, int err)
> > {
> > struct sock *sk = psock->sk;
> > diff --git a/net/core/skmsg.c b/net/core/skmsg.c
> > index 6c31eefbd777..f46732a8ddc2 100644
> > --- a/net/core/skmsg.c
> > +++ b/net/core/skmsg.c
> > @@ -415,7 +415,7 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
> > struct iov_iter *iter = &msg->msg_iter;
> > int peek = flags & MSG_PEEK;
> > struct sk_msg *msg_rx;
> > - int i, copied = 0;
> > + int i, copied = 0, msg_copied = 0;
> >
> > msg_rx = sk_psock_peek_msg(psock);
> > while (copied != len) {
> > @@ -441,6 +441,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
> > }
> >
> > copied += copy;
> > + if (!msg_rx->ingress_self)
> > + msg_copied += copy;
> > if (likely(!peek)) {
> > sge->offset += copy;
> > sge->length -= copy;
> > @@ -481,6 +483,8 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
> > msg_rx = sk_psock_peek_msg(psock);
> > }
> > out:
> > + if (likely(!peek) && msg_copied)
> > + sk_msg_queue_consumed(psock, msg_copied);
>
> Second writer,
>
> tcp_bpf_recvmsg_parser()
> lock_sock(sk)
> sk_msg_recvmsg()
> sk_psock_peek_msg()
> spin_lock_bh(ingress_lock); <- lock held from first writer.
> msg = list...
> spin_unlock_bh()
> sk_psock_Dequeue_msg(psock)
> spin_lock_bh(ingress_lock)
> msg = .... <- should call queue_consumed here
> spin_unlock_bh()
>
> out:
> if (likely(!peek) && msg_copied)
> sk_msg_queue_consumed(psock, msg_copied); <- here no lock?
>
>
> It looks like you could move the queue_consumed up into the dequeue_msg,
> but then you have some issue on partial reads I think? Basically the
> IOCTL might return more bytes than are actually in the ingress queue.
> Also it will look strange if the ioctl is called twice once before a read
> and again after a read and the byte count doesn't change.
>
Thanks john for pointing this out.
Yes, I tried to move queue_consumed into dequeue_msg without
making major changes to sk_msg_recvmsg, but failed.
> Maybe needs ingress queue lock wrapped around this queue consuned and
> leave it where it is? Couple ideas anyways, but I don't think its
> correct as is.
And, is it acceptable to just put the ingress_lock around the queue_consuned in
Sk_msg_recvmsg? Like the following:
static inline void sk_msg_queue_consumed(struct sk_psock *psock, u32 len)
{
+ spin_lock_bh(&psock->ingress_lock);
WRITE_ONCE(psock->msg_len, psock->msg_len - len);
+ spin_unlock_bh(&psock->ingress_lock);
}
static void __sk_psock_purge_ingress_msg(struct sk_psock *psock)
{
struct sk_msg *msg, *tmp;
list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) {
list_del(&msg->list);
if (!msg->ingress_self)
~ WRITE_ONCE(psock->msg_len, psock->msg_len - msg->sg.size);
sk_msg_free(psock->sk, msg);
kfree(msg);
}
WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0);
}
>
> > return copied;
> > }
> > EXPORT_SYMBOL_GPL(sk_msg_recvmsg);
> > @@ -602,6 +606,7 @@ static int sk_psock_skb_ingress_self(struct sk_psock *psock, struct sk_buff *skb
> >
> > if (unlikely(!msg))
> > return -EAGAIN;
> > + msg->ingress_self = true;
> > skb_set_owner_r(skb, sk);
> > err = sk_psock_skb_ingress_enqueue(skb, off, len, psock, sk, msg);
> > if (err < 0)
> > @@ -771,9 +776,12 @@ static void __sk_psock_purge_ingress_msg(struct sk_psock *psock)
> >
>
> purge doesn't use the ingress_lock because its cancelled and syncd the
> backlog and proto handlers have been swapped back to original handlers
> so there is no longer any way to get at the ingress queue from the socket
> side either.
>
> > list_for_each_entry_safe(msg, tmp, &psock->ingress_msg, list) {
> > list_del(&msg->list);
> > + if (!msg->ingress_self)
> > + sk_msg_queue_consumed(psock, msg->sg.size);
> > sk_msg_free(psock->sk, msg);
> > kfree(msg);
> > }
> > + WARN_ON_ONCE(READ_ONCE(psock->msg_len) != 0);
> > }
> >
> > static void __sk_psock_zap_ingress(struct sk_psock *psock)
> > --
> > 2.38.1
> >
Powered by blists - more mailing lists