lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <15228.1568821380@warthog.procyon.org.uk>
Date:   Wed, 18 Sep 2019 16:43:00 +0100
From:   David Howells <dhowells@...hat.com>
To:     Will Deacon <will@...nel.org>
Cc:     dhowells@...hat.com, paulmck@...ux.ibm.com, mark.rutland@....com,
        torvalds@...uxfoundation.org, linux-kernel@...r.kernel.org,
        linux-fsdevel@...r.kernel.org, peterz@...radead.org
Subject: Do we need to correct barriering in circular-buffers.rst?

Will Deacon <will@...nel.org> wrote:

> If I'm understanding your code correctly (big 'if'), then you have things
> like this in pipe_read():
> 
> 
> 	unsigned int head = READ_ONCE(pipe->head);
> 	unsigned int tail = pipe->tail;
> 	unsigned int mask = pipe->buffers - 1;
> 
> 	if (tail != head) {
> 		struct pipe_buffer *buf = &pipe->bufs[tail & mask];
> 
> 		[...]
> 
> 		written = copy_page_to_iter(buf->page, buf->offset, chars, to);
> 
> 
> where you want to make sure you don't read from 'buf->page' until after
> you've read the updated head index. Is that right? If so, then READ_ONCE()
> will not give you that guarantee on architectures such as Power and Arm,
> because the 'if (tail != head)' branch can be speculated and the buffer
> can be read before we've got around to looking at the head index.
> 
> So I reckon you need smp_load_acquire() in this case. pipe_write() might be
> ok with the control dependency because CPUs don't tend to make speculative
> writes visible, but I didn't check it carefully and the compiler can do
> crazy stuff in this area, so I'd be inclined to use smp_load_acquire() here
> too unless you really need the last ounce of performance.

Yeah, I probably do.

Documentation/core-api/circular-buffers.rst might be wrong, then, I think.

It mandates using smp_store_release() to update buffer->head in the producer
and buffer->tail in the consumer - but these need pairing with memory barriers
used when reading buffer->head and buffer->tail on the other side.  Currently,
for the producer we have:

	spin_lock(&producer_lock);

	unsigned long head = buffer->head;
	/* The spin_unlock() and next spin_lock() provide needed ordering. */
	unsigned long tail = READ_ONCE(buffer->tail);

	if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
		/* insert one item into the buffer */
		struct item *item = buffer[head];

		produce_item(item);

		smp_store_release(buffer->head,
				  (head + 1) & (buffer->size - 1));

		/* wake_up() will make sure that the head is committed before
		 * waking anyone up */
		wake_up(consumer);
	}

	spin_unlock(&producer_lock);

I think the ordering comment about spin_unlock and spin_lock is wrong.  There's
no requirement to have a spinlock on either side - and in any case, both sides
could be inside their respective locked sections when accessing the buffer.
The READ_ONCE() would theoretically provide the smp_read_barrier_depends() to
pair with the smp_store_release() in the consumer.  Maybe I should change this
to:

	spin_lock(&producer_lock);

	/* Barrier paired with consumer-side store-release on tail */
	unsigned long tail = smp_load_acquire(buffer->tail);
	unsigned long head = buffer->head;

	if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
		/* insert one item into the buffer */
		struct item *item = buffer[head];

		produce_item(item);

		smp_store_release(buffer->head,
				  (head + 1) & (buffer->size - 1));

		/* wake_up() will make sure that the head is committed before
		 * waking anyone up */
		wake_up(consumer);
	}

	spin_unlock(&producer_lock);

The consumer is currently:

	spin_lock(&consumer_lock);

	/* Read index before reading contents at that index. */
	unsigned long head = smp_load_acquire(buffer->head);
	unsigned long tail = buffer->tail;

	if (CIRC_CNT(head, tail, buffer->size) >= 1) {

		/* extract one item from the buffer */
		struct item *item = buffer[tail];

		consume_item(item);

		/* Finish reading descriptor before incrementing tail. */
		smp_store_release(buffer->tail,
				  (tail + 1) & (buffer->size - 1));
	}

	spin_unlock(&consumer_lock);

which I think is okay.

And yes, I note that this does use smp_load_acquire(buffer->head) in the
consumer - which I should also be doing.

David

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ