From 3475e4d56cefab9f8b061dc824db4e314d076a59 Mon Sep 17 00:00:00 2001 From: Linus Torvalds Date: Thu, 6 Jul 2023 11:23:07 -0700 Subject: [PATCH] splice: lock the pages and unlock the pipe during IO This is already what we do for page cache pages, where we can add raw pages that are not up-to-date yet to the pipe, and the readers call the pipe buffer '->confirm()' function to wait for the data to be ready. So just do the same for splice reading, allowing us to unlock the pipe during the read. That then avoids problems with users that get blocked on the pipe lock. Now they get blocked on the pipe buffer ->confirm() instead. TODO: teach 'O_NDELAY' and select/poll about this too. Signed-off-by: Linus Torvalds --- fs/pipe.c | 18 +++--- fs/splice.c | 122 +++++++++++++++++++++++++++++----------- include/linux/pagemap.h | 1 + mm/filemap.c | 6 ++ 4 files changed, 105 insertions(+), 42 deletions(-) diff --git a/fs/pipe.c b/fs/pipe.c index 2d88f73f585a..71942d240c98 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -284,10 +284,17 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) if (!pipe_empty(head, tail)) { struct pipe_buffer *buf = &pipe->bufs[tail & mask]; - size_t chars = buf->len; - size_t written; + size_t chars, written; int error; + error = pipe_buf_confirm(pipe, buf); + if (error) { + if (!ret) + ret = error; + break; + } + + chars = buf->len; if (chars > total_len) { if (buf->flags & PIPE_BUF_FLAG_WHOLE) { if (ret == 0) @@ -297,13 +304,6 @@ pipe_read(struct kiocb *iocb, struct iov_iter *to) chars = total_len; } - error = pipe_buf_confirm(pipe, buf); - if (error) { - if (!ret) - ret = error; - break; - } - written = copy_page_to_iter(buf->page, buf->offset, chars, to); if (unlikely(written < chars)) { if (!ret) diff --git a/fs/splice.c b/fs/splice.c index 004eb1c4ce31..503f7eff41b6 100644 --- a/fs/splice.c +++ b/fs/splice.c @@ -300,6 +300,42 @@ void splice_shrink_spd(struct splice_pipe_desc *spd) kfree(spd->partial); } +static void finalize_pipe_buf(struct pipe_buffer *buf, unsigned int chunk) +{ + buf->len = chunk; + buf->ops = &default_pipe_buf_ops; + unlock_page(buf->page); +} + +static int busy_pipe_buf_confirm(struct pipe_inode_info *pipe, + struct pipe_buffer *buf) +{ + struct page *page = buf->page; + + if (folio_wait_bit_interruptible(page_folio(page), PG_locked)) + return -EINTR; + return 0; +} + +/* + * These are the same as the default pipe buf operations, + * but the '.confirm()' function requires that any user + * wait for the page to unlock before use. + * + * I guess we could use the whole PG_uptodate logic too, + * and treat these as some kind of special page table pages. + * + * PIPE_BUF_FLAG_CAN_MERGE must obviously not be set when + * using these, and it's important that any pipe reader + * look at buf->len only _after_ confirming the buffer! + */ +const struct pipe_buf_operations busy_pipe_buf_ops = { + .confirm = busy_pipe_buf_confirm, + .release = generic_pipe_buf_release, + .try_steal = generic_pipe_buf_try_steal, + .get = generic_pipe_buf_get, +}; + /** * copy_splice_read - Copy data from a file and splice the copy into a pipe * @in: The file to read from @@ -328,6 +364,7 @@ ssize_t copy_splice_read(struct file *in, loff_t *ppos, struct bio_vec *bv; struct kiocb kiocb; struct page **pages; + struct pipe_buffer **bufs; ssize_t ret; size_t used, npages, chunk, remain, keep = 0; int i; @@ -339,11 +376,13 @@ ssize_t copy_splice_read(struct file *in, loff_t *ppos, npages = DIV_ROUND_UP(len, PAGE_SIZE); bv = kzalloc(array_size(npages, sizeof(bv[0])) + - array_size(npages, sizeof(struct page *)), GFP_KERNEL); + array_size(npages, sizeof(struct page *)) + + array_size(npages, sizeof(struct pipe_buffer *)), GFP_KERNEL); if (!bv) return -ENOMEM; pages = (struct page **)(bv + npages); + bufs = (struct pipe_buffer **)(pages + npages); npages = alloc_pages_bulk_array(GFP_USER, npages, pages); if (!npages) { kfree(bv); @@ -352,14 +391,34 @@ ssize_t copy_splice_read(struct file *in, loff_t *ppos, remain = len = min_t(size_t, len, npages * PAGE_SIZE); + /* Push them into the pipe and build up the bio_vec */ for (i = 0; i < npages; i++) { + struct pipe_buffer *buf = pipe_head_buf(pipe); + struct page *page = pages[i]; + + pipe->head++; + lock_page(page); + *buf = (struct pipe_buffer) { + .ops = &busy_pipe_buf_ops, + .page = page, + .offset = 0, + .len = chunk, + }; + bufs[i] = buf; + chunk = min_t(size_t, PAGE_SIZE, remain); - bv[i].bv_page = pages[i]; + bv[i].bv_page = page; bv[i].bv_offset = 0; bv[i].bv_len = chunk; remain -= chunk; } + /* + * We have now reserved the space with locked pages, + * and can unlock the pipe during the IO. + */ + pipe_unlock(pipe); + /* Do the I/O */ iov_iter_bvec(&to, ITER_DEST, bv, npages, len); init_sync_kiocb(&kiocb, in); @@ -378,26 +437,22 @@ ssize_t copy_splice_read(struct file *in, loff_t *ppos, if (ret == -EFAULT) ret = -EAGAIN; - /* Free any pages that didn't get touched at all. */ - if (keep < npages) - release_pages(pages + keep, npages - keep); - - /* Push the remaining pages into the pipe. */ + /* Update the page state in the pipe */ remain = ret; - for (i = 0; i < keep; i++) { - struct pipe_buffer *buf = pipe_head_buf(pipe); + for (i = 0; i < npages; i++) { + struct pipe_buffer *buf = bufs[i]; chunk = min_t(size_t, remain, PAGE_SIZE); - *buf = (struct pipe_buffer) { - .ops = &default_pipe_buf_ops, - .page = bv[i].bv_page, - .offset = 0, - .len = chunk, - }; - pipe->head++; remain -= chunk; + + /* + * NOTE! The size might have changed, and + * chunk may be smaller or zero! + */ + finalize_pipe_buf(buf, chunk); } + pipe_lock(pipe); kfree(bv); return ret; } @@ -455,10 +510,6 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des while (!pipe_empty(head, tail)) { struct pipe_buffer *buf = &pipe->bufs[tail & mask]; - sd->len = buf->len; - if (sd->len > sd->total_len) - sd->len = sd->total_len; - ret = pipe_buf_confirm(pipe, buf); if (unlikely(ret)) { if (ret == -ENODATA) @@ -466,6 +517,10 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des return ret; } + sd->len = buf->len; + if (sd->len > sd->total_len) + sd->len = sd->total_len; + ret = actor(pipe, buf, sd); if (ret <= 0) return ret; @@ -715,12 +770,7 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out, left = sd.total_len; for (n = 0; !pipe_empty(head, tail) && left && n < nbufs; tail++) { struct pipe_buffer *buf = &pipe->bufs[tail & mask]; - size_t this_len = buf->len; - - /* zero-length bvecs are not supported, skip them */ - if (!this_len) - continue; - this_len = min(this_len, left); + size_t this_len; ret = pipe_buf_confirm(pipe, buf); if (unlikely(ret)) { @@ -729,6 +779,12 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out, goto done; } + /* zero-length bvecs are not supported, skip them */ + this_len = buf->len; + if (!this_len) + continue; + this_len = min(this_len, left); + bvec_set_page(&array[n], buf->page, this_len, buf->offset); left -= this_len; @@ -847,13 +903,6 @@ ssize_t splice_to_socket(struct pipe_inode_info *pipe, struct file *out, struct pipe_buffer *buf = &pipe->bufs[tail & mask]; size_t seg; - if (!buf->len) { - tail++; - continue; - } - - seg = min_t(size_t, remain, buf->len); - ret = pipe_buf_confirm(pipe, buf); if (unlikely(ret)) { if (ret == -ENODATA) @@ -861,6 +910,13 @@ ssize_t splice_to_socket(struct pipe_inode_info *pipe, struct file *out, break; } + if (!buf->len) { + tail++; + continue; + } + + seg = min_t(size_t, remain, buf->len); + bvec_set_page(&bvec[bc++], buf->page, seg, buf->offset); remain -= seg; if (remain == 0 || bc >= ARRAY_SIZE(bvec)) diff --git a/include/linux/pagemap.h b/include/linux/pagemap.h index 716953ee1ebd..cc51ea910a91 100644 --- a/include/linux/pagemap.h +++ b/include/linux/pagemap.h @@ -1018,6 +1018,7 @@ static inline bool folio_lock_or_retry(struct folio *folio, */ void folio_wait_bit(struct folio *folio, int bit_nr); int folio_wait_bit_killable(struct folio *folio, int bit_nr); +int folio_wait_bit_interruptible(struct folio *folio, int bit_nr); /* * Wait for a folio to be unlocked. diff --git a/mm/filemap.c b/mm/filemap.c index 9e44a49bbd74..e36e052bb991 100644 --- a/mm/filemap.c +++ b/mm/filemap.c @@ -1450,6 +1450,12 @@ int folio_wait_bit_killable(struct folio *folio, int bit_nr) } EXPORT_SYMBOL(folio_wait_bit_killable); +int folio_wait_bit_interruptible(struct folio *folio, int bit_nr) +{ + return folio_wait_bit_common(folio, bit_nr, TASK_INTERRUPTIBLE, SHARED); +} +EXPORT_SYMBOL(folio_wait_bit_interruptible); + /** * folio_put_wait_locked - Drop a reference and wait for it to be unlocked * @folio: The folio to wait for. -- 2.41.0.203.ga4f2cd32bb.dirty