[<prev] [next>] [<thread-prev] [day] [month] [year] [list]
Message-ID: <20250530-definieren-minze-7be7a10b4354@brauner>
Date: Fri, 30 May 2025 07:08:45 +0200
From: Christian Brauner <brauner@...nel.org>
To: Nam Cao <namcao@...utronix.de>,
Frederic Weisbecker <frederic@...nel.org>
Cc: Alexander Viro <viro@...iv.linux.org.uk>, Jan Kara <jack@...e.cz>,
Sebastian Andrzej Siewior <bigeasy@...utronix.de>, John Ogness <john.ogness@...utronix.de>,
Clark Williams <clrkwllms@...nel.org>, Steven Rostedt <rostedt@...dmis.org>,
linux-fsdevel@...r.kernel.org, linux-kernel@...r.kernel.org, linux-rt-devel@...ts.linux.dev,
linux-rt-users@...r.kernel.org, Joe Damato <jdamato@...tly.com>,
Martin Karsten <mkarsten@...terloo.ca>, Jens Axboe <axboe@...nel.dk>,
Valentin Schneider <vschneid@...hat.com>
Subject: Re: [PATCH v3] eventpoll: Fix priority inversion problem
On Tue, May 27, 2025 at 11:08:36AM +0200, Nam Cao wrote:
> The ready event list of an epoll object is protected by read-write
> semaphore:
>
> - The consumer (waiter) acquires the write lock and takes items.
> - the producer (waker) takes the read lock and adds items.
>
> The point of this design is enabling epoll to scale well with large number
> of producers, as multiple producers can hold the read lock at the same
> time.
>
> Unfortunately, this implementation may cause scheduling priority inversion
> problem. Suppose the consumer has higher scheduling priority than the
> producer. The consumer needs to acquire the write lock, but may be blocked
> by the producer holding the read lock. Since read-write semaphore does not
> support priority-boosting for the readers (even with CONFIG_PREEMPT_RT=y),
> we have a case of priority inversion: a higher priority consumer is blocked
> by a lower priority producer. This problem was reported in [1].
>
> Furthermore, this could also cause stall problem, as described in [2].
>
> To fix this problem, make the event list half-lockless:
>
> - The consumer acquires a mutex (ep->mtx) and takes items.
> - The producer locklessly adds items to the list.
>
> Performance is not the main goal of this patch, but as the producer now can
> add items without waiting for consumer to release the lock, performance
> improvement is observed using the stress test from
> https://github.com/rouming/test-tools/blob/master/stress-epoll.c. This is
> the same test that justified using read-write semaphore in the past.
>
> Testing using 12 x86_64 CPUs:
>
> Before After Diff
> threads events/ms events/ms
> 8 6932 19753 +185%
> 16 7820 27923 +257%
> 32 7648 35164 +360%
> 64 9677 37780 +290%
> 128 11166 38174 +242%
>
> Testing using 1 riscv64 CPU (averaged over 10 runs, as the numbers are
> noisy):
>
> Before After Diff
> threads events/ms events/ms
> 1 73 129 +77%
> 2 151 216 +43%
> 4 216 364 +69%
> 8 234 382 +63%
> 16 251 392 +56%
>
> Reported-by: Frederic Weisbecker <frederic@...nel.org>
> Closes: https://lore.kernel.org/linux-rt-users/20210825132754.GA895675@lothringen/ [1]
> Reported-by: Valentin Schneider <vschneid@...hat.com>
> Closes: https://lore.kernel.org/linux-rt-users/xhsmhttqvnall.mognet@vschneid.remote.csb/ [2]
> Signed-off-by: Nam Cao <namcao@...utronix.de>
> ---
> v3:
> - get rid of the "link_used" and "ready" flags. They are hard to
> understand and unnecessary
> - get rid of the obsolete lockdep_assert_irqs_enabled()
> - Add lockdep_assert_held(&ep->mtx)
> - rewrite some comments
> v2:
> - rename link_locked -> link_used
> - replace xchg() with smp_store_release() when applicable
> - make sure llist_node is in clean state when not on a list
> - remove now-unused list_add_tail_lockless()
> ---
Care to review this, Frederic?
> fs/eventpoll.c | 458 +++++++++++++++----------------------------------
> 1 file changed, 134 insertions(+), 324 deletions(-)
>
> diff --git a/fs/eventpoll.c b/fs/eventpoll.c
> index d4dbffdedd08e..a97a771a459c9 100644
> --- a/fs/eventpoll.c
> +++ b/fs/eventpoll.c
> @@ -137,13 +137,7 @@ struct epitem {
> };
>
> /* List header used to link this structure to the eventpoll ready list */
> - struct list_head rdllink;
> -
> - /*
> - * Works together "struct eventpoll"->ovflist in keeping the
> - * single linked chain of items.
> - */
> - struct epitem *next;
> + struct llist_node rdllink;
>
> /* The file descriptor information this item refers to */
> struct epoll_filefd ffd;
> @@ -191,22 +185,15 @@ struct eventpoll {
> /* Wait queue used by file->poll() */
> wait_queue_head_t poll_wait;
>
> - /* List of ready file descriptors */
> - struct list_head rdllist;
> -
> - /* Lock which protects rdllist and ovflist */
> - rwlock_t lock;
> + /*
> + * List of ready file descriptors. Adding to this list is lockless. Items can be removed
> + * only with eventpoll::mtx
> + */
> + struct llist_head rdllist;
>
> /* RB tree root used to store monitored fd structs */
> struct rb_root_cached rbr;
>
> - /*
> - * This is a single linked list that chains all the "struct epitem" that
> - * happened while transferring ready events to userspace w/out
> - * holding ->lock.
> - */
> - struct epitem *ovflist;
> -
> /* wakeup_source used when ep_send_events or __ep_eventpoll_poll is running */
> struct wakeup_source *ws;
>
> @@ -361,10 +348,14 @@ static inline int ep_cmp_ffd(struct epoll_filefd *p1,
> (p1->file < p2->file ? -1 : p1->fd - p2->fd));
> }
>
> -/* Tells us if the item is currently linked */
> -static inline int ep_is_linked(struct epitem *epi)
> +/*
> + * Add the item to its container eventpoll's rdllist; do nothing if the item is already on rdllist.
> + */
> +static void epitem_ready(struct epitem *epi)
> {
> - return !list_empty(&epi->rdllink);
> + if (&epi->rdllink == cmpxchg(&epi->rdllink.next, &epi->rdllink, NULL))
> + llist_add(&epi->rdllink, &epi->ep->rdllist);
> +
> }
>
> static inline struct eppoll_entry *ep_pwq_from_wait(wait_queue_entry_t *p)
> @@ -383,13 +374,26 @@ static inline struct epitem *ep_item_from_wait(wait_queue_entry_t *p)
> *
> * @ep: Pointer to the eventpoll context.
> *
> - * Return: a value different than %zero if ready events are available,
> - * or %zero otherwise.
> + * Return: true if ready events might be available, false otherwise.
> */
> -static inline int ep_events_available(struct eventpoll *ep)
> +static inline bool ep_events_available(struct eventpoll *ep)
> {
> - return !list_empty_careful(&ep->rdllist) ||
> - READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
> + bool available;
> + int locked;
> +
> + locked = mutex_trylock(&ep->mtx);
> + if (!locked) {
> + /*
> + * The lock held and someone might have removed all items while inspecting it. The
> + * llist_empty() check in this case is futile. Assume that something is enqueued and
> + * let ep_try_send_events() figure it out.
> + */
> + return true;
> + }
> +
> + available = !llist_empty(&ep->rdllist);
> + mutex_unlock(&ep->mtx);
> + return available;
> }
>
> #ifdef CONFIG_NET_RX_BUSY_POLL
> @@ -724,77 +728,6 @@ static inline void ep_pm_stay_awake_rcu(struct epitem *epi)
> rcu_read_unlock();
> }
>
> -
> -/*
> - * ep->mutex needs to be held because we could be hit by
> - * eventpoll_release_file() and epoll_ctl().
> - */
> -static void ep_start_scan(struct eventpoll *ep, struct list_head *txlist)
> -{
> - /*
> - * Steal the ready list, and re-init the original one to the
> - * empty list. Also, set ep->ovflist to NULL so that events
> - * happening while looping w/out locks, are not lost. We cannot
> - * have the poll callback to queue directly on ep->rdllist,
> - * because we want the "sproc" callback to be able to do it
> - * in a lockless way.
> - */
> - lockdep_assert_irqs_enabled();
> - write_lock_irq(&ep->lock);
> - list_splice_init(&ep->rdllist, txlist);
> - WRITE_ONCE(ep->ovflist, NULL);
> - write_unlock_irq(&ep->lock);
> -}
> -
> -static void ep_done_scan(struct eventpoll *ep,
> - struct list_head *txlist)
> -{
> - struct epitem *epi, *nepi;
> -
> - write_lock_irq(&ep->lock);
> - /*
> - * During the time we spent inside the "sproc" callback, some
> - * other events might have been queued by the poll callback.
> - * We re-insert them inside the main ready-list here.
> - */
> - for (nepi = READ_ONCE(ep->ovflist); (epi = nepi) != NULL;
> - nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
> - /*
> - * We need to check if the item is already in the list.
> - * During the "sproc" callback execution time, items are
> - * queued into ->ovflist but the "txlist" might already
> - * contain them, and the list_splice() below takes care of them.
> - */
> - if (!ep_is_linked(epi)) {
> - /*
> - * ->ovflist is LIFO, so we have to reverse it in order
> - * to keep in FIFO.
> - */
> - list_add(&epi->rdllink, &ep->rdllist);
> - ep_pm_stay_awake(epi);
> - }
> - }
> - /*
> - * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
> - * releasing the lock, events will be queued in the normal way inside
> - * ep->rdllist.
> - */
> - WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);
> -
> - /*
> - * Quickly re-inject items left on "txlist".
> - */
> - list_splice(txlist, &ep->rdllist);
> - __pm_relax(ep->ws);
> -
> - if (!list_empty(&ep->rdllist)) {
> - if (waitqueue_active(&ep->wq))
> - wake_up(&ep->wq);
> - }
> -
> - write_unlock_irq(&ep->lock);
> -}
> -
> static void ep_get(struct eventpoll *ep)
> {
> refcount_inc(&ep->refcount);
> @@ -832,10 +765,12 @@ static void ep_free(struct eventpoll *ep)
> static bool __ep_remove(struct eventpoll *ep, struct epitem *epi, bool force)
> {
> struct file *file = epi->ffd.file;
> + struct llist_node *put_back_last;
> struct epitems_head *to_free;
> struct hlist_head *head;
> + LLIST_HEAD(put_back);
>
> - lockdep_assert_irqs_enabled();
> + lockdep_assert_held(&ep->mtx);
>
> /*
> * Removes poll wait queue hooks.
> @@ -867,10 +802,20 @@ static bool __ep_remove(struct eventpoll *ep, struct epitem *epi, bool force)
>
> rb_erase_cached(&epi->rbn, &ep->rbr);
>
> - write_lock_irq(&ep->lock);
> - if (ep_is_linked(epi))
> - list_del_init(&epi->rdllink);
> - write_unlock_irq(&ep->lock);
> + if (llist_on_list(&epi->rdllink)) {
> + put_back_last = NULL;
> + while (true) {
> + struct llist_node *n = llist_del_first(&ep->rdllist);
> +
> + if (&epi->rdllink == n || WARN_ON(!n))
> + break;
> + if (!put_back_last)
> + put_back_last = n;
> + __llist_add(n, &put_back);
> + }
> + if (put_back_last)
> + llist_add_batch(put_back.first, put_back_last, &ep->rdllist);
> + }
>
> wakeup_source_unregister(ep_wakeup_source(epi));
> /*
> @@ -974,8 +919,9 @@ static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt, int depth
> static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int depth)
> {
> struct eventpoll *ep = file->private_data;
> - LIST_HEAD(txlist);
> - struct epitem *epi, *tmp;
> + struct wakeup_source *ws;
> + struct llist_node *n;
> + struct epitem *epi;
> poll_table pt;
> __poll_t res = 0;
>
> @@ -989,22 +935,39 @@ static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int dep
> * the ready list.
> */
> mutex_lock_nested(&ep->mtx, depth);
> - ep_start_scan(ep, &txlist);
> - list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
> + while (true) {
> + n = llist_del_first_init(&ep->rdllist);
> + if (!n)
> + break;
> +
> + epi = llist_entry(n, struct epitem, rdllink);
> +
> if (ep_item_poll(epi, &pt, depth + 1)) {
> res = EPOLLIN | EPOLLRDNORM;
> + epitem_ready(epi);
> break;
> } else {
> /*
> - * Item has been dropped into the ready list by the poll
> - * callback, but it's not actually ready, as far as
> - * caller requested events goes. We can remove it here.
> + * We need to activate ep before deactivating epi, to prevent autosuspend
> + * just in case epi becomes active after ep_item_poll() above.
> + *
> + * This is similar to ep_send_events().
> */
> + ws = ep_wakeup_source(epi);
> + if (ws) {
> + if (ws->active)
> + __pm_stay_awake(ep->ws);
> + __pm_relax(ws);
> + }
> __pm_relax(ep_wakeup_source(epi));
> - list_del_init(&epi->rdllink);
> +
> + /* Just in case epi becomes active right before __pm_relax() */
> + if (unlikely(ep_item_poll(epi, &pt, depth + 1)))
> + ep_pm_stay_awake(epi);
> +
> + __pm_relax(ep->ws);
> }
> }
> - ep_done_scan(ep, &txlist);
> mutex_unlock(&ep->mtx);
> return res;
> }
> @@ -1153,12 +1116,10 @@ static int ep_alloc(struct eventpoll **pep)
> return -ENOMEM;
>
> mutex_init(&ep->mtx);
> - rwlock_init(&ep->lock);
> init_waitqueue_head(&ep->wq);
> init_waitqueue_head(&ep->poll_wait);
> - INIT_LIST_HEAD(&ep->rdllist);
> + init_llist_head(&ep->rdllist);
> ep->rbr = RB_ROOT_CACHED;
> - ep->ovflist = EP_UNACTIVE_PTR;
> ep->user = get_current_user();
> refcount_set(&ep->refcount, 1);
>
> @@ -1240,94 +1201,11 @@ struct file *get_epoll_tfile_raw_ptr(struct file *file, int tfd,
> }
> #endif /* CONFIG_KCMP */
>
> -/*
> - * Adds a new entry to the tail of the list in a lockless way, i.e.
> - * multiple CPUs are allowed to call this function concurrently.
> - *
> - * Beware: it is necessary to prevent any other modifications of the
> - * existing list until all changes are completed, in other words
> - * concurrent list_add_tail_lockless() calls should be protected
> - * with a read lock, where write lock acts as a barrier which
> - * makes sure all list_add_tail_lockless() calls are fully
> - * completed.
> - *
> - * Also an element can be locklessly added to the list only in one
> - * direction i.e. either to the tail or to the head, otherwise
> - * concurrent access will corrupt the list.
> - *
> - * Return: %false if element has been already added to the list, %true
> - * otherwise.
> - */
> -static inline bool list_add_tail_lockless(struct list_head *new,
> - struct list_head *head)
> -{
> - struct list_head *prev;
> -
> - /*
> - * This is simple 'new->next = head' operation, but cmpxchg()
> - * is used in order to detect that same element has been just
> - * added to the list from another CPU: the winner observes
> - * new->next == new.
> - */
> - if (!try_cmpxchg(&new->next, &new, head))
> - return false;
> -
> - /*
> - * Initially ->next of a new element must be updated with the head
> - * (we are inserting to the tail) and only then pointers are atomically
> - * exchanged. XCHG guarantees memory ordering, thus ->next should be
> - * updated before pointers are actually swapped and pointers are
> - * swapped before prev->next is updated.
> - */
> -
> - prev = xchg(&head->prev, new);
> -
> - /*
> - * It is safe to modify prev->next and new->prev, because a new element
> - * is added only to the tail and new->next is updated before XCHG.
> - */
> -
> - prev->next = new;
> - new->prev = prev;
> -
> - return true;
> -}
> -
> -/*
> - * Chains a new epi entry to the tail of the ep->ovflist in a lockless way,
> - * i.e. multiple CPUs are allowed to call this function concurrently.
> - *
> - * Return: %false if epi element has been already chained, %true otherwise.
> - */
> -static inline bool chain_epi_lockless(struct epitem *epi)
> -{
> - struct eventpoll *ep = epi->ep;
> -
> - /* Fast preliminary check */
> - if (epi->next != EP_UNACTIVE_PTR)
> - return false;
> -
> - /* Check that the same epi has not been just chained from another CPU */
> - if (cmpxchg(&epi->next, EP_UNACTIVE_PTR, NULL) != EP_UNACTIVE_PTR)
> - return false;
> -
> - /* Atomically exchange tail */
> - epi->next = xchg(&ep->ovflist, epi);
> -
> - return true;
> -}
> -
> /*
> * This is the callback that is passed to the wait queue wakeup
> * mechanism. It is called by the stored file descriptors when they
> * have events to report.
> *
> - * This callback takes a read lock in order not to contend with concurrent
> - * events from another file descriptor, thus all modifications to ->rdllist
> - * or ->ovflist are lockless. Read lock is paired with the write lock from
> - * ep_start/done_scan(), which stops all list modifications and guarantees
> - * that lists state is seen correctly.
> - *
> * Another thing worth to mention is that ep_poll_callback() can be called
> * concurrently for the same @epi from different CPUs if poll table was inited
> * with several wait queues entries. Plural wakeup from different CPUs of a
> @@ -1337,15 +1215,11 @@ static inline bool chain_epi_lockless(struct epitem *epi)
> */
> static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
> {
> - int pwake = 0;
> struct epitem *epi = ep_item_from_wait(wait);
> struct eventpoll *ep = epi->ep;
> __poll_t pollflags = key_to_poll(key);
> - unsigned long flags;
> int ewake = 0;
>
> - read_lock_irqsave(&ep->lock, flags);
> -
> ep_set_busy_poll_napi_id(epi);
>
> /*
> @@ -1355,7 +1229,7 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
> * until the next EPOLL_CTL_MOD will be issued.
> */
> if (!(epi->event.events & ~EP_PRIVATE_BITS))
> - goto out_unlock;
> + goto out;
>
> /*
> * Check the events coming with the callback. At this stage, not
> @@ -1364,22 +1238,10 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
> * test for "key" != NULL before the event match test.
> */
> if (pollflags && !(pollflags & epi->event.events))
> - goto out_unlock;
> + goto out;
>
> - /*
> - * If we are transferring events to userspace, we can hold no locks
> - * (because we're accessing user memory, and because of linux f_op->poll()
> - * semantics). All the events that happen during that period of time are
> - * chained in ep->ovflist and requeued later on.
> - */
> - if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
> - if (chain_epi_lockless(epi))
> - ep_pm_stay_awake_rcu(epi);
> - } else if (!ep_is_linked(epi)) {
> - /* In the usual case, add event to ready list. */
> - if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist))
> - ep_pm_stay_awake_rcu(epi);
> - }
> + ep_pm_stay_awake_rcu(epi);
> + epitem_ready(epi);
>
> /*
> * Wake up ( if active ) both the eventpoll wait list and the ->poll()
> @@ -1408,15 +1270,9 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
> wake_up(&ep->wq);
> }
> if (waitqueue_active(&ep->poll_wait))
> - pwake++;
> -
> -out_unlock:
> - read_unlock_irqrestore(&ep->lock, flags);
> -
> - /* We have to call this outside the lock */
> - if (pwake)
> ep_poll_safewake(ep, epi, pollflags & EPOLL_URING_WAKE);
>
> +out:
> if (!(epi->event.events & EPOLLEXCLUSIVE))
> ewake = 1;
>
> @@ -1661,8 +1517,6 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
> if (is_file_epoll(tfile))
> tep = tfile->private_data;
>
> - lockdep_assert_irqs_enabled();
> -
> if (unlikely(percpu_counter_compare(&ep->user->epoll_watches,
> max_user_watches) >= 0))
> return -ENOSPC;
> @@ -1674,11 +1528,10 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
> }
>
> /* Item initialization follow here ... */
> - INIT_LIST_HEAD(&epi->rdllink);
> + init_llist_node(&epi->rdllink);
> epi->ep = ep;
> ep_set_ffd(&epi->ffd, tfile, fd);
> epi->event = *event;
> - epi->next = EP_UNACTIVE_PTR;
>
> if (tep)
> mutex_lock_nested(&tep->mtx, 1);
> @@ -1745,16 +1598,13 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
> return -ENOMEM;
> }
>
> - /* We have to drop the new item inside our item list to keep track of it */
> - write_lock_irq(&ep->lock);
> -
> /* record NAPI ID of new item if present */
> ep_set_busy_poll_napi_id(epi);
>
> /* If the file is already "ready" we drop it inside the ready list */
> - if (revents && !ep_is_linked(epi)) {
> - list_add_tail(&epi->rdllink, &ep->rdllist);
> + if (revents) {
> ep_pm_stay_awake(epi);
> + epitem_ready(epi);
>
> /* Notify waiting tasks that events are available */
> if (waitqueue_active(&ep->wq))
> @@ -1763,8 +1613,6 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
> pwake++;
> }
>
> - write_unlock_irq(&ep->lock);
> -
> /* We have to call this outside the lock */
> if (pwake)
> ep_poll_safewake(ep, NULL, 0);
> @@ -1779,11 +1627,8 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
> static int ep_modify(struct eventpoll *ep, struct epitem *epi,
> const struct epoll_event *event)
> {
> - int pwake = 0;
> poll_table pt;
>
> - lockdep_assert_irqs_enabled();
> -
> init_poll_funcptr(&pt, NULL);
>
> /*
> @@ -1827,24 +1672,16 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi,
> * list, push it inside.
> */
> if (ep_item_poll(epi, &pt, 1)) {
> - write_lock_irq(&ep->lock);
> - if (!ep_is_linked(epi)) {
> - list_add_tail(&epi->rdllink, &ep->rdllist);
> - ep_pm_stay_awake(epi);
> + ep_pm_stay_awake(epi);
> + epitem_ready(epi);
>
> - /* Notify waiting tasks that events are available */
> - if (waitqueue_active(&ep->wq))
> - wake_up(&ep->wq);
> - if (waitqueue_active(&ep->poll_wait))
> - pwake++;
> - }
> - write_unlock_irq(&ep->lock);
> + /* Notify waiting tasks that events are available */
> + if (waitqueue_active(&ep->wq))
> + wake_up(&ep->wq);
> + if (waitqueue_active(&ep->poll_wait))
> + ep_poll_safewake(ep, NULL, 0);
> }
>
> - /* We have to call this outside the lock */
> - if (pwake)
> - ep_poll_safewake(ep, NULL, 0);
> -
> return 0;
> }
>
> @@ -1852,7 +1689,7 @@ static int ep_send_events(struct eventpoll *ep,
> struct epoll_event __user *events, int maxevents)
> {
> struct epitem *epi, *tmp;
> - LIST_HEAD(txlist);
> + LLIST_HEAD(txlist);
> poll_table pt;
> int res = 0;
>
> @@ -1867,19 +1704,18 @@ static int ep_send_events(struct eventpoll *ep,
> init_poll_funcptr(&pt, NULL);
>
> mutex_lock(&ep->mtx);
> - ep_start_scan(ep, &txlist);
>
> - /*
> - * We can loop without lock because we are passed a task private list.
> - * Items cannot vanish during the loop we are holding ep->mtx.
> - */
> - list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
> + while (res < maxevents) {
> struct wakeup_source *ws;
> + struct llist_node *n;
> __poll_t revents;
>
> - if (res >= maxevents)
> + n = llist_del_first(&ep->rdllist);
> + if (!n)
> break;
>
> + epi = llist_entry(n, struct epitem, rdllink);
> +
> /*
> * Activate ep->ws before deactivating epi->ws to prevent
> * triggering auto-suspend here (in case we reactive epi->ws
> @@ -1896,21 +1732,30 @@ static int ep_send_events(struct eventpoll *ep,
> __pm_relax(ws);
> }
>
> - list_del_init(&epi->rdllink);
> -
> /*
> * If the event mask intersect the caller-requested one,
> * deliver the event to userspace. Again, we are holding ep->mtx,
> * so no operations coming from userspace can change the item.
> */
> revents = ep_item_poll(epi, &pt, 1);
> - if (!revents)
> + if (!revents) {
> + init_llist_node(n);
> +
> + /*
> + * Just in case epi becomes ready after ep_item_poll() above, but before
> + * init_llist_node(). Make sure to add it to the ready list, otherwise an
> + * event may be lost.
> + */
> + if (unlikely(ep_item_poll(epi, &pt, 1))) {
> + ep_pm_stay_awake(epi);
> + epitem_ready(epi);
> + }
> continue;
> + }
>
> events = epoll_put_uevent(revents, epi->event.data, events);
> if (!events) {
> - list_add(&epi->rdllink, &txlist);
> - ep_pm_stay_awake(epi);
> + llist_add(&epi->rdllink, &ep->rdllist);
> if (!res)
> res = -EFAULT;
> break;
> @@ -1918,25 +1763,31 @@ static int ep_send_events(struct eventpoll *ep,
> res++;
> if (epi->event.events & EPOLLONESHOT)
> epi->event.events &= EP_PRIVATE_BITS;
> - else if (!(epi->event.events & EPOLLET)) {
> + __llist_add(n, &txlist);
> + }
> +
> + llist_for_each_entry_safe(epi, tmp, txlist.first, rdllink) {
> + init_llist_node(&epi->rdllink);
> +
> + if (!(epi->event.events & EPOLLET)) {
> /*
> - * If this file has been added with Level
> - * Trigger mode, we need to insert back inside
> - * the ready list, so that the next call to
> - * epoll_wait() will check again the events
> - * availability. At this point, no one can insert
> - * into ep->rdllist besides us. The epoll_ctl()
> - * callers are locked out by
> - * ep_send_events() holding "mtx" and the
> - * poll callback will queue them in ep->ovflist.
> + * If this file has been added with Level Trigger mode, we need to insert
> + * back inside the ready list, so that the next call to epoll_wait() will
> + * check again the events availability.
> */
> - list_add_tail(&epi->rdllink, &ep->rdllist);
> ep_pm_stay_awake(epi);
> + epitem_ready(epi);
> }
> }
> - ep_done_scan(ep, &txlist);
> +
> + __pm_relax(ep->ws);
> mutex_unlock(&ep->mtx);
>
> + if (!llist_empty(&ep->rdllist)) {
> + if (waitqueue_active(&ep->wq))
> + wake_up(&ep->wq);
> + }
> +
> return res;
> }
>
> @@ -2029,8 +1880,6 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
> wait_queue_entry_t wait;
> ktime_t expires, *to = NULL;
>
> - lockdep_assert_irqs_enabled();
> -
> if (timeout && (timeout->tv_sec | timeout->tv_nsec)) {
> slack = select_estimate_accuracy(timeout);
> to = &expires;
> @@ -2090,54 +1939,15 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
> init_wait(&wait);
> wait.func = ep_autoremove_wake_function;
>
> - write_lock_irq(&ep->lock);
> - /*
> - * Barrierless variant, waitqueue_active() is called under
> - * the same lock on wakeup ep_poll_callback() side, so it
> - * is safe to avoid an explicit barrier.
> - */
> - __set_current_state(TASK_INTERRUPTIBLE);
> + prepare_to_wait_exclusive(&ep->wq, &wait, TASK_INTERRUPTIBLE);
>
> - /*
> - * Do the final check under the lock. ep_start/done_scan()
> - * plays with two lists (->rdllist and ->ovflist) and there
> - * is always a race when both lists are empty for short
> - * period of time although events are pending, so lock is
> - * important.
> - */
> - eavail = ep_events_available(ep);
> - if (!eavail)
> - __add_wait_queue_exclusive(&ep->wq, &wait);
> -
> - write_unlock_irq(&ep->lock);
> -
> - if (!eavail)
> + if (!ep_events_available(ep))
> timed_out = !ep_schedule_timeout(to) ||
> !schedule_hrtimeout_range(to, slack,
> HRTIMER_MODE_ABS);
> - __set_current_state(TASK_RUNNING);
> -
> - /*
> - * We were woken up, thus go and try to harvest some events.
> - * If timed out and still on the wait queue, recheck eavail
> - * carefully under lock, below.
> - */
> - eavail = 1;
>
> - if (!list_empty_careful(&wait.entry)) {
> - write_lock_irq(&ep->lock);
> - /*
> - * If the thread timed out and is not on the wait queue,
> - * it means that the thread was woken up after its
> - * timeout expired before it could reacquire the lock.
> - * Thus, when wait.entry is empty, it needs to harvest
> - * events.
> - */
> - if (timed_out)
> - eavail = list_empty(&wait.entry);
> - __remove_wait_queue(&ep->wq, &wait);
> - write_unlock_irq(&ep->lock);
> - }
> + finish_wait(&ep->wq, &wait);
> + eavail = ep_events_available(ep);
> }
> }
>
> --
> 2.39.5
>
Powered by blists - more mailing lists