[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1226624178-3761-14-git-send-email-sage@newdream.net>
Date: Thu, 13 Nov 2008 16:56:12 -0800
From: Sage Weil <sage@...dream.net>
To: linux-fsdevel@...r.kernel.org
Cc: linux-kernel@...r.kernel.org, Sage Weil <sage@...dream.net>
Subject: [PATCH 13/19] ceph: capability management
The Ceph metadata servers control client access to data by issuing
capabilities granting clients permission to read and/or write to OSDs
(storage nodes). Each capability consists of a set of bits indicating
which operations are allowed.
Signed-off-by: Sage Weil <sage@...dream.net>
---
fs/ceph/caps.c | 1464 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 1464 insertions(+), 0 deletions(-)
create mode 100644 fs/ceph/caps.c
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
new file mode 100644
index 0000000..bdc761e
--- /dev/null
+++ b/fs/ceph/caps.c
@@ -0,0 +1,1464 @@
+#include <linux/fs.h>
+#include <linux/kernel.h>
+#include <linux/sched.h>
+#include <linux/wait.h>
+
+#include "ceph_debug.h"
+
+int ceph_debug_caps = -1;
+#define DOUT_MASK DOUT_MASK_CAPS
+#define DOUT_VAR ceph_debug_caps
+#define DOUT_PREFIX "caps: "
+#include "super.h"
+
+#include "decode.h"
+#include "messenger.h"
+
+
+/*
+ * Find ceph_cap for given mds, if any.
+ *
+ * Called with i_lock held.
+ */
+static struct ceph_cap *__get_cap_for_mds(struct inode *inode, int mds)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_cap *cap;
+ struct rb_node *n = ci->i_caps.rb_node;
+
+ while (n) {
+ cap = rb_entry(n, struct ceph_cap, ci_node);
+ if (mds < cap->mds)
+ n = n->rb_left;
+ else if (mds > cap->mds)
+ n = n->rb_right;
+ else
+ return cap;
+ }
+ return NULL;
+}
+
+/*
+ * Return id of any MDS with a cap, preferably WR|WRBUFFER|EXCL, else
+ * -1.
+ */
+static int __ceph_get_cap_mds(struct ceph_inode_info *ci, u32 *mseq)
+{
+ struct ceph_cap *cap;
+ int mds = -1;
+ struct rb_node *p;
+
+ /* prefer mds with WR|WRBUFFER|EXCL caps */
+ for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+ cap = rb_entry(p, struct ceph_cap, ci_node);
+ mds = cap->mds;
+ if (mseq)
+ *mseq = cap->mseq;
+ if (cap->issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL))
+ break;
+ }
+ return mds;
+}
+
+int ceph_get_cap_mds(struct inode *inode)
+{
+ int mds;
+ spin_lock(&inode->i_lock);
+ mds = __ceph_get_cap_mds(ceph_inode(inode), NULL);
+ spin_unlock(&inode->i_lock);
+ return mds;
+}
+
+/*
+ * Called under i_lock.
+ */
+static void __insert_cap_node(struct ceph_inode_info *ci,
+ struct ceph_cap *new)
+{
+ struct rb_node **p = &ci->i_caps.rb_node;
+ struct rb_node *parent = NULL;
+ struct ceph_cap *cap = NULL;
+
+ while (*p) {
+ parent = *p;
+ cap = rb_entry(parent, struct ceph_cap, ci_node);
+ if (new->mds < cap->mds)
+ p = &(*p)->rb_left;
+ else if (new->mds > cap->mds)
+ p = &(*p)->rb_right;
+ else
+ BUG();
+ }
+
+ rb_link_node(&new->ci_node, parent, p);
+ rb_insert_color(&new->ci_node, &ci->i_caps);
+}
+
+/*
+ * Add a capability under the given MDS session, after processing
+ * the snapblob (to update the snap realm hierarchy).
+ *
+ * Bump i_count when adding it's first cap.
+ *
+ * Caller should hold session snap_rwsem, s_mutex.
+ *
+ * @fmode can be negative, in which case it is ignored.
+ */
+int ceph_add_cap(struct inode *inode,
+ struct ceph_mds_session *session,
+ int fmode, unsigned issued,
+ unsigned seq, unsigned mseq,
+ void *snapblob, int snapblob_len,
+ struct ceph_cap *new_cap)
+{
+ struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_cap *cap;
+ struct ceph_snap_realm *realm;
+ int mds = session->s_mds;
+ int is_first = 0;
+
+ realm = ceph_update_snap_trace(mdsc, snapblob, snapblob+snapblob_len,
+ false /* not a deletion */);
+
+ dout(10, "add_cap on %p mds%d cap %d seq %d\n", inode,
+ session->s_mds, issued, seq);
+retry:
+ spin_lock(&inode->i_lock);
+ cap = __get_cap_for_mds(inode, mds);
+ if (!cap) {
+ if (new_cap) {
+ cap = new_cap;
+ new_cap = NULL;
+ } else {
+ spin_unlock(&inode->i_lock);
+ new_cap = kmalloc(sizeof(*cap), GFP_NOFS);
+ if (new_cap == NULL) {
+ ceph_put_snap_realm(mdsc, realm);
+ return -ENOMEM;
+ }
+ goto retry;
+ }
+
+ cap->issued = cap->implemented = 0;
+ cap->mds = mds;
+
+ is_first = RB_EMPTY_ROOT(&ci->i_caps); /* grab inode later */
+ cap->ci = ci;
+ __insert_cap_node(ci, cap);
+
+ /* add to session cap list */
+ cap->session = session;
+ list_add(&cap->session_caps, &session->s_caps);
+ session->s_nr_caps++;
+
+ /* clear out old exporting info? (i.e. on cap import) */
+ if (ci->i_cap_exporting_mds == mds) {
+ ci->i_cap_exporting_issued = 0;
+ ci->i_cap_exporting_mseq = 0;
+ ci->i_cap_exporting_mds = -1;
+ }
+ }
+ if (!ci->i_snap_realm) {
+ ci->i_snap_realm = realm;
+ list_add(&ci->i_snap_realm_item, &realm->inodes_with_caps);
+ } else {
+ ceph_put_snap_realm(mdsc, realm);
+ }
+
+ dout(10, "add_cap inode %p (%llx.%llx) cap %xh now %xh seq %d mds%d\n",
+ inode, ceph_vinop(inode), issued, issued|cap->issued, seq, mds);
+ cap->issued |= issued;
+ cap->implemented |= issued;
+ cap->seq = seq;
+ cap->mseq = mseq;
+ cap->gen = session->s_cap_gen;
+ if (fmode >= 0)
+ __ceph_get_fmode(ci, fmode);
+ spin_unlock(&inode->i_lock);
+ if (is_first)
+ igrab(inode);
+ kfree(new_cap);
+ return 0;
+}
+
+/*
+ * Return set of valid cap bits issued to us. Note that caps time
+ * out, and may be invalidated in bulk if the client session times out
+ * and session->s_cap_gen is bumped.
+ */
+int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
+{
+ int have = ci->i_snap_caps;
+ struct ceph_cap *cap;
+ u32 gen;
+ unsigned long ttl;
+ struct rb_node *p;
+
+ for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+ cap = rb_entry(p, struct ceph_cap, ci_node);
+
+ spin_lock(&cap->session->s_cap_lock);
+ gen = cap->session->s_cap_gen;
+ ttl = cap->session->s_cap_ttl;
+ spin_unlock(&cap->session->s_cap_lock);
+
+ if (cap->gen < gen || time_after_eq(jiffies, ttl)) {
+ dout(30, "__ceph_caps_issued %p cap %p issued %d "
+ "but STALE (gen %u vs %u)\n", &ci->vfs_inode,
+ cap, cap->issued, cap->gen, gen);
+ continue;
+ }
+ dout(30, "__ceph_caps_issued %p cap %p issued %d\n",
+ &ci->vfs_inode, cap, cap->issued);
+ have |= cap->issued;
+ if (implemented)
+ *implemented |= cap->implemented;
+ }
+ return have;
+}
+
+/*
+ * caller should hold i_lock, snap_rwsem, and session s_mutex.
+ * returns true if this is the last cap. if so, caller should iput.
+ */
+static int __ceph_remove_cap(struct ceph_cap *cap)
+{
+ struct ceph_mds_session *session = cap->session;
+ struct ceph_inode_info *ci = cap->ci;
+ struct ceph_mds_client *mdsc = &ceph_client(ci->vfs_inode.i_sb)->mdsc;
+
+ dout(20, "__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
+
+ /* remove from session list */
+ list_del_init(&cap->session_caps);
+ session->s_nr_caps--;
+
+ /* remove from inode list */
+ rb_erase(&cap->ci_node, &ci->i_caps);
+ cap->session = NULL;
+
+ kfree(cap);
+
+ if (RB_EMPTY_ROOT(&ci->i_caps)) {
+ list_del_init(&ci->i_snap_realm_item);
+ ceph_put_snap_realm(mdsc, ci->i_snap_realm);
+ ci->i_snap_realm = NULL;
+ return 1;
+ }
+ return 0;
+}
+
+/*
+ * caller should hold snap_rwsem and session s_mutex.
+ */
+void ceph_remove_cap(struct ceph_cap *cap)
+{
+ struct inode *inode = &cap->ci->vfs_inode;
+ int was_last;
+
+ spin_lock(&inode->i_lock);
+ was_last = __ceph_remove_cap(cap);
+ spin_unlock(&inode->i_lock);
+ if (was_last)
+ iput(inode);
+}
+
+/*
+ *
+ * (Re)queue cap at the end of the delayed cap release list.
+ *
+ * Caller holds i_lock
+ * -> we take mdsc->cap_delay_lock
+ */
+static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
+ struct ceph_inode_info *ci)
+{
+ ci->i_hold_caps_until = round_jiffies(jiffies + HZ * 5);
+ dout(10, "__cap_delay_requeue %p at %lu\n", &ci->vfs_inode,
+ ci->i_hold_caps_until);
+ spin_lock(&mdsc->cap_delay_lock);
+ if (!mdsc->stopping) {
+ if (list_empty(&ci->i_cap_delay_list))
+ igrab(&ci->vfs_inode);
+ else
+ list_del_init(&ci->i_cap_delay_list);
+ list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
+ }
+ spin_unlock(&mdsc->cap_delay_lock);
+}
+
+/*
+ * Cancel delayed work on cap.
+ * caller hold s_mutex, snap_rwsem.
+ */
+static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
+ struct ceph_inode_info *ci)
+{
+ dout(10, "__cap_delay_cancel %p\n", &ci->vfs_inode);
+ if (list_empty(&ci->i_cap_delay_list))
+ return;
+ spin_lock(&mdsc->cap_delay_lock);
+ list_del_init(&ci->i_cap_delay_list);
+ spin_unlock(&mdsc->cap_delay_lock);
+ iput(&ci->vfs_inode);
+}
+
+/*
+ * Build and send a cap message to the given MDS.
+ *
+ * Caller should be holding s_mutex.
+ */
+static void send_cap_msg(struct ceph_mds_client *mdsc, u64 ino, int op,
+ int caps, int wanted, u64 seq, u64 mseq,
+ u64 size, u64 max_size,
+ struct timespec *mtime, struct timespec *atime,
+ u64 time_warp_seq, u64 follows, int mds)
+{
+ struct ceph_mds_caps *fc;
+ struct ceph_msg *msg;
+
+ dout(10, "send_cap_msg %s %llx caps %d wanted %d seq %llu/%llu"
+ " follows %lld size %llu\n", ceph_cap_op_name(op), ino,
+ caps, wanted, seq, mseq, follows, size);
+
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), 0, 0, NULL);
+ if (IS_ERR(msg))
+ return;
+
+ fc = msg->front.iov_base;
+
+ memset(fc, 0, sizeof(*fc));
+
+ fc->op = cpu_to_le32(op);
+ fc->seq = cpu_to_le32(seq);
+ fc->migrate_seq = cpu_to_le32(mseq);
+ fc->caps = cpu_to_le32(caps);
+ fc->wanted = cpu_to_le32(wanted);
+ fc->ino = cpu_to_le64(ino);
+ fc->size = cpu_to_le64(size);
+ fc->max_size = cpu_to_le64(max_size);
+ fc->snap_follows = cpu_to_le64(follows);
+ if (mtime)
+ ceph_encode_timespec(&fc->mtime, mtime);
+ if (atime)
+ ceph_encode_timespec(&fc->atime, atime);
+ fc->time_warp_seq = cpu_to_le64(time_warp_seq);
+
+ ceph_send_msg_mds(mdsc, msg, mds);
+}
+
+/*
+ *
+ * Send a cap msg on the given inode. Make note of max_size
+ * reported/requested from mds, revoked caps that have now been
+ * implemented.
+ *
+ * Also, try to invalidate page cache if we are dropping RDCACHE.
+ * Note that this will leave behind any locked pages... FIXME!
+ *
+ * called with i_lock, then drops it.
+ * caller should hold snap_rwsem, s_mutex.
+ */
+static void __send_cap(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ struct ceph_cap *cap,
+ int used, int wanted) __releases(cap->ci->vfs_inode->i_lock)
+{
+ struct ceph_inode_info *ci = cap->ci;
+ struct inode *inode = &ci->vfs_inode;
+ int revoking = cap->implemented & ~cap->issued;
+ int dropping = cap->issued & ~wanted;
+ int keep;
+ u64 seq, mseq, time_warp_seq, follows;
+ u64 size, max_size;
+ struct timespec mtime, atime;
+ int wake = 0;
+ int op = CEPH_CAP_OP_ACK;
+
+ if (wanted == 0)
+ op = CEPH_CAP_OP_RELEASE;
+
+ dout(10, "__send_cap cap %p session %p %d -> %d\n", cap, cap->session,
+ cap->issued, cap->issued & wanted);
+ cap->issued &= wanted; /* drop bits we don't want */
+
+ if (revoking && (revoking && used) == 0) {
+ cap->implemented = cap->issued;
+ /*
+ * Wake up any waiters on wanted -> needed transition.
+ * This is due to the weird transition from buffered
+ * to sync IO... we need to flush dirty pages _before_
+ * allowing sync writes to avoid reordering.
+ */
+ wake = 1;
+ }
+
+ keep = cap->issued;
+ seq = cap->seq;
+ mseq = cap->mseq;
+ size = inode->i_size;
+ ci->i_reported_size = size;
+ max_size = ci->i_wanted_max_size;
+ ci->i_requested_max_size = max_size;
+ mtime = inode->i_mtime;
+ atime = inode->i_atime;
+ time_warp_seq = ci->i_time_warp_seq;
+ follows = ci->i_snap_realm->cached_context->seq;
+ spin_unlock(&inode->i_lock);
+
+ if (dropping & CEPH_CAP_RDCACHE) {
+ /* invalidate what we can */
+ dout(20, "invalidating pages on %p\n", inode);
+ invalidate_mapping_pages(&inode->i_data, 0, -1);
+ }
+
+ send_cap_msg(mdsc, ceph_vino(inode).ino,
+ op, keep, wanted, seq, mseq,
+ size, max_size, &mtime, &atime, time_warp_seq,
+ follows, session->s_mds);
+
+ if (wake)
+ wake_up(&ci->i_cap_wq);
+}
+
+
+/*
+ * When a snapshot is taken, clients accumulate "dirty" data on inodes
+ * with capabilities in ceph_cap_snaps to describe the file state at
+ * the time the snapshot was taken. This must be flushed
+ * asynchronously back to the MDS once sync writes complete and dirty
+ * data is written out.
+ *
+ * Called under i_lock. Takes s_mutex as needed.
+ */
+void __ceph_flush_snaps(struct ceph_inode_info *ci,
+ struct ceph_mds_session **psession)
+{
+ struct inode *inode = &ci->vfs_inode;
+ int mds;
+ struct list_head *p;
+ struct ceph_cap_snap *capsnap;
+ u64 follows;
+ int issued;
+ u64 size;
+ struct timespec mtime, atime, ctime;
+ u64 time_warp_seq;
+ u32 mseq;
+ struct ceph_mds_client *mdsc = &ceph_inode_to_client(inode)->mdsc;
+ struct ceph_mds_session *session = NULL; /* if session != NULL, we hold
+ session->s_mutex */
+ u64 next_follows = 0; /* keep track of how far we've gotten through the
+ i_cap_snaps list, and skip these entries next time
+ around to avoid an infinite loop */
+
+ if (psession)
+ session = *psession;
+
+ dout(10, "__flush_snaps %p\n", inode);
+retry:
+ list_for_each(p, &ci->i_cap_snaps) {
+ capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
+
+ /* avoid an infiniute loop after retry */
+ if (capsnap->follows < next_follows)
+ continue;
+ /*
+ * we need to wait for sync writes to complete and for dirty
+ * pages to be written out.
+ */
+ if (capsnap->dirty || capsnap->writing)
+ continue;
+
+ /* pick mds, take s_mutex */
+ mds = __ceph_get_cap_mds(ci, &mseq);
+ if (session && session->s_mds != mds) {
+ dout(30, "oops, wrong session %p mutex\n", session);
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ session = NULL;
+ }
+ if (!session) {
+ spin_unlock(&inode->i_lock);
+ mutex_lock(&mdsc->mutex);
+ session = __ceph_get_mds_session(mdsc, mds);
+ mutex_unlock(&mdsc->mutex);
+ if (session) {
+ dout(10, "inverting session/ino locks on %p\n",
+ session);
+ mutex_lock(&session->s_mutex);
+ }
+ /*
+ * if session == NULL, we raced against a cap
+ * deletion. retry, and we'll get a better
+ * @mds value next time.
+ */
+ spin_lock(&inode->i_lock);
+ goto retry;
+ }
+
+ follows = capsnap->follows;
+ size = capsnap->size;
+ atime = capsnap->atime;
+ mtime = capsnap->mtime;
+ ctime = capsnap->ctime;
+ time_warp_seq = capsnap->time_warp_seq;
+ issued = capsnap->issued;
+ spin_unlock(&inode->i_lock);
+
+ dout(10, "flush_snaps %p cap_snap %p follows %lld size %llu\n",
+ inode, capsnap, next_follows, size);
+ send_cap_msg(mdsc, ceph_vino(inode).ino,
+ CEPH_CAP_OP_FLUSHSNAP, issued, 0, 0, mseq,
+ size, 0,
+ &mtime, &atime, time_warp_seq,
+ follows, mds);
+
+ spin_lock(&inode->i_lock);
+ next_follows = follows + 1;
+ goto retry;
+ }
+
+ /* we flushed them all; remove this inode from the queue */
+ spin_lock(&mdsc->snap_flush_lock);
+ list_del_init(&ci->i_snap_flush_item);
+ spin_unlock(&mdsc->snap_flush_lock);
+
+ if (psession)
+ *psession = session;
+ else if (session) {
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ }
+}
+
+void ceph_flush_snaps(struct ceph_inode_info *ci)
+{
+ struct inode *inode = &ci->vfs_inode;
+
+ spin_lock(&inode->i_lock);
+ __ceph_flush_snaps(ci, NULL);
+ spin_unlock(&inode->i_lock);
+}
+
+
+/*
+ * Swiss army knife function to examine currently used, wanted versus
+ * held caps. Release, flush, ack revoked caps to mds as appropriate.
+ *
+ * @is_delayed indicates caller is delayed work and we should not
+ * delay further.
+ */
+void ceph_check_caps(struct ceph_inode_info *ci, int is_delayed)
+{
+ struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
+ struct ceph_mds_client *mdsc = &client->mdsc;
+ struct inode *inode = &ci->vfs_inode;
+ struct ceph_cap *cap;
+ int file_wanted, used;
+ struct ceph_mds_session *session = NULL; /* if set, i hold s_mutex */
+ int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
+ int revoking;
+ int mds = -1; /* keep track of how far we've gone through i_caps list
+ to avoid an infinite loop on retry */
+ struct rb_node *p;
+ int tried_invalidate = 0;
+
+ /* if we are unmounting, flush any unused caps immediately. */
+ if (mdsc->stopping)
+ is_delayed = 1;
+
+ spin_lock(&inode->i_lock);
+
+ /* flush snaps first time around only */
+ if (!list_empty(&ci->i_cap_snaps))
+ __ceph_flush_snaps(ci, &session);
+ goto retry_locked;
+retry:
+ spin_lock(&inode->i_lock);
+retry_locked:
+ file_wanted = __ceph_caps_file_wanted(ci);
+ used = __ceph_caps_used(ci);
+ dout(10, "check_caps %p file_wanted %d used %d issued %d\n",
+ inode, file_wanted, used, __ceph_caps_issued(ci, NULL));
+
+ /*
+ * Reschedule delayed caps release, unless we are called from
+ * the delayed work handler (i.e. this _is_ the delayed release)
+ */
+ if (!is_delayed)
+ __cap_delay_requeue(mdsc, ci);
+
+ /*
+ * If we no longer need to hold onto old our caps, and we may
+ * have cached pages, but don't want them, then try to invalidate.
+ * If we fail, it's because pages are locked.... try again later.
+ */
+ if ((!time_before(jiffies, ci->i_hold_caps_until) || mdsc->stopping) &&
+ ci->i_wrbuffer_ref == 0 && /* no dirty pages... */
+ ci->i_rdcache_gen && /* may have cached pages */
+ file_wanted == 0 && /* no open files */
+ !tried_invalidate) {
+ u32 invalidating_gen = ci->i_rdcache_gen;
+ int ret;
+
+ dout(10, "check_caps trying to invalidate on %p\n", inode);
+ spin_unlock(&inode->i_lock);
+ ret = invalidate_inode_pages2(&inode->i_data);
+ spin_lock(&inode->i_lock);
+ if (ret == 0 && invalidating_gen == ci->i_rdcache_gen) {
+ /* success. */
+ ci->i_rdcache_gen = 0;
+ ci->i_rdcache_revoking = 0;
+ } else {
+ dout(10, "check_caps failed to invalidate pages\n");
+ /* we failed to invalidate pages. check these
+ caps again later. */
+ if (is_delayed)
+ __cap_delay_requeue(mdsc, ci);
+ }
+ tried_invalidate = 1;
+ goto retry_locked;
+ }
+
+ for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+ cap = rb_entry(p, struct ceph_cap, ci_node);
+
+ /* avoid looping forever */
+ if (mds >= cap->mds)
+ continue;
+
+ /* NOTE: no side-effects allowed, until we take s_mutex */
+
+ revoking = cap->implemented & ~cap->issued;
+ if (revoking)
+ dout(10, "mds%d revoking %d\n", cap->mds, revoking);
+
+ /* request larger max_size from MDS? */
+ if (ci->i_wanted_max_size > ci->i_max_size &&
+ ci->i_wanted_max_size > ci->i_requested_max_size)
+ goto ack;
+
+ /* approaching file_max? */
+ if ((cap->issued & CEPH_CAP_WR) &&
+ (inode->i_size << 1) >= ci->i_max_size &&
+ (ci->i_reported_size << 1) < ci->i_max_size) {
+ dout(10, "i_size approaching max_size\n");
+ goto ack;
+ }
+
+ /* completed revocation? going down and there are no caps? */
+ if ((revoking && (revoking & used) == 0) ||
+ (mdsc->stopping && (used == 0))) {
+ dout(10, "completed revocation of %d\n",
+ cap->implemented & ~cap->issued);
+ goto ack;
+ }
+
+ if ((cap->issued & ~(file_wanted | used)) == 0)
+ continue; /* nothing extra, all good */
+
+ /* delay cap release for a bit? */
+ if (!is_delayed &&
+ time_before(jiffies, ci->i_hold_caps_until)) {
+ dout(30, "delaying cap release\n");
+ continue;
+ }
+
+ack:
+ /* take snap_rwsem before session mutex */
+ if (!took_snap_rwsem) {
+ if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
+ dout(10, "inverting snap/in locks on %p\n",
+ inode);
+ spin_unlock(&inode->i_lock);
+ down_read(&mdsc->snap_rwsem);
+ took_snap_rwsem = 1;
+ goto retry;
+ }
+ took_snap_rwsem = 1;
+ }
+ if (session && session != cap->session) {
+ dout(30, "oops, wrong session %p mutex\n", session);
+ mutex_unlock(&session->s_mutex);
+ session = NULL;
+ }
+ if (!session) {
+ session = cap->session;
+ if (mutex_trylock(&session->s_mutex) == 0) {
+ dout(10, "inverting session/ino locks on %p\n",
+ session);
+ spin_unlock(&inode->i_lock);
+ mutex_lock(&session->s_mutex);
+ goto retry;
+ }
+ }
+
+ mds = cap->mds; /* remember mds, so we don't repeat */
+
+ /* __send_cap drops i_lock */
+ __send_cap(mdsc, session, cap, used, used | file_wanted);
+
+ goto retry; /* retake i_lock and restart our cap scan. */
+ }
+
+ /* okay */
+ spin_unlock(&inode->i_lock);
+
+ if (session)
+ mutex_unlock(&session->s_mutex);
+ if (took_snap_rwsem)
+ up_read(&mdsc->snap_rwsem);
+}
+
+
+/*
+ * Track references to capabilities we hold, so that we don't release
+ * them to the MDS prematurely.
+ *
+ * Protected by i_lock.
+ */
+static void __take_cap_refs(struct ceph_inode_info *ci, int got)
+{
+ if (got & CEPH_CAP_RD)
+ ci->i_rd_ref++;
+ if (got & CEPH_CAP_RDCACHE)
+ ci->i_rdcache_ref++;
+ if (got & CEPH_CAP_WR)
+ ci->i_wr_ref++;
+ if (got & CEPH_CAP_WRBUFFER) {
+ ci->i_wrbuffer_ref++;
+ dout(30, "__take_cap_refs %p wrbuffer %d -> %d (?)\n",
+ &ci->vfs_inode, ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref);
+ }
+}
+
+/*
+ * Try to grab cap references. Specify those refs we @want, and the
+ * minimal set we @need. Also include the larger offset we are writing
+ * to (when applicable), and check against max_size here as well.
+ * Note that caller is responsible for ensuring max_size increases are
+ * requested from the MDS.
+ */
+int ceph_get_cap_refs(struct ceph_inode_info *ci, int need, int want, int *got,
+ loff_t endoff)
+{
+ struct inode *inode = &ci->vfs_inode;
+ int ret = 0;
+ int have, implemented;
+
+ dout(30, "get_cap_refs %p need %d want %d\n", inode, need, want);
+ spin_lock(&inode->i_lock);
+ if (need & CEPH_CAP_WR) {
+ if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
+ dout(20, "get_cap_refs %p endoff %llu > maxsize %llu\n",
+ inode, endoff, ci->i_max_size);
+ goto sorry;
+ }
+ /*
+ * If a sync write is in progress, we must wait, so that we
+ * can get a final snapshot value for size+mtime.
+ */
+ if (__ceph_have_pending_cap_snap(ci)) {
+ dout(20, "get_cap_refs %p cap_snap_pending\n", inode);
+ goto sorry;
+ }
+ }
+ have = __ceph_caps_issued(ci, &implemented);
+ /* HACK: force sync writes...
+ have &= ~CEPH_CAP_WRBUFFER;
+ implemented &= ~CEPH_CAP_WRBUFFER;
+ */
+ if ((have & need) == need) {
+ /*
+ * Look at (implemented & ~have & not) so that we keep waiting
+ * on transition from wanted -> needed caps. This is needed
+ * for WRBUFFER|WR -> WR to avoid a new WR sync write from
+ * going before a prior buffered writeback happens.
+ */
+ int not = want & ~(have & need);
+ int revoking = implemented & ~have;
+ dout(30, "get_cap_refs %p have %d but not %d (revoking %d)\n",
+ inode, have, not, revoking);
+ if ((revoking & not) == 0) {
+ *got = need | (have & want);
+ __take_cap_refs(ci, *got);
+ ret = 1;
+ }
+ } else {
+ dout(30, "get_cap_refs %p have %d needed %d\n", inode,
+ have, need);
+ }
+sorry:
+ spin_unlock(&inode->i_lock);
+ dout(30, "get_cap_refs %p ret %d got %d\n", inode,
+ ret, *got);
+ return ret;
+}
+
+/*
+ * Release cap refs.
+ *
+ * If we released the last ref on any given cap, call ceph_check_caps
+ * to release (or schedule a release).
+ *
+ * If we are releasing a WR cap (from a sync write), finalize any affected
+ * cap_snap, and wake up any waiters.
+ */
+void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
+{
+ struct inode *inode = &ci->vfs_inode;
+ int last = 0, flushsnaps = 0, wake = 0;
+ struct ceph_cap_snap *capsnap;
+
+ spin_lock(&inode->i_lock);
+ if (had & CEPH_CAP_RD)
+ if (--ci->i_rd_ref == 0)
+ last++;
+ if (had & CEPH_CAP_RDCACHE)
+ if (--ci->i_rdcache_ref == 0)
+ last++;
+ if (had & CEPH_CAP_WRBUFFER) {
+ if (--ci->i_wrbuffer_ref == 0)
+ last++;
+ dout(30, "put_cap_refs %p wrbuffer %d -> %d (?)\n",
+ inode, ci->i_wrbuffer_ref+1, ci->i_wrbuffer_ref);
+ }
+ if (had & CEPH_CAP_WR)
+ if (--ci->i_wr_ref == 0) {
+ last++;
+ if (!list_empty(&ci->i_cap_snaps)) {
+ capsnap = list_entry(ci->i_cap_snaps.next,
+ struct ceph_cap_snap,
+ ci_item);
+ if (capsnap->writing) {
+ capsnap->writing = 0;
+ flushsnaps =
+ __ceph_finish_cap_snap(ci,
+ capsnap);
+ wake = 1;
+ }
+ }
+ }
+ spin_unlock(&inode->i_lock);
+
+ dout(30, "put_cap_refs %p had %d %s\n", inode, had, last ? "last" : "");
+
+ if (last && !flushsnaps)
+ ceph_check_caps(ci, 0);
+ else if (flushsnaps)
+ ceph_flush_snaps(ci);
+ if (wake)
+ wake_up(&ci->i_cap_wq);
+}
+
+/*
+ * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap
+ * context. Adjust per-snap dirty page accounting as appropriate.
+ * Once all dirty data for a cap_snap is flushed, flush snapped file
+ * metadata back to the MDS. If we dropped the last ref, call
+ * ceph_check_caps.
+ */
+void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
+ struct ceph_snap_context *snapc)
+{
+ struct inode *inode = &ci->vfs_inode;
+ int last = 0;
+ int last_snap = 0;
+ int found = 0;
+ struct list_head *p;
+ struct ceph_cap_snap *capsnap = NULL;
+
+ spin_lock(&inode->i_lock);
+ ci->i_wrbuffer_ref -= nr;
+ last = !ci->i_wrbuffer_ref;
+
+ if (ci->i_head_snapc == snapc) {
+ ci->i_wrbuffer_ref_head -= nr;
+ if (!ci->i_wrbuffer_ref_head) {
+ ceph_put_snap_context(ci->i_head_snapc);
+ ci->i_head_snapc = NULL;
+ }
+ dout(30, "put_wrbuffer_cap_refs on %p head %d/%d -> %d/%d %s\n",
+ inode,
+ ci->i_wrbuffer_ref+nr, ci->i_wrbuffer_ref_head+nr,
+ ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
+ last ? " LAST" : "");
+ } else {
+ list_for_each(p, &ci->i_cap_snaps) {
+ capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
+ if (capsnap->context == snapc) {
+ found = 1;
+ capsnap->dirty -= nr;
+ last_snap = !capsnap->dirty;
+ break;
+ }
+ }
+ BUG_ON(!found);
+ dout(30, "put_wrbuffer_cap_refs on %p cap_snap %p "
+ " snap %lld %d/%d -> %d/%d %s%s\n",
+ inode, capsnap, capsnap->context->seq,
+ ci->i_wrbuffer_ref+nr, capsnap->dirty + nr,
+ ci->i_wrbuffer_ref, capsnap->dirty,
+ last ? " (wrbuffer last)" : "",
+ last_snap ? " (capsnap last)" : "");
+ }
+
+ spin_unlock(&inode->i_lock);
+
+ if (last) {
+ ceph_check_caps(ci, 0);
+ } else if (last_snap) {
+ ceph_flush_snaps(ci);
+ wake_up(&ci->i_cap_wq);
+ }
+}
+
+
+
+/*
+ * Handle a cap GRANT message from the MDS. (Note that a GRANT may
+ * actually be a revocation if it specifies a smaller cap set.)
+ *
+ * caller holds s_mutex. NOT snap_rwsem.
+ * return value:
+ * 0 - ok
+ * 1 - send the msg back to mds
+ */
+static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
+ struct ceph_mds_session *session)
+{
+ struct ceph_cap *cap;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int mds = session->s_mds;
+ int seq = le32_to_cpu(grant->seq);
+ int newcaps = le32_to_cpu(grant->caps);
+ int used;
+ int issued; /* to me, before */
+ int wanted;
+ int reply = 0;
+ u64 size = le64_to_cpu(grant->size);
+ u64 max_size = le64_to_cpu(grant->max_size);
+ struct timespec mtime, atime, ctime;
+ int wake = 0;
+ int writeback = 0;
+ int revoked_rdcache = 0;
+ int invalidate_async = 0;
+ int tried_invalidate = 0;
+ int ret;
+
+ dout(10, "handle_cap_grant inode %p ci %p mds%d seq %d\n",
+ inode, ci, mds, seq);
+ dout(10, " size %llu max_size %llu, i_size %llu\n", size, max_size,
+ inode->i_size);
+ spin_lock(&inode->i_lock);
+start:
+
+ /* do we have this cap? */
+ cap = __get_cap_for_mds(inode, mds);
+ if (!cap) {
+ /*
+ * then ignore. never reply to cap messages out of turn,
+ * or we'll be mixing up different instances of caps on the
+ * same inode, and confuse the mds.
+ */
+ dout(10, "no cap on %p ino %llx.%llx from mds%d, ignoring\n",
+ inode, ci->i_vino.ino, ci->i_vino.snap, mds);
+ goto out;
+ }
+ dout(10, " cap %p\n", cap);
+ cap->gen = session->s_cap_gen;
+
+ /*
+ * Each time we receive RDCACHE anew, we increment i_rdcache_gen.
+ */
+ if ((newcaps & CEPH_CAP_RDCACHE) && /* we just got RDCACHE */
+ (cap->issued & CEPH_CAP_RDCACHE) == 0 && /* and didn't have it */
+ (__ceph_caps_issued(ci, NULL) & CEPH_CAP_RDCACHE) == 0)
+ ci->i_rdcache_gen++;
+
+ /*
+ * If RDCACHE is being revoked, and we have no dirty buffers,
+ * try to invalidate (once). (If there are dirty buffers, we
+ * will invalidate _after_ writeback.)
+ */
+ if (((cap->issued & ~newcaps) & CEPH_CAP_RDCACHE) &&
+ !ci->i_wrbuffer_ref && !tried_invalidate) {
+ dout(10, "RDCACHE invalidation\n");
+ spin_unlock(&inode->i_lock);
+ tried_invalidate = 1;
+
+ ret = invalidate_inode_pages2(&inode->i_data);
+ spin_lock(&inode->i_lock);
+ if (ret < 0) {
+ /* there were locked pages.. invalidate later
+ in a separate thread. */
+ if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
+ invalidate_async = 1;
+ ci->i_rdcache_revoking = ci->i_rdcache_gen;
+ }
+ } else {
+ /* we successfully invalidated those pages */
+ revoked_rdcache = 1;
+ ci->i_rdcache_gen = 0;
+ ci->i_rdcache_revoking = 0;
+ }
+ goto start;
+ }
+
+
+ /* size/ctime/mtime/atime? */
+ issued = __ceph_caps_issued(ci, NULL);
+ ceph_decode_timespec(&mtime, &grant->mtime);
+ ceph_decode_timespec(&atime, &grant->atime);
+ ceph_decode_timespec(&ctime, &grant->ctime);
+ ceph_fill_file_bits(inode, issued,
+ le64_to_cpu(grant->truncate_seq), size,
+ le64_to_cpu(grant->time_warp_seq), &ctime, &mtime,
+ &atime);
+
+ /* max size increase? */
+ if (max_size != ci->i_max_size) {
+ dout(10, "max_size %lld -> %llu\n", ci->i_max_size, max_size);
+ ci->i_max_size = max_size;
+ if (max_size >= ci->i_wanted_max_size) {
+ ci->i_wanted_max_size = 0; /* reset */
+ ci->i_requested_max_size = 0;
+ }
+ wake = 1;
+ }
+
+ /* check cap bits */
+ wanted = __ceph_caps_wanted(ci);
+ used = __ceph_caps_used(ci);
+ dout(10, " my wanted = %d, used = %d\n", wanted, used);
+ if (wanted != le32_to_cpu(grant->wanted)) {
+ dout(10, "mds wanted %d -> %d\n", le32_to_cpu(grant->wanted),
+ wanted);
+ grant->wanted = cpu_to_le32(wanted);
+ }
+
+ cap->seq = seq;
+
+ /* file layout may have changed */
+ ci->i_layout = grant->layout;
+
+ /* revocation? */
+ if (cap->issued & ~newcaps) {
+ dout(10, "revocation: %d -> %d\n", cap->issued, newcaps);
+ if ((used & ~newcaps) & CEPH_CAP_WRBUFFER) {
+ writeback = 1; /* will delay ack */
+ } else if (((used & ~newcaps) & CEPH_CAP_RDCACHE) == 0 ||
+ revoked_rdcache) {
+ /*
+ * we're not using revoked caps.. ack now.
+ * re-use incoming message.
+ */
+ cap->implemented = newcaps;
+
+ grant->size = cpu_to_le64(inode->i_size);
+ grant->max_size = 0; /* don't re-request */
+ ceph_encode_timespec(&grant->mtime, &inode->i_mtime);
+ ceph_encode_timespec(&grant->atime, &inode->i_atime);
+ grant->time_warp_seq = cpu_to_le64(ci->i_time_warp_seq);
+ grant->snap_follows =
+ cpu_to_le64(ci->i_snap_realm->cached_context->seq);
+ reply = 1;
+ wake = 1;
+ }
+ cap->issued = newcaps;
+ goto out;
+ }
+
+ /* grant or no-op */
+ if (cap->issued == newcaps) {
+ dout(10, "caps unchanged: %d -> %d\n", cap->issued, newcaps);
+ } else {
+ dout(10, "grant: %d -> %d\n", cap->issued, newcaps);
+ cap->issued = newcaps;
+ cap->implemented |= newcaps; /* add bits only, to
+ * avoid stepping on a
+ * pending revocation */
+ wake = 1;
+ }
+
+out:
+ spin_unlock(&inode->i_lock);
+ if (writeback) {
+ /*
+ * queue inode for writeback: we can't actually call
+ * filemap_write_and_wait, etc. from message handler
+ * context.
+ */
+ dout(10, "queueing %p for writeback\n", inode);
+ if (ceph_queue_writeback(inode))
+ igrab(inode);
+ }
+ if (invalidate_async) {
+ dout(10, "queueing %p for page invalidation\n", inode);
+ if (ceph_queue_page_invalidation(inode))
+ igrab(inode);
+ }
+ if (wake)
+ wake_up(&ci->i_cap_wq);
+ return reply;
+}
+
+
+/*
+ * Handle RELEASE from MDS. That means we can throw away our cap
+ * state as the MDS has fully flushed that metadata to disk.
+ */
+static void handle_cap_released(struct inode *inode,
+ struct ceph_mds_caps *m,
+ struct ceph_mds_session *session)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int seq = le32_to_cpu(m->seq);
+ int removed_last;
+ struct ceph_cap *cap;
+
+ dout(10, "handle_cap_released inode %p ci %p mds%d seq %d\n", inode, ci,
+ session->s_mds, seq);
+
+ spin_lock(&inode->i_lock);
+ cap = __get_cap_for_mds(inode, session->s_mds);
+ BUG_ON(!cap);
+ removed_last = __ceph_remove_cap(cap);
+ if (removed_last)
+ __cap_delay_cancel(&ceph_inode_to_client(inode)->mdsc, ci);
+ spin_unlock(&inode->i_lock);
+ if (removed_last)
+ iput(inode);
+}
+
+
+/*
+ * Handle FLUSHEDSNAP. MDS has flushed snap data to disk and we can
+ * throw away our cap_snap.
+ *
+ * Caller hold s_mutex, snap_rwsem.
+ */
+static void handle_cap_flushedsnap(struct inode *inode,
+ struct ceph_mds_caps *m,
+ struct ceph_mds_session *session)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ u64 follows = le64_to_cpu(m->snap_follows);
+ struct list_head *p;
+ struct ceph_cap_snap *capsnap;
+ int drop = 0;
+
+ dout(10, "handle_cap_flushedsnap inode %p ci %p mds%d follows %lld\n",
+ inode, ci, session->s_mds, follows);
+
+ spin_lock(&inode->i_lock);
+ list_for_each(p, &ci->i_cap_snaps) {
+ capsnap = list_entry(p, struct ceph_cap_snap, ci_item);
+ if (capsnap->follows == follows) {
+ WARN_ON(capsnap->dirty || capsnap->writing);
+ dout(10, " removing cap_snap %p follows %lld\n",
+ capsnap, follows);
+ ceph_put_snap_context(capsnap->context);
+ list_del(&capsnap->ci_item);
+ kfree(capsnap);
+ drop = 1;
+ break;
+ } else {
+ dout(10, " skipping cap_snap %p follows %lld\n",
+ capsnap, capsnap->follows);
+ }
+ }
+ spin_unlock(&inode->i_lock);
+ if (drop)
+ iput(inode);
+}
+
+
+/*
+ * Handle TRUNC from MDS, indicating file truncation.
+ *
+ * caller hold s_mutex, NOT snap_rwsem.
+ */
+static void handle_cap_trunc(struct inode *inode,
+ struct ceph_mds_caps *trunc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int mds = session->s_mds;
+ int seq = le32_to_cpu(trunc->seq);
+ u64 size = le64_to_cpu(trunc->size);
+ int queue_trunc = 0;
+
+ dout(10, "handle_cap_trunc inode %p ci %p mds%d seq %d\n", inode, ci,
+ mds, seq);
+
+ /*
+ * vmtruncate lazily; we can't block on i_mutex in the message
+ * handler path, or we deadlock against osd op replies needed
+ * to complete the writes holding i_lock. vmtruncate will
+ * also block on page locks held by writes...
+ *
+ * if its an expansion, and there is no truncate pending, we
+ * don't need to truncate.
+ */
+ spin_lock(&inode->i_lock);
+ if (ci->i_vmtruncate_to < 0 && size > inode->i_size) {
+ dout(10, "clean fwd truncate, no vmtruncate needed\n");
+ } else if (ci->i_vmtruncate_to >= 0 && size >= ci->i_vmtruncate_to) {
+ dout(10, "trunc to %lld < %lld already queued\n",
+ ci->i_vmtruncate_to, size);
+ } else {
+ /* we need to trunc even smaller */
+ dout(10, "queueing trunc %lld -> %lld\n", inode->i_size, size);
+ ci->i_vmtruncate_to = size;
+ queue_trunc = 1;
+ }
+ i_size_write(inode, size);
+ ci->i_reported_size = size;
+ spin_unlock(&inode->i_lock);
+
+ if (queue_trunc) {
+ if (queue_work(ceph_client(inode->i_sb)->trunc_wq,
+ &ci->i_vmtruncate_work))
+ igrab(inode);
+ }
+}
+
+/*
+ * Handle EXPORT from MDS. Cap is being migrated _from_ this mds to a
+ * different one. If we are the most recent migration we've seen (as
+ * indicated by mseq), make note of the migrating cap bits for the
+ * duration (until we see the corresponding IMPORT).
+ *
+ * caller holds s_mutex, snap_rwsem
+ */
+static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
+ struct ceph_mds_session *session)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int mds = session->s_mds;
+ unsigned mseq = le32_to_cpu(ex->migrate_seq);
+ struct ceph_cap *cap = NULL, *t;
+ struct rb_node *p;
+ int was_last = 0;
+ int remember = 1;
+
+ dout(10, "handle_cap_export inode %p ci %p mds%d mseq %d\n",
+ inode, ci, mds, mseq);
+
+ spin_lock(&inode->i_lock);
+
+ /* make sure we haven't seen a higher mseq */
+ for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
+ t = rb_entry(p, struct ceph_cap, ci_node);
+ if (t->mseq > mseq) {
+ dout(10, " higher mseq on cap from mds%d\n",
+ t->session->s_mds);
+ remember = 0;
+ }
+ if (t->session->s_mds == mds)
+ cap = t;
+ }
+
+ if (cap) {
+ if (remember) {
+ /* make note */
+ ci->i_cap_exporting_mds = mds;
+ ci->i_cap_exporting_mseq = mseq;
+ ci->i_cap_exporting_issued = cap->issued;
+ }
+ was_last = __ceph_remove_cap(cap);
+ } else {
+ WARN_ON(!cap);
+ }
+
+ spin_unlock(&inode->i_lock);
+ if (was_last)
+ iput(inode);
+}
+
+/*
+ * Handle cap IMPORT. If there are temp bits from an older EXPORT,
+ * clean them up.
+ *
+ * caller holds s_mutex, snap_rwsem
+ */
+static void handle_cap_import(struct inode *inode, struct ceph_mds_caps *im,
+ struct ceph_mds_session *session,
+ void *snaptrace, int snaptrace_len)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int mds = session->s_mds;
+ unsigned issued = le32_to_cpu(im->caps);
+ unsigned seq = le32_to_cpu(im->seq);
+ unsigned mseq = le32_to_cpu(im->migrate_seq);
+
+ if (ci->i_cap_exporting_mds >= 0 &&
+ ci->i_cap_exporting_mseq < mseq) {
+ dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d"
+ " - cleared exporting from mds%d\n",
+ inode, ci, mds, mseq,
+ ci->i_cap_exporting_mds);
+ ci->i_cap_exporting_issued = 0;
+ ci->i_cap_exporting_mseq = 0;
+ ci->i_cap_exporting_mds = -1;
+ } else {
+ dout(10, "handle_cap_import inode %p ci %p mds%d mseq %d\n",
+ inode, ci, mds, mseq);
+ }
+
+ ceph_add_cap(inode, session, -1, issued, seq, mseq,
+ snaptrace, snaptrace_len, NULL);
+}
+
+
+/*
+ * Handle a CEPH_CAPS message from the MDS.
+ *
+ * Identify the appropriate session, inode, and call the right handler
+ * based on the cap op. Take read or write lock on snap_rwsem as
+ * appropriate.
+ */
+void ceph_handle_caps(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg)
+{
+ struct super_block *sb = mdsc->client->sb;
+ struct ceph_mds_session *session;
+ struct inode *inode;
+ struct ceph_mds_caps *h;
+ int mds = le32_to_cpu(msg->hdr.src.name.num);
+ int op;
+ u32 seq;
+ struct ceph_vino vino;
+ u64 size, max_size;
+ int check_caps = 0;
+
+ dout(10, "handle_caps from mds%d\n", mds);
+
+ /* decode */
+ if (msg->front.iov_len < sizeof(*h))
+ goto bad;
+ h = msg->front.iov_base;
+ op = le32_to_cpu(h->op);
+ vino.ino = le64_to_cpu(h->ino);
+ vino.snap = CEPH_NOSNAP;
+ seq = le32_to_cpu(h->seq);
+ size = le64_to_cpu(h->size);
+ max_size = le64_to_cpu(h->max_size);
+
+ /* find session */
+ mutex_lock(&mdsc->mutex);
+ session = __ceph_get_mds_session(mdsc, mds);
+ if (session)
+ down_write(&mdsc->snap_rwsem);
+ mutex_unlock(&mdsc->mutex);
+ if (!session) {
+ dout(10, "WTF, got cap but no session for mds%d\n", mds);
+ return;
+ }
+
+ mutex_lock(&session->s_mutex);
+ session->s_seq++;
+ dout(20, " mds%d seq %lld\n", session->s_mds, session->s_seq);
+
+ /* lookup ino */
+ inode = ceph_find_inode(sb, vino);
+ dout(20, " op %s ino %llx inode %p\n", ceph_cap_op_name(op), vino.ino,
+ inode);
+ if (!inode) {
+ dout(10, " i don't have ino %llx, sending release\n", vino.ino);
+ send_cap_msg(mdsc, vino.ino, CEPH_CAP_OP_RELEASE, 0, 0, seq,
+ size, 0, 0, NULL, NULL, 0, 0, mds);
+ goto no_inode;
+ }
+
+ switch (op) {
+ case CEPH_CAP_OP_GRANT:
+ up_write(&mdsc->snap_rwsem);
+ if (handle_cap_grant(inode, h, session) == 1) {
+ dout(10, " sending reply back to mds%d\n", mds);
+ ceph_msg_get(msg);
+ ceph_send_msg_mds(mdsc, msg, mds);
+ }
+ break;
+
+ case CEPH_CAP_OP_TRUNC:
+ up_write(&mdsc->snap_rwsem);
+ handle_cap_trunc(inode, h, session);
+ break;
+
+ case CEPH_CAP_OP_RELEASED:
+ handle_cap_released(inode, h, session);
+ up_write(&mdsc->snap_rwsem);
+ if (list_empty(&session->s_caps))
+ ceph_mdsc_flushed_all_caps(mdsc, session);
+ break;
+
+ case CEPH_CAP_OP_FLUSHEDSNAP:
+ handle_cap_flushedsnap(inode, h, session);
+ up_write(&mdsc->snap_rwsem);
+ break;
+
+ case CEPH_CAP_OP_EXPORT:
+ handle_cap_export(inode, h, session);
+ up_write(&mdsc->snap_rwsem);
+ if (list_empty(&session->s_caps))
+ ceph_mdsc_flushed_all_caps(mdsc, session);
+ break;
+
+ case CEPH_CAP_OP_IMPORT:
+ handle_cap_import(inode, h, session,
+ msg->front.iov_base + sizeof(*h),
+ le32_to_cpu(h->snap_trace_len));
+ up_write(&mdsc->snap_rwsem);
+ check_caps = 1; /* we may have sent a RELEASE to the old auth */
+ break;
+
+ default:
+ up_write(&mdsc->snap_rwsem);
+ derr(10, " unknown cap op %d %s\n", op, ceph_cap_op_name(op));
+ }
+
+no_inode:
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+
+ if (check_caps)
+ ceph_check_caps(ceph_inode(inode), 1);
+ if (inode)
+ iput(inode);
+ return;
+
+bad:
+ derr(10, "corrupt caps message\n");
+ return;
+}
+
+
+/*
+ * Delayed work handler to process end of delayed cap release LRU list.
+ */
+void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
+{
+ struct ceph_inode_info *ci;
+
+ dout(10, "check_delayed_caps\n");
+ while (1) {
+ spin_lock(&mdsc->cap_delay_lock);
+ if (list_empty(&mdsc->cap_delay_list))
+ break;
+ ci = list_first_entry(&mdsc->cap_delay_list,
+ struct ceph_inode_info,
+ i_cap_delay_list);
+ if (time_before(jiffies, ci->i_hold_caps_until))
+ break;
+ list_del_init(&ci->i_cap_delay_list);
+ spin_unlock(&mdsc->cap_delay_lock);
+ dout(10, "check_delayed_caps on %p\n", &ci->vfs_inode);
+ ceph_check_caps(ci, 1);
+ iput(&ci->vfs_inode);
+ }
+ spin_unlock(&mdsc->cap_delay_lock);
+}
+
--
1.5.6.5
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
Powered by blists - more mailing lists