[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20130929200114.GC19582@linux.vnet.ibm.com>
Date: Sun, 29 Sep 2013 13:01:14 -0700
From: "Paul E. McKenney" <paulmck@...ux.vnet.ibm.com>
To: Oleg Nesterov <oleg@...hat.com>
Cc: Peter Zijlstra <peterz@...radead.org>,
Mel Gorman <mgorman@...e.de>, Rik van Riel <riel@...hat.com>,
Srikar Dronamraju <srikar@...ux.vnet.ibm.com>,
Ingo Molnar <mingo@...nel.org>,
Andrea Arcangeli <aarcange@...hat.com>,
Johannes Weiner <hannes@...xchg.org>,
Linux-MM <linux-mm@...ck.org>,
LKML <linux-kernel@...r.kernel.org>,
Thomas Gleixner <tglx@...utronix.de>,
Steven Rostedt <rostedt@...dmis.org>,
Linus Torvalds <torvalds@...ux-foundation.org>
Subject: Re: [RFC] introduce synchronize_sched_{enter,exit}()
On Sun, Sep 29, 2013 at 08:36:34PM +0200, Oleg Nesterov wrote:
> Hello.
>
> Paul, Peter, et al, could you review the code below?
>
> I am not sending the patch, I think it is simpler to read the code
> inline (just in case, I didn't try to compile it yet).
>
> It is functionally equivalent to
>
> struct xxx_struct {
> atomic_t counter;
> };
>
> static inline bool xxx_is_idle(struct xxx_struct *xxx)
> {
> return atomic_read(&xxx->counter) == 0;
> }
>
> static inline void xxx_enter(struct xxx_struct *xxx)
> {
> atomic_inc(&xxx->counter);
> synchronize_sched();
> }
>
> static inline void xxx_enter(struct xxx_struct *xxx)
> {
> synchronize_sched();
> atomic_dec(&xxx->counter);
> }
But there is nothing for synchronize_sched() to wait for in the above.
Presumably the caller of xxx_is_idle() is required to disable preemption
or be under rcu_read_lock_sched()?
> except: it records the state and synchronize_sched() is only called by
> xxx_enter() and only if necessary.
>
> Why? Say, percpu_rw_semaphore, or upcoming changes in get_online_cpus(),
> (Peter, I think they should be unified anyway, but lets ignore this for
> now). Or freeze_super() (which currently looks buggy), perhaps something
> else. This pattern
>
> writer:
> state = SLOW_MODE;
> synchronize_rcu/sched();
>
> reader:
> preempt_disable(); // or rcu_read_lock();
> if (state != SLOW_MODE)
> ...
>
> is quite common.
And this does guarantee that by the time the writer's synchronize_whatever()
exits, all readers will know that state==SLOW_MODE.
> Note:
> - This implementation allows multiple writers, and sometimes
> this makes sense.
If each writer atomically incremented SLOW_MODE, did its update, then
atomically decremented it, sure. You could be more clever and avoid
unneeded synchronize_whatever() calls, but I would have to see a good
reason for doing so before recommending this.
OK, but you appear to be doing this below anyway. ;-)
> - But it's trivial to add "bool xxx->exclusive" set by xxx_init().
> If it is true only one xxx_enter() is possible, other callers
> should block until xxx_exit(). This is what percpu_down_write()
> actually needs.
Agreed.
> - Probably it makes sense to add xxx->rcu_domain = RCU/SCHED/ETC.
Or just have pointers to the RCU functions in the xxx structure...
So you are trying to make something that abstracts the RCU-protected
state-change pattern? Or perhaps more accurately, the RCU-protected
state-change-and-back pattern?
> Do you think it is correct? Makes sense? (BUG_ON's are just comments).
... Maybe ... Please see below for commentary and a question.
Thanx, Paul
> Oleg.
>
> // .h -----------------------------------------------------------------------
>
> struct xxx_struct {
> int gp_state;
>
> int gp_count;
> wait_queue_head_t gp_waitq;
>
> int cb_state;
> struct rcu_head cb_head;
spinlock_t xxx_lock; /* ? */
This spinlock might not make the big-system guys happy, but it appears to
be needed below.
> };
>
> static inline bool xxx_is_idle(struct xxx_struct *xxx)
> {
> return !xxx->gp_state; /* GP_IDLE */
> }
>
> extern void xxx_enter(struct xxx_struct *xxx);
> extern void xxx_exit(struct xxx_struct *xxx);
>
> // .c -----------------------------------------------------------------------
>
> enum { GP_IDLE = 0, GP_PENDING, GP_PASSED };
>
> enum { CB_IDLE = 0, CB_PENDING, CB_REPLAY };
>
> #define xxx_lock gp_waitq.lock
>
> void xxx_enter(struct xxx_struct *xxx)
> {
> bool need_wait, need_sync;
>
> spin_lock_irq(&xxx->xxx_lock);
> need_wait = xxx->gp_count++;
> need_sync = xxx->gp_state == GP_IDLE;
Suppose ->gp_state is GP_PASSED. It could transition to GP_IDLE at any
time, right?
> if (need_sync)
> xxx->gp_state = GP_PENDING;
> spin_unlock_irq(&xxx->xxx_lock);
>
> BUG_ON(need_wait && need_sync);
>
> } if (need_sync) {
> synchronize_sched();
> xxx->gp_state = GP_PASSED;
> wake_up_all(&xxx->gp_waitq);
> } else if (need_wait) {
> wait_event(&xxx->gp_waitq, xxx->gp_state == GP_PASSED);
Suppose the wakeup is delayed until after the state has been updated
back to GP_IDLE? Ah, presumably the non-zero ->gp_count prevents this.
Never mind!
> } else {
> BUG_ON(xxx->gp_state != GP_PASSED);
> }
> }
>
> static void cb_rcu_func(struct rcu_head *rcu)
> {
> struct xxx_struct *xxx = container_of(rcu, struct xxx_struct, cb_head);
> long flags;
>
> BUG_ON(xxx->gp_state != GP_PASSED);
> BUG_ON(xxx->cb_state == CB_IDLE);
>
> spin_lock_irqsave(&xxx->xxx_lock, flags);
> if (xxx->gp_count) {
> xxx->cb_state = CB_IDLE;
> } else if (xxx->cb_state == CB_REPLAY) {
> xxx->cb_state = CB_PENDING;
> call_rcu_sched(&xxx->cb_head, cb_rcu_func);
> } else {
> xxx->cb_state = CB_IDLE;
> xxx->gp_state = GP_IDLE;
> }
It took me a bit to work out the above. It looks like the intent is
to have the last xxx_exit() put the state back to GP_IDLE, which appears
to be the state in which readers can use a fastpath.
This works because if ->gp_count is non-zero and ->cb_state is CB_IDLE,
there must be an xxx_exit() in our future.
> spin_unlock_irqrestore(&xxx->xxx_lock, flags);
> }
>
> void xxx_exit(struct xxx_struct *xxx)
> {
> spin_lock_irq(&xxx->xxx_lock);
> if (!--xxx->gp_count) {
> if (xxx->cb_state == CB_IDLE) {
> xxx->cb_state = CB_PENDING;
> call_rcu_sched(&xxx->cb_head, cb_rcu_func);
> } else if (xxx->cb_state == CB_PENDING) {
> xxx->cb_state = CB_REPLAY;
> }
> }
> spin_unlock_irq(&xxx->xxx_lock);
> }
Then we also have something like this?
bool xxx_readers_fastpath_ok(struct xxx_struct *xxx)
{
BUG_ON(!rcu_read_lock_sched_held());
return xxx->gp_state == GP_IDLE;
}
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
Powered by blists - more mailing lists