lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Date:   Sat, 24 Sep 2022 18:56:26 -0400
From:   Joel Fernandes <joel@...lfernandes.org>
To:     paulmck@...nel.org
Cc:     rcu@...r.kernel.org, linux-kernel@...r.kernel.org,
        rushikesh.s.kadam@...el.com, urezki@...il.com,
        neeraj.iitr10@...il.com, frederic@...nel.org, rostedt@...dmis.org
Subject: Re: [PATCH v6 1/4] rcu: Make call_rcu() lazy to save power



> On Sep 24, 2022, at 5:11 PM, Paul E. McKenney <paulmck@...nel.org> wrote:
> 
> On Sat, Sep 24, 2022 at 12:20:00PM -0400, Joel Fernandes wrote:
>> Hi Paul,
>> Let’s see whether my iPhone can handle replies ;-)
> 
> Now let's see if it can handle replies to replies!  Are you using a
> keyboard, or small-screen finger gestures?

Haha here it comes. It’s the usual onscreen
keyboard which has finger gesture support
and is quite fast to type with ;-). It’s also
connected to my apple watch so I can reply
from there but I’ll take it one step a time ;-)

>> More below:
>> 
>>>> On Sep 23, 2022, at 5:44 PM, Paul E. McKenney <paulmck@...nel.org> wrote:
>>> 
>>> On Thu, Sep 22, 2022 at 10:01:01PM +0000, Joel Fernandes (Google) wrote:
>>>> Implement timer-based RCU lazy callback batching. The batch is flushed
>>>> whenever a certain amount of time has passed, or the batch on a
>>>> particular CPU grows too big. Also memory pressure will flush it in a
>>>> future patch.
>>>> 
>>>> To handle several corner cases automagically (such as rcu_barrier() and
>>>> hotplug), we re-use bypass lists to handle lazy CBs. The bypass list
>>>> length has the lazy CB length included in it. A separate lazy CB length
>>>> counter is also introduced to keep track of the number of lazy CBs.
>>>> 
>>>> v5->v6:
>>>> 
>>>> [ Frederic Weisbec: Program the lazy timer only if WAKE_NOT, since other
>>>> deferral levels wake much earlier so for those it is not needed. ]
>>>> 
>>>> [ Frederic Weisbec: Use flush flags to keep bypass API code clean. ]
>>>> 
>>>> [ Frederic Weisbec: Make rcu_barrier() wake up only if main list empty. ]
>>>> 
>>>> [ Frederic Weisbec: Remove extra 'else if' branch in rcu_nocb_try_bypass(). ]
>>>> 
>>>> [ Joel: Fix issue where I was not resetting lazy_len after moving it to rdp ]
>>>> 
>>>> [ Paul/Thomas/Joel: Make call_rcu() default lazy so users don't mess up. ]
>>>> 
>>>> Suggested-by: Paul McKenney <paulmck@...nel.org>
>>>> Signed-off-by: Joel Fernandes (Google) <joel@...lfernandes.org>
>>> 
>>> I am going to put these on a local branch for testing, but a few comments
>>> and questions interspersed below.
>>> 
>>>                           Thanx, Paul
>> 
>> Ok I replied below, thank you!
>> 
>>>> include/linux/rcupdate.h |   7 ++
>>>> kernel/rcu/Kconfig       |   8 ++
>>>> kernel/rcu/rcu.h         |   8 ++
>>>> kernel/rcu/tiny.c        |   2 +-
>>>> kernel/rcu/tree.c        | 133 ++++++++++++++++++----------
>>>> kernel/rcu/tree.h        |  17 +++-
>>>> kernel/rcu/tree_exp.h    |   2 +-
>>>> kernel/rcu/tree_nocb.h   | 184 ++++++++++++++++++++++++++++++++-------
>>>> 8 files changed, 277 insertions(+), 84 deletions(-)
>>>> 
>>>> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
>>>> index 08605ce7379d..40ae36904825 100644
>>>> --- a/include/linux/rcupdate.h
>>>> +++ b/include/linux/rcupdate.h
>>>> @@ -108,6 +108,13 @@ static inline int rcu_preempt_depth(void)
>>>> 
>>>> #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
>>>> 
>>>> +#ifdef CONFIG_RCU_LAZY
>>>> +void call_rcu_flush(struct rcu_head *head, rcu_callback_t func);
>>>> +#else
>>>> +static inline void call_rcu_flush(struct rcu_head *head,
>>>> +        rcu_callback_t func) {  call_rcu(head, func); }
>>>> +#endif
>>>> +
>>>> /* Internal to kernel */
>>>> void rcu_init(void);
>>>> extern int rcu_scheduler_active;
>>>> diff --git a/kernel/rcu/Kconfig b/kernel/rcu/Kconfig
>>>> index f53ad63b2bc6..edd632e68497 100644
>>>> --- a/kernel/rcu/Kconfig
>>>> +++ b/kernel/rcu/Kconfig
>>>> @@ -314,4 +314,12 @@ config TASKS_TRACE_RCU_READ_MB
>>>>     Say N here if you hate read-side memory barriers.
>>>>     Take the default if you are unsure.
>>>> 
>>>> +config RCU_LAZY
>>>> +    bool "RCU callback lazy invocation functionality"
>>>> +    depends on RCU_NOCB_CPU
>>>> +    default n
>>>> +    help
>>>> +      To save power, batch RCU callbacks and flush after delay, memory
>>>> +      pressure or callback list growing too big.
>>>> +
>>>> endmenu # "RCU Subsystem"
>>>> diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
>>>> index be5979da07f5..65704cbc9df7 100644
>>>> --- a/kernel/rcu/rcu.h
>>>> +++ b/kernel/rcu/rcu.h
>>>> @@ -474,6 +474,14 @@ enum rcutorture_type {
>>>>   INVALID_RCU_FLAVOR
>>>> };
>>>> 
>>>> +#if defined(CONFIG_RCU_LAZY)
>>>> +unsigned long rcu_lazy_get_jiffies_till_flush(void);
>>>> +void rcu_lazy_set_jiffies_till_flush(unsigned long j);
>>>> +#else
>>>> +static inline unsigned long rcu_lazy_get_jiffies_till_flush(void) { return 0; }
>>>> +static inline void rcu_lazy_set_jiffies_till_flush(unsigned long j) { }
>>>> +#endif
>>>> +
>>>> #if defined(CONFIG_TREE_RCU)
>>>> void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags,
>>>>               unsigned long *gp_seq);
>>>> diff --git a/kernel/rcu/tiny.c b/kernel/rcu/tiny.c
>>>> index a33a8d4942c3..810479cf17ba 100644
>>>> --- a/kernel/rcu/tiny.c
>>>> +++ b/kernel/rcu/tiny.c
>>>> @@ -44,7 +44,7 @@ static struct rcu_ctrlblk rcu_ctrlblk = {
>>>> 
>>>> void rcu_barrier(void)
>>>> {
>>>> -    wait_rcu_gp(call_rcu);
>>>> +    wait_rcu_gp(call_rcu_flush);
>>>> }
>>>> EXPORT_SYMBOL(rcu_barrier);
>>>> 
>>>> diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
>>>> index 5ec97e3f7468..736d0d724207 100644
>>>> --- a/kernel/rcu/tree.c
>>>> +++ b/kernel/rcu/tree.c
>>>> @@ -2728,47 +2728,8 @@ static void check_cb_ovld(struct rcu_data *rdp)
>>>>   raw_spin_unlock_rcu_node(rnp);
>>>> }
>>>> 
>>>> -/**
>>>> - * call_rcu() - Queue an RCU callback for invocation after a grace period.
>>>> - * @head: structure to be used for queueing the RCU updates.
>>>> - * @func: actual callback function to be invoked after the grace period
>>>> - *
>>>> - * The callback function will be invoked some time after a full grace
>>>> - * period elapses, in other words after all pre-existing RCU read-side
>>>> - * critical sections have completed.  However, the callback function
>>>> - * might well execute concurrently with RCU read-side critical sections
>>>> - * that started after call_rcu() was invoked.
>>>> - *
>>>> - * RCU read-side critical sections are delimited by rcu_read_lock()
>>>> - * and rcu_read_unlock(), and may be nested.  In addition, but only in
>>>> - * v5.0 and later, regions of code across which interrupts, preemption,
>>>> - * or softirqs have been disabled also serve as RCU read-side critical
>>>> - * sections.  This includes hardware interrupt handlers, softirq handlers,
>>>> - * and NMI handlers.
>>>> - *
>>>> - * Note that all CPUs must agree that the grace period extended beyond
>>>> - * all pre-existing RCU read-side critical section.  On systems with more
>>>> - * than one CPU, this means that when "func()" is invoked, each CPU is
>>>> - * guaranteed to have executed a full memory barrier since the end of its
>>>> - * last RCU read-side critical section whose beginning preceded the call
>>>> - * to call_rcu().  It also means that each CPU executing an RCU read-side
>>>> - * critical section that continues beyond the start of "func()" must have
>>>> - * executed a memory barrier after the call_rcu() but before the beginning
>>>> - * of that RCU read-side critical section.  Note that these guarantees
>>>> - * include CPUs that are offline, idle, or executing in user mode, as
>>>> - * well as CPUs that are executing in the kernel.
>>>> - *
>>>> - * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
>>>> - * resulting RCU callback function "func()", then both CPU A and CPU B are
>>>> - * guaranteed to execute a full memory barrier during the time interval
>>>> - * between the call to call_rcu() and the invocation of "func()" -- even
>>>> - * if CPU A and CPU B are the same CPU (but again only if the system has
>>>> - * more than one CPU).
>>>> - *
>>>> - * Implementation of these memory-ordering guarantees is described here:
>>>> - * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
>>>> - */
>>>> -void call_rcu(struct rcu_head *head, rcu_callback_t func)
>>>> +static void
>>>> +__call_rcu_common(struct rcu_head *head, rcu_callback_t func, bool lazy)
>>>> {
>>>>   static atomic_t doublefrees;
>>>>   unsigned long flags;
>>>> @@ -2809,7 +2770,7 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
>>>>   }
>>>> 
>>>>   check_cb_ovld(rdp);
>>>> -    if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags))
>>>> +    if (rcu_nocb_try_bypass(rdp, head, &was_alldone, flags, lazy))
>>>>       return; // Enqueued onto ->nocb_bypass, so just leave.
>>>>   // If no-CBs CPU gets here, rcu_nocb_try_bypass() acquired ->nocb_lock.
>>>>   rcu_segcblist_enqueue(&rdp->cblist, head);
>>>> @@ -2831,8 +2792,84 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
>>>>       local_irq_restore(flags);
>>>>   }
>>>> }
>>>> -EXPORT_SYMBOL_GPL(call_rcu);
>>>> 
>>>> +#ifdef CONFIG_RCU_LAZY
>>>> +/**
>>>> + * call_rcu_flush() - Queue RCU callback for invocation after grace period, and
>>>> + * flush all lazy callbacks (including the new one) to the main ->cblist while
>>>> + * doing so.
>>>> + *
>>>> + * @head: structure to be used for queueing the RCU updates.
>>>> + * @func: actual callback function to be invoked after the grace period
>>>> + *
>>>> + * The callback function will be invoked some time after a full grace
>>>> + * period elapses, in other words after all pre-existing RCU read-side
>>>> + * critical sections have completed.
>>>> + *
>>>> + * Use this API instead of call_rcu() if you don't mind the callback being
>>>> + * invoked after very long periods of time on systems without memory pressure
>>>> + * and on systems which are lightly loaded or mostly idle.
>>> 
>>> This comment is backwards, right?  Shouldn't it say something like "Use
>>> this API instead of call_rcu() if you don't mind burning extra power..."?
>> 
>> Yes sorry my mistake :-(. It’s a stale comment from the rework and I’ll update it.
> 
> Very good, thank you!

Anything for you!

> 
>>>> + *
>>>> + * Other than the extra delay in callbacks being invoked, this function is
>>>> + * identical to, and reuses call_rcu()'s logic. Refer to call_rcu() for more
>>>> + * details about memory ordering and other functionality.
>>>> + */
>>>> +void call_rcu_flush(struct rcu_head *head, rcu_callback_t func)
>>>> +{
>>>> +    return __call_rcu_common(head, func, false);
>>>> +}
>>>> +EXPORT_SYMBOL_GPL(call_rcu_flush);
>>>> +#endif
>>>> +
>>>> +/**
>>>> + * call_rcu() - Queue an RCU callback for invocation after a grace period.
>>>> + * By default the callbacks are 'lazy' and are kept hidden from the main
>>>> + * ->cblist to prevent starting of grace periods too soon.
>>>> + * If you desire grace periods to start very soon, use call_rcu_flush().
>>>> + *
>>>> + * @head: structure to be used for queueing the RCU updates.
>>>> + * @func: actual callback function to be invoked after the grace period
>>>> + *
>>>> + * The callback function will be invoked some time after a full grace
>>>> + * period elapses, in other words after all pre-existing RCU read-side
>>>> + * critical sections have completed.  However, the callback function
>>>> + * might well execute concurrently with RCU read-side critical sections
>>>> + * that started after call_rcu() was invoked.
>>>> + *
>>>> + * RCU read-side critical sections are delimited by rcu_read_lock()
>>>> + * and rcu_read_unlock(), and may be nested.  In addition, but only in
>>>> + * v5.0 and later, regions of code across which interrupts, preemption,
>>>> + * or softirqs have been disabled also serve as RCU read-side critical
>>>> + * sections.  This includes hardware interrupt handlers, softirq handlers,
>>>> + * and NMI handlers.
>>>> + *
>>>> + * Note that all CPUs must agree that the grace period extended beyond
>>>> + * all pre-existing RCU read-side critical section.  On systems with more
>>>> + * than one CPU, this means that when "func()" is invoked, each CPU is
>>>> + * guaranteed to have executed a full memory barrier since the end of its
>>>> + * last RCU read-side critical section whose beginning preceded the call
>>>> + * to call_rcu().  It also means that each CPU executing an RCU read-side
>>>> + * critical section that continues beyond the start of "func()" must have
>>>> + * executed a memory barrier after the call_rcu() but before the beginning
>>>> + * of that RCU read-side critical section.  Note that these guarantees
>>>> + * include CPUs that are offline, idle, or executing in user mode, as
>>>> + * well as CPUs that are executing in the kernel.
>>>> + *
>>>> + * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
>>>> + * resulting RCU callback function "func()", then both CPU A and CPU B are
>>>> + * guaranteed to execute a full memory barrier during the time interval
>>>> + * between the call to call_rcu() and the invocation of "func()" -- even
>>>> + * if CPU A and CPU B are the same CPU (but again only if the system has
>>>> + * more than one CPU).
>>>> + *
>>>> + * Implementation of these memory-ordering guarantees is described here:
>>>> + * Documentation/RCU/Design/Memory-Ordering/Tree-RCU-Memory-Ordering.rst.
>>>> + */
>>>> +void call_rcu(struct rcu_head *head, rcu_callback_t func)
>>>> +{
>>>> +    return __call_rcu_common(head, func, true);
>>>> +}
>>>> +EXPORT_SYMBOL_GPL(call_rcu);
>>>> 
>>>> /* Maximum number of jiffies to wait before draining a batch. */
>>>> #define KFREE_DRAIN_JIFFIES (5 * HZ)
>>>> @@ -3507,7 +3544,7 @@ void synchronize_rcu(void)
>>>>       if (rcu_gp_is_expedited())
>>>>           synchronize_rcu_expedited();
>>>>       else
>>>> -            wait_rcu_gp(call_rcu);
>>>> +            wait_rcu_gp(call_rcu_flush);
>>>>       return;
>>>>   }
>>>> 
>>>> @@ -3902,7 +3939,11 @@ static void rcu_barrier_entrain(struct rcu_data *rdp)
>>>>   rdp->barrier_head.func = rcu_barrier_callback;
>>>>   debug_rcu_head_queue(&rdp->barrier_head);
>>>>   rcu_nocb_lock(rdp);
>>>> -    WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
>>>> +    /*
>>>> +     * Flush the bypass list, but also wake up the GP thread as otherwise
>>>> +     * bypass/lazy CBs maynot be noticed, and can cause real long delays!
>>>> +     */
>>>> +    WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, FLUSH_BP_WAKE));
>>>>   if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head)) {
>>>>       atomic_inc(&rcu_state.barrier_cpu_count);
>>>>   } else {
>>>> @@ -4323,7 +4364,7 @@ void rcutree_migrate_callbacks(int cpu)
>>>>   my_rdp = this_cpu_ptr(&rcu_data);
>>>>   my_rnp = my_rdp->mynode;
>>>>   rcu_nocb_lock(my_rdp); /* irqs already disabled. */
>>>> -    WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies));
>>>> +    WARN_ON_ONCE(!rcu_nocb_flush_bypass(my_rdp, NULL, jiffies, FLUSH_BP_NONE));
>>>>   raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */
>>>>   /* Leverage recent GPs and set GP for new callbacks. */
>>>>   needwake = rcu_advance_cbs(my_rnp, rdp) ||
>>>> diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
>>>> index d4a97e40ea9c..361c41d642c7 100644
>>>> --- a/kernel/rcu/tree.h
>>>> +++ b/kernel/rcu/tree.h
>>>> @@ -263,14 +263,16 @@ struct rcu_data {
>>>>   unsigned long last_fqs_resched;    /* Time of last rcu_resched(). */
>>>>   unsigned long last_sched_clock;    /* Jiffies of last rcu_sched_clock_irq(). */
>>>> 
>>>> +    long lazy_len;            /* Length of buffered lazy callbacks. */
>>> 
>>> Do we ever actually care about the length as opposed to whether or not all
>>> the bypass callbacks are lazy?  If not, a "some_nonlazy" boolean would be
>>> initialed to zero and ORed with the non-laziness of the added callback.
>>> Or, if there was a test anyway, simply set to 1 in the presence of a
>>> non-lazy callback.  And as now, gets zeroed when the bypass is flushed.
>> 
>> We had discussed this before, and my point was
>> we could use it for tracing and future extension
>> as well. If it’s ok with you, I prefer to keep it this
>> way than having to come back add the length later.
>> 
>> On a minor point as well, if you really want it this
>> way,  I am afraid of changing this now since I tested
>> this way for last several iterations and it’s easy to
>> add a regression. So I prefer to keep it this way and
>> then I can add more patches later on top if that’s
>> Ok with you.
> 
> No, you are right, the debug information might be helpful.
> 
> But hey, at least I am consistent!

Haha ;-)

> 
>>> This might shorten a few lines of code.
>>> 
>>>>   int cpu;
>>>> };
>>>> 
>>>> /* Values for nocb_defer_wakeup field in struct rcu_data. */
>>>> #define RCU_NOCB_WAKE_NOT    0
>>>> #define RCU_NOCB_WAKE_BYPASS    1
>>>> -#define RCU_NOCB_WAKE        2
>>>> -#define RCU_NOCB_WAKE_FORCE    3
>>>> +#define RCU_NOCB_WAKE_LAZY    2
>>>> +#define RCU_NOCB_WAKE        3
>>>> +#define RCU_NOCB_WAKE_FORCE    4
>>>> 
>>>> #define RCU_JIFFIES_TILL_FORCE_QS (1 + (HZ > 250) + (HZ > 500))
>>>>                   /* For jiffies_till_first_fqs and */
>>>> @@ -439,10 +441,17 @@ static void zero_cpu_stall_ticks(struct rcu_data *rdp);
>>>> static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp);
>>>> static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq);
>>>> static void rcu_init_one_nocb(struct rcu_node *rnp);
>>>> +
>>>> +#define FLUSH_BP_NONE 0
>>>> +/* Is the CB being enqueued after the flush, a lazy CB? */
>>>> +#define FLUSH_BP_LAZY BIT(0)
>>>> +/* Wake up nocb-GP thread after flush? */
>>>> +#define FLUSH_BP_WAKE BIT(1)
>>>> static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>>> -                  unsigned long j);
>>>> +                  unsigned long j, unsigned long flush_flags);
>>>> static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>>> -                bool *was_alldone, unsigned long flags);
>>>> +                bool *was_alldone, unsigned long flags,
>>>> +                bool lazy);
>>>> static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty,
>>>>                unsigned long flags);
>>>> static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp, int level);
>>>> diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h
>>>> index 18e9b4cd78ef..5cac05600798 100644
>>>> --- a/kernel/rcu/tree_exp.h
>>>> +++ b/kernel/rcu/tree_exp.h
>>>> @@ -937,7 +937,7 @@ void synchronize_rcu_expedited(void)
>>>> 
>>>>   /* If expedited grace periods are prohibited, fall back to normal. */
>>>>   if (rcu_gp_is_normal()) {
>>>> -        wait_rcu_gp(call_rcu);
>>>> +        wait_rcu_gp(call_rcu_flush);
>>>>       return;
>>>>   }
>>>> 
>>>> diff --git a/kernel/rcu/tree_nocb.h b/kernel/rcu/tree_nocb.h
>>>> index f77a6d7e1356..661c685aba3f 100644
>>>> --- a/kernel/rcu/tree_nocb.h
>>>> +++ b/kernel/rcu/tree_nocb.h
>>>> @@ -256,6 +256,31 @@ static bool wake_nocb_gp(struct rcu_data *rdp, bool force)
>>>>   return __wake_nocb_gp(rdp_gp, rdp, force, flags);
>>>> }
>>>> 
>>>> +/*
>>>> + * LAZY_FLUSH_JIFFIES decides the maximum amount of time that
>>>> + * can elapse before lazy callbacks are flushed. Lazy callbacks
>>>> + * could be flushed much earlier for a number of other reasons
>>>> + * however, LAZY_FLUSH_JIFFIES will ensure no lazy callbacks are
>>>> + * left unsubmitted to RCU after those many jiffies.
>>>> + */
>>>> +#define LAZY_FLUSH_JIFFIES (10 * HZ)
>>>> +static unsigned long jiffies_till_flush = LAZY_FLUSH_JIFFIES;
>>>> +
>>>> +#ifdef CONFIG_RCU_LAZY
>>>> +// To be called only from test code.
>>>> +void rcu_lazy_set_jiffies_till_flush(unsigned long jif)
>>>> +{
>>>> +    jiffies_till_flush = jif;
>>>> +}
>>>> +EXPORT_SYMBOL(rcu_lazy_set_jiffies_till_flush);
>>>> +
>>>> +unsigned long rcu_lazy_get_jiffies_till_flush(void)
>>>> +{
>>>> +    return jiffies_till_flush;
>>>> +}
>>>> +EXPORT_SYMBOL(rcu_lazy_get_jiffies_till_flush);
>>>> +#endif
>>>> +
>>>> /*
>>>> * Arrange to wake the GP kthread for this NOCB group at some future
>>>> * time when it is safe to do so.
>>>> @@ -269,10 +294,14 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
>>>>   raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
>>>> 
>>>>   /*
>>>> -     * Bypass wakeup overrides previous deferments. In case
>>>> -     * of callback storm, no need to wake up too early.
>>>> +     * Bypass wakeup overrides previous deferments. In case of
>>>> +     * callback storm, no need to wake up too early.
>>>>    */
>>>> -    if (waketype == RCU_NOCB_WAKE_BYPASS) {
>>>> +    if (waketype == RCU_NOCB_WAKE_LAZY
>>>> +        && READ_ONCE(rdp->nocb_defer_wakeup) == RCU_NOCB_WAKE_NOT) {
>>> 
>>> Please leave the "&&" on the previous line and line up the "READ_ONCE("
>>> with the "waketype".  That makes it easier to tell the condition from
>>> the following code.
>> 
>> Will do!
>> 
>>> 
>>>> +        mod_timer(&rdp_gp->nocb_timer, jiffies + jiffies_till_flush);
>>>> +        WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
>>>> +    } else if (waketype == RCU_NOCB_WAKE_BYPASS) {
>>>>       mod_timer(&rdp_gp->nocb_timer, jiffies + 2);
>>>>       WRITE_ONCE(rdp_gp->nocb_defer_wakeup, waketype);
>>>>   } else {
>>>> @@ -293,12 +322,16 @@ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
>>>> * proves to be initially empty, just return false because the no-CB GP
>>>> * kthread may need to be awakened in this case.
>>>> *
>>>> + * Return true if there was something to be flushed and it succeeded, otherwise
>>>> + * false.
>>>> + *
>>>> * Note that this function always returns true if rhp is NULL.
>>>> */
>>>> static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>>> -                     unsigned long j)
>>>> +                     unsigned long j, unsigned long flush_flags)
>>>> {
>>>>   struct rcu_cblist rcl;
>>>> +    bool lazy = flush_flags & FLUSH_BP_LAZY;
>>>> 
>>>>   WARN_ON_ONCE(!rcu_rdp_is_offloaded(rdp));
>>>>   rcu_lockdep_assert_cblist_protected(rdp);
>>>> @@ -310,7 +343,20 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>>>   /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
>>>>   if (rhp)
>>>>       rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
>>>> -    rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
>>>> +
>>>> +    /*
>>>> +     * If the new CB requested was a lazy one, queue it onto the main
>>>> +     * ->cblist so we can take advantage of a sooner grade period.
>>> 
>>> "take advantage of a grace period that will happen regardless."?
>> 
>> Sure will update.
>> 
>>> 
>>>> +     */
>>>> +    if (lazy && rhp) {
>>>> +        rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, NULL);
>>>> +        rcu_cblist_enqueue(&rcl, rhp);
>>> 
>>> Would it makes sense to enqueue rhp onto ->nocb_bypass first, NULL out
>>> rhp, then let the rcu_cblist_flush_enqueue() be common code?  Or did this
>>> function grow a later use of rhp that I missed?
>> 
>> No that could be done, but it prefer to keep it this
>> way because rhp is a function parameter and I
>> prefer not to modify those since it could add a
>> bug in future where rhp passed by user is now
>> NULL for some reason, half way through the
>> function.
> 
> I agree that changing a function parameter is bad practice.
> 
> So the question becomes whether introducing a local would outweigh
> consolidating this code.  Could you please at least give it a shot?

Yes for sure I will try this. Was thinking the same.

> 
>>>> +        WRITE_ONCE(rdp->lazy_len, 0);
>>>> +    } else {
>>>> +        rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
>>>> +        WRITE_ONCE(rdp->lazy_len, 0);
>>> 
>>> This WRITE_ONCE() can be dropped out of the "if" statement, correct?
>> 
>> Yes will update.
> 
> Thank you!
> 
>>> If so, this could be an "if" statement with two statements in its "then"
>>> clause, no "else" clause, and two statements following the "if" statement.
>> 
>> I don’t think we can get rid of the else part but I’ll see what it looks like.
> 
> In the function header, s/rhp/rhp_in/, then:
> 
>    struct rcu_head *rhp = rhp_in;
> 
> And then:
> 
>    if (lazy && rhp) {
>        rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
>        rhp = NULL;
>    }
>    rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
>    WRITE_ONCE(rdp->lazy_len, 0);
> 
> Or did I mess something up?

Yes you are spot on. It seems more shorter, I’ll do it this way!

> 
>>>> +    }
>>>> +
>>>>   rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
>>>>   WRITE_ONCE(rdp->nocb_bypass_first, j);
>>>>   rcu_nocb_bypass_unlock(rdp);
>>>> @@ -326,13 +372,33 @@ static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>>> * Note that this function always returns true if rhp is NULL.
>>>> */
>>>> static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>>> -                  unsigned long j)
>>>> +                  unsigned long j, unsigned long flush_flags)
>>>> {
>>>> +    bool ret;
>>>> +    bool was_alldone = false;
>>>> +    bool bypass_all_lazy = false;
>>>> +    long bypass_ncbs;
>>> 
>>> Alphabetical order by variable name, please.  (Yes, I know that this is
>>> strange, but what can I say?)
>> 
>> Sure.
> 
> Thank you!

My pleasure!

> 
>>>> +
>>>>   if (!rcu_rdp_is_offloaded(rdp))
>>>>       return true;
>>>>   rcu_lockdep_assert_cblist_protected(rdp);
>>>>   rcu_nocb_bypass_lock(rdp);
>>>> -    return rcu_nocb_do_flush_bypass(rdp, rhp, j);
>>>> +
>>>> +    if (flush_flags & FLUSH_BP_WAKE) {
>>>> +        was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
>>>> +        bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>>>> +        bypass_all_lazy = bypass_ncbs && (bypass_ncbs == rdp->lazy_len);
>>>> +    }
>>>> +
>>>> +    ret = rcu_nocb_do_flush_bypass(rdp, rhp, j, flush_flags);
>>>> +
>>>> +    // Wake up the nocb GP thread if needed. GP thread could be sleeping
>>>> +    // while waiting for lazy timer to expire (otherwise rcu_barrier may
>>>> +    // end up waiting for the duration of the lazy timer).
>>>> +    if (flush_flags & FLUSH_BP_WAKE && was_alldone && bypass_all_lazy)
>>>> +        wake_nocb_gp(rdp, false);
>>>> +
>>>> +    return ret;
>>>> }
>>>> 
>>>> /*
>>>> @@ -345,7 +411,7 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
>>>>   if (!rcu_rdp_is_offloaded(rdp) ||
>>>>       !rcu_nocb_bypass_trylock(rdp))
>>>>       return;
>>>> -    WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j));
>>>> +    WARN_ON_ONCE(!rcu_nocb_do_flush_bypass(rdp, NULL, j, FLUSH_BP_NONE));
>>>> }
>>>> 
>>>> /*
>>>> @@ -367,12 +433,14 @@ static void rcu_nocb_try_flush_bypass(struct rcu_data *rdp, unsigned long j)
>>>> * there is only one CPU in operation.
>>>> */
>>>> static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>>> -                bool *was_alldone, unsigned long flags)
>>>> +                bool *was_alldone, unsigned long flags,
>>>> +                bool lazy)
>>>> {
>>>>   unsigned long c;
>>>>   unsigned long cur_gp_seq;
>>>>   unsigned long j = jiffies;
>>>>   long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>>>> +    bool bypass_is_lazy = (ncbs == READ_ONCE(rdp->lazy_len));
>>>> 
>>>>   lockdep_assert_irqs_disabled();
>>>> 
>>>> @@ -417,25 +485,30 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>>>   // If there hasn't yet been all that many ->cblist enqueues
>>>>   // this jiffy, tell the caller to enqueue onto ->cblist.  But flush
>>>>   // ->nocb_bypass first.
>>>> -    if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
>>>> +    // Lazy CBs throttle this back and do immediate bypass queuing.
>>>> +    if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy && !lazy) {
>>>>       rcu_nocb_lock(rdp);
>>>>       *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
>>>>       if (*was_alldone)
>>>>           trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
>>>>                       TPS("FirstQ"));
>>>> -        WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
>>>> +
>>>> +        WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j, FLUSH_BP_NONE));
>>>>       WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
>>>>       return false; // Caller must enqueue the callback.
>>>>   }
>>>> 
>>>>   // If ->nocb_bypass has been used too long or is too full,
>>>>   // flush ->nocb_bypass to ->cblist.
>>>> -    if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
>>>> +    if ((ncbs && !bypass_is_lazy && j != READ_ONCE(rdp->nocb_bypass_first)) ||
>>>> +        (ncbs &&  bypass_is_lazy &&
>>>> +        (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush))) ||
>>>>       ncbs >= qhimark) {
>>>>       rcu_nocb_lock(rdp);
>>>>       *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
>>>> 
>>>> -        if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
>>>> +        if (!rcu_nocb_flush_bypass(rdp, rhp, j,
>>>> +                       lazy ? FLUSH_BP_LAZY : FLUSH_BP_NONE)) {
>>>>           if (*was_alldone)
>>>>               trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
>>>>                           TPS("FirstQ"));
>>>> @@ -460,16 +533,29 @@ static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>>>   // We need to use the bypass.
>>>>   rcu_nocb_wait_contended(rdp);
>>>>   rcu_nocb_bypass_lock(rdp);
>>>> +
>>>>   ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>>>>   rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
>>>>   rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
>>>> +
>>>> +    if (IS_ENABLED(CONFIG_RCU_LAZY) && lazy)
>>> 
>>> Won't !IS_ENABLED(CONFIG_RCU_LAZY) mean that lazy cannot be set?
>>> Why do we need to check both?  Or are you going for dead code?  If so,
>>> shouldn't there be IS_ENABLED(CONFIG_RCU_LAZY) checks above as well?
>>> 
>>> Me, I am not convinced that the dead code would buy you much.  In fact,
>>> the compiler might well be propagating the constants on its own.
>>> 
>>> Ah!  The reason the compiler cannot figure this out is because you put
>>> the switch into rcu.h.  If you instead always export the call_rcu_flush()
>>> definition, and check IS_ENABLED(CONFIG_RCU_LAZY) at the beginning of
>>> call_rcu(), the compiler should have the information that it needs to
>>> do this for you.
>> 
>> Ah ok, I will try to do it this way.
> 
> Very good, thank you, for this and for the rest below.

Glad we could do this review, thanks.

 - Joel 


> 
>                            Thanx, Paul
> 
>>>> +        WRITE_ONCE(rdp->lazy_len, rdp->lazy_len + 1);
>>>> +
>>>>   if (!ncbs) {
>>>>       WRITE_ONCE(rdp->nocb_bypass_first, j);
>>>>       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
>>>>   }
>>>> +
>>>>   rcu_nocb_bypass_unlock(rdp);
>>>>   smp_mb(); /* Order enqueue before wake. */
>>>> -    if (ncbs) {
>>>> +
>>>> +    // A wake up of the grace period kthread or timer adjustment needs to
>>>> +    // be done only if:
>>>> +    // 1. Bypass list was fully empty before (this is the first bypass list entry).
>>>> +    //    Or, both the below conditions are met:
>>>> +    // 1. Bypass list had only lazy CBs before.
>>>> +    // 2. The new CB is non-lazy.
>>>> +    if (ncbs && (!bypass_is_lazy || lazy)) {
>>>>       local_irq_restore(flags);
>>>>   } else {
>>>>       // No-CBs GP kthread might be indefinitely asleep, if so, wake.
>>>> @@ -499,7 +585,7 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
>>>> {
>>>>   unsigned long cur_gp_seq;
>>>>   unsigned long j;
>>>> -    long len;
>>>> +    long len, lazy_len, bypass_len;
>>>>   struct task_struct *t;
>>> 
>>> Again, alphabetical please, strange though that might seem.
>> 
>> Yes sure
>> 
>>> 
>>>>   // If we are being polled or there is no kthread, just leave.
>>>> @@ -512,9 +598,16 @@ static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
>>>>   }
>>>>   // Need to actually to a wakeup.
>>>>   len = rcu_segcblist_n_cbs(&rdp->cblist);
>>>> +    bypass_len = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>>>> +    lazy_len = READ_ONCE(rdp->lazy_len);
>>>>   if (was_alldone) {
>>>>       rdp->qlen_last_fqs_check = len;
>>>> -        if (!irqs_disabled_flags(flags)) {
>>>> +        // Only lazy CBs in bypass list
>>>> +        if (lazy_len && bypass_len == lazy_len) {
>>>> +            rcu_nocb_unlock_irqrestore(rdp, flags);
>>>> +            wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_LAZY,
>>>> +                       TPS("WakeLazy"));
>>>> +        } else if (!irqs_disabled_flags(flags)) {
>>>>           /* ... if queue was empty ... */
>>>>           rcu_nocb_unlock_irqrestore(rdp, flags);
>>>>           wake_nocb_gp(rdp, false);
>>>> @@ -604,8 +697,8 @@ static void nocb_gp_sleep(struct rcu_data *my_rdp, int cpu)
>>>> */
>>>> static void nocb_gp_wait(struct rcu_data *my_rdp)
>>>> {
>>>> -    bool bypass = false;
>>>> -    long bypass_ncbs;
>>>> +    bool bypass = false, lazy = false;
>>>> +    long bypass_ncbs, lazy_ncbs;
>>> 
>>> And ditto.
>> 
>> Ok
>> 
>>> 
>>>>   int __maybe_unused cpu = my_rdp->cpu;
>>>>   unsigned long cur_gp_seq;
>>>>   unsigned long flags;
>>>> @@ -640,24 +733,41 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
>>>>    * won't be ignored for long.
>>>>    */
>>>>   list_for_each_entry(rdp, &my_rdp->nocb_head_rdp, nocb_entry_rdp) {
>>>> +        bool flush_bypass = false;
>>>> +
>>>>       trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
>>>>       rcu_nocb_lock_irqsave(rdp, flags);
>>>>       lockdep_assert_held(&rdp->nocb_lock);
>>>>       bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>>>> -        if (bypass_ncbs &&
>>>> +        lazy_ncbs = READ_ONCE(rdp->lazy_len);
>>>> +
>>>> +        if (bypass_ncbs && (lazy_ncbs == bypass_ncbs) &&
>>>> +            (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + jiffies_till_flush) ||
>>>> +             bypass_ncbs > 2 * qhimark)) {
>>>> +            flush_bypass = true;
>>>> +        } else if (bypass_ncbs && (lazy_ncbs != bypass_ncbs) &&
>>>>           (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
>>>>            bypass_ncbs > 2 * qhimark)) {
>>>> -            // Bypass full or old, so flush it.
>>>> -            (void)rcu_nocb_try_flush_bypass(rdp, j);
>>>> -            bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>>>> +            flush_bypass = true;
>>>>       } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
>>>>           rcu_nocb_unlock_irqrestore(rdp, flags);
>>>>           continue; /* No callbacks here, try next. */
>>>>       }
>>>> +
>>>> +        if (flush_bypass) {
>>>> +            // Bypass full or old, so flush it.
>>>> +            (void)rcu_nocb_try_flush_bypass(rdp, j);
>>>> +            bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
>>>> +            lazy_ncbs = READ_ONCE(rdp->lazy_len);
>>>> +        }
>>>> +
>>>>       if (bypass_ncbs) {
>>>>           trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
>>>> -                        TPS("Bypass"));
>>>> -            bypass = true;
>>>> +                    bypass_ncbs == lazy_ncbs ? TPS("Lazy") : TPS("Bypass"));
>>>> +            if (bypass_ncbs == lazy_ncbs)
>>>> +                lazy = true;
>>>> +            else
>>>> +                bypass = true;
>>>>       }
>>>>       rnp = rdp->mynode;
>>>> 
>>>> @@ -705,12 +815,21 @@ static void nocb_gp_wait(struct rcu_data *my_rdp)
>>>>   my_rdp->nocb_gp_gp = needwait_gp;
>>>>   my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
>>>> 
>>>> -    if (bypass && !rcu_nocb_poll) {
>>>> -        // At least one child with non-empty ->nocb_bypass, so set
>>>> -        // timer in order to avoid stranding its callbacks.
>>>> -        wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
>>>> -                   TPS("WakeBypassIsDeferred"));
>>>> +    // At least one child with non-empty ->nocb_bypass, so set
>>>> +    // timer in order to avoid stranding its callbacks.
>>>> +    if (!rcu_nocb_poll) {
>>>> +        // If bypass list only has lazy CBs. Add a deferred
>>>> +        // lazy wake up.
>>> 
>>> One sentence rather than two.
>> 
>> Ok
>> 
>>> 
>>>> +        if (lazy && !bypass) {
>>>> +            wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_LAZY,
>>>> +                    TPS("WakeLazyIsDeferred"));
>>>> +        // Otherwise add a deferred bypass wake up.
>>>> +        } else if (bypass) {
>>>> +            wake_nocb_gp_defer(my_rdp, RCU_NOCB_WAKE_BYPASS,
>>>> +                    TPS("WakeBypassIsDeferred"));
>>>> +        }
>>>>   }
>>>> +
>>>>   if (rcu_nocb_poll) {
>>>>       /* Polling, so trace if first poll in the series. */
>>>>       if (gotcbs)
>>>> @@ -1036,7 +1155,7 @@ static long rcu_nocb_rdp_deoffload(void *arg)
>>>>    * return false, which means that future calls to rcu_nocb_try_bypass()
>>>>    * will refuse to put anything into the bypass.
>>>>    */
>>>> -    WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies));
>>>> +    WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, jiffies, FLUSH_BP_NONE));
>>>>   /*
>>>>    * Start with invoking rcu_core() early. This way if the current thread
>>>>    * happens to preempt an ongoing call to rcu_core() in the middle,
>>>> @@ -1278,6 +1397,7 @@ static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
>>>>   raw_spin_lock_init(&rdp->nocb_gp_lock);
>>>>   timer_setup(&rdp->nocb_timer, do_nocb_deferred_wakeup_timer, 0);
>>>>   rcu_cblist_init(&rdp->nocb_bypass);
>>>> +    WRITE_ONCE(rdp->lazy_len, 0);
>>>>   mutex_init(&rdp->nocb_gp_kthread_mutex);
>>>> }
>>>> 
>>>> @@ -1559,13 +1679,13 @@ static void rcu_init_one_nocb(struct rcu_node *rnp)
>>>> }
>>>> 
>>>> static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>>> -                  unsigned long j)
>>>> +                  unsigned long j, unsigned long flush_flags)
>>>> {
>>>>   return true;
>>>> }
>>>> 
>>>> static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
>>>> -                bool *was_alldone, unsigned long flags)
>>>> +                bool *was_alldone, unsigned long flags, bool lazy)
>>>> {
>>>>   return false;
>>>> }
>>>> -- 
>>>> 2.37.3.998.g577e59143f-goog
>>>> 

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ