lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Date:	Thu, 13 Nov 2008 16:56:12 -0800
From:	Sage Weil <sage@...dream.net>
To:	linux-fsdevel@...r.kernel.org
Cc:	linux-kernel@...r.kernel.org, Sage Weil <sage@...dream.net>
Subject: [PATCH 13/19] ceph: capability management

The Ceph metadata servers control client access to data by issuing
capabilities granting clients permission to read and/or write to OSDs
(storage nodes).  Each capability consists of a set of bits indicating
which operations are allowed.

Signed-off-by: Sage Weil <sage@...dream.net>
---
 fs/ceph/caps.c | 1464 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 1464 insertions(+), 0 deletions(-)
 create mode 100644 fs/ceph/caps.c

diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
new file mode 100644
index 0000000..bdc761e
--- /dev/null
+++ b/fs/ceph/caps.c
@@ -0,0 +1,1464 @@
+#include <linux/fs.h>
+#include <linux/kernel.h>
+#include <linux/sched.h>
+#include <linux/wait.h>
+
+#include "ceph_debug.h"
+
+int ceph_debug_caps = -1;
+#define DOUT_MASK DOUT_MASK_CAPS
+#define DOUT_VAR ceph_debug_caps
+#define DOUT_PREFIX "caps: "
+#include "super.h"
+
+#include "decode.h"
+#include "messenger.h"
+
+
+/*
+ * Find ceph_cap for given mds, if any.
+ *
+ * Called with i_lock held.
+ */
+static struct ceph_cap *__get_cap_for_mds(struct inode *inode, int mds)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_cap *cap;
+	struct rb_node *n = ci->i_caps.rb_node;
+
+	while (n) {
+		cap = rb_entry(n, struct ceph_cap, ci_node);
+		if (mds < cap->mds)
+			n = n->rb_left;
+		else if (mds > cap->mds)
+			n = n->rb_right;
+		else
+			return cap;
+	}
+	return NULL;
+}
+
+/*
+ * Return id of any MDS with a cap, preferably WR|WRBUFFER|EXCL, else
+ * -1.
+ */
+static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq)
+{
+	struct ceph_cap *cap;
+	int mds = -1;
+	struct rb_node *p;
+
+	/* prefer mds with WR|WRBUFFER|EXCL caps */
+	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+		cap = rb_entry(p, struct ceph_cap, ci_node);
+		mds = cap->mds;
+		if (mseq)
+			*mseq = cap->mseq;
+		if (cap->issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL))
+			break;
+	}
+	return mds;
+}
+
+int ceph_get_cap_mds(struct inode *inode)
+{
+	int mds;
+	spin_lock(&inode->i_lock);
+	mds = __ceph_get_cap_mds(ceph_inode(inode), NULL);
+	spin_unlock(&inode->i_lock);
+	return mds;
+}
+
+/*
+ * Called under i_lock.
+ */
+static void __insert_cap_node(struct ceph_inode_info *ci,
+			      struct ceph_cap *new)
+{
+	struct rb_node **p = &ci->i_caps.rb_node;
+	struct rb_node *parent = NULL;
+	struct ceph_cap *cap = NULL;
+
+	while (*p) {
+		parent = *p;
+		cap = rb_entry(parent, struct ceph_cap, ci_node);
+		if (new->mds < cap->mds)
+			p = &(*p)->rb_left;
+		else if (new->mds > cap->mds)
+			p = &(*p)->rb_right;
+		else
+			BUG();
+	}
+
+	rb_link_node(&new->ci_node, parent, p);
+	rb_insert_color(&new->ci_node, &ci->i_caps);
+}
+
+/*
+ * Add a capability under the given MDS session, after processing
+ * the snapblob (to update the snap realm hierarchy).
+ *
+ * Bump i_count when adding it's first cap.
+ *
+ * Caller should hold session snap_rwsem, s_mutex.
+ *
+ * @fmode can be negative, in which case it is ignored.
+ */
+int ceph_add_cap(struct inode *inode,
+		 struct ceph_mds_session *session,
+		 int fmode, unsigned issued,
+		 unsigned seq, unsigned mseq,
+		 void *snapblob, int snapblob_len,
+		 struct ceph_cap *new_cap)
+{
+	struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_cap *cap;
+	struct ceph_snap_realm *realm;
+	int mds = session->s_mds;
+	int is_first = 0;
+
+	realm = ceph_update_snap_trace(mdsc, snapblob, snapblob+snapblob_len,
+				       false /* not a deletion */);
+
+	dout(10, "add_cap on %p mds%d cap %d seq %d\n", inode,
+	     session->s_mds, issued, seq);
+retry:
+	spin_lock(&inode->i_lock);
+	cap = __get_cap_for_mds(inode, mds);
+	if (!cap) {
+		if (new_cap) {
+			cap = new_cap;
+			new_cap = NULL;
+		} else {
+			spin_unlock(&inode->i_lock);
+			new_cap = kmalloc(sizeof(*cap), GFP_NOFS);
+			if (new_cap == NULL) {
+				ceph_put_snap_realm(mdsc, realm);
+				return -ENOMEM;
+			}
+			goto retry;
+		}
+
+		cap->issued = cap->implemented = 0;
+		cap->mds = mds;
+
+		is_first = RB_EMPTY_ROOT(&ci->i_caps);  /* grab inode later */
+		cap->ci = ci;
+		__insert_cap_node(ci, cap);
+
+		/* add to session cap list */
+		cap->session = session;
+		list_add(&cap->session_caps, &session->s_caps);
+		session->s_nr_caps++;
+
+		/* clear out old exporting info?  (i.e. on cap import) */
+		if (ci->i_cap_exporting_mds == mds) {
+			ci->i_cap_exporting_issued = 0;
+			ci->i_cap_exporting_mseq = 0;
+			ci->i_cap_exporting_mds = -1;
+		}
+	}
+	if (!ci->i_snap_realm) {
+		ci->i_snap_realm = realm;
+		list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps);
+	} else {
+		ceph_put_snap_realm(mdsc, realm);
+	}
+
+	dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n",
+	     inode, ceph_vinop(inode), issued, issued|cap->issued, seq, mds);
+	cap->issued |= issued;
+	cap->implemented |= issued;
+	cap->seq = seq;
+	cap->mseq = mseq;
+	cap->gen = session->s_cap_gen;
+	if (fmode >= 0)
+		__ceph_get_fmode(ci, fmode);
+	spin_unlock(&inode->i_lock);
+	if (is_first)
+		igrab(inode);
+	kfree(new_cap);
+	return 0;
+}
+
+/*
+ * Return set of valid cap bits issued to us.  Note that caps time
+ * out, and may be invalidated in bulk if the client session times out
+ * and session->s_cap_gen is bumped.
+ */
+int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
+{
+	int have = ci->i_snap_caps;
+	struct ceph_cap *cap;
+	u32 gen;
+	unsigned long ttl;
+	struct rb_node *p;
+
+	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+		cap = rb_entry(p, struct ceph_cap, ci_node);
+
+		spin_lock(&cap->session->s_cap_lock);
+		gen = cap->session->s_cap_gen;
+		ttl = cap->session->s_cap_ttl;
+		spin_unlock(&cap->session->s_cap_lock);
+
+		if (cap->gen < gen || time_after_eq(jiffies, ttl)) {
+			dout(30, "__ceph_caps_issued %p cap %p issued %d "
+			     "but STALE (gen %u vs %u)\n", &ci->vfs_inode,
+			     cap, cap->issued, cap->gen, gen);
+			continue;
+		}
+		dout(30, "__ceph_caps_issued %p cap %p issued %d\n",
+		     &ci->vfs_inode, cap, cap->issued);
+		have |= cap->issued;
+		if (implemented)
+			*implemented |= cap->implemented;
+	}
+	return have;
+}
+
+/*
+ * caller should hold i_lock, snap_rwsem, and session s_mutex.
+ * returns true if this is the last cap.  if so, caller should iput.
+ */
+static int __ceph_remove_cap(struct ceph_cap *cap)
+{
+	struct ceph_mds_session *session = cap->session;
+	struct ceph_inode_info *ci = cap->ci;
+	struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc;
+
+	dout(20, "__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
+
+	/* remove from session list */
+	list_del_init(&cap->session_caps);
+	session->s_nr_caps--;
+
+	/* remove from inode list */
+	rb_erase(&cap->ci_node, &ci->i_caps);
+	cap->session = NULL;
+
+	kfree(cap);
+
+	if (RB_EMPTY_ROOT(&ci->i_caps)) {
+		list_del_init(&ci->i_snap_realm_item);
+		ceph_put_snap_realm(mdsc, ci->i_snap_realm);
+		ci->i_snap_realm = NULL;
+		return 1;
+	}
+	return 0;
+}
+
+/*
+ * caller should hold snap_rwsem and session s_mutex.
+ */
+void ceph_remove_cap(struct ceph_cap *cap)
+{
+	struct inode *inode = &cap->ci->vfs_inode;
+	int was_last;
+
+	spin_lock(&inode->i_lock);
+	was_last = __ceph_remove_cap(cap);
+	spin_unlock(&inode->i_lock);
+	if (was_last)
+		iput(inode);
+}
+
+/*
+ *
+ * (Re)queue cap at the end of the delayed cap release list.
+ *
+ * Caller holds i_lock
+ *    -> we take mdsc->cap_delay_lock
+ */
+static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
+				struct ceph_inode_info *ci)
+{
+	ci->i_hold_caps_until = round_jiffies(jiffies + HZ * 5);
+	dout(10, "__cap_delay_requeue %p at %lu\n", &ci->vfs_inode,
+	     ci->i_hold_caps_until);
+	spin_lock(&mdsc->cap_delay_lock);
+	if (!mdsc->stopping) {
+		if (list_empty(&ci->i_cap_delay_list))
+			igrab(&ci->vfs_inode);
+		else
+			list_del_init(&ci->i_cap_delay_list);
+		list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
+	}
+	spin_unlock(&mdsc->cap_delay_lock);
+}
+
+/*
+ * Cancel delayed work on cap.
+ * caller hold s_mutex, snap_rwsem.
+ */
+static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
+			       struct ceph_inode_info *ci)
+{
+	dout(10, "__cap_delay_cancel %p\n", &ci->vfs_inode);
+	if (list_empty(&ci->i_cap_delay_list))
+		return;
+	spin_lock(&mdsc->cap_delay_lock);
+	list_del_init(&ci->i_cap_delay_list);
+	spin_unlock(&mdsc->cap_delay_lock);
+	iput(&ci->vfs_inode);
+}
+
+/*
+ * Build and send a cap message to the given MDS.
+ *
+ * Caller should be holding s_mutex.
+ */
+static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, int op,
+			 int caps, int wanted, u64 seq, u64 mseq,
+			 u64 size, u64 max_size,
+			 struct timespec *mtime, struct timespec *atime,
+			 u64 time_warp_seq, u64 follows, int mds)
+{
+	struct ceph_mds_caps *fc;
+	struct ceph_msg *msg;
+
+	dout(10, "send_cap_msg %s %llx caps %d wanted %d seq %llu/%llu"
+	     " follows %lld size %llu\n", ceph_cap_op_name(op), ino,
+	     caps, wanted, seq, mseq, follows, size);
+
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, NULL);
+	if (IS_ERR(msg))
+		return;
+
+	fc = msg->front.iov_base;
+
+	memset(fc, 0, sizeof(*fc));
+
+	fc->op = cpu_to_le32(op);
+	fc->seq = cpu_to_le32(seq);
+	fc->migrate_seq = cpu_to_le32(mseq);
+	fc->caps = cpu_to_le32(caps);
+	fc->wanted = cpu_to_le32(wanted);
+	fc->ino = cpu_to_le64(ino);
+	fc->size = cpu_to_le64(size);
+	fc->max_size = cpu_to_le64(max_size);
+	fc->snap_follows = cpu_to_le64(follows);
+	if (mtime)
+		ceph_encode_timespec(&fc->mtime, mtime);
+	if (atime)
+		ceph_encode_timespec(&fc->atime, atime);
+	fc->time_warp_seq = cpu_to_le64(time_warp_seq);
+
+	ceph_send_msg_mds(mdsc, msg, mds);
+}
+
+/*
+ *
+ * Send a cap msg on the given inode.  Make note of max_size
+ * reported/requested from mds, revoked caps that have now been
+ * implemented.
+ *
+ * Also, try to invalidate page cache if we are dropping RDCACHE.
+ * Note that this will leave behind any locked pages... FIXME!
+ *
+ * called with i_lock, then drops it.
+ * caller should hold snap_rwsem, s_mutex.
+ */
+static void __send_cap(struct ceph_mds_client *mdsc,
+		struct ceph_mds_session *session,
+		struct ceph_cap *cap,
+		int used, int wanted) __releases(cap->ci->vfs_inode->i_lock)
+{
+	struct ceph_inode_info *ci = cap->ci;
+	struct inode *inode = &ci->vfs_inode;
+	int revoking = cap->implemented & ~cap->issued;
+	int dropping = cap->issued & ~wanted;
+	int keep;
+	u64 seq, mseq, time_warp_seq, follows;
+	u64 size, max_size;
+	struct timespec mtime, atime;
+	int wake = 0;
+	int op = CEPH_CAP_OP_ACK;
+
+	if (wanted == 0)
+		op = CEPH_CAP_OP_RELEASE;
+
+	dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session,
+	     cap->issued, cap->issued & wanted);
+	cap->issued &= wanted;  /* drop bits we don't want */
+
+	if (revoking && (revoking && used) == 0) {
+		cap->implemented = cap->issued;
+		/*
+		 * Wake up any waiters on wanted -> needed transition.
+		 * This is due to the weird transition from buffered
+		 * to sync IO... we need to flush dirty pages _before_
+		 * allowing sync writes to avoid reordering.
+		 */
+		wake = 1;
+	}
+
+	keep = cap->issued;
+	seq = cap->seq;
+	mseq = cap->mseq;
+	size = inode->i_size;
+	ci->i_reported_size = size;
+	max_size = ci->i_wanted_max_size;
+	ci->i_requested_max_size = max_size;
+	mtime = inode->i_mtime;
+	atime = inode->i_atime;
+	time_warp_seq = ci->i_time_warp_seq;
+	follows = ci->i_snap_realm->cached_context->seq;
+	spin_unlock(&inode->i_lock);
+
+	if (dropping & CEPH_CAP_RDCACHE) {
+		/* invalidate what we can */
+		dout(20, "invalidating pages on %p\n", inode);
+		invalidate_mapping_pages(&inode->i_data, 0, -1);
+	}
+
+	send_cap_msg(mdsc, ceph_vino(inode).ino,
+		     op, keep, wanted, seq, mseq,
+		     size, max_size, &mtime, &atime, time_warp_seq,
+		     follows, session->s_mds);
+
+	if (wake)
+		wake_up(&ci->i_cap_wq);
+}
+
+
+/*
+ * When a snapshot is taken, clients accumulate "dirty" data on inodes
+ * with capabilities in ceph_cap_snaps to describe the file state at
+ * the time the snapshot was taken.  This must be flushed
+ * asynchronously back to the MDS once sync writes complete and dirty
+ * data is written out.
+ *
+ * Called under i_lock.  Takes s_mutex as needed.
+ */
+void __ceph_flush_snaps(struct ceph_inode_info *ci,
+			struct ceph_mds_session **psession)
+{
+	struct inode *inode = &ci->vfs_inode;
+	int mds;
+	struct list_head *p;
+	struct ceph_cap_snap *capsnap;
+	u64 follows;
+	int issued;
+	u64 size;
+	struct timespec mtime, atime, ctime;
+	u64 time_warp_seq;
+	u32 mseq;
+	struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
+	struct ceph_mds_session *session = NULL; /* if session != NULL, we hold
+						    session->s_mutex */
+	u64 next_follows = 0;  /* keep track of how far we've gotten through the
+			     i_cap_snaps list, and skip these entries next time
+			     around to avoid an infinite loop */
+
+	if (psession)
+		session = *psession;
+
+	dout(10, "__flush_snaps %p\n", inode);
+retry:
+	list_for_each(p, &ci->i_cap_snaps) {
+		capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
+
+		/* avoid an infiniute loop after retry */
+		if (capsnap->follows < next_follows)
+			continue;
+		/*
+		 * we need to wait for sync writes to complete and for dirty
+		 * pages to be written out.
+		 */
+		if (capsnap->dirty || capsnap->writing)
+			continue;
+
+		/* pick mds, take s_mutex */
+		mds = __ceph_get_cap_mds(ci, &mseq);
+		if (session && session->s_mds != mds) {
+			dout(30, "oops, wrong session %p mutex\n", session);
+			mutex_unlock(&session->s_mutex);
+			ceph_put_mds_session(session);
+			session = NULL;
+		}
+		if (!session) {
+			spin_unlock(&inode->i_lock);
+			mutex_lock(&mdsc->mutex);
+			session = __ceph_get_mds_session(mdsc, mds);
+			mutex_unlock(&mdsc->mutex);
+			if (session) {
+				dout(10, "inverting session/ino locks on %p\n",
+				     session);
+				mutex_lock(&session->s_mutex);
+			}
+			/*
+			 * if session == NULL, we raced against a cap
+			 * deletion.  retry, and we'll get a better
+			 * @mds value next time.
+			 */
+			spin_lock(&inode->i_lock);
+			goto retry;
+		}
+
+		follows = capsnap->follows;
+		size = capsnap->size;
+		atime = capsnap->atime;
+		mtime = capsnap->mtime;
+		ctime = capsnap->ctime;
+		time_warp_seq = capsnap->time_warp_seq;
+		issued = capsnap->issued;
+		spin_unlock(&inode->i_lock);
+
+		dout(10, "flush_snaps %p cap_snap %p follows %lld size %llu\n",
+		     inode, capsnap, next_follows, size);
+		send_cap_msg(mdsc, ceph_vino(inode).ino,
+			     CEPH_CAP_OP_FLUSHSNAP, issued, 0, 0, mseq,
+			     size, 0,
+			     &mtime, &atime, time_warp_seq,
+			     follows, mds);
+
+		spin_lock(&inode->i_lock);
+		next_follows = follows + 1;
+		goto retry;
+	}
+
+	/* we flushed them all; remove this inode from the queue */
+	spin_lock(&mdsc->snap_flush_lock);
+	list_del_init(&ci->i_snap_flush_item);
+	spin_unlock(&mdsc->snap_flush_lock);
+
+	if (psession)
+		*psession = session;
+	else if (session) {
+		mutex_unlock(&session->s_mutex);
+		ceph_put_mds_session(session);
+	}
+}
+
+void ceph_flush_snaps(struct ceph_inode_info *ci)
+{
+	struct inode *inode = &ci->vfs_inode;
+
+	spin_lock(&inode->i_lock);
+	__ceph_flush_snaps(ci, NULL);
+	spin_unlock(&inode->i_lock);
+}
+
+
+/*
+ * Swiss army knife function to examine currently used, wanted versus
+ * held caps.  Release, flush, ack revoked caps to mds as appropriate.
+ *
+ * @is_delayed indicates caller is delayed work and we should not
+ * delay further.
+ */
+void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed)
+{
+	struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
+	struct ceph_mds_client *mdsc = &client->mdsc;
+	struct inode *inode = &ci->vfs_inode;
+	struct ceph_cap *cap;
+	int file_wanted, used;
+	struct ceph_mds_session *session = NULL;    /* if set, i hold s_mutex */
+	int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
+	int revoking;
+	int mds = -1;   /* keep track of how far we've gone through i_caps list
+			   to avoid an infinite loop on retry */
+	struct rb_node *p;
+	int tried_invalidate = 0;
+
+	/* if we are unmounting, flush any unused caps immediately. */
+	if (mdsc->stopping)
+		is_delayed = 1;
+
+	spin_lock(&inode->i_lock);
+
+	/* flush snaps first time around only */
+	if (!list_empty(&ci->i_cap_snaps))
+		__ceph_flush_snaps(ci, &session);
+	goto retry_locked;
+retry:
+	spin_lock(&inode->i_lock);
+retry_locked:
+	file_wanted = __ceph_caps_file_wanted(ci);
+	used = __ceph_caps_used(ci);
+	dout(10, "check_caps %p file_wanted %d used %d issued %d\n",
+	     inode, file_wanted, used, __ceph_caps_issued(ci, NULL));
+
+	/*
+	 * Reschedule delayed caps release, unless we are called from
+	 * the delayed work handler (i.e. this _is_ the delayed release)
+	 */
+	if (!is_delayed)
+		__cap_delay_requeue(mdsc, ci);
+
+	/*
+	 * If we no longer need to hold onto old our caps, and we may
+	 * have cached pages, but don't want them, then try to invalidate.
+	 * If we fail, it's because pages are locked.... try again later.
+	 */
+	if ((!time_before(jiffies, ci->i_hold_caps_until) || mdsc->stopping) &&
+	    ci->i_wrbuffer_ref == 0 &&               /* no dirty pages... */
+	    ci->i_rdcache_gen &&                     /* may have cached pages */
+	    file_wanted == 0 &&                      /* no open files */
+	    !tried_invalidate) {
+		u32 invalidating_gen = ci->i_rdcache_gen;
+		int ret;
+
+		dout(10, "check_caps trying to invalidate on %p\n", inode);
+		spin_unlock(&inode->i_lock);
+		ret = invalidate_inode_pages2(&inode->i_data);
+		spin_lock(&inode->i_lock);
+		if (ret == 0 && invalidating_gen == ci->i_rdcache_gen) {
+			/* success. */
+			ci->i_rdcache_gen = 0;
+			ci->i_rdcache_revoking = 0;
+		} else {
+			dout(10, "check_caps failed to invalidate pages\n");
+			/* we failed to invalidate pages.  check these
+			   caps again later. */
+			if (is_delayed)
+				__cap_delay_requeue(mdsc, ci);
+		}
+		tried_invalidate = 1;
+		goto retry_locked;
+	}
+
+	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+		cap = rb_entry(p, struct ceph_cap, ci_node);
+
+		/* avoid looping forever */
+		if (mds >= cap->mds)
+			continue;
+
+		/* NOTE: no side-effects allowed, until we take s_mutex */
+
+		revoking = cap->implemented & ~cap->issued;
+		if (revoking)
+			dout(10, "mds%d revoking %d\n", cap->mds, revoking);
+
+		/* request larger max_size from MDS? */
+		if (ci->i_wanted_max_size > ci->i_max_size &&
+		    ci->i_wanted_max_size > ci->i_requested_max_size)
+			goto ack;
+
+		/* approaching file_max? */
+		if ((cap->issued & CEPH_CAP_WR) &&
+		    (inode->i_size << 1) >= ci->i_max_size &&
+		    (ci->i_reported_size << 1) < ci->i_max_size) {
+			dout(10, "i_size approaching max_size\n");
+			goto ack;
+		}
+
+		/* completed revocation? going down and there are no caps? */
+		if ((revoking && (revoking & used) == 0) ||
+		    (mdsc->stopping && (used == 0))) {
+			dout(10, "completed revocation of %d\n",
+			     cap->implemented & ~cap->issued);
+			goto ack;
+		}
+
+		if ((cap->issued & ~(file_wanted | used)) == 0)
+			continue;     /* nothing extra, all good */
+
+		/* delay cap release for a bit? */
+		if (!is_delayed &&
+		    time_before(jiffies, ci->i_hold_caps_until)) {
+			dout(30, "delaying cap release\n");
+			continue;
+		}
+
+ack:
+		/* take snap_rwsem before session mutex */
+		if (!took_snap_rwsem) {
+			if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
+				dout(10, "inverting snap/in locks on %p\n",
+				     inode);
+				spin_unlock(&inode->i_lock);
+				down_read(&mdsc->snap_rwsem);
+				took_snap_rwsem = 1;
+				goto retry;
+			}
+			took_snap_rwsem = 1;
+		}
+		if (session && session != cap->session) {
+			dout(30, "oops, wrong session %p mutex\n", session);
+			mutex_unlock(&session->s_mutex);
+			session = NULL;
+		}
+		if (!session) {
+			session = cap->session;
+			if (mutex_trylock(&session->s_mutex) == 0) {
+				dout(10, "inverting session/ino locks on %p\n",
+				     session);
+				spin_unlock(&inode->i_lock);
+				mutex_lock(&session->s_mutex);
+				goto retry;
+			}
+		}
+
+		mds = cap->mds;  /* remember mds, so we don't repeat */
+
+		/* __send_cap drops i_lock */
+		__send_cap(mdsc, session, cap, used, used | file_wanted);
+
+		goto retry; /* retake i_lock and restart our cap scan. */
+	}
+
+	/* okay */
+	spin_unlock(&inode->i_lock);
+
+	if (session)
+		mutex_unlock(&session->s_mutex);
+	if (took_snap_rwsem)
+		up_read(&mdsc->snap_rwsem);
+}
+
+
+/*
+ * Track references to capabilities we hold, so that we don't release
+ * them to the MDS prematurely.
+ *
+ * Protected by i_lock.
+ */
+static void __take_cap_refs(struct ceph_inode_info *ci, int got)
+{
+	if (got & CEPH_CAP_RD)
+		ci->i_rd_ref++;
+	if (got & CEPH_CAP_RDCACHE)
+		ci->i_rdcache_ref++;
+	if (got & CEPH_CAP_WR)
+		ci->i_wr_ref++;
+	if (got & CEPH_CAP_WRBUFFER) {
+		ci->i_wrbuffer_ref++;
+		dout(30, "__take_cap_refs %p wrbuffer %d -> %d (?)\n",
+		     &ci->vfs_inode, ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref);
+	}
+}
+
+/*
+ * Try to grab cap references.  Specify those refs we @want, and the
+ * minimal set we @need.  Also include the larger offset we are writing
+ * to (when applicable), and check against max_size here as well.
+ * Note that caller is responsible for ensuring max_size increases are
+ * requested from the MDS.
+ */
+int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got,
+		      loff_t endoff)
+{
+	struct inode *inode = &ci->vfs_inode;
+	int ret = 0;
+	int have, implemented;
+
+	dout(30, "get_cap_refs %p need %d want %d\n", inode, need, want);
+	spin_lock(&inode->i_lock);
+	if (need & CEPH_CAP_WR) {
+		if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
+			dout(20, "get_cap_refs %p endoff %llu > maxsize %llu\n",
+			     inode, endoff, ci->i_max_size);
+			goto sorry;
+		}
+		/*
+		 * If a sync write is in progress, we must wait, so that we
+		 * can get a final snapshot value for size+mtime.
+		 */
+		if (__ceph_have_pending_cap_snap(ci)) {
+			dout(20, "get_cap_refs %p cap_snap_pending\n", inode);
+			goto sorry;
+		}
+	}
+	have = __ceph_caps_issued(ci, &implemented);
+	/* HACK: force sync writes...
+	have &= ~CEPH_CAP_WRBUFFER;
+	implemented &= ~CEPH_CAP_WRBUFFER;
+	*/
+	if ((have & need) == need) {
+		/*
+		 * Look at (implemented & ~have & not) so that we keep waiting
+		 * on transition from wanted -> needed caps.  This is needed
+		 * for WRBUFFER|WR -> WR to avoid a new WR sync write from
+		 * going before a prior buffered writeback happens.
+		 */
+		int not = want & ~(have & need);
+		int revoking = implemented & ~have;
+		dout(30, "get_cap_refs %p have %d but not %d (revoking %d)\n",
+		     inode, have, not, revoking);
+		if ((revoking & not) == 0) {
+			*got = need | (have & want);
+			__take_cap_refs(ci, *got);
+			ret = 1;
+		}
+	} else {
+		dout(30, "get_cap_refs %p have %d needed %d\n", inode,
+		     have, need);
+	}
+sorry:
+	spin_unlock(&inode->i_lock);
+	dout(30, "get_cap_refs %p ret %d got %d\n", inode,
+	     ret, *got);
+	return ret;
+}
+
+/*
+ * Release cap refs.
+ *
+ * If we released the last ref on any given cap, call ceph_check_caps
+ * to release (or schedule a release).
+ *
+ * If we are releasing a WR cap (from a sync write), finalize any affected
+ * cap_snap, and wake up any waiters.
+ */
+void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+{
+	struct inode *inode = &ci->vfs_inode;
+	int last = 0, flushsnaps = 0, wake = 0;
+	struct ceph_cap_snap *capsnap;
+
+	spin_lock(&inode->i_lock);
+	if (had & CEPH_CAP_RD)
+		if (--ci->i_rd_ref == 0)
+			last++;
+	if (had & CEPH_CAP_RDCACHE)
+		if (--ci->i_rdcache_ref == 0)
+			last++;
+	if (had & CEPH_CAP_WRBUFFER) {
+		if (--ci->i_wrbuffer_ref == 0)
+			last++;
+		dout(30, "put_cap_refs %p wrbuffer %d -> %d (?)\n",
+		     inode, ci->i_wrbuffer_ref+1, ci->i_wrbuffer_ref);
+	}
+	if (had & CEPH_CAP_WR)
+		if (--ci->i_wr_ref == 0) {
+			last++;
+			if (!list_empty(&ci->i_cap_snaps)) {
+				capsnap = list_entry(ci->i_cap_snaps.next,
+						     struct ceph_cap_snap,
+						     ci_item);
+				if (capsnap->writing) {
+					capsnap->writing = 0;
+					flushsnaps =
+						__ceph_finish_cap_snap(ci,
+								       capsnap);
+					wake = 1;
+				}
+			}
+		}
+	spin_unlock(&inode->i_lock);
+
+	dout(30, "put_cap_refs %p had %d %s\n", inode, had, last ? "last" : "");
+
+	if (last && !flushsnaps)
+		ceph_check_caps(ci, 0);
+	else if (flushsnaps)
+		ceph_flush_snaps(ci);
+	if (wake)
+		wake_up(&ci->i_cap_wq);
+}
+
+/*
+ * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap
+ * context.  Adjust per-snap dirty page accounting as appropriate.
+ * Once all dirty data for a cap_snap is flushed, flush snapped file
+ * metadata back to the MDS.  If we dropped the last ref, call
+ * ceph_check_caps.
+ */
+void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
+				struct ceph_snap_context *snapc)
+{
+	struct inode *inode = &ci->vfs_inode;
+	int last = 0;
+	int last_snap = 0;
+	int found = 0;
+	struct list_head *p;
+	struct ceph_cap_snap *capsnap = NULL;
+
+	spin_lock(&inode->i_lock);
+	ci->i_wrbuffer_ref -= nr;
+	last = !ci->i_wrbuffer_ref;
+
+	if (ci->i_head_snapc == snapc) {
+		ci->i_wrbuffer_ref_head -= nr;
+		if (!ci->i_wrbuffer_ref_head) {
+			ceph_put_snap_context(ci->i_head_snapc);
+			ci->i_head_snapc = NULL;
+		}
+		dout(30, "put_wrbuffer_cap_refs on %p head %d/%d -> %d/%d %s\n",
+		     inode,
+		     ci->i_wrbuffer_ref+nr, ci->i_wrbuffer_ref_head+nr,
+		     ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
+		     last ? " LAST" : "");
+	} else {
+		list_for_each(p, &ci->i_cap_snaps) {
+			capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
+			if (capsnap->context == snapc) {
+				found = 1;
+				capsnap->dirty -= nr;
+				last_snap = !capsnap->dirty;
+				break;
+			}
+		}
+		BUG_ON(!found);
+		dout(30, "put_wrbuffer_cap_refs on %p cap_snap %p "
+		     " snap %lld %d/%d -> %d/%d %s%s\n",
+		     inode, capsnap, capsnap->context->seq,
+		     ci->i_wrbuffer_ref+nr, capsnap->dirty + nr,
+		     ci->i_wrbuffer_ref, capsnap->dirty,
+		     last ? " (wrbuffer last)" : "",
+		     last_snap ? " (capsnap last)" : "");
+	}
+
+	spin_unlock(&inode->i_lock);
+
+	if (last) {
+		ceph_check_caps(ci, 0);
+	} else if (last_snap) {
+		ceph_flush_snaps(ci);
+		wake_up(&ci->i_cap_wq);
+	}
+}
+
+
+
+/*
+ * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
+ * actually be a revocation if it specifies a smaller cap set.)
+ *
+ * caller holds s_mutex.  NOT snap_rwsem.
+ * return value:
+ *  0 - ok
+ *  1 - send the msg back to mds
+ */
+static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
+			    struct ceph_mds_session *session)
+{
+	struct ceph_cap *cap;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int mds = session->s_mds;
+	int seq = le32_to_cpu(grant->seq);
+	int newcaps = le32_to_cpu(grant->caps);
+	int used;
+	int issued; /* to me, before */
+	int wanted;
+	int reply = 0;
+	u64 size = le64_to_cpu(grant->size);
+	u64 max_size = le64_to_cpu(grant->max_size);
+	struct timespec mtime, atime, ctime;
+	int wake = 0;
+	int writeback = 0;
+	int revoked_rdcache = 0;
+	int invalidate_async = 0;
+	int tried_invalidate = 0;
+	int ret;
+
+	dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d\n",
+	     inode, ci, mds, seq);
+	dout(10, " size %llu max_size %llu, i_size %llu\n", size, max_size,
+		inode->i_size);
+	spin_lock(&inode->i_lock);
+start:
+
+	/* do we have this cap? */
+	cap = __get_cap_for_mds(inode, mds);
+	if (!cap) {
+		/*
+		 * then ignore.  never reply to cap messages out of turn,
+		 * or we'll be mixing up different instances of caps on the
+		 * same inode, and confuse the mds.
+		 */
+		dout(10, "no cap on %p ino %llx.%llx from mds%d, ignoring\n",
+		     inode, ci->i_vino.ino, ci->i_vino.snap, mds);
+		goto out;
+	}
+	dout(10, " cap %p\n", cap);
+	cap->gen = session->s_cap_gen;
+
+	/*
+	 * Each time we receive RDCACHE anew, we increment i_rdcache_gen.
+	 */
+	if ((newcaps & CEPH_CAP_RDCACHE) &&            /* we just got RDCACHE */
+	    (cap->issued & CEPH_CAP_RDCACHE) == 0 &&    /* and didn't have it */
+	    (__ceph_caps_issued(ci, NULL) & CEPH_CAP_RDCACHE) == 0)
+		ci->i_rdcache_gen++;
+
+	/*
+	 * If RDCACHE is being revoked, and we have no dirty buffers,
+	 * try to invalidate (once).  (If there are dirty buffers, we
+	 * will invalidate _after_ writeback.)
+	 */
+	if (((cap->issued & ~newcaps) & CEPH_CAP_RDCACHE) &&
+	    !ci->i_wrbuffer_ref && !tried_invalidate) {
+		dout(10, "RDCACHE invalidation\n");
+		spin_unlock(&inode->i_lock);
+		tried_invalidate = 1;
+
+		ret = invalidate_inode_pages2(&inode->i_data);
+		spin_lock(&inode->i_lock);
+		if (ret < 0) {
+			/* there were locked pages.. invalidate later
+			   in a separate thread. */
+			if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
+				invalidate_async = 1;
+				ci->i_rdcache_revoking = ci->i_rdcache_gen;
+			}
+		} else {
+			/* we successfully invalidated those pages */
+			revoked_rdcache = 1;
+			ci->i_rdcache_gen = 0;
+			ci->i_rdcache_revoking = 0;
+		}
+		goto start;
+	}
+
+
+	/* size/ctime/mtime/atime? */
+	issued = __ceph_caps_issued(ci, NULL);
+	ceph_decode_timespec(&mtime, &grant->mtime);
+	ceph_decode_timespec(&atime, &grant->atime);
+	ceph_decode_timespec(&ctime, &grant->ctime);
+	ceph_fill_file_bits(inode, issued,
+			    le64_to_cpu(grant->truncate_seq), size,
+			    le64_to_cpu(grant->time_warp_seq), &ctime, &mtime,
+			    &atime);
+
+	/* max size increase? */
+	if (max_size != ci->i_max_size) {
+		dout(10, "max_size %lld -> %llu\n", ci->i_max_size, max_size);
+		ci->i_max_size = max_size;
+		if (max_size >= ci->i_wanted_max_size) {
+			ci->i_wanted_max_size = 0;  /* reset */
+			ci->i_requested_max_size = 0;
+		}
+		wake = 1;
+	}
+
+	/* check cap bits */
+	wanted = __ceph_caps_wanted(ci);
+	used = __ceph_caps_used(ci);
+	dout(10, " my wanted = %d, used = %d\n", wanted, used);
+	if (wanted != le32_to_cpu(grant->wanted)) {
+		dout(10, "mds wanted %d -> %d\n", le32_to_cpu(grant->wanted),
+		     wanted);
+		grant->wanted = cpu_to_le32(wanted);
+	}
+
+	cap->seq = seq;
+
+	/* file layout may have changed */
+	ci->i_layout = grant->layout;
+
+	/* revocation? */
+	if (cap->issued & ~newcaps) {
+		dout(10, "revocation: %d -> %d\n", cap->issued, newcaps);
+		if ((used & ~newcaps) & CEPH_CAP_WRBUFFER) {
+			writeback = 1; /* will delay ack */
+		} else if (((used & ~newcaps) & CEPH_CAP_RDCACHE) == 0 ||
+			   revoked_rdcache) {
+			/*
+			 * we're not using revoked caps.. ack now.
+			 * re-use incoming message.
+			 */
+			cap->implemented = newcaps;
+
+			grant->size = cpu_to_le64(inode->i_size);
+			grant->max_size = 0;  /* don't re-request */
+			ceph_encode_timespec(&grant->mtime, &inode->i_mtime);
+			ceph_encode_timespec(&grant->atime, &inode->i_atime);
+			grant->time_warp_seq = cpu_to_le64(ci->i_time_warp_seq);
+			grant->snap_follows =
+			     cpu_to_le64(ci->i_snap_realm->cached_context->seq);
+			reply = 1;
+			wake = 1;
+		}
+		cap->issued = newcaps;
+		goto out;
+	}
+
+	/* grant or no-op */
+	if (cap->issued == newcaps) {
+		dout(10, "caps unchanged: %d -> %d\n", cap->issued, newcaps);
+	} else {
+		dout(10, "grant: %d -> %d\n", cap->issued, newcaps);
+		cap->issued = newcaps;
+		cap->implemented |= newcaps;    /* add bits only, to
+						 * avoid stepping on a
+						 * pending revocation */
+		wake = 1;
+	}
+
+out:
+	spin_unlock(&inode->i_lock);
+	if (writeback) {
+		/*
+		 * queue inode for writeback: we can't actually call
+		 * filemap_write_and_wait, etc. from message handler
+		 * context.
+		 */
+		dout(10, "queueing %p for writeback\n", inode);
+		if (ceph_queue_writeback(inode))
+			igrab(inode);
+	}
+	if (invalidate_async) {
+		dout(10, "queueing %p for page invalidation\n", inode);
+		if (ceph_queue_page_invalidation(inode))
+			igrab(inode);
+	}
+	if (wake)
+		wake_up(&ci->i_cap_wq);
+	return reply;
+}
+
+
+/*
+ * Handle RELEASE from MDS.  That means we can throw away our cap
+ * state as the MDS has fully flushed that metadata to disk.
+ */
+static void handle_cap_released(struct inode *inode,
+				struct ceph_mds_caps *m,
+				struct ceph_mds_session *session)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int seq = le32_to_cpu(m->seq);
+	int removed_last;
+	struct ceph_cap *cap;
+
+	dout(10, "handle_cap_released inode %p ci %p mds%d seq %d\n", inode, ci,
+	     session->s_mds, seq);
+
+	spin_lock(&inode->i_lock);
+	cap = __get_cap_for_mds(inode, session->s_mds);
+	BUG_ON(!cap);
+	removed_last = __ceph_remove_cap(cap);
+	if (removed_last)
+		__cap_delay_cancel(&ceph_inode_to_client(inode)->mdsc, ci);
+	spin_unlock(&inode->i_lock);
+	if (removed_last)
+		iput(inode);
+}
+
+
+/*
+ * Handle FLUSHEDSNAP.  MDS has flushed snap data to disk and we can
+ * throw away our cap_snap.
+ *
+ * Caller hold s_mutex, snap_rwsem.
+ */
+static void handle_cap_flushedsnap(struct inode *inode,
+				   struct ceph_mds_caps *m,
+				   struct ceph_mds_session *session)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	u64 follows = le64_to_cpu(m->snap_follows);
+	struct list_head *p;
+	struct ceph_cap_snap *capsnap;
+	int drop = 0;
+
+	dout(10, "handle_cap_flushedsnap inode %p ci %p mds%d follows %lld\n",
+	     inode, ci, session->s_mds, follows);
+
+	spin_lock(&inode->i_lock);
+	list_for_each(p, &ci->i_cap_snaps) {
+		capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
+		if (capsnap->follows == follows) {
+			WARN_ON(capsnap->dirty || capsnap->writing);
+			dout(10, " removing cap_snap %p follows %lld\n",
+			     capsnap, follows);
+			ceph_put_snap_context(capsnap->context);
+			list_del(&capsnap->ci_item);
+			kfree(capsnap);
+			drop = 1;
+			break;
+		} else {
+			dout(10, " skipping cap_snap %p follows %lld\n",
+			     capsnap, capsnap->follows);
+		}
+	}
+	spin_unlock(&inode->i_lock);
+	if (drop)
+		iput(inode);
+}
+
+
+/*
+ * Handle TRUNC from MDS, indicating file truncation.
+ *
+ * caller hold s_mutex, NOT snap_rwsem.
+ */
+static void handle_cap_trunc(struct inode *inode,
+			     struct ceph_mds_caps *trunc,
+			     struct ceph_mds_session *session)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int mds = session->s_mds;
+	int seq = le32_to_cpu(trunc->seq);
+	u64 size = le64_to_cpu(trunc->size);
+	int queue_trunc = 0;
+
+	dout(10, "handle_cap_trunc inode %p ci %p mds%d seq %d\n", inode, ci,
+	     mds, seq);
+
+	/*
+	 * vmtruncate lazily; we can't block on i_mutex in the message
+	 * handler path, or we deadlock against osd op replies needed
+	 * to complete the writes holding i_lock.  vmtruncate will
+	 * also block on page locks held by writes...
+	 *
+	 * if its an expansion, and there is no truncate pending, we
+	 * don't need to truncate.
+	 */
+	spin_lock(&inode->i_lock);
+	if (ci->i_vmtruncate_to < 0 && size > inode->i_size) {
+		dout(10, "clean fwd truncate, no vmtruncate needed\n");
+	} else if (ci->i_vmtruncate_to >= 0 && size >= ci->i_vmtruncate_to) {
+		dout(10, "trunc to %lld < %lld already queued\n",
+		     ci->i_vmtruncate_to, size);
+	} else {
+		/* we need to trunc even smaller */
+		dout(10, "queueing trunc %lld -> %lld\n", inode->i_size, size);
+		ci->i_vmtruncate_to = size;
+		queue_trunc = 1;
+	}
+	i_size_write(inode, size);
+	ci->i_reported_size = size;
+	spin_unlock(&inode->i_lock);
+
+	if (queue_trunc) {
+		if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
+			   &ci->i_vmtruncate_work))
+			   igrab(inode);
+	}
+}
+
+/*
+ * Handle EXPORT from MDS.  Cap is being migrated _from_ this mds to a
+ * different one.  If we are the most recent migration we've seen (as
+ * indicated by mseq), make note of the migrating cap bits for the
+ * duration (until we see the corresponding IMPORT).
+ *
+ * caller holds s_mutex, snap_rwsem
+ */
+static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
+			      struct ceph_mds_session *session)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int mds = session->s_mds;
+	unsigned mseq = le32_to_cpu(ex->migrate_seq);
+	struct ceph_cap *cap = NULL, *t;
+	struct rb_node *p;
+	int was_last = 0;
+	int remember = 1;
+
+	dout(10, "handle_cap_export inode %p ci %p mds%d mseq %d\n",
+	     inode, ci, mds, mseq);
+
+	spin_lock(&inode->i_lock);
+
+	/* make sure we haven't seen a higher mseq */
+	for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+		t = rb_entry(p, struct ceph_cap, ci_node);
+		if (t->mseq > mseq) {
+			dout(10, " higher mseq on cap from mds%d\n",
+			     t->session->s_mds);
+			remember = 0;
+		}
+		if (t->session->s_mds == mds)
+			cap = t;
+	}
+
+	if (cap) {
+		if (remember) {
+			/* make note */
+			ci->i_cap_exporting_mds = mds;
+			ci->i_cap_exporting_mseq = mseq;
+			ci->i_cap_exporting_issued = cap->issued;
+		}
+		was_last = __ceph_remove_cap(cap);
+	} else {
+		WARN_ON(!cap);
+	}
+
+	spin_unlock(&inode->i_lock);
+	if (was_last)
+		iput(inode);
+}
+
+/*
+ * Handle cap IMPORT.  If there are temp bits from an older EXPORT,
+ * clean them up.
+ *
+ * caller holds s_mutex, snap_rwsem
+ */
+static void handle_cap_import(struct inode *inode, struct ceph_mds_caps *im,
+			      struct ceph_mds_session *session,
+			      void *snaptrace, int snaptrace_len)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int mds = session->s_mds;
+	unsigned issued = le32_to_cpu(im->caps);
+	unsigned seq = le32_to_cpu(im->seq);
+	unsigned mseq = le32_to_cpu(im->migrate_seq);
+
+	if (ci->i_cap_exporting_mds >= 0 &&
+	    ci->i_cap_exporting_mseq < mseq) {
+		dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d"
+		     " - cleared exporting from mds%d\n",
+		     inode, ci, mds, mseq,
+		     ci->i_cap_exporting_mds);
+		ci->i_cap_exporting_issued = 0;
+		ci->i_cap_exporting_mseq = 0;
+		ci->i_cap_exporting_mds = -1;
+	} else {
+		dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d\n",
+		     inode, ci, mds, mseq);
+	}
+
+	ceph_add_cap(inode, session, -1, issued, seq, mseq,
+		     snaptrace, snaptrace_len, NULL);
+}
+
+
+/*
+ * Handle a CEPH_CAPS message from the MDS.
+ *
+ * Identify the appropriate session, inode, and call the right handler
+ * based on the cap op.  Take read or write lock on snap_rwsem as
+ * appropriate.
+ */
+void ceph_handle_caps(struct ceph_mds_client *mdsc,
+		      struct ceph_msg *msg)
+{
+	struct super_block *sb = mdsc->client->sb;
+	struct ceph_mds_session *session;
+	struct inode *inode;
+	struct ceph_mds_caps *h;
+	int mds = le32_to_cpu(msg->hdr.src.name.num);
+	int op;
+	u32 seq;
+	struct ceph_vino vino;
+	u64 size, max_size;
+	int check_caps = 0;
+
+	dout(10, "handle_caps from mds%d\n", mds);
+
+	/* decode */
+	if (msg->front.iov_len < sizeof(*h))
+		goto bad;
+	h = msg->front.iov_base;
+	op = le32_to_cpu(h->op);
+	vino.ino = le64_to_cpu(h->ino);
+	vino.snap = CEPH_NOSNAP;
+	seq = le32_to_cpu(h->seq);
+	size = le64_to_cpu(h->size);
+	max_size = le64_to_cpu(h->max_size);
+
+	/* find session */
+	mutex_lock(&mdsc->mutex);
+	session = __ceph_get_mds_session(mdsc, mds);
+	if (session)
+		down_write(&mdsc->snap_rwsem);
+	mutex_unlock(&mdsc->mutex);
+	if (!session) {
+		dout(10, "WTF, got cap but no session for mds%d\n", mds);
+		return;
+	}
+
+	mutex_lock(&session->s_mutex);
+	session->s_seq++;
+	dout(20, " mds%d seq %lld\n", session->s_mds, session->s_seq);
+
+	/* lookup ino */
+	inode = ceph_find_inode(sb, vino);
+	dout(20, " op %s ino %llx inode %p\n", ceph_cap_op_name(op), vino.ino,
+	     inode);
+	if (!inode) {
+		dout(10, " i don't have ino %llx, sending release\n", vino.ino);
+		send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq,
+			     size, 0, 0, NULL, NULL, 0, 0, mds);
+		goto no_inode;
+	}
+
+	switch (op) {
+	case CEPH_CAP_OP_GRANT:
+		up_write(&mdsc->snap_rwsem);
+		if (handle_cap_grant(inode, h, session) == 1) {
+			dout(10, " sending reply back to mds%d\n", mds);
+			ceph_msg_get(msg);
+			ceph_send_msg_mds(mdsc, msg, mds);
+		}
+		break;
+
+	case CEPH_CAP_OP_TRUNC:
+		up_write(&mdsc->snap_rwsem);
+		handle_cap_trunc(inode, h, session);
+		break;
+
+	case CEPH_CAP_OP_RELEASED:
+		handle_cap_released(inode, h, session);
+		up_write(&mdsc->snap_rwsem);
+		if (list_empty(&session->s_caps))
+			ceph_mdsc_flushed_all_caps(mdsc, session);
+		break;
+
+	case CEPH_CAP_OP_FLUSHEDSNAP:
+		handle_cap_flushedsnap(inode, h, session);
+		up_write(&mdsc->snap_rwsem);
+		break;
+
+	case CEPH_CAP_OP_EXPORT:
+		handle_cap_export(inode, h, session);
+		up_write(&mdsc->snap_rwsem);
+		if (list_empty(&session->s_caps))
+			ceph_mdsc_flushed_all_caps(mdsc, session);
+		break;
+
+	case CEPH_CAP_OP_IMPORT:
+		handle_cap_import(inode, h, session,
+				  msg->front.iov_base + sizeof(*h),
+				  le32_to_cpu(h->snap_trace_len));
+		up_write(&mdsc->snap_rwsem);
+		check_caps = 1; /* we may have sent a RELEASE to the old auth */
+		break;
+
+	default:
+		up_write(&mdsc->snap_rwsem);
+		derr(10, " unknown cap op %d %s\n", op, ceph_cap_op_name(op));
+	}
+
+no_inode:
+	mutex_unlock(&session->s_mutex);
+	ceph_put_mds_session(session);
+
+	if (check_caps)
+		ceph_check_caps(ceph_inode(inode), 1);
+	if (inode)
+		iput(inode);
+	return;
+
+bad:
+	derr(10, "corrupt caps message\n");
+	return;
+}
+
+
+/*
+ * Delayed work handler to process end of delayed cap release LRU list.
+ */
+void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
+{
+	struct ceph_inode_info *ci;
+
+	dout(10, "check_delayed_caps\n");
+	while (1) {
+		spin_lock(&mdsc->cap_delay_lock);
+		if (list_empty(&mdsc->cap_delay_list))
+			break;
+		ci = list_first_entry(&mdsc->cap_delay_list,
+				      struct ceph_inode_info,
+				      i_cap_delay_list);
+		if (time_before(jiffies, ci->i_hold_caps_until))
+			break;
+		list_del_init(&ci->i_cap_delay_list);
+		spin_unlock(&mdsc->cap_delay_lock);
+		dout(10, "check_delayed_caps on %p\n", &ci->vfs_inode);
+		ceph_check_caps(ci, 1);
+		iput(&ci->vfs_inode);
+	}
+	spin_unlock(&mdsc->cap_delay_lock);
+}
+
-- 
1.5.6.5

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ