lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250313233341.1675324-36-dhowells@redhat.com>
Date: Thu, 13 Mar 2025 23:33:27 +0000
From: David Howells <dhowells@...hat.com>
To: Viacheslav Dubeyko <slava@...eyko.com>,
	Alex Markuze <amarkuze@...hat.com>
Cc: David Howells <dhowells@...hat.com>,
	Ilya Dryomov <idryomov@...il.com>,
	Jeff Layton <jlayton@...nel.org>,
	Dongsheng Yang <dongsheng.yang@...ystack.cn>,
	ceph-devel@...r.kernel.org,
	linux-fsdevel@...r.kernel.org,
	linux-block@...r.kernel.org,
	linux-kernel@...r.kernel.org
Subject: [RFC PATCH 35/35] ceph: Remove old I/O API bits

Remove the #if'd out bits of the old I/O API.  This is separate to the
implementation to reduce the size of the reviewable patch.

Signed-off-by: David Howells <dhowells@...hat.com>
cc: Viacheslav Dubeyko <slava@...eyko.com>
cc: Alex Markuze <amarkuze@...hat.com>
cc: Ilya Dryomov <idryomov@...il.com>
cc: ceph-devel@...r.kernel.org
cc: linux-fsdevel@...r.kernel.org
---
 fs/ceph/addr.c  | 2018 ++---------------------------------------------
 fs/ceph/file.c  | 1504 -----------------------------------
 fs/ceph/super.h |   21 -
 3 files changed, 46 insertions(+), 3497 deletions(-)

diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 325fbbce1eaa..b3ba102af60b 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -59,1890 +59,70 @@
  * accounting is preserved.
  */
 
-#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
-#define CONGESTION_OFF_THRESH(congestion_kb)				\
-	(CONGESTION_ON_THRESH(congestion_kb) -				\
-	 (CONGESTION_ON_THRESH(congestion_kb) >> 2))
-
-#if 0 // TODO: Remove after netfs conversion
-static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
-					struct folio **foliop, void **_fsdata);
-
-static struct ceph_snap_context *page_snap_context(struct page *page)
-{
-	if (PagePrivate(page))
-		return (void *)page->private;
-	return NULL;
-}
-#endif // TODO: Remove after netfs conversion
-
-/*
- * Dirty a page.  Optimistically adjust accounting, on the assumption
- * that we won't race with invalidate.  If we do, readjust.
- */
-bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio)
-{
-	struct inode *inode = mapping->host;
-	struct ceph_client *cl = ceph_inode_to_client(inode);
-	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
-	struct ceph_inode_info *ci;
-	struct ceph_snap_context *snapc;
-	struct netfs_group *group;
-
-	if (folio_test_dirty(folio)) {
-		doutc(cl, "%llx.%llx %p idx %lu -- already dirty\n",
-		      ceph_vinop(inode), folio, folio->index);
-		VM_BUG_ON_FOLIO(!folio_test_private(folio), folio);
-		return false;
-	}
-
-	atomic64_inc(&mdsc->dirty_folios);
-
-	ci = ceph_inode(inode);
-
-	/* dirty the head */
-	spin_lock(&ci->i_ceph_lock);
-	if (__ceph_have_pending_cap_snap(ci)) {
-		struct ceph_cap_snap *capsnap =
-			list_last_entry(&ci->i_cap_snaps,
-					struct ceph_cap_snap,
-					ci_item);
-		snapc = capsnap->context;
-		capsnap->dirty_pages++;
-	} else {
-		snapc = ci->i_head_snapc;
-		BUG_ON(!snapc);
-		++ci->i_wrbuffer_ref_head;
-	}
-
-	/* Attach a reference to the snap/group to the folio. */
-	group = netfs_folio_group(folio);
-	if (group != &snapc->group) {
-		netfs_set_group(folio, &snapc->group);
-		if (group) {
-			doutc(cl, "Different group %px != %px\n",
-			      group, &snapc->group);
-			netfs_put_group(group);
-		}
-	}
-
-	if (ci->i_wrbuffer_ref == 0)
-		ihold(inode);
-	++ci->i_wrbuffer_ref;
-	doutc(cl, "%llx.%llx %p idx %lu head %d/%d -> %d/%d "
-	      "snapc %p seq %lld (%d snaps)\n",
-	      ceph_vinop(inode), folio, folio->index,
-	      ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
-	      ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
-	      snapc, snapc->seq, snapc->num_snaps);
-	spin_unlock(&ci->i_ceph_lock);
-
-	return netfs_dirty_folio(mapping, folio);
-}
-
-#if 0 // TODO: Remove after netfs conversion
-/*
- * If we are truncating the full folio (i.e. offset == 0), adjust the
- * dirty folio counters appropriately.  Only called if there is private
- * data on the folio.
- */
-static void ceph_invalidate_folio(struct folio *folio, size_t offset,
-				size_t length)
-{
-	struct inode *inode = folio->mapping->host;
-	struct ceph_client *cl = ceph_inode_to_client(inode);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_snap_context *snapc;
-
-
-	if (offset != 0 || length != folio_size(folio)) {
-		doutc(cl, "%llx.%llx idx %lu partial dirty page %zu~%zu\n",
-		      ceph_vinop(inode), folio->index, offset, length);
-		return;
-	}
-
-	WARN_ON(!folio_test_locked(folio));
-	if (folio_test_private(folio)) {
-		doutc(cl, "%llx.%llx idx %lu full dirty page\n",
-		      ceph_vinop(inode), folio->index);
-
-		snapc = folio_detach_private(folio);
-		ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
-		ceph_put_snap_context(snapc);
-	}
-
-	netfs_invalidate_folio(folio, offset, length);
-}
-
-static void ceph_netfs_expand_readahead(struct netfs_io_request *rreq)
-{
-	struct inode *inode = rreq->inode;
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_file_layout *lo = &ci->i_layout;
-	unsigned long max_pages = inode->i_sb->s_bdi->ra_pages;
-	loff_t end = rreq->start + rreq->len, new_end;
-	struct ceph_netfs_request_data *priv = rreq->netfs_priv;
-	unsigned long max_len;
-	u32 blockoff;
-
-	if (priv) {
-		/* Readahead is disabled by posix_fadvise POSIX_FADV_RANDOM */
-		if (priv->file_ra_disabled)
-			max_pages = 0;
-		else
-			max_pages = priv->file_ra_pages;
-
-	}
-
-	/* Readahead is disabled */
-	if (!max_pages)
-		return;
-
-	max_len = max_pages << PAGE_SHIFT;
-
-	/*
-	 * Try to expand the length forward by rounding up it to the next
-	 * block, but do not exceed the file size, unless the original
-	 * request already exceeds it.
-	 */
-	new_end = umin(round_up(end, lo->stripe_unit), rreq->i_size);
-	if (new_end > end && new_end <= rreq->start + max_len)
-		rreq->len = new_end - rreq->start;
-
-	/* Try to expand the start downward */
-	div_u64_rem(rreq->start, lo->stripe_unit, &blockoff);
-	if (rreq->len + blockoff <= max_len) {
-		rreq->start -= blockoff;
-		rreq->len += blockoff;
-	}
-}
-
-static void finish_netfs_read(struct ceph_osd_request *req)
-{
-	struct inode *inode = req->r_inode;
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
-	struct netfs_io_subrequest *subreq = req->r_priv;
-	struct ceph_osd_req_op *op = &req->r_ops[0];
-	int err = req->r_result;
-	bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ);
-
-	ceph_update_read_metrics(&fsc->mdsc->metric, req->r_start_latency,
-				 req->r_end_latency, osd_data->length, err);
-
-	doutc(cl, "result %d subreq->len=%zu i_size=%lld\n", req->r_result,
-	      subreq->len, i_size_read(req->r_inode));
-
-	/* no object means success but no data */
-	if (err == -ENOENT) {
-		__set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
-		__set_bit(NETFS_SREQ_MADE_PROGRESS, &subreq->flags);
-		err = 0;
-	} else if (err == -EBLOCKLISTED) {
-		fsc->blocklisted = true;
-	}
-
-	if (err >= 0) {
-		if (sparse && err > 0)
-			err = ceph_sparse_ext_map_end(op);
-		if (err < subreq->len &&
-		    subreq->rreq->origin != NETFS_DIO_READ)
-			__set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
-		if (IS_ENCRYPTED(inode) && err > 0) {
-			err = ceph_fscrypt_decrypt_extents(inode,
-					osd_data->pages, subreq->start,
-					op->extent.sparse_ext,
-					op->extent.sparse_ext_cnt);
-			if (err > subreq->len)
-				err = subreq->len;
-		}
-		if (err > 0)
-			__set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
-	}
-
-	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
-		ceph_put_page_vector(osd_data->pages,
-				     calc_pages_for(osd_data->offset,
-					osd_data->length), false);
-	}
-	if (err > 0) {
-		subreq->transferred = err;
-		err = 0;
-	}
-	subreq->error = err;
-	trace_netfs_sreq(subreq, netfs_sreq_trace_io_progress);
-	netfs_read_subreq_terminated(subreq);
-	iput(req->r_inode);
-	ceph_dec_osd_stopping_blocker(fsc->mdsc);
-}
-
-static bool ceph_netfs_issue_op_inline(struct netfs_io_subrequest *subreq)
-{
-	struct netfs_io_request *rreq = subreq->rreq;
-	struct inode *inode = rreq->inode;
-	struct ceph_mds_reply_info_parsed *rinfo;
-	struct ceph_mds_reply_info_in *iinfo;
-	struct ceph_mds_request *req;
-	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	ssize_t err = 0;
-	size_t len;
-	int mode;
-
-	if (rreq->origin != NETFS_DIO_READ)
-		__set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
-	__clear_bit(NETFS_SREQ_COPY_TO_CACHE, &subreq->flags);
-
-	if (subreq->start >= inode->i_size)
-		goto out;
-
-	/* We need to fetch the inline data. */
-	mode = ceph_try_to_choose_auth_mds(inode, CEPH_STAT_CAP_INLINE_DATA);
-	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
-	if (IS_ERR(req)) {
-		err = PTR_ERR(req);
-		goto out;
-	}
-	req->r_ino1 = ci->i_vino;
-	req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INLINE_DATA);
-	req->r_num_caps = 2;
-
-	trace_netfs_sreq(subreq, netfs_sreq_trace_submit);
-	err = ceph_mdsc_do_request(mdsc, NULL, req);
-	if (err < 0)
-		goto out;
-
-	rinfo = &req->r_reply_info;
-	iinfo = &rinfo->targeti;
-	if (iinfo->inline_version == CEPH_INLINE_NONE) {
-		/* The data got uninlined */
-		ceph_mdsc_put_request(req);
-		return false;
-	}
-
-	len = min_t(size_t, iinfo->inline_len - subreq->start, subreq->len);
-	err = copy_to_iter(iinfo->inline_data + subreq->start, len, &subreq->io_iter);
-	if (err == 0) {
-		err = -EFAULT;
-	} else {
-		subreq->transferred += err;
-		err = 0;
-	}
-
-	ceph_mdsc_put_request(req);
-out:
-	subreq->error = err;
-	trace_netfs_sreq(subreq, netfs_sreq_trace_io_progress);
-	netfs_read_subreq_terminated(subreq);
-	return true;
-}
-
-static int ceph_netfs_prepare_read(struct netfs_io_subrequest *subreq)
-{
-	struct netfs_io_request *rreq = subreq->rreq;
-	struct inode *inode = rreq->inode;
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	size_t xlen;
-	u64 objno, objoff;
-
-	/* Truncate the extent at the end of the current block */
-	ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len,
-				      &objno, &objoff, &xlen);
-	rreq->io_streams[0].sreq_max_len = umin(xlen, fsc->mount_options->rsize);
-	return 0;
-}
-
-static void ceph_netfs_issue_read(struct netfs_io_subrequest *subreq)
-{
-	struct netfs_io_request *rreq = subreq->rreq;
-	struct inode *inode = rreq->inode;
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct ceph_osd_request *req = NULL;
-	struct ceph_vino vino = ceph_vino(inode);
-	int err;
-	u64 len;
-	bool sparse = IS_ENCRYPTED(inode) || ceph_test_mount_opt(fsc, SPARSEREAD);
-	u64 off = subreq->start;
-	int extent_cnt;
-
-	if (ceph_inode_is_shutdown(inode)) {
-		err = -EIO;
-		goto out;
-	}
-
-	if (ceph_has_inline_data(ci) && ceph_netfs_issue_op_inline(subreq))
-		return;
-
-	// TODO: This rounding here is slightly dodgy.  It *should* work, for
-	// now, as the cache only deals in blocks that are a multiple of
-	// PAGE_SIZE and fscrypt blocks are at most PAGE_SIZE.  What needs to
-	// happen is for the fscrypt driving to be moved into netfslib and the
-	// data in the cache also to be stored encrypted.
-	len = subreq->len;
-	ceph_fscrypt_adjust_off_and_len(inode, &off, &len);
-
-	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino,
-			off, &len, 0, 1, sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ,
-			CEPH_OSD_FLAG_READ, NULL, ci->i_truncate_seq,
-			ci->i_truncate_size, false);
-	if (IS_ERR(req)) {
-		err = PTR_ERR(req);
-		req = NULL;
-		goto out;
-	}
-
-	if (sparse) {
-		extent_cnt = __ceph_sparse_read_ext_count(inode, len);
-		err = ceph_alloc_sparse_ext_map(&req->r_ops[0], extent_cnt);
-		if (err)
-			goto out;
-	}
-
-	doutc(cl, "%llx.%llx pos=%llu orig_len=%zu len=%llu\n",
-	      ceph_vinop(inode), subreq->start, subreq->len, len);
-
-	/*
-	 * FIXME: For now, use CEPH_OSD_DATA_TYPE_PAGES instead of _ITER for
-	 * encrypted inodes. We'd need infrastructure that handles an iov_iter
-	 * instead of page arrays, and we don't have that as of yet. Once the
-	 * dust settles on the write helpers and encrypt/decrypt routines for
-	 * netfs, we should be able to rework this.
-	 */
-	if (IS_ENCRYPTED(inode)) {
-		struct page **pages;
-		size_t page_off;
-
-		/*
-		 * The io_iter.count needs to be corrected to aligned length.
-		 * Otherwise, iov_iter_get_pages_alloc2() operates with
-		 * the initial unaligned length value. As a result,
-		 * ceph_msg_data_cursor_init() triggers BUG_ON() in the case
-		 * if msg->sparse_read_total > msg->data_length.
-		 */
-		subreq->io_iter.count = len;
-
-		err = iov_iter_get_pages_alloc2(&subreq->io_iter, &pages, len, &page_off);
-		if (err < 0) {
-			doutc(cl, "%llx.%llx failed to allocate pages, %d\n",
-			      ceph_vinop(inode), err);
-			goto out;
-		}
-
-		/* should always give us a page-aligned read */
-		WARN_ON_ONCE(page_off);
-
-		len = err;
-		err = 0;
-
-		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false,
-						 false);
-	} else {
-		osd_req_op_extent_osd_iter(req, 0, &subreq->io_iter);
-	}
-	if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) {
-		err = -EIO;
-		goto out;
-	}
-	req->r_callback = finish_netfs_read;
-	req->r_priv = subreq;
-	req->r_inode = inode;
-	ihold(inode);
-
-	trace_netfs_sreq(subreq, netfs_sreq_trace_submit);
-	ceph_osdc_start_request(req->r_osdc, req);
-out:
-	ceph_osdc_put_request(req);
-	if (err) {
-		subreq->error = err;
-		netfs_read_subreq_terminated(subreq);
-	}
-	doutc(cl, "%llx.%llx result %d\n", ceph_vinop(inode), err);
-}
-
-static int ceph_init_request(struct netfs_io_request *rreq, struct file *file)
-{
-	struct inode *inode = rreq->inode;
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = ceph_inode_to_client(inode);
-	int got = 0, want = CEPH_CAP_FILE_CACHE;
-	struct ceph_netfs_request_data *priv;
-	int ret = 0;
-
-	/* [DEPRECATED] Use PG_private_2 to mark folio being written to the cache. */
-	__set_bit(NETFS_RREQ_USE_PGPRIV2, &rreq->flags);
-
-	if (rreq->origin != NETFS_READAHEAD)
-		return 0;
-
-	priv = kzalloc(sizeof(*priv), GFP_NOFS);
-	if (!priv)
-		return -ENOMEM;
-
-	/*
-	 * If we are doing readahead triggered by a read, fault-in or
-	 * MADV/FADV_WILLNEED, someone higher up the stack must be holding the
-	 * FILE_CACHE and/or LAZYIO caps.
-	 */
-	if (file) {
-		priv->file_ra_pages = file->f_ra.ra_pages;
-		priv->file_ra_disabled = file->f_mode & FMODE_RANDOM;
-		rreq->netfs_priv = priv;
-		return 0;
-	}
-
-	/*
-	 * readahead callers do not necessarily hold Fcb caps
-	 * (e.g. fadvise, madvise).
-	 */
-	ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want, true, &got);
-	if (ret < 0) {
-		doutc(cl, "%llx.%llx, error getting cap\n", ceph_vinop(inode));
-		goto out;
-	}
-
-	if (!(got & want)) {
-		doutc(cl, "%llx.%llx, no cache cap\n", ceph_vinop(inode));
-		ret = -EACCES;
-		goto out;
-	}
-	if (ret == 0) {
-		ret = -EACCES;
-		goto out;
-	}
-
-	priv->caps = got;
-	rreq->netfs_priv = priv;
-	rreq->io_streams[0].sreq_max_len = fsc->mount_options->rsize;
-
-out:
-	if (ret < 0) {
-		if (got)
-			ceph_put_cap_refs(ceph_inode(inode), got);
-		kfree(priv);
-	}
-
-	return ret;
-}
-
-static void ceph_netfs_free_request(struct netfs_io_request *rreq)
-{
-	struct ceph_netfs_request_data *priv = rreq->netfs_priv;
-
-	if (!priv)
-		return;
-
-	if (priv->caps)
-		ceph_put_cap_refs(ceph_inode(rreq->inode), priv->caps);
-	kfree(priv);
-	rreq->netfs_priv = NULL;
-}
-
-const struct netfs_request_ops ceph_netfs_ops = {
-	.init_request		= ceph_init_request,
-	.free_request		= ceph_netfs_free_request,
-	.prepare_read		= ceph_netfs_prepare_read,
-	.issue_read		= ceph_netfs_issue_read,
-	.expand_readahead	= ceph_netfs_expand_readahead,
-	.check_write_begin	= ceph_netfs_check_write_begin,
-};
-
-#ifdef CONFIG_CEPH_FSCACHE
-static void ceph_set_page_fscache(struct page *page)
-{
-	folio_start_private_2(page_folio(page)); /* [DEPRECATED] */
-}
-
-static void ceph_fscache_write_terminated(void *priv, ssize_t error, bool was_async)
-{
-	struct inode *inode = priv;
-
-	if (IS_ERR_VALUE(error) && error != -ENOBUFS)
-		ceph_fscache_invalidate(inode, false);
-}
-
-static void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct fscache_cookie *cookie = ceph_fscache_cookie(ci);
-
-	fscache_write_to_cache(cookie, inode->i_mapping, off, len, i_size_read(inode),
-			       ceph_fscache_write_terminated, inode, true, caching);
-}
-#else
-static inline void ceph_set_page_fscache(struct page *page)
-{
-}
-
-static inline void ceph_fscache_write_to_cache(struct inode *inode, u64 off, u64 len, bool caching)
-{
-}
-#endif /* CONFIG_CEPH_FSCACHE */
-
-struct ceph_writeback_ctl
-{
-	loff_t i_size;
-	u64 truncate_size;
-	u32 truncate_seq;
-	bool size_stable;
-
-	bool head_snapc;
-	struct ceph_snap_context *snapc;
-	struct ceph_snap_context *last_snapc;
-
-	bool done;
-	bool should_loop;
-	bool range_whole;
-	pgoff_t start_index;
-	pgoff_t index;
-	pgoff_t end;
-	xa_mark_t tag;
-
-	pgoff_t strip_unit_end;
-	unsigned int wsize;
-	unsigned int nr_folios;
-	unsigned int max_pages;
-	unsigned int locked_pages;
-
-	int op_idx;
-	int num_ops;
-	u64 offset;
-	u64 len;
-
-	struct folio_batch fbatch;
-	unsigned int processed_in_fbatch;
-
-	bool from_pool;
-	struct page **pages;
-	struct page **data_pages;
-};
-
-/*
- * Get ref for the oldest snapc for an inode with dirty data... that is, the
- * only snap context we are allowed to write back.
- */
-static struct ceph_snap_context *
-get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
-		   struct ceph_snap_context *page_snapc)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_client *cl = ceph_inode_to_client(inode);
-	struct ceph_snap_context *snapc = NULL;
-	struct ceph_cap_snap *capsnap = NULL;
-
-	spin_lock(&ci->i_ceph_lock);
-	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
-		doutc(cl, " capsnap %p snapc %p has %d dirty pages\n",
-		      capsnap, capsnap->context, capsnap->dirty_pages);
-		if (!capsnap->dirty_pages)
-			continue;
-
-		/* get i_size, truncate_{seq,size} for page_snapc? */
-		if (snapc && capsnap->context != page_snapc)
-			continue;
-
-		if (ctl) {
-			if (capsnap->writing) {
-				ctl->i_size = i_size_read(inode);
-				ctl->size_stable = false;
-			} else {
-				ctl->i_size = capsnap->size;
-				ctl->size_stable = true;
-			}
-			ctl->truncate_size = capsnap->truncate_size;
-			ctl->truncate_seq = capsnap->truncate_seq;
-			ctl->head_snapc = false;
-		}
-
-		if (snapc)
-			break;
-
-		snapc = ceph_get_snap_context(capsnap->context);
-		if (!page_snapc ||
-		    page_snapc == snapc ||
-		    page_snapc->seq > snapc->seq)
-			break;
-	}
-	if (!snapc && ci->i_wrbuffer_ref_head) {
-		snapc = ceph_get_snap_context(ci->i_head_snapc);
-		doutc(cl, " head snapc %p has %d dirty pages\n", snapc,
-		      ci->i_wrbuffer_ref_head);
-		if (ctl) {
-			ctl->i_size = i_size_read(inode);
-			ctl->truncate_size = ci->i_truncate_size;
-			ctl->truncate_seq = ci->i_truncate_seq;
-			ctl->size_stable = false;
-			ctl->head_snapc = true;
-		}
-	}
-	spin_unlock(&ci->i_ceph_lock);
-	return snapc;
-}
-
-static u64 get_writepages_data_length(struct inode *inode,
-				      struct page *page, u64 start)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_snap_context *snapc;
-	struct ceph_cap_snap *capsnap = NULL;
-	u64 end = i_size_read(inode);
-	u64 ret;
-
-	snapc = page_snap_context(ceph_fscrypt_pagecache_page(page));
-	if (snapc != ci->i_head_snapc) {
-		bool found = false;
-		spin_lock(&ci->i_ceph_lock);
-		list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
-			if (capsnap->context == snapc) {
-				if (!capsnap->writing)
-					end = capsnap->size;
-				found = true;
-				break;
-			}
-		}
-		spin_unlock(&ci->i_ceph_lock);
-		WARN_ON(!found);
-	}
-	if (end > ceph_fscrypt_page_offset(page) + thp_size(page))
-		end = ceph_fscrypt_page_offset(page) + thp_size(page);
-	ret = end > start ? end - start : 0;
-	if (ret && fscrypt_is_bounce_page(page))
-		ret = round_up(ret, CEPH_FSCRYPT_BLOCK_SIZE);
-	return ret;
-}
-
-/*
- * Write a folio, but leave it locked.
- *
- * If we get a write error, mark the mapping for error, but still adjust the
- * dirty page accounting (i.e., folio is no longer dirty).
- */
-static int write_folio_nounlock(struct folio *folio,
-		struct writeback_control *wbc)
-{
-	struct page *page = &folio->page;
-	struct inode *inode = folio->mapping->host;
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct ceph_snap_context *snapc, *oldest;
-	loff_t page_off = folio_pos(folio);
-	int err;
-	loff_t len = folio_size(folio);
-	loff_t wlen;
-	struct ceph_writeback_ctl ceph_wbc;
-	struct ceph_osd_client *osdc = &fsc->client->osdc;
-	struct ceph_osd_request *req;
-	bool caching = ceph_is_cache_enabled(inode);
-	struct page *bounce_page = NULL;
-
-	doutc(cl, "%llx.%llx folio %p idx %lu\n", ceph_vinop(inode), folio,
-	      folio->index);
-
-	if (ceph_inode_is_shutdown(inode))
-		return -EIO;
-
-	/* verify this is a writeable snap context */
-	snapc = page_snap_context(&folio->page);
-	if (!snapc) {
-		doutc(cl, "%llx.%llx folio %p not dirty?\n", ceph_vinop(inode),
-		      folio);
-		return 0;
-	}
-	oldest = get_oldest_context(inode, &ceph_wbc, snapc);
-	if (snapc->seq > oldest->seq) {
-		doutc(cl, "%llx.%llx folio %p snapc %p not writeable - noop\n",
-		      ceph_vinop(inode), folio, snapc);
-		/* we should only noop if called by kswapd */
-		WARN_ON(!(current->flags & PF_MEMALLOC));
-		ceph_put_snap_context(oldest);
-		folio_redirty_for_writepage(wbc, folio);
-		return 0;
-	}
-	ceph_put_snap_context(oldest);
-
-	/* is this a partial page at end of file? */
-	if (page_off >= ceph_wbc.i_size) {
-		doutc(cl, "%llx.%llx folio at %lu beyond eof %llu\n",
-		      ceph_vinop(inode), folio->index, ceph_wbc.i_size);
-		folio_invalidate(folio, 0, folio_size(folio));
-		return 0;
-	}
-
-	if (ceph_wbc.i_size < page_off + len)
-		len = ceph_wbc.i_size - page_off;
-
-	wlen = IS_ENCRYPTED(inode) ? round_up(len, CEPH_FSCRYPT_BLOCK_SIZE) : len;
-	doutc(cl, "%llx.%llx folio %p index %lu on %llu~%llu snapc %p seq %lld\n",
-	      ceph_vinop(inode), folio, folio->index, page_off, wlen, snapc,
-	      snapc->seq);
-
-	if (atomic_long_inc_return(&fsc->writeback_count) >
-	    CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
-		fsc->write_congested = true;
-
-	req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
-				    page_off, &wlen, 0, 1, CEPH_OSD_OP_WRITE,
-				    CEPH_OSD_FLAG_WRITE, snapc,
-				    ceph_wbc.truncate_seq,
-				    ceph_wbc.truncate_size, true);
-	if (IS_ERR(req)) {
-		folio_redirty_for_writepage(wbc, folio);
-		return PTR_ERR(req);
-	}
-
-	if (wlen < len)
-		len = wlen;
-
-	folio_start_writeback(folio);
-	if (caching)
-		ceph_set_page_fscache(&folio->page);
-	ceph_fscache_write_to_cache(inode, page_off, len, caching);
-
-	if (IS_ENCRYPTED(inode)) {
-		bounce_page = fscrypt_encrypt_pagecache_blocks(&folio->page,
-						    CEPH_FSCRYPT_BLOCK_SIZE, 0,
-						    GFP_NOFS);
-		if (IS_ERR(bounce_page)) {
-			folio_redirty_for_writepage(wbc, folio);
-			folio_end_writeback(folio);
-			ceph_osdc_put_request(req);
-			return PTR_ERR(bounce_page);
-		}
-	}
-
-	/* it may be a short write due to an object boundary */
-	WARN_ON_ONCE(len > folio_size(folio));
-	osd_req_op_extent_osd_data_pages(req, 0,
-			bounce_page ? &bounce_page : &page, wlen, 0,
-			false, false);
-	doutc(cl, "%llx.%llx %llu~%llu (%llu bytes, %sencrypted)\n",
-	      ceph_vinop(inode), page_off, len, wlen,
-	      IS_ENCRYPTED(inode) ? "" : "not ");
-
-	req->r_mtime = inode_get_mtime(inode);
-	ceph_osdc_start_request(osdc, req);
-	err = ceph_osdc_wait_request(osdc, req);
-
-	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
-				  req->r_end_latency, len, err);
-	fscrypt_free_bounce_page(bounce_page);
-	ceph_osdc_put_request(req);
-	if (err == 0)
-		err = len;
-
-	if (err < 0) {
-		struct writeback_control tmp_wbc;
-		if (!wbc)
-			wbc = &tmp_wbc;
-		if (err == -ERESTARTSYS) {
-			/* killed by SIGKILL */
-			doutc(cl, "%llx.%llx interrupted page %p\n",
-			      ceph_vinop(inode), folio);
-			folio_redirty_for_writepage(wbc, folio);
-			folio_end_writeback(folio);
-			return err;
-		}
-		if (err == -EBLOCKLISTED)
-			fsc->blocklisted = true;
-		doutc(cl, "%llx.%llx setting mapping error %d %p\n",
-		      ceph_vinop(inode), err, folio);
-		mapping_set_error(&inode->i_data, err);
-		wbc->pages_skipped++;
-	} else {
-		doutc(cl, "%llx.%llx cleaned page %p\n",
-		      ceph_vinop(inode), folio);
-		err = 0;  /* vfs expects us to return 0 */
-	}
-	oldest = folio_detach_private(folio);
-	WARN_ON_ONCE(oldest != snapc);
-	folio_end_writeback(folio);
-	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
-	ceph_put_snap_context(snapc);  /* page's reference */
-
-	if (atomic_long_dec_return(&fsc->writeback_count) <
-	    CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
-		fsc->write_congested = false;
-
-	return err;
-}
-
-/*
- * async writeback completion handler.
- *
- * If we get an error, set the mapping error bit, but not the individual
- * page error bits.
- */
-static void writepages_finish(struct ceph_osd_request *req)
-{
-	struct inode *inode = req->r_inode;
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_client *cl = ceph_inode_to_client(inode);
-	struct ceph_osd_data *osd_data;
-	struct page *page;
-	int num_pages, total_pages = 0;
-	int i, j;
-	int rc = req->r_result;
-	struct ceph_snap_context *snapc = req->r_snapc;
-	struct address_space *mapping = inode->i_mapping;
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
-	unsigned int len = 0;
-	bool remove_page;
-
-	doutc(cl, "%llx.%llx rc %d\n", ceph_vinop(inode), rc);
-	if (rc < 0) {
-		mapping_set_error(mapping, rc);
-		ceph_set_error_write(ci);
-		if (rc == -EBLOCKLISTED)
-			fsc->blocklisted = true;
-	} else {
-		ceph_clear_error_write(ci);
-	}
-
-	/*
-	 * We lost the cache cap, need to truncate the page before
-	 * it is unlocked, otherwise we'd truncate it later in the
-	 * page truncation thread, possibly losing some data that
-	 * raced its way in
-	 */
-	remove_page = !(ceph_caps_issued(ci) &
-			(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
-
-	/* clean all pages */
-	for (i = 0; i < req->r_num_ops; i++) {
-		if (req->r_ops[i].op != CEPH_OSD_OP_WRITE) {
-			pr_warn_client(cl,
-				"%llx.%llx incorrect op %d req %p index %d tid %llu\n",
-				ceph_vinop(inode), req->r_ops[i].op, req, i,
-				req->r_tid);
-			break;
-		}
-
-		osd_data = osd_req_op_extent_osd_data(req, i);
-		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
-		len += osd_data->length;
-		num_pages = calc_pages_for((u64)osd_data->offset,
-					   (u64)osd_data->length);
-		total_pages += num_pages;
-		for (j = 0; j < num_pages; j++) {
-			page = osd_data->pages[j];
-			if (fscrypt_is_bounce_page(page)) {
-				page = fscrypt_pagecache_page(page);
-				fscrypt_free_bounce_page(osd_data->pages[j]);
-				osd_data->pages[j] = page;
-			}
-			BUG_ON(!page);
-			WARN_ON(!PageUptodate(page));
-
-			if (atomic_long_dec_return(&fsc->writeback_count) <
-			     CONGESTION_OFF_THRESH(
-					fsc->mount_options->congestion_kb))
-				fsc->write_congested = false;
-
-			ceph_put_snap_context(detach_page_private(page));
-			end_page_writeback(page);
-
-			if (atomic64_dec_return(&mdsc->dirty_folios) <= 0) {
-				wake_up_all(&mdsc->flush_end_wq);
-				WARN_ON(atomic64_read(&mdsc->dirty_folios) < 0);
-			}
-
-			doutc(cl, "unlocking %p\n", page);
-
-			if (remove_page)
-				generic_error_remove_folio(inode->i_mapping,
-							  page_folio(page));
-
-			unlock_page(page);
-		}
-		doutc(cl, "%llx.%llx wrote %llu bytes cleaned %d pages\n",
-		      ceph_vinop(inode), osd_data->length,
-		      rc >= 0 ? num_pages : 0);
-
-		release_pages(osd_data->pages, num_pages);
-	}
-
-	ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
-				  req->r_end_latency, len, rc);
-
-	ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
-
-	osd_data = osd_req_op_extent_osd_data(req, 0);
-	if (osd_data->pages_from_pool)
-		mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
-	else
-		kfree(osd_data->pages);
-	ceph_osdc_put_request(req);
-	ceph_dec_osd_stopping_blocker(fsc->mdsc);
-}
-
-static inline
-bool is_forced_umount(struct address_space *mapping)
-{
-	struct inode *inode = mapping->host;
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-
-	if (ceph_inode_is_shutdown(inode)) {
-		if (ci->i_wrbuffer_ref > 0) {
-			pr_warn_ratelimited_client(cl,
-				"%llx.%llx %lld forced umount\n",
-				ceph_vinop(inode), ceph_ino(inode));
-		}
-		mapping_set_error(mapping, -EIO);
-		return true;
-	}
-
-	return false;
-}
-
-static inline
-unsigned int ceph_define_write_size(struct address_space *mapping)
-{
-	struct inode *inode = mapping->host;
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	unsigned int wsize = i_blocksize(inode);
-
-	if (fsc->mount_options->wsize < wsize)
-		wsize = fsc->mount_options->wsize;
-
-	return wsize;
-}
-
-static inline
-void ceph_folio_batch_init(struct ceph_writeback_ctl *ceph_wbc)
-{
-	folio_batch_init(&ceph_wbc->fbatch);
-	ceph_wbc->processed_in_fbatch = 0;
-}
-
-static inline
-void ceph_folio_batch_reinit(struct ceph_writeback_ctl *ceph_wbc)
-{
-	folio_batch_release(&ceph_wbc->fbatch);
-	ceph_folio_batch_init(ceph_wbc);
-}
-
-static inline
-void ceph_init_writeback_ctl(struct address_space *mapping,
-			     struct writeback_control *wbc,
-			     struct ceph_writeback_ctl *ceph_wbc)
-{
-	ceph_wbc->snapc = NULL;
-	ceph_wbc->last_snapc = NULL;
-
-	ceph_wbc->strip_unit_end = 0;
-	ceph_wbc->wsize = ceph_define_write_size(mapping);
-
-	ceph_wbc->nr_folios = 0;
-	ceph_wbc->max_pages = 0;
-	ceph_wbc->locked_pages = 0;
-
-	ceph_wbc->done = false;
-	ceph_wbc->should_loop = false;
-	ceph_wbc->range_whole = false;
-
-	ceph_wbc->start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
-	ceph_wbc->index = ceph_wbc->start_index;
-	ceph_wbc->end = -1;
-
-	if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) {
-		ceph_wbc->tag = PAGECACHE_TAG_TOWRITE;
-	} else {
-		ceph_wbc->tag = PAGECACHE_TAG_DIRTY;
-	}
-
-	ceph_wbc->op_idx = -1;
-	ceph_wbc->num_ops = 0;
-	ceph_wbc->offset = 0;
-	ceph_wbc->len = 0;
-	ceph_wbc->from_pool = false;
-
-	ceph_folio_batch_init(ceph_wbc);
-
-	ceph_wbc->pages = NULL;
-	ceph_wbc->data_pages = NULL;
-}
-
-static inline
-int ceph_define_writeback_range(struct address_space *mapping,
-				struct writeback_control *wbc,
-				struct ceph_writeback_ctl *ceph_wbc)
-{
-	struct inode *inode = mapping->host;
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-
-	/* find oldest snap context with dirty data */
-	ceph_wbc->snapc = get_oldest_context(inode, ceph_wbc, NULL);
-	if (!ceph_wbc->snapc) {
-		/* hmm, why does writepages get called when there
-		   is no dirty data? */
-		doutc(cl, " no snap context with dirty data?\n");
-		return -ENODATA;
-	}
-
-	doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n",
-	      ceph_wbc->snapc, ceph_wbc->snapc->seq,
-	      ceph_wbc->snapc->num_snaps);
-
-	ceph_wbc->should_loop = false;
-
-	if (ceph_wbc->head_snapc && ceph_wbc->snapc != ceph_wbc->last_snapc) {
-		/* where to start/end? */
-		if (wbc->range_cyclic) {
-			ceph_wbc->index = ceph_wbc->start_index;
-			ceph_wbc->end = -1;
-			if (ceph_wbc->index > 0)
-				ceph_wbc->should_loop = true;
-			doutc(cl, " cyclic, start at %lu\n", ceph_wbc->index);
-		} else {
-			ceph_wbc->index = wbc->range_start >> PAGE_SHIFT;
-			ceph_wbc->end = wbc->range_end >> PAGE_SHIFT;
-			if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
-				ceph_wbc->range_whole = true;
-			doutc(cl, " not cyclic, %lu to %lu\n",
-				ceph_wbc->index, ceph_wbc->end);
-		}
-	} else if (!ceph_wbc->head_snapc) {
-		/* Do not respect wbc->range_{start,end}. Dirty pages
-		 * in that range can be associated with newer snapc.
-		 * They are not writeable until we write all dirty pages
-		 * associated with 'snapc' get written */
-		if (ceph_wbc->index > 0)
-			ceph_wbc->should_loop = true;
-		doutc(cl, " non-head snapc, range whole\n");
-	}
-
-	ceph_put_snap_context(ceph_wbc->last_snapc);
-	ceph_wbc->last_snapc = ceph_wbc->snapc;
-
-	return 0;
-}
-
-static inline
-bool has_writeback_done(struct ceph_writeback_ctl *ceph_wbc)
-{
-	return ceph_wbc->done && ceph_wbc->index > ceph_wbc->end;
-}
-
-static inline
-bool can_next_page_be_processed(struct ceph_writeback_ctl *ceph_wbc,
-				unsigned index)
-{
-	return index < ceph_wbc->nr_folios &&
-		ceph_wbc->locked_pages < ceph_wbc->max_pages;
-}
-
-static
-int ceph_check_page_before_write(struct address_space *mapping,
-				 struct writeback_control *wbc,
-				 struct ceph_writeback_ctl *ceph_wbc,
-				 struct folio *folio)
-{
-	struct inode *inode = mapping->host;
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct ceph_snap_context *pgsnapc;
-
-	/* only dirty folios, or our accounting breaks */
-	if (unlikely(!folio_test_dirty(folio) || folio->mapping != mapping)) {
-		doutc(cl, "!dirty or !mapping %p\n", folio);
-		return -ENODATA;
-	}
-
-	/* only if matching snap context */
-	pgsnapc = page_snap_context(&folio->page);
-	if (pgsnapc != ceph_wbc->snapc) {
-		doutc(cl, "folio snapc %p %lld != oldest %p %lld\n",
-		      pgsnapc, pgsnapc->seq,
-		      ceph_wbc->snapc, ceph_wbc->snapc->seq);
-
-		if (!ceph_wbc->should_loop && !ceph_wbc->head_snapc &&
-		    wbc->sync_mode != WB_SYNC_NONE)
-			ceph_wbc->should_loop = true;
-
-		return -ENODATA;
-	}
-
-	if (folio_pos(folio) >= ceph_wbc->i_size) {
-		doutc(cl, "folio at %lu beyond eof %llu\n",
-		      folio->index, ceph_wbc->i_size);
-
-		if ((ceph_wbc->size_stable ||
-		    folio_pos(folio) >= i_size_read(inode)) &&
-		    folio_clear_dirty_for_io(folio))
-			folio_invalidate(folio, 0, folio_size(folio));
-
-		return -ENODATA;
-	}
-
-	if (ceph_wbc->strip_unit_end &&
-	    (folio->index > ceph_wbc->strip_unit_end)) {
-		doutc(cl, "end of strip unit %p\n", folio);
-		return -E2BIG;
-	}
-
-	return 0;
-}
-
-static inline
-void __ceph_allocate_page_array(struct ceph_writeback_ctl *ceph_wbc,
-				unsigned int max_pages)
-{
-	ceph_wbc->pages = kmalloc_array(max_pages,
-					sizeof(*ceph_wbc->pages),
-					GFP_NOFS);
-	if (!ceph_wbc->pages) {
-		ceph_wbc->from_pool = true;
-		ceph_wbc->pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
-		BUG_ON(!ceph_wbc->pages);
-	}
-}
-
-static inline
-void ceph_allocate_page_array(struct address_space *mapping,
-			      struct ceph_writeback_ctl *ceph_wbc,
-			      struct folio *folio)
-{
-	struct inode *inode = mapping->host;
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	size_t xlen;
-	u64 objnum;
-	u64 objoff;
-
-	/* prepare async write request */
-	ceph_wbc->offset = (u64)folio_pos(folio);
-	ceph_calc_file_object_mapping(&ci->i_layout,
-					ceph_wbc->offset, ceph_wbc->wsize,
-					&objnum, &objoff, &xlen);
-
-	ceph_wbc->num_ops = 1;
-	ceph_wbc->strip_unit_end = folio->index + ((xlen - 1) >> PAGE_SHIFT);
-
-	BUG_ON(ceph_wbc->pages);
-	ceph_wbc->max_pages = calc_pages_for(0, (u64)xlen);
-	__ceph_allocate_page_array(ceph_wbc, ceph_wbc->max_pages);
-
-	ceph_wbc->len = 0;
-}
-
-static inline
-bool is_folio_index_contiguous(const struct ceph_writeback_ctl *ceph_wbc,
-			      const struct folio *folio)
-{
-	return folio->index == (ceph_wbc->offset + ceph_wbc->len) >> PAGE_SHIFT;
-}
-
-static inline
-bool is_num_ops_too_big(struct ceph_writeback_ctl *ceph_wbc)
-{
-	return ceph_wbc->num_ops >=
-		(ceph_wbc->from_pool ?  CEPH_OSD_SLAB_OPS : CEPH_OSD_MAX_OPS);
-}
-#endif // TODO: Remove after netfs conversion
-
-static inline
-bool is_write_congestion_happened(struct ceph_fs_client *fsc)
-{
-	return atomic_long_inc_return(&fsc->writeback_count) >
-		CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb);
-}
-
-#if 0 // TODO: Remove after netfs conversion
-static inline int move_dirty_folio_in_page_array(struct address_space *mapping,
-		struct writeback_control *wbc,
-		struct ceph_writeback_ctl *ceph_wbc, struct folio *folio)
-{
-	struct inode *inode = mapping->host;
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct page **pages = ceph_wbc->pages;
-	unsigned int index = ceph_wbc->locked_pages;
-	gfp_t gfp_flags = ceph_wbc->locked_pages ? GFP_NOWAIT : GFP_NOFS;
-
-	if (IS_ENCRYPTED(inode)) {
-		pages[index] = fscrypt_encrypt_pagecache_blocks(&folio->page,
-								PAGE_SIZE,
-								0,
-								gfp_flags);
-		if (IS_ERR(pages[index])) {
-			if (PTR_ERR(pages[index]) == -EINVAL) {
-				pr_err_client(cl, "inode->i_blkbits=%hhu\n",
-						inode->i_blkbits);
-			}
-
-			/* better not fail on first page! */
-			BUG_ON(ceph_wbc->locked_pages == 0);
-
-			pages[index] = NULL;
-			return PTR_ERR(pages[index]);
-		}
-	} else {
-		pages[index] = &folio->page;
-	}
-
-	ceph_wbc->locked_pages++;
-
-	return 0;
-}
-
-static
-int ceph_process_folio_batch(struct address_space *mapping,
-			     struct writeback_control *wbc,
-			     struct ceph_writeback_ctl *ceph_wbc)
-{
-	struct inode *inode = mapping->host;
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct folio *folio = NULL;
-	unsigned i;
-	int rc = 0;
-
-	for (i = 0; can_next_page_be_processed(ceph_wbc, i); i++) {
-		folio = ceph_wbc->fbatch.folios[i];
-
-		if (!folio)
-			continue;
-
-		doutc(cl, "? %p idx %lu, folio_test_writeback %#x, "
-			"folio_test_dirty %#x, folio_test_locked %#x\n",
-			folio, folio->index, folio_test_writeback(folio),
-			folio_test_dirty(folio),
-			folio_test_locked(folio));
-
-		if (folio_test_writeback(folio) ||
-		    folio_test_private_2(folio) /* [DEPRECATED] */) {
-			doutc(cl, "waiting on writeback %p\n", folio);
-			folio_wait_writeback(folio);
-			folio_wait_private_2(folio); /* [DEPRECATED] */
-			continue;
-		}
-
-		if (ceph_wbc->locked_pages == 0)
-			folio_lock(folio);
-		else if (!folio_trylock(folio))
-			break;
-
-		rc = ceph_check_page_before_write(mapping, wbc,
-						  ceph_wbc, folio);
-		if (rc == -ENODATA) {
-			rc = 0;
-			folio_unlock(folio);
-			ceph_wbc->fbatch.folios[i] = NULL;
-			continue;
-		} else if (rc == -E2BIG) {
-			rc = 0;
-			folio_unlock(folio);
-			ceph_wbc->fbatch.folios[i] = NULL;
-			break;
-		}
-
-		if (!folio_clear_dirty_for_io(folio)) {
-			doutc(cl, "%p !folio_clear_dirty_for_io\n", folio);
-			folio_unlock(folio);
-			ceph_wbc->fbatch.folios[i] = NULL;
-			continue;
-		}
-
-		/*
-		 * We have something to write.  If this is
-		 * the first locked page this time through,
-		 * calculate max possible write size and
-		 * allocate a page array
-		 */
-		if (ceph_wbc->locked_pages == 0) {
-			ceph_allocate_page_array(mapping, ceph_wbc, folio);
-		} else if (!is_folio_index_contiguous(ceph_wbc, folio)) {
-			if (is_num_ops_too_big(ceph_wbc)) {
-				folio_redirty_for_writepage(wbc, folio);
-				folio_unlock(folio);
-				break;
-			}
-
-			ceph_wbc->num_ops++;
-			ceph_wbc->offset = (u64)folio_pos(folio);
-			ceph_wbc->len = 0;
-		}
-
-		/* note position of first page in fbatch */
-		doutc(cl, "%llx.%llx will write folio %p idx %lu\n",
-		      ceph_vinop(inode), folio, folio->index);
-
-		fsc->write_congested = is_write_congestion_happened(fsc);
-
-		rc = move_dirty_folio_in_page_array(mapping, wbc, ceph_wbc,
-				folio);
-		if (rc) {
-			folio_redirty_for_writepage(wbc, folio);
-			folio_unlock(folio);
-			break;
-		}
-
-		ceph_wbc->fbatch.folios[i] = NULL;
-		ceph_wbc->len += folio_size(folio);
-	}
-
-	ceph_wbc->processed_in_fbatch = i;
-
-	return rc;
-}
-
-static inline
-void ceph_shift_unused_folios_left(struct folio_batch *fbatch)
-{
-	unsigned j, n = 0;
-
-	/* shift unused page to beginning of fbatch */
-	for (j = 0; j < folio_batch_count(fbatch); j++) {
-		if (!fbatch->folios[j])
-			continue;
-
-		if (n < j) {
-			fbatch->folios[n] = fbatch->folios[j];
-		}
-
-		n++;
-	}
-
-	fbatch->nr = n;
-}
-
-static
-int ceph_submit_write(struct address_space *mapping,
-			struct writeback_control *wbc,
-			struct ceph_writeback_ctl *ceph_wbc)
-{
-	struct inode *inode = mapping->host;
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct ceph_vino vino = ceph_vino(inode);
-	struct ceph_osd_request *req = NULL;
-	struct page *page = NULL;
-	bool caching = ceph_is_cache_enabled(inode);
-	u64 offset;
-	u64 len;
-	unsigned i;
-
-new_request:
-	offset = ceph_fscrypt_page_offset(ceph_wbc->pages[0]);
-	len = ceph_wbc->wsize;
-
-	req = ceph_osdc_new_request(&fsc->client->osdc,
-				    &ci->i_layout, vino,
-				    offset, &len, 0, ceph_wbc->num_ops,
-				    CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
-				    ceph_wbc->snapc, ceph_wbc->truncate_seq,
-				    ceph_wbc->truncate_size, false);
-	if (IS_ERR(req)) {
-		req = ceph_osdc_new_request(&fsc->client->osdc,
-					    &ci->i_layout, vino,
-					    offset, &len, 0,
-					    min(ceph_wbc->num_ops,
-						CEPH_OSD_SLAB_OPS),
-					    CEPH_OSD_OP_WRITE,
-					    CEPH_OSD_FLAG_WRITE,
-					    ceph_wbc->snapc,
-					    ceph_wbc->truncate_seq,
-					    ceph_wbc->truncate_size,
-					    true);
-		BUG_ON(IS_ERR(req));
-	}
-
-	page = ceph_wbc->pages[ceph_wbc->locked_pages - 1];
-	BUG_ON(len < ceph_fscrypt_page_offset(page) + thp_size(page) - offset);
-
-	if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) {
-		for (i = 0; i < folio_batch_count(&ceph_wbc->fbatch); i++) {
-			struct folio *folio = ceph_wbc->fbatch.folios[i];
-
-			if (!folio)
-				continue;
-
-			page = &folio->page;
-			redirty_page_for_writepage(wbc, page);
-			unlock_page(page);
-		}
-
-		for (i = 0; i < ceph_wbc->locked_pages; i++) {
-			page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]);
-
-			if (!page)
-				continue;
-
-			redirty_page_for_writepage(wbc, page);
-			unlock_page(page);
-		}
-
-		ceph_osdc_put_request(req);
-		return -EIO;
-	}
-
-	req->r_callback = writepages_finish;
-	req->r_inode = inode;
-
-	/* Format the osd request message and submit the write */
-	len = 0;
-	ceph_wbc->data_pages = ceph_wbc->pages;
-	ceph_wbc->op_idx = 0;
-	for (i = 0; i < ceph_wbc->locked_pages; i++) {
-		u64 cur_offset;
-
-		page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]);
-		cur_offset = page_offset(page);
-
-		/*
-		 * Discontinuity in page range? Ceph can handle that by just passing
-		 * multiple extents in the write op.
-		 */
-		if (offset + len != cur_offset) {
-			/* If it's full, stop here */
-			if (ceph_wbc->op_idx + 1 == req->r_num_ops)
-				break;
-
-			/* Kick off an fscache write with what we have so far. */
-			ceph_fscache_write_to_cache(inode, offset, len, caching);
-
-			/* Start a new extent */
-			osd_req_op_extent_dup_last(req, ceph_wbc->op_idx,
-						   cur_offset - offset);
-
-			doutc(cl, "got pages at %llu~%llu\n", offset, len);
-
-			osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx,
-							 ceph_wbc->data_pages,
-							 len, 0,
-							 ceph_wbc->from_pool,
-							 false);
-			osd_req_op_extent_update(req, ceph_wbc->op_idx, len);
-
-			len = 0;
-			offset = cur_offset;
-			ceph_wbc->data_pages = ceph_wbc->pages + i;
-			ceph_wbc->op_idx++;
-		}
-
-		set_page_writeback(page);
-
-		if (caching)
-			ceph_set_page_fscache(page);
-
-		len += thp_size(page);
-	}
-
-	ceph_fscache_write_to_cache(inode, offset, len, caching);
-
-	if (ceph_wbc->size_stable) {
-		len = min(len, ceph_wbc->i_size - offset);
-	} else if (i == ceph_wbc->locked_pages) {
-		/* writepages_finish() clears writeback pages
-		 * according to the data length, so make sure
-		 * data length covers all locked pages */
-		u64 min_len = len + 1 - thp_size(page);
-		len = get_writepages_data_length(inode,
-						 ceph_wbc->pages[i - 1],
-						 offset);
-		len = max(len, min_len);
-	}
-
-	if (IS_ENCRYPTED(inode))
-		len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE);
-
-	doutc(cl, "got pages at %llu~%llu\n", offset, len);
-
-	if (IS_ENCRYPTED(inode) &&
-	    ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) {
-		pr_warn_client(cl,
-			"bad encrypted write offset=%lld len=%llu\n",
-			offset, len);
-	}
-
-	osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx,
-					 ceph_wbc->data_pages, len,
-					 0, ceph_wbc->from_pool, false);
-	osd_req_op_extent_update(req, ceph_wbc->op_idx, len);
-
-	BUG_ON(ceph_wbc->op_idx + 1 != req->r_num_ops);
-
-	ceph_wbc->from_pool = false;
-	if (i < ceph_wbc->locked_pages) {
-		BUG_ON(ceph_wbc->num_ops <= req->r_num_ops);
-		ceph_wbc->num_ops -= req->r_num_ops;
-		ceph_wbc->locked_pages -= i;
-
-		/* allocate new pages array for next request */
-		ceph_wbc->data_pages = ceph_wbc->pages;
-		__ceph_allocate_page_array(ceph_wbc, ceph_wbc->locked_pages);
-		memcpy(ceph_wbc->pages, ceph_wbc->data_pages + i,
-			ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages));
-		memset(ceph_wbc->data_pages + i, 0,
-			ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages));
-	} else {
-		BUG_ON(ceph_wbc->num_ops != req->r_num_ops);
-		/* request message now owns the pages array */
-		ceph_wbc->pages = NULL;
-	}
-
-	req->r_mtime = inode_get_mtime(inode);
-	ceph_osdc_start_request(&fsc->client->osdc, req);
-	req = NULL;
-
-	wbc->nr_to_write -= i;
-	if (ceph_wbc->pages)
-		goto new_request;
-
-	return 0;
-}
-
-static
-void ceph_wait_until_current_writes_complete(struct address_space *mapping,
-					     struct writeback_control *wbc,
-					     struct ceph_writeback_ctl *ceph_wbc)
-{
-	struct page *page;
-	unsigned i, nr;
-
-	if (wbc->sync_mode != WB_SYNC_NONE &&
-	    ceph_wbc->start_index == 0 && /* all dirty pages were checked */
-	    !ceph_wbc->head_snapc) {
-		ceph_wbc->index = 0;
-
-		while ((ceph_wbc->index <= ceph_wbc->end) &&
-			(nr = filemap_get_folios_tag(mapping,
-						     &ceph_wbc->index,
-						     (pgoff_t)-1,
-						     PAGECACHE_TAG_WRITEBACK,
-						     &ceph_wbc->fbatch))) {
-			for (i = 0; i < nr; i++) {
-				page = &ceph_wbc->fbatch.folios[i]->page;
-				if (page_snap_context(page) != ceph_wbc->snapc)
-					continue;
-				wait_on_page_writeback(page);
-			}
-
-			folio_batch_release(&ceph_wbc->fbatch);
-			cond_resched();
-		}
-	}
-}
-
 /*
- * initiate async writeback
+ * Dirty a page.  Optimistically adjust accounting, on the assumption
+ * that we won't race with invalidate.  If we do, readjust.
  */
-static int ceph_writepages_start(struct address_space *mapping,
-				 struct writeback_control *wbc)
+bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio)
 {
 	struct inode *inode = mapping->host;
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct ceph_writeback_ctl ceph_wbc;
-	int rc = 0;
-
-	if (wbc->sync_mode == WB_SYNC_NONE && fsc->write_congested)
-		return 0;
-
-	doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode),
-	      wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
-	      (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
-
-	if (is_forced_umount(mapping)) {
-		/* we're in a forced umount, don't write! */
-		return -EIO;
-	}
-
-	ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc);
-
-	if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) {
-		rc = -EIO;
-		goto out;
-	}
-
-retry:
-	rc = ceph_define_writeback_range(mapping, wbc, &ceph_wbc);
-	if (rc == -ENODATA) {
-		/* hmm, why does writepages get called when there
-		   is no dirty data? */
-		rc = 0;
-		goto dec_osd_stopping_blocker;
-	}
-
-	if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages)
-		tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end);
-
-	while (!has_writeback_done(&ceph_wbc)) {
-		ceph_wbc.locked_pages = 0;
-		ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT;
-
-get_more_pages:
-		ceph_folio_batch_reinit(&ceph_wbc);
-
-		ceph_wbc.nr_folios = filemap_get_folios_tag(mapping,
-							    &ceph_wbc.index,
-							    ceph_wbc.end,
-							    ceph_wbc.tag,
-							    &ceph_wbc.fbatch);
-		doutc(cl, "pagevec_lookup_range_tag for tag %#x got %d\n",
-			ceph_wbc.tag, ceph_wbc.nr_folios);
-
-		if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages)
-			break;
-
-process_folio_batch:
-		rc = ceph_process_folio_batch(mapping, wbc, &ceph_wbc);
-		if (rc)
-			goto release_folios;
-
-		/* did we get anything? */
-		if (!ceph_wbc.locked_pages)
-			goto release_folios;
-
-		if (ceph_wbc.processed_in_fbatch) {
-			ceph_shift_unused_folios_left(&ceph_wbc.fbatch);
-
-			if (folio_batch_count(&ceph_wbc.fbatch) == 0 &&
-			    ceph_wbc.locked_pages < ceph_wbc.max_pages) {
-				doutc(cl, "reached end fbatch, trying for more\n");
-				goto get_more_pages;
-			}
-		}
-
-		rc = ceph_submit_write(mapping, wbc, &ceph_wbc);
-		if (rc)
-			goto release_folios;
-
-		ceph_wbc.locked_pages = 0;
-		ceph_wbc.strip_unit_end = 0;
-
-		if (folio_batch_count(&ceph_wbc.fbatch) > 0) {
-			ceph_wbc.nr_folios =
-				folio_batch_count(&ceph_wbc.fbatch);
-			goto process_folio_batch;
-		}
-
-		/*
-		 * We stop writing back only if we are not doing
-		 * integrity sync. In case of integrity sync we have to
-		 * keep going until we have written all the pages
-		 * we tagged for writeback prior to entering this loop.
-		 */
-		if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
-			ceph_wbc.done = true;
-
-release_folios:
-		doutc(cl, "folio_batch release on %d folios (%p)\n",
-		      (int)ceph_wbc.fbatch.nr,
-		      ceph_wbc.fbatch.nr ? ceph_wbc.fbatch.folios[0] : NULL);
-		folio_batch_release(&ceph_wbc.fbatch);
-	}
-
-	if (ceph_wbc.should_loop && !ceph_wbc.done) {
-		/* more to do; loop back to beginning of file */
-		doutc(cl, "looping back to beginning of file\n");
-		/* OK even when start_index == 0 */
-		ceph_wbc.end = ceph_wbc.start_index - 1;
-
-		/* to write dirty pages associated with next snapc,
-		 * we need to wait until current writes complete */
-		ceph_wait_until_current_writes_complete(mapping, wbc, &ceph_wbc);
-
-		ceph_wbc.start_index = 0;
-		ceph_wbc.index = 0;
-		goto retry;
-	}
-
-	if (wbc->range_cyclic || (ceph_wbc.range_whole && wbc->nr_to_write > 0))
-		mapping->writeback_index = ceph_wbc.index;
-
-dec_osd_stopping_blocker:
-	ceph_dec_osd_stopping_blocker(fsc->mdsc);
-
-out:
-	ceph_put_snap_context(ceph_wbc.last_snapc);
-	doutc(cl, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode),
-	      rc);
-
-	return rc;
-}
-
-/*
- * See if a given @snapc is either writeable, or already written.
- */
-static int context_is_writeable_or_written(struct inode *inode,
-					   struct ceph_snap_context *snapc)
-{
-	struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL);
-	int ret = !oldest || snapc->seq <= oldest->seq;
-
-	ceph_put_snap_context(oldest);
-	return ret;
-}
-
-/**
- * ceph_find_incompatible - find an incompatible context and return it
- * @folio: folio being dirtied
- *
- * We are only allowed to write into/dirty a folio if the folio is
- * clean, or already dirty within the same snap context. Returns a
- * conflicting context if there is one, NULL if there isn't, or a
- * negative error code on other errors.
- *
- * Must be called with folio lock held.
- */
-static struct ceph_snap_context *
-ceph_find_incompatible(struct folio *folio)
-{
-	struct inode *inode = folio->mapping->host;
 	struct ceph_client *cl = ceph_inode_to_client(inode);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-
-	if (ceph_inode_is_shutdown(inode)) {
-		doutc(cl, " %llx.%llx folio %p is shutdown\n",
-		      ceph_vinop(inode), folio);
-		return ERR_PTR(-ESTALE);
-	}
-
-	for (;;) {
-		struct ceph_snap_context *snapc, *oldest;
-
-		folio_wait_writeback(folio);
-
-		snapc = page_snap_context(&folio->page);
-		if (!snapc || snapc == ci->i_head_snapc)
-			break;
-
-		/*
-		 * this folio is already dirty in another (older) snap
-		 * context!  is it writeable now?
-		 */
-		oldest = get_oldest_context(inode, NULL, NULL);
-		if (snapc->seq > oldest->seq) {
-			/* not writeable -- return it for the caller to deal with */
-			ceph_put_snap_context(oldest);
-			doutc(cl, " %llx.%llx folio %p snapc %p not current or oldest\n",
-			      ceph_vinop(inode), folio, snapc);
-			return ceph_get_snap_context(snapc);
-		}
-		ceph_put_snap_context(oldest);
-
-		/* yay, writeable, do it now (without dropping folio lock) */
-		doutc(cl, " %llx.%llx folio %p snapc %p not current, but oldest\n",
-		      ceph_vinop(inode), folio, snapc);
-		if (folio_clear_dirty_for_io(folio)) {
-			int r = write_folio_nounlock(folio, NULL);
-			if (r < 0)
-				return ERR_PTR(r);
-		}
-	}
-	return NULL;
-}
-
-static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
-					struct folio **foliop, void **_fsdata)
-{
-	struct inode *inode = file_inode(file);
-	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
+	struct ceph_inode_info *ci;
 	struct ceph_snap_context *snapc;
+	struct netfs_group *group;
 
-	snapc = ceph_find_incompatible(*foliop);
-	if (snapc) {
-		int r;
-
-		folio_unlock(*foliop);
-		folio_put(*foliop);
-		*foliop = NULL;
-		if (IS_ERR(snapc))
-			return PTR_ERR(snapc);
-
-		ceph_queue_writeback(inode);
-		r = wait_event_killable(ci->i_cap_wq,
-					context_is_writeable_or_written(inode, snapc));
-		ceph_put_snap_context(snapc);
-		return r == 0 ? -EAGAIN : r;
+	if (folio_test_dirty(folio)) {
+		doutc(cl, "%llx.%llx %p idx %lu -- already dirty\n",
+		      ceph_vinop(inode), folio, folio->index);
+		VM_BUG_ON_FOLIO(!folio_test_private(folio), folio);
+		return false;
 	}
-	return 0;
-}
-
-/*
- * We are only allowed to write into/dirty the page if the page is
- * clean, or already dirty within the same snap context.
- */
-static int ceph_write_begin(struct file *file, struct address_space *mapping,
-			    loff_t pos, unsigned len,
-			    struct folio **foliop, void **fsdata)
-{
-	struct inode *inode = file_inode(file);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	int r;
-
-	r = netfs_write_begin(&ci->netfs, file, inode->i_mapping, pos, len, foliop, NULL);
-	if (r < 0)
-		return r;
 
-	folio_wait_private_2(*foliop); /* [DEPRECATED] */
-	WARN_ON_ONCE(!folio_test_locked(*foliop));
-	return 0;
-}
+	atomic64_inc(&mdsc->dirty_folios);
 
-/*
- * we don't do anything in here that simple_write_end doesn't do
- * except adjust dirty page accounting
- */
-static int ceph_write_end(struct file *file, struct address_space *mapping,
-			  loff_t pos, unsigned len, unsigned copied,
-			  struct folio *folio, void *fsdata)
-{
-	struct inode *inode = file_inode(file);
-	struct ceph_client *cl = ceph_inode_to_client(inode);
-	bool check_cap = false;
+	ci = ceph_inode(inode);
 
-	doutc(cl, "%llx.%llx file %p folio %p %d~%d (%d)\n", ceph_vinop(inode),
-	      file, folio, (int)pos, (int)copied, (int)len);
+	/* dirty the head */
+	spin_lock(&ci->i_ceph_lock);
+	if (__ceph_have_pending_cap_snap(ci)) {
+		struct ceph_cap_snap *capsnap =
+			list_last_entry(&ci->i_cap_snaps,
+					struct ceph_cap_snap,
+					ci_item);
+		snapc = capsnap->context;
+		capsnap->dirty_pages++;
+	} else {
+		snapc = ci->i_head_snapc;
+		BUG_ON(!snapc);
+		++ci->i_wrbuffer_ref_head;
+	}
 
-	if (!folio_test_uptodate(folio)) {
-		/* just return that nothing was copied on a short copy */
-		if (copied < len) {
-			copied = 0;
-			goto out;
+	/* Attach a reference to the snap/group to the folio. */
+	group = netfs_folio_group(folio);
+	if (group != &snapc->group) {
+		netfs_set_group(folio, &snapc->group);
+		if (group) {
+			doutc(cl, "Different group %px != %px\n",
+			      group, &snapc->group);
+			netfs_put_group(group);
 		}
-		folio_mark_uptodate(folio);
 	}
 
-	/* did file size increase? */
-	if (pos+copied > i_size_read(inode))
-		check_cap = ceph_inode_set_size(inode, pos+copied);
-
-	folio_mark_dirty(folio);
-
-out:
-	folio_unlock(folio);
-	folio_put(folio);
-
-	if (check_cap)
-		ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY);
+	if (ci->i_wrbuffer_ref == 0)
+		ihold(inode);
+	++ci->i_wrbuffer_ref;
+	doutc(cl, "%llx.%llx %p idx %lu head %d/%d -> %d/%d "
+	      "snapc %p seq %lld (%d snaps)\n",
+	      ceph_vinop(inode), folio, folio->index,
+	      ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
+	      ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
+	      snapc, snapc->seq, snapc->num_snaps);
+	spin_unlock(&ci->i_ceph_lock);
 
-	return copied;
+	return netfs_dirty_folio(mapping, folio);
 }
 
-const struct address_space_operations ceph_aops = {
-	.read_folio = netfs_read_folio,
-	.readahead = netfs_readahead,
-	.writepages = ceph_writepages_start,
-	.write_begin = ceph_write_begin,
-	.write_end = ceph_write_end,
-	.dirty_folio = ceph_dirty_folio,
-	.invalidate_folio = ceph_invalidate_folio,
-	.release_folio = netfs_release_folio,
-	.direct_IO = noop_direct_IO,
-	.migrate_folio = filemap_migrate_folio,
-};
-#endif // TODO: Remove after netfs conversion
-
 static void ceph_block_sigs(sigset_t *oldset)
 {
 	sigset_t mask;
@@ -2046,112 +226,6 @@ static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
 	return ret;
 }
 
-#if 0 // TODO: Remove after netfs conversion
-static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
-{
-	struct vm_area_struct *vma = vmf->vma;
-	struct inode *inode = file_inode(vma->vm_file);
-	struct ceph_client *cl = ceph_inode_to_client(inode);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_file_info *fi = vma->vm_file->private_data;
-	struct ceph_cap_flush *prealloc_cf;
-	struct folio *folio = page_folio(vmf->page);
-	loff_t off = folio_pos(folio);
-	loff_t size = i_size_read(inode);
-	size_t len;
-	int want, got, err;
-	sigset_t oldset;
-	vm_fault_t ret = VM_FAULT_SIGBUS;
-
-	if (ceph_inode_is_shutdown(inode))
-		return ret;
-
-	prealloc_cf = ceph_alloc_cap_flush();
-	if (!prealloc_cf)
-		return VM_FAULT_OOM;
-
-	sb_start_pagefault(inode->i_sb);
-	ceph_block_sigs(&oldset);
-
-	if (off + folio_size(folio) <= size)
-		len = folio_size(folio);
-	else
-		len = offset_in_folio(folio, size);
-
-	doutc(cl, "%llx.%llx %llu~%zd getting caps i_size %llu\n",
-	      ceph_vinop(inode), off, len, size);
-	if (fi->fmode & CEPH_FILE_MODE_LAZY)
-		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
-	else
-		want = CEPH_CAP_FILE_BUFFER;
-
-	got = 0;
-	err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len, &got);
-	if (err < 0)
-		goto out_free;
-
-	doutc(cl, "%llx.%llx %llu~%zd got cap refs on %s\n", ceph_vinop(inode),
-	      off, len, ceph_cap_string(got));
-
-	/* Update time before taking folio lock */
-	file_update_time(vma->vm_file);
-	inode_inc_iversion_raw(inode);
-
-	do {
-		struct ceph_snap_context *snapc;
-
-		folio_lock(folio);
-
-		if (folio_mkwrite_check_truncate(folio, inode) < 0) {
-			folio_unlock(folio);
-			ret = VM_FAULT_NOPAGE;
-			break;
-		}
-
-		snapc = ceph_find_incompatible(folio);
-		if (!snapc) {
-			/* success.  we'll keep the folio locked. */
-			folio_mark_dirty(folio);
-			ret = VM_FAULT_LOCKED;
-			break;
-		}
-
-		folio_unlock(folio);
-
-		if (IS_ERR(snapc)) {
-			ret = VM_FAULT_SIGBUS;
-			break;
-		}
-
-		ceph_queue_writeback(inode);
-		err = wait_event_killable(ci->i_cap_wq,
-				context_is_writeable_or_written(inode, snapc));
-		ceph_put_snap_context(snapc);
-	} while (err == 0);
-
-	if (ret == VM_FAULT_LOCKED) {
-		int dirty;
-		spin_lock(&ci->i_ceph_lock);
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
-					       &prealloc_cf);
-		spin_unlock(&ci->i_ceph_lock);
-		if (dirty)
-			__mark_inode_dirty(inode, dirty);
-	}
-
-	doutc(cl, "%llx.%llx %llu~%zd dropping cap refs on %s ret %x\n",
-	      ceph_vinop(inode), off, len, ceph_cap_string(got), ret);
-	ceph_put_cap_refs_async(ci, got);
-out_free:
-	ceph_restore_sigs(&oldset);
-	sb_end_pagefault(inode->i_sb);
-	ceph_free_cap_flush(prealloc_cf);
-	if (err < 0)
-		ret = vmf_error(err);
-	return ret;
-}
-#endif // TODO: Remove after netfs conversion
-
 void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
 			   char	*data, size_t len)
 {
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 94b91b5bc843..d7684f4b2e10 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -77,97 +77,6 @@ static __le32 ceph_flags_sys2wire(struct ceph_mds_client *mdsc, u32 flags)
  * need to wait for MDS acknowledgement.
  */
 
-#if 0 // TODO: Remove after netfs conversion
-/*
- * How many pages to get in one call to iov_iter_get_pages().  This
- * determines the size of the on-stack array used as a buffer.
- */
-#define ITER_GET_BVECS_PAGES	64
-
-static int __iter_get_bvecs(struct iov_iter *iter, size_t maxsize,
-			    struct ceph_databuf *dbuf)
-{
-	size_t size = 0;
-
-	if (maxsize > iov_iter_count(iter))
-		maxsize = iov_iter_count(iter);
-
-	while (size < maxsize) {
-		struct page *pages[ITER_GET_BVECS_PAGES];
-		ssize_t bytes;
-		size_t start;
-		int idx = 0;
-
-		bytes = iov_iter_get_pages2(iter, pages, maxsize - size,
-					    ITER_GET_BVECS_PAGES, &start);
-		if (bytes < 0) {
-			if (size == 0)
-				return bytes;
-			break;
-		}
-
-		while (bytes) {
-			int len = min_t(int, bytes, PAGE_SIZE - start);
-
-			ceph_databuf_append_page(dbuf, pages[idx++], start, len);
-			bytes -= len;
-			size += len;
-			start = 0;
-		}
-	}
-
-	return 0;
-}
-
-/*
- * iov_iter_get_pages() only considers one iov_iter segment, no matter
- * what maxsize or maxpages are given.  For ITER_BVEC that is a single
- * page.
- *
- * Attempt to get up to @maxsize bytes worth of pages from @iter.
- * Return the number of bytes in the created bio_vec array, or an error.
- */
-static struct ceph_databuf *iter_get_bvecs_alloc(struct iov_iter *iter,
-						 size_t maxsize, bool write)
-{
-	struct ceph_databuf *dbuf;
-	size_t orig_count = iov_iter_count(iter);
-	int npages, ret;
-
-	iov_iter_truncate(iter, maxsize);
-	npages = iov_iter_npages(iter, INT_MAX);
-	iov_iter_reexpand(iter, orig_count);
-
-	if (write)
-		dbuf = ceph_databuf_req_alloc(npages, 0, GFP_KERNEL);
-	else
-		dbuf = ceph_databuf_reply_alloc(npages, 0, GFP_KERNEL);
-	if (!dbuf)
-		return ERR_PTR(-ENOMEM);
-
-	ret = __iter_get_bvecs(iter, maxsize, dbuf);
-	if (ret < 0) {
-		/*
-		 * No pages were pinned -- just free the array.
-		 */
-		ceph_databuf_release(dbuf);
-		return ERR_PTR(ret);
-	}
-
-	return dbuf;
-}
-
-static void ceph_dirty_pages(struct ceph_databuf *dbuf)
-{
-	struct bio_vec *bvec = dbuf->bvec;
-	int i;
-
-	for (i = 0; i < dbuf->nr_bvec; i++)
-		if (bvec[i].bv_page)
-			set_page_dirty_lock(bvec[i].bv_page);
-}
-#endif // TODO: Remove after netfs conversion
-
 /*
  * Prepare an open request.  Preallocate ceph_cap to avoid an
  * inopportune ENOMEM later.
@@ -1023,1222 +932,6 @@ int ceph_release(struct inode *inode, struct file *file)
 	return 0;
 }
 
-#if 0 // TODO: Remove after netfs conversion
-enum {
-	HAVE_RETRIED = 1,
-	CHECK_EOF =    2,
-	READ_INLINE =  3,
-};
-
-/*
- * Completely synchronous read and write methods.  Direct from __user
- * buffer to osd, or directly to user pages (if O_DIRECT).
- *
- * If the read spans object boundary, just do multiple reads.  (That's not
- * atomic, but good enough for now.)
- *
- * If we get a short result from the OSD, check against i_size; we need to
- * only return a short read to the caller if we hit EOF.
- */
-ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,
-			 struct iov_iter *to, int *retry_op,
-			 u64 *last_objver)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct ceph_osd_client *osdc = &fsc->client->osdc;
-	ssize_t ret;
-	u64 off = *ki_pos;
-	u64 len = iov_iter_count(to);
-	u64 i_size = i_size_read(inode);
-	bool sparse = IS_ENCRYPTED(inode) || ceph_test_mount_opt(fsc, SPARSEREAD);
-	u64 objver = 0;
-
-	doutc(cl, "on inode %p %llx.%llx %llx~%llx\n", inode,
-	      ceph_vinop(inode), *ki_pos, len);
-
-	if (ceph_inode_is_shutdown(inode))
-		return -EIO;
-
-	if (!len || !i_size)
-		return 0;
-	/*
-	 * flush any page cache pages in this range.  this
-	 * will make concurrent normal and sync io slow,
-	 * but it will at least behave sensibly when they are
-	 * in sequence.
-	 */
-	ret = filemap_write_and_wait_range(inode->i_mapping,
-					   off, off + len - 1);
-	if (ret < 0)
-		return ret;
-
-	ret = 0;
-	while ((len = iov_iter_count(to)) > 0) {
-		struct ceph_osd_request *req;
-		struct page **pages;
-		int num_pages;
-		size_t page_off;
-		bool more;
-		int idx = 0;
-		size_t left;
-		struct ceph_osd_req_op *op;
-		u64 read_off = off;
-		u64 read_len = len;
-		int extent_cnt;
-
-		/* determine new offset/length if encrypted */
-		ceph_fscrypt_adjust_off_and_len(inode, &read_off, &read_len);
-
-		doutc(cl, "orig %llu~%llu reading %llu~%llu", off, len,
-		      read_off, read_len);
-
-		req = ceph_osdc_new_request(osdc, &ci->i_layout,
-					ci->i_vino, read_off, &read_len, 0, 1,
-					sparse ? CEPH_OSD_OP_SPARSE_READ :
-						 CEPH_OSD_OP_READ,
-					CEPH_OSD_FLAG_READ,
-					NULL, ci->i_truncate_seq,
-					ci->i_truncate_size, false);
-		if (IS_ERR(req)) {
-			ret = PTR_ERR(req);
-			break;
-		}
-
-		/* adjust len downward if the request truncated the len */
-		if (off + len > read_off + read_len)
-			len = read_off + read_len - off;
-		more = len < iov_iter_count(to);
-
-		op = &req->r_ops[0];
-		if (sparse) {
-			extent_cnt = __ceph_sparse_read_ext_count(inode, read_len);
-			ret = ceph_alloc_sparse_ext_map(op, extent_cnt);
-			if (ret) {
-				ceph_osdc_put_request(req);
-				break;
-			}
-		}
-
-		num_pages = calc_pages_for(read_off, read_len);
-		page_off = offset_in_page(off);
-		pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-		if (IS_ERR(pages)) {
-			ceph_osdc_put_request(req);
-			ret = PTR_ERR(pages);
-			break;
-		}
-
-		osd_req_op_extent_osd_data_pages(req, 0, pages, read_len,
-						 offset_in_page(read_off),
-						 false, true);
-
-		ceph_osdc_start_request(osdc, req);
-		ret = ceph_osdc_wait_request(osdc, req);
-
-		ceph_update_read_metrics(&fsc->mdsc->metric,
-					 req->r_start_latency,
-					 req->r_end_latency,
-					 read_len, ret);
-
-		if (ret > 0)
-			objver = req->r_version;
-
-		i_size = i_size_read(inode);
-		doutc(cl, "%llu~%llu got %zd i_size %llu%s\n", off, len,
-		      ret, i_size, (more ? " MORE" : ""));
-
-		/* Fix it to go to end of extent map */
-		if (sparse && ret >= 0)
-			ret = ceph_sparse_ext_map_end(op);
-		else if (ret == -ENOENT)
-			ret = 0;
-
-		if (ret < 0) {
-			ceph_osdc_put_request(req);
-			if (ret == -EBLOCKLISTED)
-				fsc->blocklisted = true;
-			break;
-		}
-
-		if (IS_ENCRYPTED(inode)) {
-			int fret;
-
-			fret = ceph_fscrypt_decrypt_extents(inode, pages,
-					read_off, op->extent.sparse_ext,
-					op->extent.sparse_ext_cnt);
-			if (fret < 0) {
-				ret = fret;
-				ceph_osdc_put_request(req);
-				break;
-			}
-
-			/* account for any partial block at the beginning */
-			fret -= (off - read_off);
-
-			/*
-			 * Short read after big offset adjustment?
-			 * Nothing is usable, just call it a zero
-			 * len read.
-			 */
-			fret = max(fret, 0);
-
-			/* account for partial block at the end */
-			ret = min_t(ssize_t, fret, len);
-		}
-
-		/* Short read but not EOF? Zero out the remainder. */
-		if (ret < len && (off + ret < i_size)) {
-			int zlen = min(len - ret, i_size - off - ret);
-			int zoff = page_off + ret;
-
-			doutc(cl, "zero gap %llu~%llu\n", off + ret,
-			      off + ret + zlen);
-			ceph_zero_page_vector_range(zoff, zlen, pages);
-			ret += zlen;
-		}
-
-		if (off + ret > i_size)
-			left = (i_size > off) ? i_size - off : 0;
-		else
-			left = ret;
-
-		while (left > 0) {
-			size_t plen, copied;
-
-			plen = min_t(size_t, left, PAGE_SIZE - page_off);
-			SetPageUptodate(pages[idx]);
-			copied = copy_page_to_iter(pages[idx++],
-						   page_off, plen, to);
-			off += copied;
-			left -= copied;
-			page_off = 0;
-			if (copied < plen) {
-				ret = -EFAULT;
-				break;
-			}
-		}
-
-		ceph_osdc_put_request(req);
-
-		if (off >= i_size || !more)
-			break;
-	}
-
-	if (ret > 0) {
-		if (off >= i_size) {
-			*retry_op = CHECK_EOF;
-			ret = i_size - *ki_pos;
-			*ki_pos = i_size;
-		} else {
-			ret = off - *ki_pos;
-			*ki_pos = off;
-		}
-
-		if (last_objver)
-			*last_objver = objver;
-	}
-	doutc(cl, "result %zd retry_op %d\n", ret, *retry_op);
-	return ret;
-}
-
-static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
-			      int *retry_op)
-{
-	struct file *file = iocb->ki_filp;
-	struct inode *inode = file_inode(file);
-	struct ceph_client *cl = ceph_inode_to_client(inode);
-
-	doutc(cl, "on file %p %llx~%zx %s\n", file, iocb->ki_pos,
-	      iov_iter_count(to),
-	      (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
-
-	return __ceph_sync_read(inode, &iocb->ki_pos, to, retry_op, NULL);
-}
-
-struct ceph_aio_request {
-	struct kiocb *iocb;
-	size_t total_len;
-	bool write;
-	bool should_dirty;
-	int error;
-	struct list_head osd_reqs;
-	unsigned num_reqs;
-	atomic_t pending_reqs;
-	struct timespec64 mtime;
-	struct ceph_cap_flush *prealloc_cf;
-};
-
-struct ceph_aio_work {
-	struct work_struct work;
-	struct ceph_osd_request *req;
-};
-
-static void ceph_aio_retry_work(struct work_struct *work);
-
-static void ceph_aio_complete(struct inode *inode,
-			      struct ceph_aio_request *aio_req)
-{
-	struct ceph_client *cl = ceph_inode_to_client(inode);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	int ret;
-
-	if (!atomic_dec_and_test(&aio_req->pending_reqs))
-		return;
-
-	if (aio_req->iocb->ki_flags & IOCB_DIRECT)
-		inode_dio_end(inode);
-
-	ret = aio_req->error;
-	if (!ret)
-		ret = aio_req->total_len;
-
-	doutc(cl, "%p %llx.%llx rc %d\n", inode, ceph_vinop(inode), ret);
-
-	if (ret >= 0 && aio_req->write) {
-		int dirty;
-
-		loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
-		if (endoff > i_size_read(inode)) {
-			if (ceph_inode_set_size(inode, endoff))
-				ceph_check_caps(ci, CHECK_CAPS_AUTHONLY);
-		}
-
-		spin_lock(&ci->i_ceph_lock);
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
-					       &aio_req->prealloc_cf);
-		spin_unlock(&ci->i_ceph_lock);
-		if (dirty)
-			__mark_inode_dirty(inode, dirty);
-
-	}
-
-	ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
-						CEPH_CAP_FILE_RD));
-
-	aio_req->iocb->ki_complete(aio_req->iocb, ret);
-
-	ceph_free_cap_flush(aio_req->prealloc_cf);
-	kfree(aio_req);
-}
-
-static void ceph_aio_complete_req(struct ceph_osd_request *req)
-{
-	int rc = req->r_result;
-	struct inode *inode = req->r_inode;
-	struct ceph_aio_request *aio_req = req->r_priv;
-	struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
-	struct ceph_osd_req_op *op = &req->r_ops[0];
-	struct ceph_client_metric *metric = &ceph_sb_to_mdsc(inode->i_sb)->metric;
-	size_t len = osd_data->iter.count;
-	bool sparse = (op->op == CEPH_OSD_OP_SPARSE_READ);
-	struct ceph_client *cl = ceph_inode_to_client(inode);
-
-	doutc(cl, "req %p inode %p %llx.%llx, rc %d bytes %zu\n", req,
-	      inode, ceph_vinop(inode), rc, len);
-
-	if (rc == -EOLDSNAPC) {
-		struct ceph_aio_work *aio_work;
-		BUG_ON(!aio_req->write);
-
-		aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
-		if (aio_work) {
-			INIT_WORK(&aio_work->work, ceph_aio_retry_work);
-			aio_work->req = req;
-			queue_work(ceph_inode_to_fs_client(inode)->inode_wq,
-				   &aio_work->work);
-			return;
-		}
-		rc = -ENOMEM;
-	} else if (!aio_req->write) {
-		if (sparse && rc >= 0)
-			rc = ceph_sparse_ext_map_end(op);
-		if (rc == -ENOENT)
-			rc = 0;
-		if (rc >= 0 && len > rc) {
-			int zlen = len - rc;
-
-			/*
-			 * If read is satisfied by single OSD request,
-			 * it can pass EOF. Otherwise read is within
-			 * i_size.
-			 */
-			if (aio_req->num_reqs == 1) {
-				loff_t i_size = i_size_read(inode);
-				loff_t endoff = aio_req->iocb->ki_pos + rc;
-				if (endoff < i_size)
-					zlen = min_t(size_t, zlen,
-						     i_size - endoff);
-				aio_req->total_len = rc + zlen;
-			}
-
-			iov_iter_advance(&osd_data->iter, rc);
-			iov_iter_zero(zlen, &osd_data->iter);
-		}
-	}
-
-	/* r_start_latency == 0 means the request was not submitted */
-	if (req->r_start_latency) {
-		if (aio_req->write)
-			ceph_update_write_metrics(metric, req->r_start_latency,
-						  req->r_end_latency, len, rc);
-		else
-			ceph_update_read_metrics(metric, req->r_start_latency,
-						 req->r_end_latency, len, rc);
-	}
-
-	if (aio_req->should_dirty)
-		ceph_dirty_pages(osd_data->dbuf);
-	ceph_osdc_put_request(req);
-
-	if (rc < 0)
-		cmpxchg(&aio_req->error, 0, rc);
-
-	ceph_aio_complete(inode, aio_req);
-	return;
-}
-
-static void ceph_aio_retry_work(struct work_struct *work)
-{
-	struct ceph_aio_work *aio_work =
-		container_of(work, struct ceph_aio_work, work);
-	struct ceph_osd_request *orig_req = aio_work->req;
-	struct ceph_aio_request *aio_req = orig_req->r_priv;
-	struct inode *inode = orig_req->r_inode;
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_snap_context *snapc;
-	struct ceph_osd_request *req;
-	int ret;
-
-	spin_lock(&ci->i_ceph_lock);
-	if (__ceph_have_pending_cap_snap(ci)) {
-		struct ceph_cap_snap *capsnap =
-			list_last_entry(&ci->i_cap_snaps,
-					struct ceph_cap_snap,
-					ci_item);
-		snapc = ceph_get_snap_context(capsnap->context);
-	} else {
-		BUG_ON(!ci->i_head_snapc);
-		snapc = ceph_get_snap_context(ci->i_head_snapc);
-	}
-	spin_unlock(&ci->i_ceph_lock);
-
-	req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 1,
-			false, GFP_NOFS);
-	if (!req) {
-		ret = -ENOMEM;
-		req = orig_req;
-		goto out;
-	}
-
-	req->r_flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
-	ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
-	ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
-
-	req->r_ops[0] = orig_req->r_ops[0];
-
-	req->r_mtime = aio_req->mtime;
-	req->r_data_offset = req->r_ops[0].extent.offset;
-
-	ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
-	if (ret) {
-		ceph_osdc_put_request(req);
-		req = orig_req;
-		goto out;
-	}
-
-	ceph_osdc_put_request(orig_req);
-
-	req->r_callback = ceph_aio_complete_req;
-	req->r_inode = inode;
-	req->r_priv = aio_req;
-
-	ceph_osdc_start_request(req->r_osdc, req);
-out:
-	if (ret < 0) {
-		req->r_result = ret;
-		ceph_aio_complete_req(req);
-	}
-
-	ceph_put_snap_context(snapc);
-	kfree(aio_work);
-}
-
-static ssize_t
-ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
-		       struct ceph_snap_context *snapc,
-		       struct ceph_cap_flush **pcf)
-{
-	struct file *file = iocb->ki_filp;
-	struct inode *inode = file_inode(file);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct ceph_client_metric *metric = &fsc->mdsc->metric;
-	struct ceph_vino vino;
-	struct ceph_osd_request *req;
-	struct ceph_aio_request *aio_req = NULL;
-	struct ceph_databuf *dbuf = NULL;
-	int flags;
-	int ret = 0;
-	struct timespec64 mtime = current_time(inode);
-	size_t count = iov_iter_count(iter);
-	loff_t pos = iocb->ki_pos;
-	bool write = iov_iter_rw(iter) == WRITE;
-	bool should_dirty = !write && user_backed_iter(iter);
-	bool sparse = ceph_test_mount_opt(fsc, SPARSEREAD);
-
-	if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
-		return -EROFS;
-
-	doutc(cl, "sync_direct_%s on file %p %lld~%u snapc %p seq %lld\n",
-	      (write ? "write" : "read"), file, pos, (unsigned)count,
-	      snapc, snapc ? snapc->seq : 0);
-
-	if (write) {
-		int ret2;
-
-		ceph_fscache_invalidate(inode, true);
-
-		ret2 = invalidate_inode_pages2_range(inode->i_mapping,
-					pos >> PAGE_SHIFT,
-					(pos + count - 1) >> PAGE_SHIFT);
-		if (ret2 < 0)
-			doutc(cl, "invalidate_inode_pages2_range returned %d\n",
-			      ret2);
-
-		flags = /* CEPH_OSD_FLAG_ORDERSNAP | */ CEPH_OSD_FLAG_WRITE;
-	} else {
-		flags = CEPH_OSD_FLAG_READ;
-	}
-
-	while (iov_iter_count(iter) > 0) {
-		u64 size = iov_iter_count(iter);
-		struct ceph_osd_req_op *op;
-		size_t len;
-		int readop = sparse ? CEPH_OSD_OP_SPARSE_READ : CEPH_OSD_OP_READ;
-		int extent_cnt;
-
-		if (write)
-			size = min_t(u64, size, fsc->mount_options->wsize);
-		else
-			size = min_t(u64, size, fsc->mount_options->rsize);
-
-		vino = ceph_vino(inode);
-		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-					    vino, pos, &size, 0,
-					    1,
-					    write ? CEPH_OSD_OP_WRITE : readop,
-					    flags, snapc,
-					    ci->i_truncate_seq,
-					    ci->i_truncate_size,
-					    false);
-		if (IS_ERR(req)) {
-			ret = PTR_ERR(req);
-			break;
-		}
-
-		op = &req->r_ops[0];
-		if (!write && sparse) {
-			extent_cnt = __ceph_sparse_read_ext_count(inode, size);
-			ret = ceph_alloc_sparse_ext_map(op, extent_cnt);
-			if (ret) {
-				ceph_osdc_put_request(req);
-				break;
-			}
-		}
-
-		dbuf = iter_get_bvecs_alloc(iter, size, write);
-		if (IS_ERR(dbuf)) {
-			ceph_osdc_put_request(req);
-			ret = PTR_ERR(dbuf);
-			break;
-		}
-		len = ceph_databuf_len(dbuf);
-		if (len != size)
-			osd_req_op_extent_update(req, 0, len);
-
-		osd_req_op_extent_osd_databuf(req, 0, dbuf);
-
-		/*
-		 * To simplify error handling, allow AIO when IO within i_size
-		 * or IO can be satisfied by single OSD request.
-		 */
-		if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
-		    (len == count || pos + count <= i_size_read(inode))) {
-			aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
-			if (aio_req) {
-				aio_req->iocb = iocb;
-				aio_req->write = write;
-				aio_req->should_dirty = should_dirty;
-				INIT_LIST_HEAD(&aio_req->osd_reqs);
-				if (write) {
-					aio_req->mtime = mtime;
-					swap(aio_req->prealloc_cf, *pcf);
-				}
-			}
-			/* ignore error */
-		}
-
-		if (write) {
-			/*
-			 * throw out any page cache pages in this range. this
-			 * may block.
-			 */
-			truncate_inode_pages_range(inode->i_mapping, pos,
-						   PAGE_ALIGN(pos + len) - 1);
-
-			req->r_mtime = mtime;
-		}
-
-		if (aio_req) {
-			aio_req->total_len += len;
-			aio_req->num_reqs++;
-			atomic_inc(&aio_req->pending_reqs);
-
-			req->r_callback = ceph_aio_complete_req;
-			req->r_inode = inode;
-			req->r_priv = aio_req;
-			list_add_tail(&req->r_private_item, &aio_req->osd_reqs);
-
-			pos += len;
-			continue;
-		}
-
-		ceph_osdc_start_request(req->r_osdc, req);
-		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
-
-		if (write)
-			ceph_update_write_metrics(metric, req->r_start_latency,
-						  req->r_end_latency, len, ret);
-		else
-			ceph_update_read_metrics(metric, req->r_start_latency,
-						 req->r_end_latency, len, ret);
-
-		size = i_size_read(inode);
-		if (!write) {
-			if (sparse && ret >= 0)
-				ret = ceph_sparse_ext_map_end(op);
-			else if (ret == -ENOENT)
-				ret = 0;
-
-			if (ret >= 0 && ret < len && pos + ret < size) {
-				int zlen = min_t(size_t, len - ret,
-						 size - pos - ret);
-
-				iov_iter_advance(&dbuf->iter, ret);
-				iov_iter_zero(zlen, &dbuf->iter);
-				ret += zlen;
-			}
-			if (ret >= 0)
-				len = ret;
-		}
-
-		ceph_osdc_put_request(req);
-		if (ret < 0)
-			break;
-
-		pos += len;
-		if (!write && pos >= size)
-			break;
-
-		if (write && pos > size) {
-			if (ceph_inode_set_size(inode, pos))
-				ceph_check_caps(ceph_inode(inode),
-						CHECK_CAPS_AUTHONLY);
-		}
-	}
-
-	if (aio_req) {
-		LIST_HEAD(osd_reqs);
-
-		if (aio_req->num_reqs == 0) {
-			kfree(aio_req);
-			return ret;
-		}
-
-		ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
-					      CEPH_CAP_FILE_RD);
-
-		list_splice(&aio_req->osd_reqs, &osd_reqs);
-		inode_dio_begin(inode);
-		while (!list_empty(&osd_reqs)) {
-			req = list_first_entry(&osd_reqs,
-					       struct ceph_osd_request,
-					       r_private_item);
-			list_del_init(&req->r_private_item);
-			if (ret >= 0)
-				ceph_osdc_start_request(req->r_osdc, req);
-			if (ret < 0) {
-				req->r_result = ret;
-				ceph_aio_complete_req(req);
-			}
-		}
-		return -EIOCBQUEUED;
-	}
-
-	if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
-		ret = pos - iocb->ki_pos;
-		iocb->ki_pos = pos;
-	}
-	return ret;
-}
-
-/*
- * Synchronous write, straight from __user pointer or user pages.
- *
- * If write spans object boundary, just do multiple writes.  (For a
- * correct atomic write, we should e.g. take write locks on all
- * objects, rollback on failure, etc.)
- */
-static ssize_t
-ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
-		struct ceph_snap_context *snapc)
-{
-	struct file *file = iocb->ki_filp;
-	struct inode *inode = file_inode(file);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct ceph_osd_client *osdc = &fsc->client->osdc;
-	struct ceph_osd_request *req;
-	struct page **pages;
-	u64 len;
-	int num_pages;
-	int written = 0;
-	int ret;
-	bool check_caps = false;
-	struct timespec64 mtime = current_time(inode);
-	size_t count = iov_iter_count(from);
-
-	if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
-		return -EROFS;
-
-	doutc(cl, "on file %p %lld~%u snapc %p seq %lld\n", file, pos,
-	      (unsigned)count, snapc, snapc->seq);
-
-	ret = filemap_write_and_wait_range(inode->i_mapping,
-					   pos, pos + count - 1);
-	if (ret < 0)
-		return ret;
-
-	ceph_fscache_invalidate(inode, false);
-
-	while ((len = iov_iter_count(from)) > 0) {
-		size_t left;
-		int n;
-		u64 write_pos = pos;
-		u64 write_len = len;
-		u64 objnum, objoff;
-		u64 assert_ver = 0;
-		bool rmw;
-		bool first, last;
-		struct iov_iter saved_iter = *from;
-		size_t off, xlen;
-
-		ceph_fscrypt_adjust_off_and_len(inode, &write_pos, &write_len);
-
-		/* clamp the length to the end of first object */
-		ceph_calc_file_object_mapping(&ci->i_layout, write_pos,
-					      write_len, &objnum, &objoff,
-					      &xlen);
-		write_len = xlen;
-
-		/* adjust len downward if it goes beyond current object */
-		if (pos + len > write_pos + write_len)
-			len = write_pos + write_len - pos;
-
-		/*
-		 * If we had to adjust the length or position to align with a
-		 * crypto block, then we must do a read/modify/write cycle. We
-		 * use a version assertion to redrive the thing if something
-		 * changes in between.
-		 */
-		first = pos != write_pos;
-		last = (pos + len) != (write_pos + write_len);
-		rmw = first || last;
-
-		doutc(cl, "ino %llx %lld~%llu adjusted %lld~%llu -- %srmw\n",
-		      ci->i_vino.ino, pos, len, write_pos, write_len,
-		      rmw ? "" : "no ");
-
-		/*
-		 * The data is emplaced into the page as it would be if it were
-		 * in an array of pagecache pages.
-		 */
-		num_pages = calc_pages_for(write_pos, write_len);
-		pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-		if (IS_ERR(pages)) {
-			ret = PTR_ERR(pages);
-			break;
-		}
-
-		/* Do we need to preload the pages? */
-		if (rmw) {
-			u64 first_pos = write_pos;
-			u64 last_pos = (write_pos + write_len) - CEPH_FSCRYPT_BLOCK_SIZE;
-			u64 read_len = CEPH_FSCRYPT_BLOCK_SIZE;
-			struct ceph_osd_req_op *op;
-
-			/* We should only need to do this for encrypted inodes */
-			WARN_ON_ONCE(!IS_ENCRYPTED(inode));
-
-			/* No need to do two reads if first and last blocks are same */
-			if (first && last_pos == first_pos)
-				last = false;
-
-			/*
-			 * Allocate a read request for one or two extents,
-			 * depending on how the request was aligned.
-			 */
-			req = ceph_osdc_new_request(osdc, &ci->i_layout,
-					ci->i_vino, first ? first_pos : last_pos,
-					&read_len, 0, (first && last) ? 2 : 1,
-					CEPH_OSD_OP_SPARSE_READ, CEPH_OSD_FLAG_READ,
-					NULL, ci->i_truncate_seq,
-					ci->i_truncate_size, false);
-			if (IS_ERR(req)) {
-				ceph_release_page_vector(pages, num_pages);
-				ret = PTR_ERR(req);
-				break;
-			}
-
-			/* Something is misaligned! */
-			if (read_len != CEPH_FSCRYPT_BLOCK_SIZE) {
-				ceph_osdc_put_request(req);
-				ceph_release_page_vector(pages, num_pages);
-				ret = -EIO;
-				break;
-			}
-
-			/* Add extent for first block? */
-			op = &req->r_ops[0];
-
-			if (first) {
-				osd_req_op_extent_osd_data_pages(req, 0, pages,
-							 CEPH_FSCRYPT_BLOCK_SIZE,
-							 offset_in_page(first_pos),
-							 false, false);
-				/* We only expect a single extent here */
-				ret = __ceph_alloc_sparse_ext_map(op, 1);
-				if (ret) {
-					ceph_osdc_put_request(req);
-					ceph_release_page_vector(pages, num_pages);
-					break;
-				}
-			}
-
-			/* Add extent for last block */
-			if (last) {
-				/* Init the other extent if first extent has been used */
-				if (first) {
-					op = &req->r_ops[1];
-					osd_req_op_extent_init(req, 1,
-							CEPH_OSD_OP_SPARSE_READ,
-							last_pos, CEPH_FSCRYPT_BLOCK_SIZE,
-							ci->i_truncate_size,
-							ci->i_truncate_seq);
-				}
-
-				ret = __ceph_alloc_sparse_ext_map(op, 1);
-				if (ret) {
-					ceph_osdc_put_request(req);
-					ceph_release_page_vector(pages, num_pages);
-					break;
-				}
-
-				osd_req_op_extent_osd_data_pages(req, first ? 1 : 0,
-							&pages[num_pages - 1],
-							CEPH_FSCRYPT_BLOCK_SIZE,
-							offset_in_page(last_pos),
-							false, false);
-			}
-
-			ceph_osdc_start_request(osdc, req);
-			ret = ceph_osdc_wait_request(osdc, req);
-
-			/* FIXME: length field is wrong if there are 2 extents */
-			ceph_update_read_metrics(&fsc->mdsc->metric,
-						 req->r_start_latency,
-						 req->r_end_latency,
-						 read_len, ret);
-
-			/* Ok if object is not already present */
-			if (ret == -ENOENT) {
-				/*
-				 * If there is no object, then we can't assert
-				 * on its version. Set it to 0, and we'll use an
-				 * exclusive create instead.
-				 */
-				ceph_osdc_put_request(req);
-				ret = 0;
-
-				/*
-				 * zero out the soon-to-be uncopied parts of the
-				 * first and last pages.
-				 */
-				if (first)
-					zero_user_segment(pages[0], 0,
-							  offset_in_page(first_pos));
-				if (last)
-					zero_user_segment(pages[num_pages - 1],
-							  offset_in_page(last_pos),
-							  PAGE_SIZE);
-			} else {
-				if (ret < 0) {
-					ceph_osdc_put_request(req);
-					ceph_release_page_vector(pages, num_pages);
-					break;
-				}
-
-				op = &req->r_ops[0];
-				if (op->extent.sparse_ext_cnt == 0) {
-					if (first)
-						zero_user_segment(pages[0], 0,
-								  offset_in_page(first_pos));
-					else
-						zero_user_segment(pages[num_pages - 1],
-								  offset_in_page(last_pos),
-								  PAGE_SIZE);
-				} else if (op->extent.sparse_ext_cnt != 1 ||
-					   ceph_sparse_ext_map_end(op) !=
-						CEPH_FSCRYPT_BLOCK_SIZE) {
-					ret = -EIO;
-					ceph_osdc_put_request(req);
-					ceph_release_page_vector(pages, num_pages);
-					break;
-				}
-
-				if (first && last) {
-					op = &req->r_ops[1];
-					if (op->extent.sparse_ext_cnt == 0) {
-						zero_user_segment(pages[num_pages - 1],
-								  offset_in_page(last_pos),
-								  PAGE_SIZE);
-					} else if (op->extent.sparse_ext_cnt != 1 ||
-						   ceph_sparse_ext_map_end(op) !=
-							CEPH_FSCRYPT_BLOCK_SIZE) {
-						ret = -EIO;
-						ceph_osdc_put_request(req);
-						ceph_release_page_vector(pages, num_pages);
-						break;
-					}
-				}
-
-				/* Grab assert version. It must be non-zero. */
-				assert_ver = req->r_version;
-				WARN_ON_ONCE(ret > 0 && assert_ver == 0);
-
-				ceph_osdc_put_request(req);
-				if (first) {
-					ret = ceph_fscrypt_decrypt_block_inplace(inode,
-							pages[0], CEPH_FSCRYPT_BLOCK_SIZE,
-							offset_in_page(first_pos),
-							first_pos >> CEPH_FSCRYPT_BLOCK_SHIFT);
-					if (ret < 0) {
-						ceph_release_page_vector(pages, num_pages);
-						break;
-					}
-				}
-				if (last) {
-					ret = ceph_fscrypt_decrypt_block_inplace(inode,
-							pages[num_pages - 1],
-							CEPH_FSCRYPT_BLOCK_SIZE,
-							offset_in_page(last_pos),
-							last_pos >> CEPH_FSCRYPT_BLOCK_SHIFT);
-					if (ret < 0) {
-						ceph_release_page_vector(pages, num_pages);
-						break;
-					}
-				}
-			}
-		}
-
-		left = len;
-		off = offset_in_page(pos);
-		for (n = 0; n < num_pages; n++) {
-			size_t plen = min_t(size_t, left, PAGE_SIZE - off);
-
-			/* copy the data */
-			ret = copy_page_from_iter(pages[n], off, plen, from);
-			if (ret != plen) {
-				ret = -EFAULT;
-				break;
-			}
-			off = 0;
-			left -= ret;
-		}
-		if (ret < 0) {
-			doutc(cl, "write failed with %d\n", ret);
-			ceph_release_page_vector(pages, num_pages);
-			break;
-		}
-
-		if (IS_ENCRYPTED(inode)) {
-			ret = ceph_fscrypt_encrypt_pages(inode, pages,
-							 write_pos, write_len,
-							 GFP_KERNEL);
-			if (ret < 0) {
-				doutc(cl, "encryption failed with %d\n", ret);
-				ceph_release_page_vector(pages, num_pages);
-				break;
-			}
-		}
-
-		req = ceph_osdc_new_request(osdc, &ci->i_layout,
-					    ci->i_vino, write_pos, &write_len,
-					    rmw ? 1 : 0, rmw ? 2 : 1,
-					    CEPH_OSD_OP_WRITE,
-					    CEPH_OSD_FLAG_WRITE,
-					    snapc, ci->i_truncate_seq,
-					    ci->i_truncate_size, false);
-		if (IS_ERR(req)) {
-			ret = PTR_ERR(req);
-			ceph_release_page_vector(pages, num_pages);
-			break;
-		}
-
-		doutc(cl, "write op %lld~%llu\n", write_pos, write_len);
-		osd_req_op_extent_osd_data_pages(req, rmw ? 1 : 0, pages, write_len,
-						 offset_in_page(write_pos), false,
-						 true);
-		req->r_inode = inode;
-		req->r_mtime = mtime;
-
-		/* Set up the assertion */
-		if (rmw) {
-			/*
-			 * Set up the assertion. If we don't have a version
-			 * number, then the object doesn't exist yet. Use an
-			 * exclusive create instead of a version assertion in
-			 * that case.
-			 */
-			if (assert_ver) {
-				osd_req_op_init(req, 0, CEPH_OSD_OP_ASSERT_VER, 0);
-				req->r_ops[0].assert_ver.ver = assert_ver;
-			} else {
-				osd_req_op_init(req, 0, CEPH_OSD_OP_CREATE,
-						CEPH_OSD_OP_FLAG_EXCL);
-			}
-		}
-
-		ceph_osdc_start_request(osdc, req);
-		ret = ceph_osdc_wait_request(osdc, req);
-
-		ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
-					  req->r_end_latency, len, ret);
-		ceph_osdc_put_request(req);
-		if (ret != 0) {
-			doutc(cl, "osd write returned %d\n", ret);
-			/* Version changed! Must re-do the rmw cycle */
-			if ((assert_ver && (ret == -ERANGE || ret == -EOVERFLOW)) ||
-			    (!assert_ver && ret == -EEXIST)) {
-				/* We should only ever see this on a rmw */
-				WARN_ON_ONCE(!rmw);
-
-				/* The version should never go backward */
-				WARN_ON_ONCE(ret == -EOVERFLOW);
-
-				*from = saved_iter;
-
-				/* FIXME: limit number of times we loop? */
-				continue;
-			}
-			ceph_set_error_write(ci);
-			break;
-		}
-
-		ceph_clear_error_write(ci);
-
-		/*
-		 * We successfully wrote to a range of the file. Declare
-		 * that region of the pagecache invalid.
-		 */
-		ret = invalidate_inode_pages2_range(
-				inode->i_mapping,
-				pos >> PAGE_SHIFT,
-				(pos + len - 1) >> PAGE_SHIFT);
-		if (ret < 0) {
-			doutc(cl, "invalidate_inode_pages2_range returned %d\n",
-			      ret);
-			ret = 0;
-		}
-		pos += len;
-		written += len;
-		doutc(cl, "written %d\n", written);
-		if (pos > i_size_read(inode)) {
-			check_caps = ceph_inode_set_size(inode, pos);
-			if (check_caps)
-				ceph_check_caps(ceph_inode(inode),
-						CHECK_CAPS_AUTHONLY);
-		}
-
-	}
-
-	if (ret != -EOLDSNAPC && written > 0) {
-		ret = written;
-		iocb->ki_pos = pos;
-	}
-	doutc(cl, "returning %d\n", ret);
-	return ret;
-}
-
-/*
- * Wrap generic_file_aio_read with checks for cap bits on the inode.
- * Atomically grab references, so that those bits are not released
- * back to the MDS mid-read.
- *
- * Hmm, the sync read case isn't actually async... should it be?
- */
-static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
-{
-	struct file *filp = iocb->ki_filp;
-	struct ceph_file_info *fi = filp->private_data;
-	size_t len = iov_iter_count(to);
-	struct inode *inode = file_inode(filp);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	bool direct_lock = iocb->ki_flags & IOCB_DIRECT;
-	struct ceph_client *cl = ceph_inode_to_client(inode);
-	ssize_t ret;
-	int want = 0, got = 0;
-	int retry_op = 0, read = 0;
-
-again:
-	doutc(cl, "%llu~%u trying to get caps on %p %llx.%llx\n",
-	      iocb->ki_pos, (unsigned)len, inode, ceph_vinop(inode));
-
-	if (ceph_inode_is_shutdown(inode))
-		return -ESTALE;
-
-	if (direct_lock)
-		ceph_start_io_direct(inode);
-	else
-		ceph_start_io_read(inode);
-
-	if (!(fi->flags & CEPH_F_SYNC) && !direct_lock)
-		want |= CEPH_CAP_FILE_CACHE;
-	if (fi->fmode & CEPH_FILE_MODE_LAZY)
-		want |= CEPH_CAP_FILE_LAZYIO;
-
-	ret = ceph_get_caps(filp, CEPH_CAP_FILE_RD, want, -1, &got);
-	if (ret < 0) {
-		if (direct_lock)
-			ceph_end_io_direct(inode);
-		else
-			ceph_end_io_read(inode);
-		return ret;
-	}
-
-	if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
-	    (iocb->ki_flags & IOCB_DIRECT) ||
-	    (fi->flags & CEPH_F_SYNC)) {
-
-		doutc(cl, "sync %p %llx.%llx %llu~%u got cap refs on %s\n",
-		      inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
-		      ceph_cap_string(got));
-
-		if (!ceph_has_inline_data(ci)) {
-			if (!retry_op &&
-			    (iocb->ki_flags & IOCB_DIRECT) &&
-			    !IS_ENCRYPTED(inode)) {
-				ret = ceph_direct_read_write(iocb, to,
-							     NULL, NULL);
-				if (ret >= 0 && ret < len)
-					retry_op = CHECK_EOF;
-			} else {
-				ret = ceph_sync_read(iocb, to, &retry_op);
-			}
-		} else {
-			retry_op = READ_INLINE;
-		}
-	} else {
-		doutc(cl, "async %p %llx.%llx %llu~%u got cap refs on %s\n",
-		      inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
-		      ceph_cap_string(got));
-		ret = generic_file_read_iter(iocb, to);
-	}
-
-	doutc(cl, "%p %llx.%llx dropping cap refs on %s = %d\n",
-	      inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
-	ceph_put_cap_refs(ci, got);
-
-	if (direct_lock)
-		ceph_end_io_direct(inode);
-	else
-		ceph_end_io_read(inode);
-
-	if (retry_op > HAVE_RETRIED && ret >= 0) {
-		int statret;
-		struct page *page = NULL;
-		loff_t i_size;
-		int mask = CEPH_STAT_CAP_SIZE;
-		if (retry_op == READ_INLINE) {
-			page = __page_cache_alloc(GFP_KERNEL);
-			if (!page)
-				return -ENOMEM;
-
-			mask = CEPH_STAT_CAP_INLINE_DATA;
-		}
-
-		statret = __ceph_do_getattr(inode, page, mask, !!page);
-		if (statret < 0) {
-			if (page)
-				__free_page(page);
-			if (statret == -ENODATA) {
-				BUG_ON(retry_op != READ_INLINE);
-				goto again;
-			}
-			return statret;
-		}
-
-		i_size = i_size_read(inode);
-		if (retry_op == READ_INLINE) {
-			BUG_ON(ret > 0 || read > 0);
-			if (iocb->ki_pos < i_size &&
-			    iocb->ki_pos < PAGE_SIZE) {
-				loff_t end = min_t(loff_t, i_size,
-						   iocb->ki_pos + len);
-				end = min_t(loff_t, end, PAGE_SIZE);
-				if (statret < end)
-					zero_user_segment(page, statret, end);
-				ret = copy_page_to_iter(page,
-						iocb->ki_pos & ~PAGE_MASK,
-						end - iocb->ki_pos, to);
-				iocb->ki_pos += ret;
-				read += ret;
-			}
-			if (iocb->ki_pos < i_size && read < len) {
-				size_t zlen = min_t(size_t, len - read,
-						    i_size - iocb->ki_pos);
-				ret = iov_iter_zero(zlen, to);
-				iocb->ki_pos += ret;
-				read += ret;
-			}
-			__free_pages(page, 0);
-			return read;
-		}
-
-		/* hit EOF or hole? */
-		if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
-		    ret < len) {
-			doutc(cl, "may hit hole, ppos %lld < size %lld, reading more\n",
-			      iocb->ki_pos, i_size);
-
-			read += ret;
-			len -= ret;
-			retry_op = HAVE_RETRIED;
-			goto again;
-		}
-	}
-
-	if (ret >= 0)
-		ret += read;
-
-	return ret;
-}
-#endif // TODO: Remove after netfs conversion
-
 /*
  * Wrap filemap_splice_read with checks for cap bits on the inode.
  * Atomically grab references, so that those bits are not released
@@ -2298,203 +991,6 @@ static ssize_t ceph_splice_read(struct file *in, loff_t *ppos,
 	return ret;
 }
 
-#if 0 // TODO: Remove after netfs conversion
-/*
- * Take cap references to avoid releasing caps to MDS mid-write.
- *
- * If we are synchronous, and write with an old snap context, the OSD
- * may return EOLDSNAPC.  In that case, retry the write.. _after_
- * dropping our cap refs and allowing the pending snap to logically
- * complete _before_ this write occurs.
- *
- * If we are near ENOSPC, write synchronously.
- */
-static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
-{
-	struct file *file = iocb->ki_filp;
-	struct ceph_file_info *fi = file->private_data;
-	struct inode *inode = file_inode(file);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode);
-	struct ceph_client *cl = fsc->client;
-	struct ceph_osd_client *osdc = &fsc->client->osdc;
-	struct ceph_cap_flush *prealloc_cf;
-	ssize_t count, written = 0;
-	int err, want = 0, got;
-	bool direct_lock = false;
-	u32 map_flags;
-	u64 pool_flags;
-	loff_t pos;
-	loff_t limit = max(i_size_read(inode), fsc->max_file_size);
-
-	if (ceph_inode_is_shutdown(inode))
-		return -ESTALE;
-
-	if (ceph_snap(inode) != CEPH_NOSNAP)
-		return -EROFS;
-
-	prealloc_cf = ceph_alloc_cap_flush();
-	if (!prealloc_cf)
-		return -ENOMEM;
-
-	if ((iocb->ki_flags & (IOCB_DIRECT | IOCB_APPEND)) == IOCB_DIRECT)
-		direct_lock = true;
-
-retry_snap:
-	if (direct_lock)
-		ceph_start_io_direct(inode);
-	else
-		ceph_start_io_write(inode);
-
-	if (iocb->ki_flags & IOCB_APPEND) {
-		err = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
-		if (err < 0)
-			goto out;
-	}
-
-	err = generic_write_checks(iocb, from);
-	if (err <= 0)
-		goto out;
-
-	pos = iocb->ki_pos;
-	if (unlikely(pos >= limit)) {
-		err = -EFBIG;
-		goto out;
-	} else {
-		iov_iter_truncate(from, limit - pos);
-	}
-
-	count = iov_iter_count(from);
-	if (ceph_quota_is_max_bytes_exceeded(inode, pos + count)) {
-		err = -EDQUOT;
-		goto out;
-	}
-
-	down_read(&osdc->lock);
-	map_flags = osdc->osdmap->flags;
-	pool_flags = ceph_pg_pool_flags(osdc->osdmap, ci->i_layout.pool_id);
-	up_read(&osdc->lock);
-	if ((map_flags & CEPH_OSDMAP_FULL) ||
-	    (pool_flags & CEPH_POOL_FLAG_FULL)) {
-		err = -ENOSPC;
-		goto out;
-	}
-
-	err = file_remove_privs(file);
-	if (err)
-		goto out;
-
-	doutc(cl, "%p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
-	      inode, ceph_vinop(inode), pos, count,
-	      i_size_read(inode));
-	if (!(fi->flags & CEPH_F_SYNC) && !direct_lock)
-		want |= CEPH_CAP_FILE_BUFFER;
-	if (fi->fmode & CEPH_FILE_MODE_LAZY)
-		want |= CEPH_CAP_FILE_LAZYIO;
-	got = 0;
-	err = ceph_get_caps(file, CEPH_CAP_FILE_WR, want, pos + count, &got);
-	if (err < 0)
-		goto out;
-
-	err = file_update_time(file);
-	if (err)
-		goto out_caps;
-
-	inode_inc_iversion_raw(inode);
-
-	doutc(cl, "%p %llx.%llx %llu~%zd got cap refs on %s\n",
-	      inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
-
-	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
-	    (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC) ||
-	    (ci->i_ceph_flags & CEPH_I_ERROR_WRITE)) {
-		struct ceph_snap_context *snapc;
-		struct iov_iter data;
-
-		spin_lock(&ci->i_ceph_lock);
-		if (__ceph_have_pending_cap_snap(ci)) {
-			struct ceph_cap_snap *capsnap =
-					list_last_entry(&ci->i_cap_snaps,
-							struct ceph_cap_snap,
-							ci_item);
-			snapc = ceph_get_snap_context(capsnap->context);
-		} else {
-			BUG_ON(!ci->i_head_snapc);
-			snapc = ceph_get_snap_context(ci->i_head_snapc);
-		}
-		spin_unlock(&ci->i_ceph_lock);
-
-		/* we might need to revert back to that point */
-		data = *from;
-		if ((iocb->ki_flags & IOCB_DIRECT) && !IS_ENCRYPTED(inode))
-			written = ceph_direct_read_write(iocb, &data, snapc,
-							 &prealloc_cf);
-		else
-			written = ceph_sync_write(iocb, &data, pos, snapc);
-		if (direct_lock)
-			ceph_end_io_direct(inode);
-		else
-			ceph_end_io_write(inode);
-		if (written > 0)
-			iov_iter_advance(from, written);
-		ceph_put_snap_context(snapc);
-	} else {
-		/*
-		 * No need to acquire the i_truncate_mutex. Because
-		 * the MDS revokes Fwb caps before sending truncate
-		 * message to us. We can't get Fwb cap while there
-		 * are pending vmtruncate. So write and vmtruncate
-		 * can not run at the same time
-		 */
-		written = generic_perform_write(iocb, from);
-		ceph_end_io_write(inode);
-	}
-
-	if (written >= 0) {
-		int dirty;
-
-		spin_lock(&ci->i_ceph_lock);
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
-					       &prealloc_cf);
-		spin_unlock(&ci->i_ceph_lock);
-		if (dirty)
-			__mark_inode_dirty(inode, dirty);
-		if (ceph_quota_is_max_bytes_approaching(inode, iocb->ki_pos))
-			ceph_check_caps(ci, CHECK_CAPS_FLUSH);
-	}
-
-	doutc(cl, "%p %llx.%llx %llu~%u  dropping cap refs on %s\n",
-	      inode, ceph_vinop(inode), pos, (unsigned)count,
-	      ceph_cap_string(got));
-	ceph_put_cap_refs(ci, got);
-
-	if (written == -EOLDSNAPC) {
-		doutc(cl, "%p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n",
-		      inode, ceph_vinop(inode), pos, (unsigned)count);
-		goto retry_snap;
-	}
-
-	if (written >= 0) {
-		if ((map_flags & CEPH_OSDMAP_NEARFULL) ||
-		    (pool_flags & CEPH_POOL_FLAG_NEARFULL))
-			iocb->ki_flags |= IOCB_DSYNC;
-		written = generic_write_sync(iocb, written);
-	}
-
-	goto out_unlocked;
-out_caps:
-	ceph_put_cap_refs(ci, got);
-out:
-	if (direct_lock)
-		ceph_end_io_direct(inode);
-	else
-		ceph_end_io_write(inode);
-out_unlocked:
-	ceph_free_cap_flush(prealloc_cf);
-	return written ? written : err;
-}
-#endif // TODO: Remove after netfs conversion
-
 /*
  * llseek.  be sure to verify file size on SEEK_END.
  */
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index acd5c4821ded..97eddbf9dae9 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -470,19 +470,6 @@ struct ceph_inode_info {
 #endif
 };
 
-struct ceph_netfs_request_data { // TODO: Remove
-	int caps;
-
-	/*
-	 * Maximum size of a file readahead request.
-	 * The fadvise could update the bdi's default ra_pages.
-	 */
-	unsigned int file_ra_pages;
-
-	/* Set it if fadvise disables file readahead entirely */
-	bool file_ra_disabled;
-};
-
 struct ceph_io_request {
 	struct netfs_io_request rreq;
 	u64 rmw_assert_version;
@@ -1260,9 +1247,6 @@ extern void __ceph_touch_fmode(struct ceph_inode_info *ci,
 			       struct ceph_mds_client *mdsc, int fmode);
 
 /* addr.c */
-#if 0 // TODO: Remove after netfs conversion
-extern const struct netfs_request_ops ceph_netfs_ops;
-#endif // TODO: Remove after netfs conversion
 bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio);
 extern int ceph_mmap(struct file *file, struct vm_area_struct *vma);
 extern int ceph_uninline_data(struct file *file);
@@ -1293,11 +1277,6 @@ extern int ceph_renew_caps(struct inode *inode, int fmode);
 extern int ceph_open(struct inode *inode, struct file *file);
 extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
 			    struct file *file, unsigned flags, umode_t mode);
-#if 0 // TODO: Remove after netfs conversion
-extern ssize_t __ceph_sync_read(struct inode *inode, loff_t *ki_pos,
-				struct iov_iter *to, int *retry_op,
-				u64 *last_objver);
-#endif
 extern int ceph_release(struct inode *inode, struct file *filp);
 extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
 				  char *data, size_t len);


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ