lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [day] [month] [year] [list]
Message-ID: <20250530-definieren-minze-7be7a10b4354@brauner>
Date: Fri, 30 May 2025 07:08:45 +0200
From: Christian Brauner <brauner@...nel.org>
To: Nam Cao <namcao@...utronix.de>, 
	Frederic Weisbecker <frederic@...nel.org>
Cc: Alexander Viro <viro@...iv.linux.org.uk>, Jan Kara <jack@...e.cz>, 
	Sebastian Andrzej Siewior <bigeasy@...utronix.de>, John Ogness <john.ogness@...utronix.de>, 
	Clark Williams <clrkwllms@...nel.org>, Steven Rostedt <rostedt@...dmis.org>, 
	linux-fsdevel@...r.kernel.org, linux-kernel@...r.kernel.org, linux-rt-devel@...ts.linux.dev, 
	linux-rt-users@...r.kernel.org, Joe Damato <jdamato@...tly.com>, 
	Martin Karsten <mkarsten@...terloo.ca>, Jens Axboe <axboe@...nel.dk>, 
	Valentin Schneider <vschneid@...hat.com>
Subject: Re: [PATCH v3] eventpoll: Fix priority inversion problem

On Tue, May 27, 2025 at 11:08:36AM +0200, Nam Cao wrote:
> The ready event list of an epoll object is protected by read-write
> semaphore:
> 
>   - The consumer (waiter) acquires the write lock and takes items.
>   - the producer (waker) takes the read lock and adds items.
> 
> The point of this design is enabling epoll to scale well with large number
> of producers, as multiple producers can hold the read lock at the same
> time.
> 
> Unfortunately, this implementation may cause scheduling priority inversion
> problem. Suppose the consumer has higher scheduling priority than the
> producer. The consumer needs to acquire the write lock, but may be blocked
> by the producer holding the read lock. Since read-write semaphore does not
> support priority-boosting for the readers (even with CONFIG_PREEMPT_RT=y),
> we have a case of priority inversion: a higher priority consumer is blocked
> by a lower priority producer. This problem was reported in [1].
> 
> Furthermore, this could also cause stall problem, as described in [2].
> 
> To fix this problem, make the event list half-lockless:
> 
>   - The consumer acquires a mutex (ep->mtx) and takes items.
>   - The producer locklessly adds items to the list.
> 
> Performance is not the main goal of this patch, but as the producer now can
> add items without waiting for consumer to release the lock, performance
> improvement is observed using the stress test from
> https://github.com/rouming/test-tools/blob/master/stress-epoll.c. This is
> the same test that justified using read-write semaphore in the past.
> 
> Testing using 12 x86_64 CPUs:
> 
>           Before     After        Diff
> threads  events/ms  events/ms
>       8       6932      19753    +185%
>      16       7820      27923    +257%
>      32       7648      35164    +360%
>      64       9677      37780    +290%
>     128      11166      38174    +242%
> 
> Testing using 1 riscv64 CPU (averaged over 10 runs, as the numbers are
> noisy):
> 
>           Before     After        Diff
> threads  events/ms  events/ms
>       1         73        129     +77%
>       2        151        216     +43%
>       4        216        364     +69%
>       8        234        382     +63%
>      16        251        392     +56%
> 
> Reported-by: Frederic Weisbecker <frederic@...nel.org>
> Closes: https://lore.kernel.org/linux-rt-users/20210825132754.GA895675@lothringen/ [1]
> Reported-by: Valentin Schneider <vschneid@...hat.com>
> Closes: https://lore.kernel.org/linux-rt-users/xhsmhttqvnall.mognet@vschneid.remote.csb/ [2]
> Signed-off-by: Nam Cao <namcao@...utronix.de>
> ---
> v3:
>   - get rid of the "link_used" and "ready" flags. They are hard to
>     understand and unnecessary
>   - get rid of the obsolete lockdep_assert_irqs_enabled()
>   - Add lockdep_assert_held(&ep->mtx)
>   - rewrite some comments
> v2:
>   - rename link_locked -> link_used
>   - replace xchg() with smp_store_release() when applicable
>   - make sure llist_node is in clean state when not on a list
>   - remove now-unused list_add_tail_lockless()
> ---

Care to review this, Frederic?

>  fs/eventpoll.c | 458 +++++++++++++++----------------------------------
>  1 file changed, 134 insertions(+), 324 deletions(-)
> 
> diff --git a/fs/eventpoll.c b/fs/eventpoll.c
> index d4dbffdedd08e..a97a771a459c9 100644
> --- a/fs/eventpoll.c
> +++ b/fs/eventpoll.c
> @@ -137,13 +137,7 @@ struct epitem {
>  	};
>  
>  	/* List header used to link this structure to the eventpoll ready list */
> -	struct list_head rdllink;
> -
> -	/*
> -	 * Works together "struct eventpoll"->ovflist in keeping the
> -	 * single linked chain of items.
> -	 */
> -	struct epitem *next;
> +	struct llist_node rdllink;
>  
>  	/* The file descriptor information this item refers to */
>  	struct epoll_filefd ffd;
> @@ -191,22 +185,15 @@ struct eventpoll {
>  	/* Wait queue used by file->poll() */
>  	wait_queue_head_t poll_wait;
>  
> -	/* List of ready file descriptors */
> -	struct list_head rdllist;
> -
> -	/* Lock which protects rdllist and ovflist */
> -	rwlock_t lock;
> +	/*
> +	 * List of ready file descriptors. Adding to this list is lockless. Items can be removed
> +	 * only with eventpoll::mtx
> +	 */
> +	struct llist_head rdllist;
>  
>  	/* RB tree root used to store monitored fd structs */
>  	struct rb_root_cached rbr;
>  
> -	/*
> -	 * This is a single linked list that chains all the "struct epitem" that
> -	 * happened while transferring ready events to userspace w/out
> -	 * holding ->lock.
> -	 */
> -	struct epitem *ovflist;
> -
>  	/* wakeup_source used when ep_send_events or __ep_eventpoll_poll is running */
>  	struct wakeup_source *ws;
>  
> @@ -361,10 +348,14 @@ static inline int ep_cmp_ffd(struct epoll_filefd *p1,
>  	        (p1->file < p2->file ? -1 : p1->fd - p2->fd));
>  }
>  
> -/* Tells us if the item is currently linked */
> -static inline int ep_is_linked(struct epitem *epi)
> +/*
> + * Add the item to its container eventpoll's rdllist; do nothing if the item is already on rdllist.
> + */
> +static void epitem_ready(struct epitem *epi)
>  {
> -	return !list_empty(&epi->rdllink);
> +	if (&epi->rdllink == cmpxchg(&epi->rdllink.next, &epi->rdllink, NULL))
> +		llist_add(&epi->rdllink, &epi->ep->rdllist);
> +
>  }
>  
>  static inline struct eppoll_entry *ep_pwq_from_wait(wait_queue_entry_t *p)
> @@ -383,13 +374,26 @@ static inline struct epitem *ep_item_from_wait(wait_queue_entry_t *p)
>   *
>   * @ep: Pointer to the eventpoll context.
>   *
> - * Return: a value different than %zero if ready events are available,
> - *          or %zero otherwise.
> + * Return: true if ready events might be available, false otherwise.
>   */
> -static inline int ep_events_available(struct eventpoll *ep)
> +static inline bool ep_events_available(struct eventpoll *ep)
>  {
> -	return !list_empty_careful(&ep->rdllist) ||
> -		READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR;
> +	bool available;
> +	int locked;
> +
> +	locked = mutex_trylock(&ep->mtx);
> +	if (!locked) {
> +		/*
> +		 * The lock held and someone might have removed all items while inspecting it. The
> +		 * llist_empty() check in this case is futile. Assume that something is enqueued and
> +		 * let ep_try_send_events() figure it out.
> +		 */
> +		return true;
> +	}
> +
> +	available = !llist_empty(&ep->rdllist);
> +	mutex_unlock(&ep->mtx);
> +	return available;
>  }
>  
>  #ifdef CONFIG_NET_RX_BUSY_POLL
> @@ -724,77 +728,6 @@ static inline void ep_pm_stay_awake_rcu(struct epitem *epi)
>  	rcu_read_unlock();
>  }
>  
> -
> -/*
> - * ep->mutex needs to be held because we could be hit by
> - * eventpoll_release_file() and epoll_ctl().
> - */
> -static void ep_start_scan(struct eventpoll *ep, struct list_head *txlist)
> -{
> -	/*
> -	 * Steal the ready list, and re-init the original one to the
> -	 * empty list. Also, set ep->ovflist to NULL so that events
> -	 * happening while looping w/out locks, are not lost. We cannot
> -	 * have the poll callback to queue directly on ep->rdllist,
> -	 * because we want the "sproc" callback to be able to do it
> -	 * in a lockless way.
> -	 */
> -	lockdep_assert_irqs_enabled();
> -	write_lock_irq(&ep->lock);
> -	list_splice_init(&ep->rdllist, txlist);
> -	WRITE_ONCE(ep->ovflist, NULL);
> -	write_unlock_irq(&ep->lock);
> -}
> -
> -static void ep_done_scan(struct eventpoll *ep,
> -			 struct list_head *txlist)
> -{
> -	struct epitem *epi, *nepi;
> -
> -	write_lock_irq(&ep->lock);
> -	/*
> -	 * During the time we spent inside the "sproc" callback, some
> -	 * other events might have been queued by the poll callback.
> -	 * We re-insert them inside the main ready-list here.
> -	 */
> -	for (nepi = READ_ONCE(ep->ovflist); (epi = nepi) != NULL;
> -	     nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
> -		/*
> -		 * We need to check if the item is already in the list.
> -		 * During the "sproc" callback execution time, items are
> -		 * queued into ->ovflist but the "txlist" might already
> -		 * contain them, and the list_splice() below takes care of them.
> -		 */
> -		if (!ep_is_linked(epi)) {
> -			/*
> -			 * ->ovflist is LIFO, so we have to reverse it in order
> -			 * to keep in FIFO.
> -			 */
> -			list_add(&epi->rdllink, &ep->rdllist);
> -			ep_pm_stay_awake(epi);
> -		}
> -	}
> -	/*
> -	 * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
> -	 * releasing the lock, events will be queued in the normal way inside
> -	 * ep->rdllist.
> -	 */
> -	WRITE_ONCE(ep->ovflist, EP_UNACTIVE_PTR);
> -
> -	/*
> -	 * Quickly re-inject items left on "txlist".
> -	 */
> -	list_splice(txlist, &ep->rdllist);
> -	__pm_relax(ep->ws);
> -
> -	if (!list_empty(&ep->rdllist)) {
> -		if (waitqueue_active(&ep->wq))
> -			wake_up(&ep->wq);
> -	}
> -
> -	write_unlock_irq(&ep->lock);
> -}
> -
>  static void ep_get(struct eventpoll *ep)
>  {
>  	refcount_inc(&ep->refcount);
> @@ -832,10 +765,12 @@ static void ep_free(struct eventpoll *ep)
>  static bool __ep_remove(struct eventpoll *ep, struct epitem *epi, bool force)
>  {
>  	struct file *file = epi->ffd.file;
> +	struct llist_node *put_back_last;
>  	struct epitems_head *to_free;
>  	struct hlist_head *head;
> +	LLIST_HEAD(put_back);
>  
> -	lockdep_assert_irqs_enabled();
> +	lockdep_assert_held(&ep->mtx);
>  
>  	/*
>  	 * Removes poll wait queue hooks.
> @@ -867,10 +802,20 @@ static bool __ep_remove(struct eventpoll *ep, struct epitem *epi, bool force)
>  
>  	rb_erase_cached(&epi->rbn, &ep->rbr);
>  
> -	write_lock_irq(&ep->lock);
> -	if (ep_is_linked(epi))
> -		list_del_init(&epi->rdllink);
> -	write_unlock_irq(&ep->lock);
> +	if (llist_on_list(&epi->rdllink)) {
> +		put_back_last = NULL;
> +		while (true) {
> +			struct llist_node *n = llist_del_first(&ep->rdllist);
> +
> +			if (&epi->rdllink == n || WARN_ON(!n))
> +				break;
> +			if (!put_back_last)
> +				put_back_last = n;
> +			__llist_add(n, &put_back);
> +		}
> +		if (put_back_last)
> +			llist_add_batch(put_back.first, put_back_last, &ep->rdllist);
> +	}
>  
>  	wakeup_source_unregister(ep_wakeup_source(epi));
>  	/*
> @@ -974,8 +919,9 @@ static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt, int depth
>  static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int depth)
>  {
>  	struct eventpoll *ep = file->private_data;
> -	LIST_HEAD(txlist);
> -	struct epitem *epi, *tmp;
> +	struct wakeup_source *ws;
> +	struct llist_node *n;
> +	struct epitem *epi;
>  	poll_table pt;
>  	__poll_t res = 0;
>  
> @@ -989,22 +935,39 @@ static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int dep
>  	 * the ready list.
>  	 */
>  	mutex_lock_nested(&ep->mtx, depth);
> -	ep_start_scan(ep, &txlist);
> -	list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
> +	while (true) {
> +		n = llist_del_first_init(&ep->rdllist);
> +		if (!n)
> +			break;
> +
> +		epi = llist_entry(n, struct epitem, rdllink);
> +
>  		if (ep_item_poll(epi, &pt, depth + 1)) {
>  			res = EPOLLIN | EPOLLRDNORM;
> +			epitem_ready(epi);
>  			break;
>  		} else {
>  			/*
> -			 * Item has been dropped into the ready list by the poll
> -			 * callback, but it's not actually ready, as far as
> -			 * caller requested events goes. We can remove it here.
> +			 * We need to activate ep before deactivating epi, to prevent autosuspend
> +			 * just in case epi becomes active after ep_item_poll() above.
> +			 *
> +			 * This is similar to ep_send_events().
>  			 */
> +			ws = ep_wakeup_source(epi);
> +			if (ws) {
> +				if (ws->active)
> +					__pm_stay_awake(ep->ws);
> +				__pm_relax(ws);
> +			}
>  			__pm_relax(ep_wakeup_source(epi));
> -			list_del_init(&epi->rdllink);
> +
> +			/* Just in case epi becomes active right before __pm_relax() */
> +			if (unlikely(ep_item_poll(epi, &pt, depth + 1)))
> +				ep_pm_stay_awake(epi);
> +
> +			__pm_relax(ep->ws);
>  		}
>  	}
> -	ep_done_scan(ep, &txlist);
>  	mutex_unlock(&ep->mtx);
>  	return res;
>  }
> @@ -1153,12 +1116,10 @@ static int ep_alloc(struct eventpoll **pep)
>  		return -ENOMEM;
>  
>  	mutex_init(&ep->mtx);
> -	rwlock_init(&ep->lock);
>  	init_waitqueue_head(&ep->wq);
>  	init_waitqueue_head(&ep->poll_wait);
> -	INIT_LIST_HEAD(&ep->rdllist);
> +	init_llist_head(&ep->rdllist);
>  	ep->rbr = RB_ROOT_CACHED;
> -	ep->ovflist = EP_UNACTIVE_PTR;
>  	ep->user = get_current_user();
>  	refcount_set(&ep->refcount, 1);
>  
> @@ -1240,94 +1201,11 @@ struct file *get_epoll_tfile_raw_ptr(struct file *file, int tfd,
>  }
>  #endif /* CONFIG_KCMP */
>  
> -/*
> - * Adds a new entry to the tail of the list in a lockless way, i.e.
> - * multiple CPUs are allowed to call this function concurrently.
> - *
> - * Beware: it is necessary to prevent any other modifications of the
> - *         existing list until all changes are completed, in other words
> - *         concurrent list_add_tail_lockless() calls should be protected
> - *         with a read lock, where write lock acts as a barrier which
> - *         makes sure all list_add_tail_lockless() calls are fully
> - *         completed.
> - *
> - *        Also an element can be locklessly added to the list only in one
> - *        direction i.e. either to the tail or to the head, otherwise
> - *        concurrent access will corrupt the list.
> - *
> - * Return: %false if element has been already added to the list, %true
> - * otherwise.
> - */
> -static inline bool list_add_tail_lockless(struct list_head *new,
> -					  struct list_head *head)
> -{
> -	struct list_head *prev;
> -
> -	/*
> -	 * This is simple 'new->next = head' operation, but cmpxchg()
> -	 * is used in order to detect that same element has been just
> -	 * added to the list from another CPU: the winner observes
> -	 * new->next == new.
> -	 */
> -	if (!try_cmpxchg(&new->next, &new, head))
> -		return false;
> -
> -	/*
> -	 * Initially ->next of a new element must be updated with the head
> -	 * (we are inserting to the tail) and only then pointers are atomically
> -	 * exchanged.  XCHG guarantees memory ordering, thus ->next should be
> -	 * updated before pointers are actually swapped and pointers are
> -	 * swapped before prev->next is updated.
> -	 */
> -
> -	prev = xchg(&head->prev, new);
> -
> -	/*
> -	 * It is safe to modify prev->next and new->prev, because a new element
> -	 * is added only to the tail and new->next is updated before XCHG.
> -	 */
> -
> -	prev->next = new;
> -	new->prev = prev;
> -
> -	return true;
> -}
> -
> -/*
> - * Chains a new epi entry to the tail of the ep->ovflist in a lockless way,
> - * i.e. multiple CPUs are allowed to call this function concurrently.
> - *
> - * Return: %false if epi element has been already chained, %true otherwise.
> - */
> -static inline bool chain_epi_lockless(struct epitem *epi)
> -{
> -	struct eventpoll *ep = epi->ep;
> -
> -	/* Fast preliminary check */
> -	if (epi->next != EP_UNACTIVE_PTR)
> -		return false;
> -
> -	/* Check that the same epi has not been just chained from another CPU */
> -	if (cmpxchg(&epi->next, EP_UNACTIVE_PTR, NULL) != EP_UNACTIVE_PTR)
> -		return false;
> -
> -	/* Atomically exchange tail */
> -	epi->next = xchg(&ep->ovflist, epi);
> -
> -	return true;
> -}
> -
>  /*
>   * This is the callback that is passed to the wait queue wakeup
>   * mechanism. It is called by the stored file descriptors when they
>   * have events to report.
>   *
> - * This callback takes a read lock in order not to contend with concurrent
> - * events from another file descriptor, thus all modifications to ->rdllist
> - * or ->ovflist are lockless.  Read lock is paired with the write lock from
> - * ep_start/done_scan(), which stops all list modifications and guarantees
> - * that lists state is seen correctly.
> - *
>   * Another thing worth to mention is that ep_poll_callback() can be called
>   * concurrently for the same @epi from different CPUs if poll table was inited
>   * with several wait queues entries.  Plural wakeup from different CPUs of a
> @@ -1337,15 +1215,11 @@ static inline bool chain_epi_lockless(struct epitem *epi)
>   */
>  static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
>  {
> -	int pwake = 0;
>  	struct epitem *epi = ep_item_from_wait(wait);
>  	struct eventpoll *ep = epi->ep;
>  	__poll_t pollflags = key_to_poll(key);
> -	unsigned long flags;
>  	int ewake = 0;
>  
> -	read_lock_irqsave(&ep->lock, flags);
> -
>  	ep_set_busy_poll_napi_id(epi);
>  
>  	/*
> @@ -1355,7 +1229,7 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
>  	 * until the next EPOLL_CTL_MOD will be issued.
>  	 */
>  	if (!(epi->event.events & ~EP_PRIVATE_BITS))
> -		goto out_unlock;
> +		goto out;
>  
>  	/*
>  	 * Check the events coming with the callback. At this stage, not
> @@ -1364,22 +1238,10 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
>  	 * test for "key" != NULL before the event match test.
>  	 */
>  	if (pollflags && !(pollflags & epi->event.events))
> -		goto out_unlock;
> +		goto out;
>  
> -	/*
> -	 * If we are transferring events to userspace, we can hold no locks
> -	 * (because we're accessing user memory, and because of linux f_op->poll()
> -	 * semantics). All the events that happen during that period of time are
> -	 * chained in ep->ovflist and requeued later on.
> -	 */
> -	if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
> -		if (chain_epi_lockless(epi))
> -			ep_pm_stay_awake_rcu(epi);
> -	} else if (!ep_is_linked(epi)) {
> -		/* In the usual case, add event to ready list. */
> -		if (list_add_tail_lockless(&epi->rdllink, &ep->rdllist))
> -			ep_pm_stay_awake_rcu(epi);
> -	}
> +	ep_pm_stay_awake_rcu(epi);
> +	epitem_ready(epi);
>  
>  	/*
>  	 * Wake up ( if active ) both the eventpoll wait list and the ->poll()
> @@ -1408,15 +1270,9 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
>  			wake_up(&ep->wq);
>  	}
>  	if (waitqueue_active(&ep->poll_wait))
> -		pwake++;
> -
> -out_unlock:
> -	read_unlock_irqrestore(&ep->lock, flags);
> -
> -	/* We have to call this outside the lock */
> -	if (pwake)
>  		ep_poll_safewake(ep, epi, pollflags & EPOLL_URING_WAKE);
>  
> +out:
>  	if (!(epi->event.events & EPOLLEXCLUSIVE))
>  		ewake = 1;
>  
> @@ -1661,8 +1517,6 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
>  	if (is_file_epoll(tfile))
>  		tep = tfile->private_data;
>  
> -	lockdep_assert_irqs_enabled();
> -
>  	if (unlikely(percpu_counter_compare(&ep->user->epoll_watches,
>  					    max_user_watches) >= 0))
>  		return -ENOSPC;
> @@ -1674,11 +1528,10 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
>  	}
>  
>  	/* Item initialization follow here ... */
> -	INIT_LIST_HEAD(&epi->rdllink);
> +	init_llist_node(&epi->rdllink);
>  	epi->ep = ep;
>  	ep_set_ffd(&epi->ffd, tfile, fd);
>  	epi->event = *event;
> -	epi->next = EP_UNACTIVE_PTR;
>  
>  	if (tep)
>  		mutex_lock_nested(&tep->mtx, 1);
> @@ -1745,16 +1598,13 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
>  		return -ENOMEM;
>  	}
>  
> -	/* We have to drop the new item inside our item list to keep track of it */
> -	write_lock_irq(&ep->lock);
> -
>  	/* record NAPI ID of new item if present */
>  	ep_set_busy_poll_napi_id(epi);
>  
>  	/* If the file is already "ready" we drop it inside the ready list */
> -	if (revents && !ep_is_linked(epi)) {
> -		list_add_tail(&epi->rdllink, &ep->rdllist);
> +	if (revents) {
>  		ep_pm_stay_awake(epi);
> +		epitem_ready(epi);
>  
>  		/* Notify waiting tasks that events are available */
>  		if (waitqueue_active(&ep->wq))
> @@ -1763,8 +1613,6 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
>  			pwake++;
>  	}
>  
> -	write_unlock_irq(&ep->lock);
> -
>  	/* We have to call this outside the lock */
>  	if (pwake)
>  		ep_poll_safewake(ep, NULL, 0);
> @@ -1779,11 +1627,8 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
>  static int ep_modify(struct eventpoll *ep, struct epitem *epi,
>  		     const struct epoll_event *event)
>  {
> -	int pwake = 0;
>  	poll_table pt;
>  
> -	lockdep_assert_irqs_enabled();
> -
>  	init_poll_funcptr(&pt, NULL);
>  
>  	/*
> @@ -1827,24 +1672,16 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi,
>  	 * list, push it inside.
>  	 */
>  	if (ep_item_poll(epi, &pt, 1)) {
> -		write_lock_irq(&ep->lock);
> -		if (!ep_is_linked(epi)) {
> -			list_add_tail(&epi->rdllink, &ep->rdllist);
> -			ep_pm_stay_awake(epi);
> +		ep_pm_stay_awake(epi);
> +		epitem_ready(epi);
>  
> -			/* Notify waiting tasks that events are available */
> -			if (waitqueue_active(&ep->wq))
> -				wake_up(&ep->wq);
> -			if (waitqueue_active(&ep->poll_wait))
> -				pwake++;
> -		}
> -		write_unlock_irq(&ep->lock);
> +		/* Notify waiting tasks that events are available */
> +		if (waitqueue_active(&ep->wq))
> +			wake_up(&ep->wq);
> +		if (waitqueue_active(&ep->poll_wait))
> +			ep_poll_safewake(ep, NULL, 0);
>  	}
>  
> -	/* We have to call this outside the lock */
> -	if (pwake)
> -		ep_poll_safewake(ep, NULL, 0);
> -
>  	return 0;
>  }
>  
> @@ -1852,7 +1689,7 @@ static int ep_send_events(struct eventpoll *ep,
>  			  struct epoll_event __user *events, int maxevents)
>  {
>  	struct epitem *epi, *tmp;
> -	LIST_HEAD(txlist);
> +	LLIST_HEAD(txlist);
>  	poll_table pt;
>  	int res = 0;
>  
> @@ -1867,19 +1704,18 @@ static int ep_send_events(struct eventpoll *ep,
>  	init_poll_funcptr(&pt, NULL);
>  
>  	mutex_lock(&ep->mtx);
> -	ep_start_scan(ep, &txlist);
>  
> -	/*
> -	 * We can loop without lock because we are passed a task private list.
> -	 * Items cannot vanish during the loop we are holding ep->mtx.
> -	 */
> -	list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
> +	while (res < maxevents) {
>  		struct wakeup_source *ws;
> +		struct llist_node *n;
>  		__poll_t revents;
>  
> -		if (res >= maxevents)
> +		n = llist_del_first(&ep->rdllist);
> +		if (!n)
>  			break;
>  
> +		epi = llist_entry(n, struct epitem, rdllink);
> +
>  		/*
>  		 * Activate ep->ws before deactivating epi->ws to prevent
>  		 * triggering auto-suspend here (in case we reactive epi->ws
> @@ -1896,21 +1732,30 @@ static int ep_send_events(struct eventpoll *ep,
>  			__pm_relax(ws);
>  		}
>  
> -		list_del_init(&epi->rdllink);
> -
>  		/*
>  		 * If the event mask intersect the caller-requested one,
>  		 * deliver the event to userspace. Again, we are holding ep->mtx,
>  		 * so no operations coming from userspace can change the item.
>  		 */
>  		revents = ep_item_poll(epi, &pt, 1);
> -		if (!revents)
> +		if (!revents) {
> +			init_llist_node(n);
> +
> +			/*
> +			 * Just in case epi becomes ready after ep_item_poll() above, but before
> +			 * init_llist_node(). Make sure to add it to the ready list, otherwise an
> +			 * event may be lost.
> +			 */
> +			if (unlikely(ep_item_poll(epi, &pt, 1))) {
> +				ep_pm_stay_awake(epi);
> +				epitem_ready(epi);
> +			}
>  			continue;
> +		}
>  
>  		events = epoll_put_uevent(revents, epi->event.data, events);
>  		if (!events) {
> -			list_add(&epi->rdllink, &txlist);
> -			ep_pm_stay_awake(epi);
> +			llist_add(&epi->rdllink, &ep->rdllist);
>  			if (!res)
>  				res = -EFAULT;
>  			break;
> @@ -1918,25 +1763,31 @@ static int ep_send_events(struct eventpoll *ep,
>  		res++;
>  		if (epi->event.events & EPOLLONESHOT)
>  			epi->event.events &= EP_PRIVATE_BITS;
> -		else if (!(epi->event.events & EPOLLET)) {
> +		__llist_add(n, &txlist);
> +	}
> +
> +	llist_for_each_entry_safe(epi, tmp, txlist.first, rdllink) {
> +		init_llist_node(&epi->rdllink);
> +
> +		if (!(epi->event.events & EPOLLET)) {
>  			/*
> -			 * If this file has been added with Level
> -			 * Trigger mode, we need to insert back inside
> -			 * the ready list, so that the next call to
> -			 * epoll_wait() will check again the events
> -			 * availability. At this point, no one can insert
> -			 * into ep->rdllist besides us. The epoll_ctl()
> -			 * callers are locked out by
> -			 * ep_send_events() holding "mtx" and the
> -			 * poll callback will queue them in ep->ovflist.
> +			 * If this file has been added with Level Trigger mode, we need to insert
> +			 * back inside the ready list, so that the next call to epoll_wait() will
> +			 * check again the events availability.
>  			 */
> -			list_add_tail(&epi->rdllink, &ep->rdllist);
>  			ep_pm_stay_awake(epi);
> +			epitem_ready(epi);
>  		}
>  	}
> -	ep_done_scan(ep, &txlist);
> +
> +	__pm_relax(ep->ws);
>  	mutex_unlock(&ep->mtx);
>  
> +	if (!llist_empty(&ep->rdllist)) {
> +		if (waitqueue_active(&ep->wq))
> +			wake_up(&ep->wq);
> +	}
> +
>  	return res;
>  }
>  
> @@ -2029,8 +1880,6 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
>  	wait_queue_entry_t wait;
>  	ktime_t expires, *to = NULL;
>  
> -	lockdep_assert_irqs_enabled();
> -
>  	if (timeout && (timeout->tv_sec | timeout->tv_nsec)) {
>  		slack = select_estimate_accuracy(timeout);
>  		to = &expires;
> @@ -2090,54 +1939,15 @@ static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
>  		init_wait(&wait);
>  		wait.func = ep_autoremove_wake_function;
>  
> -		write_lock_irq(&ep->lock);
> -		/*
> -		 * Barrierless variant, waitqueue_active() is called under
> -		 * the same lock on wakeup ep_poll_callback() side, so it
> -		 * is safe to avoid an explicit barrier.
> -		 */
> -		__set_current_state(TASK_INTERRUPTIBLE);
> +		prepare_to_wait_exclusive(&ep->wq, &wait, TASK_INTERRUPTIBLE);
>  
> -		/*
> -		 * Do the final check under the lock. ep_start/done_scan()
> -		 * plays with two lists (->rdllist and ->ovflist) and there
> -		 * is always a race when both lists are empty for short
> -		 * period of time although events are pending, so lock is
> -		 * important.
> -		 */
> -		eavail = ep_events_available(ep);
> -		if (!eavail)
> -			__add_wait_queue_exclusive(&ep->wq, &wait);
> -
> -		write_unlock_irq(&ep->lock);
> -
> -		if (!eavail)
> +		if (!ep_events_available(ep))
>  			timed_out = !ep_schedule_timeout(to) ||
>  				!schedule_hrtimeout_range(to, slack,
>  							  HRTIMER_MODE_ABS);
> -		__set_current_state(TASK_RUNNING);
> -
> -		/*
> -		 * We were woken up, thus go and try to harvest some events.
> -		 * If timed out and still on the wait queue, recheck eavail
> -		 * carefully under lock, below.
> -		 */
> -		eavail = 1;
>  
> -		if (!list_empty_careful(&wait.entry)) {
> -			write_lock_irq(&ep->lock);
> -			/*
> -			 * If the thread timed out and is not on the wait queue,
> -			 * it means that the thread was woken up after its
> -			 * timeout expired before it could reacquire the lock.
> -			 * Thus, when wait.entry is empty, it needs to harvest
> -			 * events.
> -			 */
> -			if (timed_out)
> -				eavail = list_empty(&wait.entry);
> -			__remove_wait_queue(&ep->wq, &wait);
> -			write_unlock_irq(&ep->lock);
> -		}
> +		finish_wait(&ep->wq, &wait);
> +		eavail = ep_events_available(ep);
>  	}
>  }
>  
> -- 
> 2.39.5
> 

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ