[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1245450702-31343-11-git-send-email-sage@newdream.net>
Date: Fri, 19 Jun 2009 15:31:31 -0700
From: Sage Weil <sage@...dream.net>
To: linux-kernel@...r.kernel.org, linux-fsdevel@...r.kernel.org,
greg@...ah.com
Cc: Sage Weil <sage@...dream.net>
Subject: [PATCH 10/21] ceph: MDS client
The MDS client is responsible for submitting requests to the MDS
cluster and parsing the response. We decide which MDS to submit each
request to based on cached information about the current partition of
the directory hierarchy across the cluster. A stateful session is
opened with each MDS before we submit requests to it, and a mutex is
used to control the ordering of messages within each session.
An MDS request may generate two responses. The first indicates the
operation was a success and returns any result. A second reply is
sent when the operation commits to the journal. Note that locking
on the MDS ensures that the results of updates are visible only to
the updating client until the operation commits.
Requests are linked to the containing directory so that an fsync will
wait for them to commit.
If an MDS fails and/or recovers, we resubmit requests as needed.
Signed-off-by: Sage Weil <sage@...dream.net>
---
fs/staging/ceph/mds_client.c | 2694 ++++++++++++++++++++++++++++++++++++++++++
fs/staging/ceph/mds_client.h | 347 ++++++
fs/staging/ceph/mdsmap.c | 132 ++
fs/staging/ceph/mdsmap.h | 45 +
4 files changed, 3218 insertions(+), 0 deletions(-)
create mode 100644 fs/staging/ceph/mds_client.c
create mode 100644 fs/staging/ceph/mds_client.h
create mode 100644 fs/staging/ceph/mdsmap.c
create mode 100644 fs/staging/ceph/mdsmap.h
diff --git a/fs/staging/ceph/mds_client.c b/fs/staging/ceph/mds_client.c
new file mode 100644
index 0000000..6d0d3d6
--- /dev/null
+++ b/fs/staging/ceph/mds_client.c
@@ -0,0 +1,2694 @@
+
+#include <linux/wait.h>
+#include <linux/sched.h>
+#include "mds_client.h"
+#include "mon_client.h"
+
+#include "ceph_debug.h"
+
+int ceph_debug_mdsc __read_mostly = -1;
+#define DOUT_VAR ceph_debug_mdsc
+#define DOUT_MASK DOUT_MASK_MDSC
+#include "super.h"
+#include "messenger.h"
+#include "decode.h"
+
+static void __wake_requests(struct ceph_mds_client *mdsc,
+ struct list_head *head);
+
+/*
+ * address and send message to a given mds
+ */
+void ceph_send_msg_mds(struct ceph_mds_client *mdsc, struct ceph_msg *msg,
+ int mds)
+{
+ msg->hdr.dst.addr = *ceph_mdsmap_get_addr(mdsc->mdsmap, mds);
+ msg->hdr.dst.name.type = cpu_to_le32(CEPH_ENTITY_TYPE_MDS);
+ msg->hdr.dst.name.num = cpu_to_le32(mds);
+ ceph_msg_send(mdsc->client->msgr, msg, BASE_DELAY_INTERVAL);
+}
+
+
+/*
+ * mds reply parsing
+ */
+
+/*
+ * parse individual inode info
+ */
+static int parse_reply_info_in(void **p, void *end,
+ struct ceph_mds_reply_info_in *info)
+{
+ int err = -EIO;
+
+ info->in = *p;
+ *p += sizeof(struct ceph_mds_reply_inode) +
+ sizeof(*info->in->fragtree.splits) *
+ le32_to_cpu(info->in->fragtree.nsplits);
+
+ ceph_decode_32_safe(p, end, info->symlink_len, bad);
+ ceph_decode_need(p, end, info->symlink_len, bad);
+ info->symlink = *p;
+ *p += info->symlink_len;
+
+ ceph_decode_32_safe(p, end, info->xattr_len, bad);
+ ceph_decode_need(p, end, info->xattr_len, bad);
+ info->xattr_data = *p;
+ *p += info->xattr_len;
+ return 0;
+bad:
+ return err;
+}
+
+/*
+ * parse a full metadata trace from the mds: inode, dirinfo, dentry, inode...
+ * sequence.
+ */
+static int parse_reply_info_trace(void **p, void *end,
+ struct ceph_mds_reply_info_parsed *info)
+{
+ int err;
+
+ if (info->head->is_dentry) {
+ err = parse_reply_info_in(p, end, &info->diri);
+ if (err < 0)
+ goto out_bad;
+
+ if (unlikely(*p + sizeof(*info->dirfrag) > end))
+ goto bad;
+ info->dirfrag = *p;
+ *p += sizeof(*info->dirfrag) +
+ sizeof(u32)*le32_to_cpu(info->dirfrag->ndist);
+ if (unlikely(*p > end))
+ goto bad;
+
+ ceph_decode_32_safe(p, end, info->dname_len, bad);
+ ceph_decode_need(p, end, info->dname_len, bad);
+ info->dname = *p;
+ *p += info->dname_len;
+ info->dlease = *p;
+ *p += sizeof(*info->dlease);
+ }
+
+ if (info->head->is_target) {
+ err = parse_reply_info_in(p, end, &info->targeti);
+ if (err < 0)
+ goto out_bad;
+ }
+
+ if (unlikely(*p != end))
+ goto bad;
+ return 0;
+
+bad:
+ err = -EIO;
+out_bad:
+ derr(1, "problem parsing trace %d\n", err);
+ return err;
+}
+
+/*
+ * parse readdir results
+ */
+static int parse_reply_info_dir(void **p, void *end,
+ struct ceph_mds_reply_info_parsed *info)
+{
+ u32 num, i = 0;
+ int err;
+
+ info->dir_dir = *p;
+ if (*p + sizeof(*info->dir_dir) > end)
+ goto bad;
+ *p += sizeof(*info->dir_dir) +
+ sizeof(u32)*le32_to_cpu(info->dir_dir->ndist);
+ if (*p > end)
+ goto bad;
+
+ ceph_decode_need(p, end, sizeof(num) + 2, bad);
+ ceph_decode_32(p, num);
+ ceph_decode_8(p, info->dir_end);
+ ceph_decode_8(p, info->dir_complete);
+ if (num == 0)
+ goto done;
+
+ /* alloc large array */
+ info->dir_nr = num;
+ info->dir_in = kmalloc(num * (sizeof(*info->dir_in) +
+ sizeof(*info->dir_dname) +
+ sizeof(*info->dir_dname_len) +
+ sizeof(*info->dir_dlease)),
+ GFP_NOFS);
+ if (info->dir_in == NULL) {
+ err = -ENOMEM;
+ goto out_bad;
+ }
+ info->dir_dname = (void *)(info->dir_in + num);
+ info->dir_dname_len = (void *)(info->dir_dname + num);
+ info->dir_dlease = (void *)(info->dir_dname_len + num);
+
+ while (num) {
+ /* dentry */
+ ceph_decode_need(p, end, sizeof(u32)*2, bad);
+ ceph_decode_32(p, info->dir_dname_len[i]);
+ ceph_decode_need(p, end, info->dir_dname_len[i], bad);
+ info->dir_dname[i] = *p;
+ *p += info->dir_dname_len[i];
+ dout(20, "parsed dir dname '%.*s'\n", info->dir_dname_len[i],
+ info->dir_dname[i]);
+ info->dir_dlease[i] = *p;
+ *p += sizeof(struct ceph_mds_reply_lease);
+
+ /* inode */
+ err = parse_reply_info_in(p, end, &info->dir_in[i]);
+ if (err < 0)
+ goto out_bad;
+ i++;
+ num--;
+ }
+
+done:
+ if (*p != end)
+ goto bad;
+ return 0;
+
+bad:
+ err = -EIO;
+out_bad:
+ derr(1, "problem parsing dir contents %d\n", err);
+ return err;
+}
+
+/*
+ * parse entire mds reply
+ */
+static int parse_reply_info(struct ceph_msg *msg,
+ struct ceph_mds_reply_info_parsed *info)
+{
+ void *p, *end;
+ u32 len;
+ int err;
+
+ info->head = msg->front.iov_base;
+ p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
+ end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);
+
+ /* trace */
+ ceph_decode_32_safe(&p, end, len, bad);
+ if (len > 0) {
+ err = parse_reply_info_trace(&p, p+len, info);
+ if (err < 0)
+ goto out_bad;
+ }
+
+ /* dir content */
+ ceph_decode_32_safe(&p, end, len, bad);
+ if (len > 0) {
+ err = parse_reply_info_dir(&p, p+len, info);
+ if (err < 0)
+ goto out_bad;
+ }
+
+ /* snap blob */
+ ceph_decode_32_safe(&p, end, len, bad);
+ info->snapblob_len = len;
+ info->snapblob = p;
+ p += len;
+
+ if (p != end)
+ goto bad;
+ return 0;
+
+bad:
+ err = -EIO;
+out_bad:
+ derr(1, "parse_reply err %d\n", err);
+ return err;
+}
+
+static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
+{
+ kfree(info->dir_in);
+}
+
+
+/*
+ * sessions
+ */
+static const char *session_state_name(int s)
+{
+ switch (s) {
+ case CEPH_MDS_SESSION_NEW: return "new";
+ case CEPH_MDS_SESSION_OPENING: return "opening";
+ case CEPH_MDS_SESSION_OPEN: return "open";
+ case CEPH_MDS_SESSION_CLOSING: return "closing";
+ case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
+ default: return "???";
+ }
+}
+
+static struct ceph_mds_session *get_session(struct ceph_mds_session *s)
+{
+ dout(30, "get_session %p %d -> %d\n", s,
+ atomic_read(&s->s_ref), atomic_read(&s->s_ref)+1);
+ atomic_inc(&s->s_ref);
+ return s;
+}
+
+void ceph_put_mds_session(struct ceph_mds_session *s)
+{
+ dout(30, "put_session %p %d -> %d\n", s,
+ atomic_read(&s->s_ref), atomic_read(&s->s_ref)-1);
+ if (atomic_dec_and_test(&s->s_ref))
+ kfree(s);
+}
+
+/*
+ * called under mdsc->mutex
+ */
+struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
+ int mds)
+{
+ struct ceph_mds_session *session;
+
+ if (mds >= mdsc->max_sessions || mdsc->sessions[mds] == NULL)
+ return NULL;
+ session = mdsc->sessions[mds];
+ dout(30, "lookup_mds_session %p %d -> %d\n", session,
+ atomic_read(&session->s_ref), atomic_read(&session->s_ref)+1);
+ get_session(session);
+ return session;
+}
+
+
+/*
+ * create+register a new session for given mds.
+ * called under mdsc->mutex.
+ */
+static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
+ int mds)
+{
+ struct ceph_mds_session *s;
+
+ s = kmalloc(sizeof(*s), GFP_NOFS);
+ s->s_mds = mds;
+ s->s_state = CEPH_MDS_SESSION_NEW;
+ s->s_ttl = 0;
+ s->s_seq = 0;
+ mutex_init(&s->s_mutex);
+ spin_lock_init(&s->s_cap_lock);
+ s->s_cap_gen = 0;
+ s->s_cap_ttl = 0;
+ s->s_renew_requested = 0;
+ INIT_LIST_HEAD(&s->s_caps);
+ s->s_nr_caps = 0;
+ atomic_set(&s->s_ref, 1);
+ INIT_LIST_HEAD(&s->s_waiting);
+ INIT_LIST_HEAD(&s->s_unsafe);
+ s->s_num_cap_releases = 0;
+ INIT_LIST_HEAD(&s->s_cap_releases);
+ INIT_LIST_HEAD(&s->s_cap_releases_done);
+
+ dout(10, "register_session mds%d\n", mds);
+ if (mds >= mdsc->max_sessions) {
+ int newmax = 1 << get_count_order(mds+1);
+ struct ceph_mds_session **sa;
+
+ dout(50, "register_session realloc to %d\n", newmax);
+ sa = kzalloc(newmax * sizeof(void *), GFP_NOFS);
+ if (sa == NULL)
+ return ERR_PTR(-ENOMEM);
+ if (mdsc->sessions) {
+ memcpy(sa, mdsc->sessions,
+ mdsc->max_sessions * sizeof(void *));
+ kfree(mdsc->sessions);
+ }
+ mdsc->sessions = sa;
+ mdsc->max_sessions = newmax;
+ }
+ mdsc->sessions[mds] = s;
+ atomic_inc(&s->s_ref); /* one ref to sessions[], one to caller */
+ return s;
+}
+
+/*
+ * called under mdsc->mutex
+ */
+static void unregister_session(struct ceph_mds_client *mdsc, int mds)
+{
+ dout(10, "unregister_session mds%d %p\n", mds, mdsc->sessions[mds]);
+ ceph_put_mds_session(mdsc->sessions[mds]);
+ mdsc->sessions[mds] = NULL;
+}
+
+/* drop session refs in request */
+static void put_request_sessions(struct ceph_mds_request *req)
+{
+ if (req->r_session) {
+ ceph_put_mds_session(req->r_session);
+ req->r_session = NULL;
+ }
+ if (req->r_fwd_session) {
+ ceph_put_mds_session(req->r_fwd_session);
+ req->r_fwd_session = NULL;
+ }
+}
+
+void ceph_mdsc_put_request(struct ceph_mds_request *req)
+{
+ dout(10, "put_request %p %d -> %d\n", req,
+ atomic_read(&req->r_ref), atomic_read(&req->r_ref)-1);
+ if (atomic_dec_and_test(&req->r_ref)) {
+ if (req->r_request)
+ ceph_msg_put(req->r_request);
+ if (req->r_reply) {
+ ceph_msg_put(req->r_reply);
+ destroy_reply_info(&req->r_reply_info);
+ }
+ if (req->r_inode) {
+ ceph_put_cap_refs(ceph_inode(req->r_inode),
+ CEPH_CAP_PIN);
+ iput(req->r_inode);
+ }
+ if (req->r_locked_dir)
+ ceph_put_cap_refs(ceph_inode(req->r_locked_dir),
+ CEPH_CAP_PIN);
+ if (req->r_target_inode)
+ iput(req->r_target_inode);
+ if (req->r_dentry)
+ dput(req->r_dentry);
+ if (req->r_old_dentry) {
+ ceph_put_cap_refs(ceph_inode(req->r_old_dentry->d_parent->d_inode),
+ CEPH_CAP_PIN);
+ dput(req->r_old_dentry);
+ }
+ put_request_sessions(req);
+ ceph_unreserve_caps(&req->r_caps_reservation);
+ kfree(req);
+ }
+}
+
+/*
+ * lookup session, bump ref if found.
+ *
+ * called under mdsc->mutex.
+ */
+static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc,
+ u64 tid)
+{
+ struct ceph_mds_request *req;
+ req = radix_tree_lookup(&mdsc->request_tree, tid);
+ if (req)
+ ceph_mdsc_get_request(req);
+ return req;
+}
+
+/*
+ * Register an in-flight request, and assign a tid in msg request header.
+ *
+ * Called under mdsc->mutex.
+ */
+static void __register_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req,
+ struct inode *listener)
+{
+ req->r_tid = ++mdsc->last_tid;
+ if (req->r_num_caps)
+ ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
+ dout(30, "__register_request %p tid %lld\n", req, req->r_tid);
+ ceph_mdsc_get_request(req);
+ radix_tree_insert(&mdsc->request_tree, req->r_tid, (void *)req);
+
+ if (listener) {
+ struct ceph_inode_info *ci = ceph_inode(listener);
+
+ spin_lock(&ci->i_unsafe_lock);
+ req->r_unsafe_dir = listener;
+ list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
+ spin_unlock(&ci->i_unsafe_lock);
+ }
+}
+
+static void __unregister_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ dout(30, "__unregister_request %p tid %lld\n", req, req->r_tid);
+ radix_tree_delete(&mdsc->request_tree, req->r_tid);
+ ceph_mdsc_put_request(req);
+
+ if (req->r_unsafe_dir) {
+ struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
+
+ spin_lock(&ci->i_unsafe_lock);
+ list_del_init(&req->r_unsafe_dir_item);
+ spin_unlock(&ci->i_unsafe_lock);
+ }
+}
+
+static bool __have_session(struct ceph_mds_client *mdsc, int mds)
+{
+ if (mds >= mdsc->max_sessions)
+ return false;
+ return mdsc->sessions[mds];
+}
+
+/*
+ * Choose mds to send request to next. If there is a hint set in
+ * the request (e.g., due to a prior forward hint from the mds), use
+ * that.
+ *
+ * Called under mdsc->mutex.
+ */
+static int __choose_mds(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ int mds = -1;
+ u32 hash = req->r_direct_hash;
+ bool is_hash = req->r_direct_is_hash;
+ struct dentry *dentry = req->r_dentry;
+ struct ceph_inode_info *ci;
+ int mode = req->r_direct_mode;
+
+ /*
+ * is there a specific mds we should try? ignore hint if we have
+ * no session and the mds is not up (active or recovering).
+ */
+ if (req->r_resend_mds >= 0 &&
+ (__have_session(mdsc, req->r_resend_mds) ||
+ ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
+ dout(20, "choose_mds using resend_mds mds%d\n",
+ req->r_resend_mds);
+ return req->r_resend_mds;
+ }
+
+ if (mode == USE_RANDOM_MDS)
+ goto random;
+
+ /*
+ * try to find an appropriate mds to contact based on the
+ * given dentry. walk up the tree until we find delegation info
+ * in the i_fragtree.
+ *
+ * if is_hash is true, direct request at the appropriate directory
+ * fragment (as with a readdir on a fragmented directory).
+ */
+ while (dentry) {
+ if (is_hash && dentry->d_inode &&
+ S_ISDIR(dentry->d_inode->i_mode)) {
+ struct ceph_inode_frag frag;
+ int found;
+
+ ci = ceph_inode(dentry->d_inode);
+ ceph_choose_frag(ci, hash, &frag, &found);
+ if (found) {
+ if (mode == USE_ANY_MDS && frag.ndist > 0) {
+ u8 r;
+
+ /* choose a random replica */
+ get_random_bytes(&r, 1);
+ r %= frag.ndist;
+ mds = frag.dist[r];
+ dout(20, "choose_mds %p %llx.%llx "
+ "frag %u mds%d (%d/%d)\n",
+ dentry->d_inode,
+ ceph_vinop(&ci->vfs_inode),
+ frag.frag, frag.mds,
+ (int)r, frag.ndist);
+ return mds;
+ }
+ /* since the more deeply nested item wasn't
+ * known to be replicated, then we want to
+ * look for the authoritative mds. */
+ mode = USE_AUTH_MDS;
+ if (frag.mds >= 0) {
+ /* choose auth mds */
+ mds = frag.mds;
+ dout(20, "choose_mds %p %llx.%llx "
+ "frag %u mds%d (auth)\n",
+ dentry->d_inode,
+ ceph_vinop(&ci->vfs_inode),
+ frag.frag, mds);
+ return mds;
+ }
+ }
+ }
+ if (IS_ROOT(dentry))
+ break;
+
+ /* move up the hierarchy, but direct request based on the hash
+ * for the child's dentry name */
+ hash = dentry->d_name.hash;
+ is_hash = true;
+ dentry = dentry->d_parent;
+ }
+
+ /* ok, just pick one at random */
+random:
+ mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
+ dout(20, "choose_mds chose random mds%d\n", mds);
+ return mds;
+}
+
+
+/*
+ * session messages
+ */
+static struct ceph_msg *create_session_msg(u32 op, u64 seq)
+{
+ struct ceph_msg *msg;
+ struct ceph_mds_session_head *h;
+
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), 0, 0, NULL);
+ if (IS_ERR(msg)) {
+ derr("ENOMEM creating session msg\n");
+ return ERR_PTR(PTR_ERR(msg));
+ }
+ h = msg->front.iov_base;
+ h->op = cpu_to_le32(op);
+ h->seq = cpu_to_le64(seq);
+ return msg;
+}
+
+/*
+ * send session open request.
+ *
+ * called under mdsc->mutex
+ */
+static int __open_session(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_msg *msg;
+ int mstate;
+ int mds = session->s_mds;
+ int err = 0;
+
+ /* wait for mds to go active? */
+ mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
+ dout(10, "open_session to mds%d (%s)\n", mds,
+ ceph_mds_state_name(mstate));
+ session->s_state = CEPH_MDS_SESSION_OPENING;
+ session->s_renew_requested = jiffies;
+
+ /* send connect message */
+ msg = create_session_msg(CEPH_SESSION_REQUEST_OPEN, session->s_seq);
+ if (IS_ERR(msg)) {
+ err = PTR_ERR(msg);
+ goto out;
+ }
+ ceph_send_msg_mds(mdsc, msg, mds);
+
+out:
+ return 0;
+}
+
+/*
+ * Free preallocated cap messages assigned to this session
+ */
+static void cleanup_cap_releases(struct ceph_mds_session *session)
+{
+ struct ceph_msg *msg;
+
+ spin_lock(&session->s_cap_lock);
+ while (!list_empty(&session->s_cap_releases)) {
+ msg = list_first_entry(&session->s_cap_releases,
+ struct ceph_msg, list_head);
+ ceph_msg_remove(msg);
+ }
+ while (!list_empty(&session->s_cap_releases_done)) {
+ msg = list_first_entry(&session->s_cap_releases_done,
+ struct ceph_msg, list_head);
+ ceph_msg_remove(msg);
+ }
+ spin_unlock(&session->s_cap_lock);
+}
+
+/*
+ * caller must hold session s_mutex
+ */
+static int iterate_session_caps(struct ceph_mds_session *session,
+ int (*cb)(struct inode *, struct ceph_cap *,
+ void *), void *arg)
+{
+ struct list_head *p;
+ struct ceph_cap *cap;
+ struct inode *inode;
+ struct list_head *n;
+ int ret;
+
+ dout(10, "iterate_session_caps %p mds%d\n", session, session->s_mds);
+ spin_lock(&session->s_cap_lock);
+ list_for_each_safe(p, n, &session->s_caps) {
+ cap = list_entry(p, struct ceph_cap, session_caps);
+ inode = igrab(&cap->ci->vfs_inode);
+ if (!inode)
+ continue;
+ spin_unlock(&session->s_cap_lock);
+ ret = cb(inode, cap, arg);
+ iput(inode);
+ if (ret < 0)
+ return ret;
+ spin_lock(&session->s_cap_lock);
+ }
+ spin_unlock(&session->s_cap_lock);
+
+ return 0;
+}
+
+static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
+ void *arg)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ dout(10, "removing cap %p, ci is %p, inode is %p\n",
+ cap, ci, &ci->vfs_inode);
+ ceph_remove_cap(cap);
+ return 0;
+}
+
+/*
+ * caller must hold session s_mutex
+ */
+static void remove_session_caps(struct ceph_mds_session *session)
+{
+ dout(10, "remove_session_caps on %p\n", session);
+ iterate_session_caps(session, remove_session_caps_cb, NULL);
+
+ BUG_ON(session->s_nr_caps > 0);
+
+ cleanup_cap_releases(session);
+}
+
+static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
+ void *arg)
+{
+ spin_lock(&inode->i_lock);
+ wake_up(&cap->ci->i_cap_wq);
+ spin_unlock(&inode->i_lock);
+ return 0;
+}
+/*
+ * wake up any threads waiting on this session's caps
+ *
+ * caller must hold s_mutex.
+ */
+static void wake_up_session_caps(struct ceph_mds_session *session)
+{
+ dout(10, "wake_up_session_caps %p mds%d\n", session, session->s_mds);
+ iterate_session_caps(session, wake_up_session_cb, NULL);
+}
+
+/*
+ * Send periodic message to MDS renewing all currently held caps. The
+ * ack will reset the expiration for all caps from this session.
+ *
+ * caller holds s_mutex
+ */
+static int send_renew_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_msg *msg;
+ int state;
+
+ if (time_after_eq(jiffies, session->s_cap_ttl) &&
+ time_after_eq(session->s_cap_ttl, session->s_renew_requested))
+ dout(1, "mds%d session caps stale\n", session->s_mds);
+
+ /* do not try to renew caps until a recovering mds has reconnected
+ * with its clients. */
+ state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
+ if (state < CEPH_MDS_STATE_RECONNECT) {
+ dout(10, "send_renew_caps ignoring mds%d (%s)\n",
+ session->s_mds, ceph_mds_state_name(state));
+ return 0;
+ }
+
+ dout(10, "send_renew_caps to mds%d (%s)\n", session->s_mds,
+ ceph_mds_state_name(state));
+ session->s_renew_requested = jiffies;
+ msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS, 0);
+ if (IS_ERR(msg))
+ return PTR_ERR(msg);
+ ceph_send_msg_mds(mdsc, msg, session->s_mds);
+ return 0;
+}
+
+/*
+ * Note new cap ttl, and any transition from stale -> not stale (fresh?).
+ */
+static void renewed_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session, int is_renew)
+{
+ int was_stale;
+ int wake = 0;
+
+ spin_lock(&session->s_cap_lock);
+ was_stale = is_renew && (session->s_cap_ttl == 0 ||
+ time_after_eq(jiffies, session->s_cap_ttl));
+
+ session->s_cap_ttl = session->s_renew_requested +
+ mdsc->mdsmap->m_session_timeout*HZ;
+
+ if (was_stale) {
+ if (time_before(jiffies, session->s_cap_ttl)) {
+ dout(1, "mds%d caps renewed\n", session->s_mds);
+ wake = 1;
+ } else {
+ dout(1, "mds%d caps still stale\n", session->s_mds);
+ }
+ }
+ dout(10, "renewed_caps mds%d ttl now %lu, was %s, now %s\n",
+ session->s_mds, session->s_cap_ttl, was_stale ? "stale" : "fresh",
+ time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
+ spin_unlock(&session->s_cap_lock);
+
+ if (wake)
+ wake_up_session_caps(session);
+}
+
+
+
+static int request_close_session(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_msg *msg;
+ int err = 0;
+
+ msg = create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
+ session->s_seq);
+ if (IS_ERR(msg))
+ err = PTR_ERR(msg);
+ else
+ ceph_send_msg_mds(mdsc, msg, session->s_mds);
+ return err;
+}
+
+/*
+ * Called with s_mutex held.
+ */
+static int __close_session(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ dout(10, "close_session mds%d state=%s\n", session->s_mds,
+ session_state_name(session->s_state));
+ if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
+ return 0;
+ session->s_state = CEPH_MDS_SESSION_CLOSING;
+ return request_close_session(mdsc, session);
+}
+
+/*
+ * Trim old(er) caps.
+ */
+static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
+{
+ struct ceph_mds_session *session = arg;
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ int used, oissued, mine;
+
+ if (session->s_trim_caps <= 0)
+ return -1;
+
+ spin_lock(&inode->i_lock);
+ mine = cap->issued | cap->implemented;
+ used = __ceph_caps_used(ci);
+ oissued = __ceph_caps_issued_other(ci, cap);
+
+ dout(20, "trim_caps_cb %p cap %p mine %s oissued %s used %s\n",
+ inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued),
+ ceph_cap_string(used));
+ if (ci->i_dirty_caps)
+ goto out; /* dirty caps */
+ if ((used & ~oissued) & mine)
+ goto out; /* we need these caps */
+
+ session->s_trim_caps--;
+ if (oissued) {
+ /* we aren't the only cap.. just remove us */
+ __ceph_remove_cap(cap, NULL);
+ } else {
+ /* try to drop referring dentries */
+ spin_unlock(&inode->i_lock);
+ d_prune_aliases(inode);
+ dout(20, "trim_caps_cb %p cap %p pruned, count now %d\n",
+ inode, cap, atomic_read(&inode->i_count));
+ return 0;
+ }
+
+out:
+ spin_unlock(&inode->i_lock);
+ return 0;
+}
+
+static int trim_caps(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ int max_caps)
+{
+ int trim_caps = session->s_nr_caps - max_caps;
+
+ dout(10, "trim_caps mds%d start: %d / %d, trim %d\n",
+ session->s_mds, session->s_nr_caps, max_caps, trim_caps);
+ if (trim_caps > 0) {
+ session->s_trim_caps = trim_caps;
+ iterate_session_caps(session, trim_caps_cb, session);
+ dout(10, "trim_caps mds%d done: %d / %d, trimmed %d\n",
+ session->s_mds, session->s_nr_caps, max_caps,
+ trim_caps - session->s_trim_caps);
+ }
+ return 0;
+}
+
+/*
+ * Allocate cap_release messages. If there is a partially full message
+ * in the queue, try to allocate enough to cover it's remainder, so that
+ * we can send it immediately.
+ *
+ * Called under s_mutex.
+ */
+static int add_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session,
+ int extra)
+{
+ struct ceph_msg *msg;
+ struct ceph_mds_cap_release *head;
+ int err = -ENOMEM;
+
+ if (extra < 0)
+ extra = mdsc->client->mount_args.cap_release_safety;
+
+ spin_lock(&session->s_cap_lock);
+
+ if (!list_empty(&session->s_cap_releases)) {
+ msg = list_first_entry(&session->s_cap_releases,
+ struct ceph_msg,
+ list_head);
+ head = msg->front.iov_base;
+ extra += CAPS_PER_RELEASE - le32_to_cpu(head->num);
+ }
+
+ while (session->s_num_cap_releases < session->s_nr_caps + extra) {
+ spin_unlock(&session->s_cap_lock);
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
+ 0, 0, NULL);
+ if (!msg)
+ goto out_unlocked;
+ dout(10, "add_cap_releases %p msg %p now %d\n", session, msg,
+ (int)msg->front.iov_len);
+ head = msg->front.iov_base;
+ head->num = cpu_to_le32(0);
+ msg->front.iov_len = sizeof(*head);
+ spin_lock(&session->s_cap_lock);
+ list_add(&msg->list_head, &session->s_cap_releases);
+ session->s_num_cap_releases += CAPS_PER_RELEASE;
+ }
+
+ if (!list_empty(&session->s_cap_releases)) {
+ msg = list_first_entry(&session->s_cap_releases,
+ struct ceph_msg,
+ list_head);
+ head = msg->front.iov_base;
+ if (head->num) {
+ dout(10, " queueing non-full %p (%d)\n", msg,
+ le32_to_cpu(head->num));
+ list_move_tail(&msg->list_head,
+ &session->s_cap_releases_done);
+ session->s_num_cap_releases -=
+ CAPS_PER_RELEASE - le32_to_cpu(head->num);
+ }
+ }
+ err = 0;
+ spin_unlock(&session->s_cap_lock);
+out_unlocked:
+ return err;
+}
+
+/*
+ * called under s_mutex
+ */
+static void send_cap_releases(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct ceph_msg *msg;
+
+ dout(10, "send_cap_releases mds%d\n", session->s_mds);
+ while (1) {
+ spin_lock(&session->s_cap_lock);
+ if (list_empty(&session->s_cap_releases_done))
+ break;
+ msg = list_first_entry(&session->s_cap_releases_done,
+ struct ceph_msg, list_head);
+ list_del_init(&msg->list_head);
+ spin_unlock(&session->s_cap_lock);
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+ dout(10, "send_cap_releases mds%d %p\n", session->s_mds, msg);
+ ceph_send_msg_mds(mdsc, msg, session->s_mds);
+ }
+ spin_unlock(&session->s_cap_lock);
+}
+
+/*
+ * Create an mds request.
+ */
+struct ceph_mds_request *
+ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
+{
+ struct ceph_mds_request *req = kzalloc(sizeof(*req), GFP_NOFS);
+
+ if (!req)
+ return ERR_PTR(-ENOMEM);
+
+ req->r_started = jiffies;
+ req->r_resend_mds = -1;
+ INIT_LIST_HEAD(&req->r_unsafe_dir_item);
+ req->r_fmode = -1;
+ atomic_set(&req->r_ref, 1); /* one for request_tree, one for caller */
+ INIT_LIST_HEAD(&req->r_wait);
+ init_completion(&req->r_completion);
+ init_completion(&req->r_safe_completion);
+ INIT_LIST_HEAD(&req->r_unsafe_item);
+
+ req->r_op = op;
+ req->r_direct_mode = mode;
+ return req;
+}
+
+/*
+ * return oldest (lowest) tid in request tree, 0 if none.
+ *
+ * called under mdsc->mutex.
+ */
+static u64 __get_oldest_tid(struct ceph_mds_client *mdsc)
+{
+ struct ceph_mds_request *first;
+ if (radix_tree_gang_lookup(&mdsc->request_tree,
+ (void **)&first, 0, 1) <= 0)
+ return 0;
+ return first->r_tid;
+}
+
+/*
+ * Build a dentry's path. Allocate on heap; caller must kfree. Based
+ * on build_path_from_dentry in fs/cifs/dir.c.
+ *
+ * If @stop_on_nosnap, generate path relative to the first non-snapped
+ * inode.
+ *
+ * Encode hidden .snap dirs as a double /, i.e.
+ * foo/.snap/bar -> foo//bar
+ */
+char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
+ int stop_on_nosnap)
+{
+ struct dentry *temp;
+ char *path;
+ int len, pos;
+
+ if (dentry == NULL)
+ return ERR_PTR(-EINVAL);
+
+retry:
+ len = 0;
+ for (temp = dentry; !IS_ROOT(temp);) {
+ struct inode *inode = temp->d_inode;
+ if (inode && ceph_snap(inode) == CEPH_SNAPDIR)
+ len++; /* slash only */
+ else if (stop_on_nosnap && inode &&
+ ceph_snap(inode) == CEPH_NOSNAP)
+ break;
+ else
+ len += 1 + temp->d_name.len;
+ temp = temp->d_parent;
+ if (temp == NULL) {
+ derr(1, "corrupt dentry %p\n", dentry);
+ return ERR_PTR(-EINVAL);
+ }
+ }
+ if (len)
+ len--; /* no leading '/' */
+
+ path = kmalloc(len+1, GFP_NOFS);
+ if (path == NULL)
+ return ERR_PTR(-ENOMEM);
+ pos = len;
+ path[pos] = 0; /* trailing null */
+ for (temp = dentry; !IS_ROOT(temp) && pos != 0; ) {
+ struct inode *inode = temp->d_inode;
+
+ if (inode && ceph_snap(inode) == CEPH_SNAPDIR) {
+ dout(50, "build_path_dentry path+%d: %p SNAPDIR\n",
+ pos, temp);
+ } else if (stop_on_nosnap && inode &&
+ ceph_snap(inode) == CEPH_NOSNAP) {
+ break;
+ } else {
+ pos -= temp->d_name.len;
+ if (pos < 0)
+ break;
+ strncpy(path + pos, temp->d_name.name,
+ temp->d_name.len);
+ dout(50, "build_path_dentry path+%d: %p '%.*s'\n",
+ pos, temp, temp->d_name.len, path + pos);
+ }
+ if (pos)
+ path[--pos] = '/';
+ temp = temp->d_parent;
+ if (temp == NULL) {
+ derr(1, "corrupt dentry\n");
+ kfree(path);
+ return ERR_PTR(-EINVAL);
+ }
+ }
+ if (pos != 0) {
+ derr(1, "did not end path lookup where expected, "
+ "namelen is %d, pos is %d\n", len, pos);
+ /* presumably this is only possible if racing with a
+ rename of one of the parent directories (we can not
+ lock the dentries above us to prevent this, but
+ retrying should be harmless) */
+ kfree(path);
+ goto retry;
+ }
+
+ *base = ceph_ino(temp->d_inode);
+ *plen = len;
+ dout(10, "build_path_dentry on %p %d built %llx '%.*s'\n",
+ dentry, atomic_read(&dentry->d_count), *base, len, path);
+ return path;
+}
+
+static int build_dentry_path(struct dentry *dentry,
+ const char **ppath, int *ppathlen, u64 *pino)
+{
+ char *path;
+
+ if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP) {
+ *pino = ceph_ino(dentry->d_parent->d_inode);
+ *ppath = dentry->d_name.name;
+ *ppathlen = dentry->d_name.len;
+ return 0;
+ }
+ path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
+ if (IS_ERR(path))
+ return PTR_ERR(path);
+ *ppath = path;
+ return 1;
+}
+
+static int build_inode_path(struct inode *inode,
+ const char **ppath, int *ppathlen, u64 *pino)
+{
+ struct dentry *dentry;
+ char *path;
+
+ if (ceph_snap(inode) == CEPH_NOSNAP) {
+ *pino = ceph_ino(inode);
+ *ppathlen = 0;
+ return 0;
+ }
+ dentry = d_find_alias(inode);
+ path = ceph_mdsc_build_path(dentry, ppathlen, pino, 1);
+ dput(dentry);
+ if (IS_ERR(path))
+ return PTR_ERR(path);
+ *ppath = path;
+ return 1;
+}
+
+static int set_request_path_attr(struct inode *rinode, struct dentry *rdentry,
+ const char *rpath, u64 rino,
+ const char **ppath, int *pathlen,
+ u64 *ino, int *freepath)
+{
+ *freepath = 0;
+ *pathlen = 0;
+ *ino = 0;
+
+ if (rinode) {
+ *freepath = build_inode_path(rinode, ppath, pathlen, ino);
+ dout(10, " inode %p %llx.%llx\n", rinode, ceph_ino(rinode),
+ ceph_snap(rinode));
+ } else if (rdentry) {
+ *freepath = build_dentry_path(rdentry, ppath, pathlen, ino);
+ dout(10, " dentry %p %llx/%.*s\n", rdentry, *ino, *pathlen,
+ *ppath);
+ } else if (rpath) {
+ *ino = rino;
+ *ppath = rpath;
+ *pathlen = strlen(rpath);
+ dout(10, " path %.*s\n", *pathlen, rpath);
+ }
+
+ if (*freepath < 0)
+ return *freepath;
+ return 0;
+}
+
+/*
+ * called under mdsc->mutex
+ */
+static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req,
+ int mds)
+{
+ struct ceph_msg *msg;
+ struct ceph_mds_request_head *head;
+ const char *path1 = req->r_path1;
+ const char *path2 = req->r_path2;
+ u64 ino1, ino2;
+ int pathlen1, pathlen2;
+ int len;
+ int freepath1, freepath2;
+ u16 releases;
+ void *p, *end;
+ int ret;
+
+ ret = set_request_path_attr(req->r_inode, req->r_dentry,
+ req->r_path1, req->r_ino1.ino,
+ &path1, &pathlen1, &ino1, &freepath1);
+ if (ret < 0) {
+ msg = ERR_PTR(ret);
+ goto out;
+ }
+
+ ret = set_request_path_attr(NULL, req->r_old_dentry,
+ req->r_path2, req->r_ino2.ino,
+ &path2, &pathlen2, &ino2, &freepath2);
+ if (ret < 0) {
+ msg = ERR_PTR(ret);
+ goto out_free1;
+ }
+
+ len = sizeof(*head) +
+ pathlen1 + pathlen2 + 2*(sizeof(u32) + sizeof(u64));
+
+ /* calculate (max) length for cap releases */
+ len += sizeof(struct ceph_mds_request_release) *
+ (!!req->r_inode_drop + !!req->r_dentry_drop +
+ !!req->r_old_inode_drop + !!req->r_old_dentry_drop);
+ if (req->r_dentry_drop)
+ len += req->r_dentry->d_name.len;
+ if (req->r_old_dentry_drop)
+ len += req->r_old_dentry->d_name.len;
+
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_REQUEST, len, 0, 0, NULL);
+ if (IS_ERR(msg))
+ goto out_free2;
+
+ head = msg->front.iov_base;
+ p = msg->front.iov_base + sizeof(*head);
+ end = msg->front.iov_base + msg->front.iov_len;
+
+ head->mdsmap_epoch = cpu_to_le32(mdsc->mdsmap->m_epoch);
+ head->op = cpu_to_le32(req->r_op);
+ head->caller_uid = cpu_to_le32(current_fsuid());
+ head->caller_gid = cpu_to_le32(current_fsgid());
+ head->args = req->r_args;
+
+ ceph_encode_filepath(&p, end, ino1, path1);
+ ceph_encode_filepath(&p, end, ino2, path2);
+
+ /* cap releases */
+ releases = 0;
+ if (req->r_inode_drop)
+ releases += ceph_encode_inode_release(&p,
+ req->r_inode ? req->r_inode : req->r_dentry->d_inode,
+ mds, req->r_inode_drop, req->r_inode_unless, 0);
+ if (req->r_dentry_drop)
+ releases += ceph_encode_dentry_release(&p, req->r_dentry,
+ mds, req->r_dentry_drop, req->r_dentry_unless);
+ if (req->r_old_dentry_drop)
+ releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
+ mds, req->r_old_dentry_drop, req->r_old_dentry_unless);
+ if (req->r_old_inode_drop)
+ releases += ceph_encode_inode_release(&p,
+ req->r_old_dentry->d_inode,
+ mds, req->r_old_inode_drop, req->r_old_inode_unless, 0);
+ head->num_releases = cpu_to_le16(releases);
+
+ BUG_ON(p > end);
+ msg->front.iov_len = p - msg->front.iov_base;
+ msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+
+ msg->pages = req->r_pages;
+ msg->nr_pages = req->r_num_pages;
+ msg->hdr.data_len = cpu_to_le32(req->r_data_len);
+ msg->hdr.data_off = cpu_to_le16(0);
+
+out_free2:
+ if (freepath2)
+ kfree((char *)path2);
+out_free1:
+ if (freepath1)
+ kfree((char *)path1);
+out:
+ return msg;
+}
+
+/*
+ * called under mdsc->mutex if error, under no mutex if
+ * success.
+ */
+static void complete_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ if (req->r_callback)
+ req->r_callback(mdsc, req);
+ else
+ complete(&req->r_completion);
+}
+
+/*
+ * called under mdsc->mutex
+ */
+static int __prepare_send_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req,
+ int mds)
+{
+ struct ceph_mds_request_head *rhead;
+ struct ceph_msg *msg;
+ int flags = 0;
+
+ req->r_attempts++;
+ dout(10, "prepare_send_request %p tid %lld %s (attempt %d)\n", req,
+ req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
+
+ if (req->r_request) {
+ ceph_msg_put(req->r_request);
+ req->r_request = NULL;
+ }
+ msg = create_request_message(mdsc, req, mds);
+ if (IS_ERR(msg)) {
+ req->r_reply = ERR_PTR(PTR_ERR(msg));
+ complete_request(mdsc, req);
+ return -PTR_ERR(msg);
+ }
+ req->r_request = msg;
+
+ rhead = msg->front.iov_base;
+ rhead->tid = cpu_to_le64(req->r_tid);
+ rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
+ if (req->r_got_unsafe)
+ flags |= CEPH_MDS_FLAG_REPLAY;
+ if (req->r_locked_dir)
+ flags |= CEPH_MDS_FLAG_WANT_DENTRY;
+ rhead->flags = cpu_to_le32(flags);
+ rhead->num_fwd = req->r_num_fwd;
+ rhead->num_retry = req->r_attempts - 1;
+
+ dout(20, " r_locked_dir = %p\n", req->r_locked_dir);
+
+ if (req->r_target_inode && req->r_got_unsafe)
+ rhead->ino = cpu_to_le64(ceph_ino(req->r_target_inode));
+ else
+ rhead->ino = 0;
+ return 0;
+}
+
+/*
+ * send request, or put it on the appropriate wait list.
+ */
+static int __do_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ struct ceph_mds_session *session = NULL;
+ int mds = -1;
+ int err = -EAGAIN;
+
+ if (req->r_reply)
+ goto out;
+
+ if (req->r_timeout &&
+ time_after_eq(jiffies, req->r_started + req->r_timeout)) {
+ dout(10, "do_request timed out\n");
+ err = -EIO;
+ goto finish;
+ }
+
+ mds = __choose_mds(mdsc, req);
+ if (mds < 0 ||
+ ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
+ dout(30, "do_request no mds or not active, waiting for map\n");
+ list_add(&req->r_wait, &mdsc->waiting_for_map);
+ ceph_monc_request_mdsmap(&mdsc->client->monc,
+ mdsc->mdsmap->m_epoch+1);
+ goto out;
+ }
+
+ /* get, open session */
+ session = __ceph_lookup_mds_session(mdsc, mds);
+ if (!session)
+ session = register_session(mdsc, mds);
+ dout(30, "do_request mds%d session %p state %s\n", mds, session,
+ session_state_name(session->s_state));
+ if (session->s_state != CEPH_MDS_SESSION_OPEN) {
+ if (session->s_state == CEPH_MDS_SESSION_NEW ||
+ session->s_state == CEPH_MDS_SESSION_CLOSING)
+ __open_session(mdsc, session);
+ list_add(&req->r_wait, &session->s_waiting);
+ ceph_monc_request_mdsmap(&mdsc->client->monc,
+ mdsc->mdsmap->m_epoch+1);
+ goto out_session;
+ }
+
+ /* send request */
+ req->r_session = get_session(session);
+ req->r_resend_mds = -1; /* forget any previous mds hint */
+
+ if (req->r_request_started == 0) /* note request start time */
+ req->r_request_started = jiffies;
+
+ err = __prepare_send_request(mdsc, req, mds);
+ if (!err) {
+ ceph_msg_get(req->r_request);
+ ceph_send_msg_mds(mdsc, req->r_request, mds);
+ }
+
+out_session:
+ ceph_put_mds_session(session);
+out:
+ return err;
+
+finish:
+ req->r_reply = ERR_PTR(err);
+ complete_request(mdsc, req);
+ goto out;
+}
+
+static void __wake_requests(struct ceph_mds_client *mdsc,
+ struct list_head *head)
+{
+ struct list_head *p, *n;
+
+ list_for_each_safe(p, n, head) {
+ struct ceph_mds_request *req =
+ list_entry(p, struct ceph_mds_request, r_wait);
+ list_del_init(&req->r_wait);
+ __do_request(mdsc, req);
+ }
+}
+
+/*
+ * Wake up threads with requests pending for @mds, so that they can
+ * resubmit their requests to a possibly different mds. If @all is set,
+ * wake up if their requests has been forwarded to @mds, too.
+ */
+static void kick_requests(struct ceph_mds_client *mdsc, int mds, int all)
+{
+ struct ceph_mds_request *reqs[10];
+ u64 nexttid = 0;
+ int i, got;
+
+ dout(20, "kick_requests mds%d\n", mds);
+ while (nexttid < mdsc->last_tid) {
+ got = radix_tree_gang_lookup(&mdsc->request_tree,
+ (void **)&reqs, nexttid, 10);
+ if (got == 0)
+ break;
+ nexttid = reqs[got-1]->r_tid + 1;
+ for (i = 0; i < got; i++) {
+ if (reqs[i]->r_got_unsafe)
+ continue;
+ if (((reqs[i]->r_session &&
+ reqs[i]->r_session->s_mds == mds) ||
+ (all && reqs[i]->r_fwd_session &&
+ reqs[i]->r_fwd_session->s_mds == mds))) {
+ dout(10, " kicking tid %llu\n", reqs[i]->r_tid);
+ put_request_sessions(reqs[i]);
+ __do_request(mdsc, reqs[i]);
+ }
+ }
+ }
+}
+
+void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req)
+{
+ dout(30, "submit_request on %p\n", req);
+ mutex_lock(&mdsc->mutex);
+ __register_request(mdsc, req, NULL);
+ __do_request(mdsc, req);
+ mutex_unlock(&mdsc->mutex);
+}
+
+/*
+ * Synchrously perform an mds request. Take care of all of the
+ * session setup, forwarding, retry details.
+ */
+int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+ struct inode *listener,
+ struct ceph_mds_request *req)
+{
+ int err;
+
+ dout(30, "do_request on %p\n", req);
+
+ /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
+ if (req->r_inode)
+ ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
+ if (req->r_locked_dir)
+ ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
+ if (req->r_old_dentry)
+ ceph_get_cap_refs(ceph_inode(req->r_old_dentry->d_parent->d_inode),
+ CEPH_CAP_PIN);
+
+ mutex_lock(&mdsc->mutex);
+ __register_request(mdsc, req, listener);
+ __do_request(mdsc, req);
+
+ if (!req->r_reply) {
+ mutex_unlock(&mdsc->mutex);
+ if (req->r_timeout) {
+ err = wait_for_completion_timeout(&req->r_completion,
+ req->r_timeout);
+ if (err > 0)
+ err = 0;
+ else if (err == 0)
+ req->r_reply = ERR_PTR(-EIO);
+ } else {
+ wait_for_completion(&req->r_completion);
+ }
+ mutex_lock(&mdsc->mutex);
+ }
+
+ if (IS_ERR(req->r_reply)) {
+ err = PTR_ERR(req->r_reply);
+ req->r_reply = NULL;
+
+ /* clean up */
+ __unregister_request(mdsc, req);
+ if (!list_empty(&req->r_unsafe_item))
+ list_del_init(&req->r_unsafe_item);
+ complete(&req->r_safe_completion);
+ } else if (req->r_err) {
+ err = req->r_err;
+ } else {
+ err = le32_to_cpu(req->r_reply_info.head->result);
+ }
+ mutex_unlock(&mdsc->mutex);
+
+ dout(30, "do_request %p done, result %d\n", req, err);
+ return err;
+}
+
+/*
+ * Handle mds reply.
+ *
+ * We take the session mutex and parse and process the reply immediately.
+ * This preserves the logical ordering of replies, capabilities, etc., sent
+ * by the MDS as they are applied to our local cache.
+ */
+void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+ struct ceph_mds_request *req;
+ struct ceph_mds_reply_head *head = msg->front.iov_base;
+ struct ceph_mds_reply_info_parsed *rinfo; /* parsed reply info */
+ u64 tid;
+ int err, result;
+ int mds;
+
+ if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
+ return;
+ if (msg->front.iov_len < sizeof(*head)) {
+ derr(1, "handle_reply got corrupt (short) reply\n");
+ return;
+ }
+
+ /* get request, session */
+ tid = le64_to_cpu(head->tid);
+ mutex_lock(&mdsc->mutex);
+ req = __lookup_request(mdsc, tid);
+ if (!req) {
+ dout(1, "handle_reply on unknown tid %llu\n", tid);
+ mutex_unlock(&mdsc->mutex);
+ return;
+ }
+ dout(10, "handle_reply %p\n", req);
+ mds = le32_to_cpu(msg->hdr.src.name.num);
+
+ /* dup? */
+ if ((req->r_got_unsafe && !head->safe) ||
+ (req->r_got_safe && head->safe)) {
+ dout(0, "got a dup %s reply on %llu from mds%d\n",
+ head->safe ? "safe" : "unsafe", tid, mds);
+ mutex_unlock(&mdsc->mutex);
+ goto out;
+ }
+
+ if (head->safe) {
+ req->r_got_safe = true;
+ __unregister_request(mdsc, req);
+ complete(&req->r_safe_completion);
+
+ if (req->r_got_unsafe) {
+ /*
+ * We already handled the unsafe response, now do the
+ * cleanup. No need to examine the response; the MDS
+ * doesn't include any result info in the safe
+ * response. And even if it did, there is nothing
+ * useful we could do with a revised return value.
+ */
+ dout(10, "got safe reply %llu, mds%d\n", tid, mds);
+ BUG_ON(req->r_session == NULL);
+ list_del_init(&req->r_unsafe_item);
+
+ /* last unsafe request during umount? */
+ if (mdsc->stopping && !__get_oldest_tid(mdsc))
+ complete(&mdsc->safe_umount_waiters);
+ mutex_unlock(&mdsc->mutex);
+ goto out;
+ }
+ }
+
+ if (req->r_session && req->r_session->s_mds != mds) {
+ ceph_put_mds_session(req->r_session);
+ req->r_session = __ceph_lookup_mds_session(mdsc, mds);
+ }
+ if (req->r_session == NULL) {
+ derr(1, "got reply on %llu, but no session for mds%d\n",
+ tid, mds);
+ mutex_unlock(&mdsc->mutex);
+ goto out;
+ }
+ BUG_ON(req->r_reply);
+
+ if (!head->safe) {
+ req->r_got_unsafe = true;
+ list_add_tail(&req->r_unsafe_item, &req->r_session->s_unsafe);
+ }
+
+ mutex_unlock(&mdsc->mutex);
+
+ mutex_lock(&req->r_session->s_mutex);
+
+ /* parse */
+ rinfo = &req->r_reply_info;
+ err = parse_reply_info(msg, rinfo);
+ if (err < 0) {
+ derr(0, "handle_reply got corrupt reply\n");
+ goto out_err;
+ }
+ result = le32_to_cpu(rinfo->head->result);
+ dout(10, "handle_reply tid %lld result %d\n", tid, result);
+
+ /*
+ * Tolerate 2 consecutive ESTALEs from the same mds.
+ * FIXME: we should be looking at the cap migrate_seq.
+ */
+ if (result == -ESTALE) {
+ req->r_direct_mode = USE_AUTH_MDS;
+ req->r_num_stale++;
+ if (req->r_num_stale <= 2) {
+ put_request_sessions(req);
+ __do_request(mdsc, req);
+ goto out_session_unlock;
+ }
+ } else {
+ req->r_num_stale = 0;
+ }
+
+ /* snap trace */
+ if (rinfo->snapblob_len) {
+ down_write(&mdsc->snap_rwsem);
+ ceph_update_snap_trace(mdsc, rinfo->snapblob,
+ rinfo->snapblob + rinfo->snapblob_len,
+ le32_to_cpu(head->op) == CEPH_MDS_OP_RMSNAP);
+ downgrade_write(&mdsc->snap_rwsem);
+ } else {
+ down_read(&mdsc->snap_rwsem);
+ }
+
+ /* insert trace into our cache */
+ err = ceph_fill_trace(mdsc->client->sb, req, req->r_session);
+ if (err == 0) {
+ if (result == 0 && rinfo->dir_nr)
+ ceph_readdir_prepopulate(req, req->r_session);
+ ceph_unreserve_caps(&req->r_caps_reservation);
+ }
+
+ up_read(&mdsc->snap_rwsem);
+out_err:
+ if (err) {
+ req->r_err = err;
+ } else {
+ req->r_reply = msg;
+ ceph_msg_get(msg);
+ }
+
+ add_cap_releases(mdsc, req->r_session, -1);
+out_session_unlock:
+ mutex_unlock(&req->r_session->s_mutex);
+
+ /* kick calling process */
+ complete_request(mdsc, req);
+out:
+ ceph_mdsc_put_request(req);
+ return;
+}
+
+
+
+/*
+ * handle mds notification that our request has been forwarded.
+ */
+void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg)
+{
+ struct ceph_mds_request *req;
+ u64 tid;
+ u32 next_mds;
+ u32 fwd_seq;
+ u8 must_resend;
+ int err = -EINVAL;
+ void *p = msg->front.iov_base;
+ void *end = p + msg->front.iov_len;
+ int from_mds;
+
+ if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
+ goto bad;
+ from_mds = le32_to_cpu(msg->hdr.src.name.num);
+
+ ceph_decode_need(&p, end, sizeof(u64)+2*sizeof(u32), bad);
+ ceph_decode_64(&p, tid);
+ ceph_decode_32(&p, next_mds);
+ ceph_decode_32(&p, fwd_seq);
+ ceph_decode_8(&p, must_resend);
+
+ mutex_lock(&mdsc->mutex);
+ req = __lookup_request(mdsc, tid);
+ if (!req) {
+ dout(10, "forward %llu dne\n", tid);
+ goto out; /* dup reply? */
+ }
+
+ if (fwd_seq <= req->r_num_fwd) {
+ dout(10, "forward %llu to mds%d - old seq %d <= %d\n",
+ tid, next_mds, req->r_num_fwd, fwd_seq);
+ } else if (!must_resend &&
+ __have_session(mdsc, next_mds) &&
+ mdsc->sessions[next_mds]->s_state == CEPH_MDS_SESSION_OPEN) {
+ /* yes. adjust our sessions, but that's all; the old mds
+ * forwarded our message for us. */
+ dout(10, "forward %llu to mds%d (mds%d fwded)\n", tid, next_mds,
+ from_mds);
+ req->r_num_fwd = fwd_seq;
+ put_request_sessions(req);
+ req->r_session = __ceph_lookup_mds_session(mdsc, next_mds);
+ req->r_fwd_session = __ceph_lookup_mds_session(mdsc, from_mds);
+ } else {
+ /* no, resend. */
+ /* forward race not possible; mds would drop */
+ dout(10, "forward %llu to mds%d (we resend)\n", tid, next_mds);
+ req->r_num_fwd = fwd_seq;
+ req->r_resend_mds = next_mds;
+ put_request_sessions(req);
+ __do_request(mdsc, req);
+ }
+ ceph_mdsc_put_request(req);
+out:
+ mutex_unlock(&mdsc->mutex);
+ return;
+
+bad:
+ derr(0, "problem decoding message, err=%d\n", err);
+}
+
+/*
+ * handle a mds session control message
+ */
+void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg)
+{
+ u32 op;
+ u64 seq;
+ struct ceph_mds_session *session = NULL;
+ int mds;
+ struct ceph_mds_session_head *h = msg->front.iov_base;
+ int wake = 0;
+
+ if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
+ return;
+ mds = le32_to_cpu(msg->hdr.src.name.num);
+
+ /* decode */
+ if (msg->front.iov_len != sizeof(*h))
+ goto bad;
+ op = le32_to_cpu(h->op);
+ seq = le64_to_cpu(h->seq);
+
+ mutex_lock(&mdsc->mutex);
+ session = __ceph_lookup_mds_session(mdsc, mds);
+ if (session && mdsc->mdsmap)
+ /* FIXME: this ttl calculation is generous */
+ session->s_ttl = jiffies + HZ*mdsc->mdsmap->m_session_autoclose;
+ mutex_unlock(&mdsc->mutex);
+
+ if (!session) {
+ if (op != CEPH_SESSION_OPEN) {
+ dout(10, "handle_session no session for mds%d\n", mds);
+ return;
+ }
+ dout(10, "handle_session creating session for mds%d\n", mds);
+ session = register_session(mdsc, mds);
+ }
+
+ mutex_lock(&session->s_mutex);
+
+ dout(2, "handle_session mds%d %s %p state %s seq %llu\n",
+ mds, ceph_session_op_name(op), session,
+ session_state_name(session->s_state), seq);
+ switch (op) {
+ case CEPH_SESSION_OPEN:
+ session->s_state = CEPH_MDS_SESSION_OPEN;
+ renewed_caps(mdsc, session, 0);
+ wake = 1;
+ if (mdsc->stopping)
+ __close_session(mdsc, session);
+ break;
+
+ case CEPH_SESSION_RENEWCAPS:
+ renewed_caps(mdsc, session, 1);
+ break;
+
+ case CEPH_SESSION_CLOSE:
+ unregister_session(mdsc, mds);
+ remove_session_caps(session);
+ wake = 1; /* for good measure */
+ complete(&mdsc->session_close_waiters);
+ kick_requests(mdsc, mds, 0); /* cur only */
+ break;
+
+ case CEPH_SESSION_STALE:
+ dout(1, "mds%d caps went stale, renewing\n", session->s_mds);
+ spin_lock(&session->s_cap_lock);
+ session->s_cap_gen++;
+ session->s_cap_ttl = 0;
+ spin_unlock(&session->s_cap_lock);
+ send_renew_caps(mdsc, session);
+ break;
+
+ case CEPH_SESSION_RECALL_STATE:
+ trim_caps(mdsc, session, le32_to_cpu(h->max_caps));
+ break;
+
+ default:
+ derr(0, "bad session op %d from mds%d\n", op, mds);
+ WARN_ON(1);
+ }
+
+ mutex_unlock(&session->s_mutex);
+ if (wake) {
+ mutex_lock(&mdsc->mutex);
+ __wake_requests(mdsc, &session->s_waiting);
+ mutex_unlock(&mdsc->mutex);
+ }
+ ceph_put_mds_session(session);
+ return;
+
+bad:
+ derr(1, "corrupt mds%d session message, len %d, expected %d\n", mds,
+ (int)msg->front.iov_len, (int)sizeof(*h));
+ return;
+}
+
+
+/*
+ * called under session->mutex.
+ */
+static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
+ struct ceph_mds_session *session)
+{
+ struct list_head *p, *n;
+ struct ceph_mds_request *req;
+ int err;
+
+ dout(10, "replay_unsafe_requests mds%d\n", session->s_mds);
+
+ mutex_lock(&mdsc->mutex);
+ list_for_each_safe(p, n, &session->s_unsafe) {
+ req = list_entry(p, struct ceph_mds_request, r_unsafe_item);
+ err = __prepare_send_request(mdsc, req, session->s_mds);
+ if (!err) {
+ ceph_msg_get(req->r_request);
+ ceph_send_msg_mds(mdsc, req->r_request, session->s_mds);
+ }
+ }
+ mutex_unlock(&mdsc->mutex);
+}
+
+struct encode_caps_data {
+ void **pp;
+ void *end;
+ int *num_caps;
+};
+
+static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
+ void *arg)
+{
+ struct ceph_mds_cap_reconnect *rec;
+ struct ceph_inode_info *ci;
+ struct encode_caps_data *data = (struct encode_caps_data *)arg;
+ void *p = *(data->pp);
+ void *end = data->end;
+ char *path;
+ int pathlen, err;
+ u64 pathbase;
+ struct dentry *dentry;
+
+ ci = cap->ci;
+
+ dout(10, " adding %p ino %llx.%llx cap %p %s\n",
+ inode, ceph_vinop(inode), cap,
+ ceph_cap_string(cap->issued));
+ ceph_decode_need(&p, end, sizeof(u64), needmore);
+ ceph_encode_64(&p, ceph_ino(inode));
+
+ dentry = d_find_alias(inode);
+ if (dentry) {
+ path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
+ if (IS_ERR(path)) {
+ err = PTR_ERR(path);
+ BUG_ON(err);
+ }
+ } else {
+ path = NULL;
+ pathlen = 0;
+ }
+ ceph_decode_need(&p, end, pathlen+4, needmore);
+ ceph_encode_string(&p, end, path, pathlen);
+
+ ceph_decode_need(&p, end, sizeof(*rec), needmore);
+ rec = p;
+ p += sizeof(*rec);
+ BUG_ON(p > end);
+ spin_lock(&inode->i_lock);
+ cap->seq = 0; /* reset cap seq */
+ rec->cap_id = cpu_to_le64(cap->cap_id);
+ rec->pathbase = cpu_to_le64(pathbase);
+ rec->wanted = cpu_to_le32(__ceph_caps_wanted(ci));
+ rec->issued = cpu_to_le32(cap->issued);
+ rec->size = cpu_to_le64(inode->i_size);
+ ceph_encode_timespec(&rec->mtime, &inode->i_mtime);
+ ceph_encode_timespec(&rec->atime, &inode->i_atime);
+ rec->snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
+ spin_unlock(&inode->i_lock);
+
+ kfree(path);
+ dput(dentry);
+ (*data->num_caps)++;
+ *(data->pp) = p;
+ return 0;
+needmore:
+ return -ENOSPC;
+}
+
+
+/*
+ * If an MDS fails and recovers, it needs to reconnect with clients in order
+ * to reestablish shared state. This includes all caps issued through this
+ * session _and_ the snap_realm hierarchy. Because it's not clear which
+ * snap realms the mds cares about, we send everything we know about.. that
+ * ensures we'll then get any new info the recovering MDS might have.
+ *
+ * This is a relatively heavyweight operation, but it's rare.
+ *
+ * called with mdsc->mutex held.
+ */
+static void send_mds_reconnect(struct ceph_mds_client *mdsc, int mds)
+{
+ struct ceph_mds_session *session;
+ struct ceph_msg *reply;
+ int newlen, len = 4 + 1;
+ void *p, *end;
+ int err;
+ int num_caps, num_realms = 0;
+ int got;
+ u64 next_snap_ino = 0;
+ __le32 *pnum_caps, *pnum_realms;
+ struct encode_caps_data iter_args;
+
+ dout(1, "reconnect to recovering mds%d\n", mds);
+
+ /* find session */
+ session = __ceph_lookup_mds_session(mdsc, mds);
+ mutex_unlock(&mdsc->mutex); /* drop lock for duration */
+
+ if (session) {
+ mutex_lock(&session->s_mutex);
+
+ session->s_state = CEPH_MDS_SESSION_RECONNECTING;
+ session->s_seq = 0;
+
+ /* replay unsafe requests */
+ replay_unsafe_requests(mdsc, session);
+
+ /* estimate needed space */
+ len += session->s_nr_caps *
+ sizeof(struct ceph_mds_cap_reconnect);
+ len += session->s_nr_caps * (100); /* guess! */
+ dout(40, "estimating i need %d bytes for %d caps\n",
+ len, session->s_nr_caps);
+ } else {
+ dout(20, "no session for mds%d, will send short reconnect\n",
+ mds);
+ }
+
+ down_read(&mdsc->snap_rwsem);
+
+retry:
+ /* build reply */
+ reply = ceph_msg_new(CEPH_MSG_CLIENT_RECONNECT, len, 0, 0, NULL);
+ if (IS_ERR(reply)) {
+ err = PTR_ERR(reply);
+ derr(0, "ENOMEM trying to send mds reconnect to mds%d\n", mds);
+ goto out;
+ }
+ p = reply->front.iov_base;
+ end = p + len;
+
+ if (!session) {
+ ceph_encode_8(&p, 1); /* session was closed */
+ ceph_encode_32(&p, 0);
+ goto send;
+ }
+ dout(10, "session %p state %s\n", session,
+ session_state_name(session->s_state));
+
+ /* traverse this session's caps */
+ ceph_encode_8(&p, 0);
+ pnum_caps = p;
+ ceph_encode_32(&p, session->s_nr_caps);
+ num_caps = 0;
+
+ iter_args.pp = &p;
+ iter_args.end = end;
+ iter_args.num_caps = &num_caps;
+ err = iterate_session_caps(session, encode_caps_cb, &iter_args);
+ if (err == -ENOSPC)
+ goto needmore;
+ if (err < 0)
+ goto out;
+ *pnum_caps = cpu_to_le32(num_caps);
+
+ /*
+ * snaprealms. we provide mds with the ino, seq (version), and
+ * parent for all of our realms. If the mds has any newer info,
+ * it will tell us.
+ */
+ next_snap_ino = 0;
+ /* save some space for the snaprealm count */
+ pnum_realms = p;
+ ceph_decode_need(&p, end, sizeof(*pnum_realms), needmore);
+ p += sizeof(*pnum_realms);
+ num_realms = 0;
+ while (1) {
+ struct ceph_snap_realm *realm;
+ struct ceph_mds_snaprealm_reconnect *sr_rec;
+ got = radix_tree_gang_lookup(&mdsc->snap_realms,
+ (void **)&realm, next_snap_ino, 1);
+ if (!got)
+ break;
+
+ dout(10, " adding snap realm %llx seq %lld parent %llx\n",
+ realm->ino, realm->seq, realm->parent_ino);
+ ceph_decode_need(&p, end, sizeof(*sr_rec), needmore);
+ sr_rec = p;
+ sr_rec->ino = cpu_to_le64(realm->ino);
+ sr_rec->seq = cpu_to_le64(realm->seq);
+ sr_rec->parent = cpu_to_le64(realm->parent_ino);
+ p += sizeof(*sr_rec);
+ num_realms++;
+ next_snap_ino = realm->ino + 1;
+ }
+ *pnum_realms = cpu_to_le32(num_realms);
+
+send:
+ reply->front.iov_len = p - reply->front.iov_base;
+ reply->hdr.front_len = cpu_to_le32(reply->front.iov_len);
+ dout(10, "final len was %u (guessed %d)\n",
+ (unsigned)reply->front.iov_len, len);
+ ceph_send_msg_mds(mdsc, reply, mds);
+
+ if (session) {
+ session->s_state = CEPH_MDS_SESSION_OPEN;
+ __wake_requests(mdsc, &session->s_waiting);
+ }
+
+out:
+ up_read(&mdsc->snap_rwsem);
+ if (session) {
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ }
+ mutex_lock(&mdsc->mutex);
+ return;
+
+needmore:
+ /*
+ * we need a larger buffer. this doesn't very accurately
+ * factor in snap realms, but it's safe.
+ */
+ num_caps += num_realms;
+ newlen = (len * (session->s_nr_caps+3)) / (num_caps + 1);
+ dout(30, "i guessed %d, and did %d of %d caps, retrying with %d\n",
+ len, num_caps, session->s_nr_caps, newlen);
+ len = newlen;
+ ceph_msg_put(reply);
+ goto retry;
+}
+
+
+/*
+ * if the client is unresponsive for long enough, the mds will kill
+ * the session entirely.
+ */
+void ceph_mdsc_handle_reset(struct ceph_mds_client *mdsc, int mds)
+{
+ derr(1, "mds%d gave us the boot. IMPLEMENT RECONNECT.\n", mds);
+}
+
+
+
+/*
+ * compare old and new mdsmaps, kicking requests
+ * and closing out old connections as necessary
+ *
+ * called under mdsc->mutex.
+ */
+static void check_new_map(struct ceph_mds_client *mdsc,
+ struct ceph_mdsmap *newmap,
+ struct ceph_mdsmap *oldmap)
+{
+ int i;
+ int oldstate, newstate;
+ struct ceph_mds_session *s;
+
+ dout(20, "check_new_map new %u old %u\n",
+ newmap->m_epoch, oldmap->m_epoch);
+
+ for (i = 0; i < oldmap->m_max_mds && i < mdsc->max_sessions; i++) {
+ if (mdsc->sessions[i] == NULL)
+ continue;
+ s = mdsc->sessions[i];
+ oldstate = ceph_mdsmap_get_state(oldmap, i);
+ newstate = ceph_mdsmap_get_state(newmap, i);
+
+ dout(20, "check_new_map mds%d state %s -> %s (session %s)\n",
+ i, ceph_mds_state_name(oldstate),
+ ceph_mds_state_name(newstate),
+ session_state_name(s->s_state));
+ if (newstate < oldstate) {
+ /* if the state moved backwards, that means
+ * the old mds failed and/or a new mds is
+ * recovering in its place. */
+ /* notify messenger to close out old messages,
+ * socket. */
+ ceph_messenger_mark_down(mdsc->client->msgr,
+ &oldmap->m_addr[i]);
+
+ if (s->s_state == CEPH_MDS_SESSION_OPENING) {
+ /* the session never opened, just close it
+ * out now */
+ __wake_requests(mdsc, &s->s_waiting);
+ unregister_session(mdsc, i);
+ }
+
+ /* kick any requests waiting on the recovering mds */
+ kick_requests(mdsc, i, 1);
+ continue;
+ }
+
+ /*
+ * kick requests on any mds that has gone active.
+ *
+ * kick requests on cur or forwarder: we may have sent
+ * the request to mds1, mds1 told us it forwarded it
+ * to mds2, but then we learn mds1 failed and can't be
+ * sure it successfully forwarded our request before
+ * it died.
+ */
+ if (oldstate < CEPH_MDS_STATE_ACTIVE &&
+ newstate >= CEPH_MDS_STATE_ACTIVE)
+ kick_requests(mdsc, i, 1);
+ }
+}
+
+
+
+/*
+ * leases
+ */
+
+/*
+ * caller must hold session s_mutex, dentry->d_lock
+ */
+void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry)
+{
+ struct ceph_dentry_info *di = ceph_dentry(dentry);
+
+ ceph_put_mds_session(di->lease_session);
+ di->lease_session = NULL;
+}
+
+void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+ struct super_block *sb = mdsc->client->sb;
+ struct inode *inode;
+ struct ceph_mds_session *session;
+ struct ceph_inode_info *ci;
+ struct dentry *parent, *dentry;
+ struct ceph_dentry_info *di;
+ int mds;
+ struct ceph_mds_lease *h = msg->front.iov_base;
+ struct ceph_vino vino;
+ int mask;
+ struct qstr dname;
+ int release = 0;
+
+ if (le32_to_cpu(msg->hdr.src.name.type) != CEPH_ENTITY_TYPE_MDS)
+ return;
+ mds = le32_to_cpu(msg->hdr.src.name.num);
+ dout(10, "handle_lease from mds%d\n", mds);
+
+ /* decode */
+ if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
+ goto bad;
+ vino.ino = le64_to_cpu(h->ino);
+ vino.snap = CEPH_NOSNAP;
+ mask = le16_to_cpu(h->mask);
+ dname.name = (void *)h + sizeof(*h) + sizeof(u32);
+ dname.len = msg->front.iov_len - sizeof(*h) - sizeof(u32);
+ if (dname.len != le32_to_cpu(*(__le32 *)(h+1)))
+ goto bad;
+
+ /* find session */
+ mutex_lock(&mdsc->mutex);
+ session = __ceph_lookup_mds_session(mdsc, mds);
+ mutex_unlock(&mdsc->mutex);
+ if (!session) {
+ derr(0, "WTF, got lease but no session for mds%d\n", mds);
+ return;
+ }
+
+ mutex_lock(&session->s_mutex);
+ session->s_seq++;
+
+ /* lookup inode */
+ inode = ceph_find_inode(sb, vino);
+ dout(20, "handle_lease '%s', mask %d, ino %llx %p\n",
+ ceph_lease_op_name(h->action), mask, vino.ino, inode);
+ if (inode == NULL) {
+ dout(10, "handle_lease no inode %llx\n", vino.ino);
+ goto release;
+ }
+ ci = ceph_inode(inode);
+
+ /* dentry */
+ parent = d_find_alias(inode);
+ if (!parent) {
+ dout(10, "no parent dentry on inode %p\n", inode);
+ WARN_ON(1);
+ goto release; /* hrm... */
+ }
+ dname.hash = full_name_hash(dname.name, dname.len);
+ dentry = d_lookup(parent, &dname);
+ dput(parent);
+ if (!dentry)
+ goto release;
+
+ spin_lock(&dentry->d_lock);
+ di = ceph_dentry(dentry);
+ switch (h->action) {
+ case CEPH_MDS_LEASE_REVOKE:
+ if (di && di->lease_session == session) {
+ h->seq = cpu_to_le32(di->lease_seq);
+ __ceph_mdsc_drop_dentry_lease(dentry);
+ }
+ release = 1;
+ break;
+
+ case CEPH_MDS_LEASE_RENEW:
+ if (di && di->lease_session == session &&
+ di->lease_gen == session->s_cap_gen &&
+ di->lease_renew_from &&
+ di->lease_renew_after == 0) {
+ unsigned long duration =
+ le32_to_cpu(h->duration_ms) * HZ / 1000;
+
+ di->lease_seq = le32_to_cpu(h->seq);
+ dentry->d_time = di->lease_renew_from + duration;
+ di->lease_renew_after = di->lease_renew_from +
+ (duration >> 1);
+ di->lease_renew_from = 0;
+ }
+ break;
+ }
+ spin_unlock(&dentry->d_lock);
+ dput(dentry);
+
+ if (!release)
+ goto out;
+
+release:
+ /* let's just reuse the same message */
+ h->action = CEPH_MDS_LEASE_REVOKE_ACK;
+ ceph_msg_get(msg);
+ ceph_send_msg_mds(mdsc, msg, mds);
+
+out:
+ iput(inode);
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ return;
+
+bad:
+ dout(0, "corrupt lease message\n");
+}
+
+void ceph_mdsc_lease_send_msg(struct ceph_mds_client *mdsc, int mds,
+ struct inode *inode,
+ struct dentry *dentry, char action,
+ u32 seq)
+{
+ struct ceph_msg *msg;
+ struct ceph_mds_lease *lease;
+ int len = sizeof(*lease) + sizeof(u32);
+ int dnamelen = 0;
+
+ dout(30, "lease_send_msg inode %p dentry %p %s to mds%d\n",
+ inode, dentry, ceph_lease_op_name(action), mds);
+ dnamelen = dentry->d_name.len;
+ len += dnamelen;
+
+ msg = ceph_msg_new(CEPH_MSG_CLIENT_LEASE, len, 0, 0, NULL);
+ if (IS_ERR(msg))
+ return;
+ lease = msg->front.iov_base;
+ lease->action = action;
+ lease->mask = cpu_to_le16(CEPH_LOCK_DN);
+ lease->ino = cpu_to_le64(ceph_vino(inode).ino);
+ lease->first = lease->last = cpu_to_le64(ceph_vino(inode).snap);
+ lease->seq = cpu_to_le32(seq);
+ *(__le32 *)((void *)lease + sizeof(*lease)) = cpu_to_le32(dnamelen);
+ memcpy((void *)lease + sizeof(*lease) + 4, dentry->d_name.name,
+ dnamelen);
+
+ /*
+ * if this is a preemptive lease RELEASE, no need to
+ * flush request stream, since the actual request will
+ * soon follow.
+ */
+ msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
+
+ ceph_send_msg_mds(mdsc, msg, mds);
+}
+
+/*
+ * Preemptively release a lease we expect to invalidate anyway.
+ * Pass @inode always, @dentry is optional.
+ */
+void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
+ struct dentry *dentry, int mask)
+{
+ struct ceph_dentry_info *di;
+ int mds = -1;
+ u32 seq;
+
+ BUG_ON(inode == NULL);
+ BUG_ON(dentry == NULL);
+ BUG_ON(mask != CEPH_LOCK_DN);
+
+ /* is dentry lease valid? */
+ spin_lock(&dentry->d_lock);
+ di = ceph_dentry(dentry);
+ if (!di || !di->lease_session ||
+ di->lease_session->s_mds < 0 ||
+ di->lease_gen != di->lease_session->s_cap_gen ||
+ !time_before(jiffies, dentry->d_time)) {
+ dout(10, "lease_release inode %p dentry %p -- "
+ "no lease on %d\n",
+ inode, dentry, mask);
+ spin_unlock(&dentry->d_lock);
+ return;
+ }
+
+ /* we do have a lease on this dentry; note mds and seq */
+ mds = di->lease_session->s_mds;
+ seq = di->lease_seq;
+ __ceph_mdsc_drop_dentry_lease(dentry);
+ spin_unlock(&dentry->d_lock);
+
+ dout(10, "lease_release inode %p dentry %p mask %d to mds%d\n",
+ inode, dentry, mask, mds);
+ ceph_mdsc_lease_send_msg(mdsc, mds, inode, dentry,
+ CEPH_MDS_LEASE_RELEASE, seq);
+}
+
+
+/*
+ * delayed work -- periodically trim expired leases, renew caps with mds
+ */
+static void schedule_delayed(struct ceph_mds_client *mdsc)
+{
+ int delay = 5;
+ unsigned hz = round_jiffies_relative(HZ * delay);
+ schedule_delayed_work(&mdsc->delayed_work, hz);
+}
+
+static void delayed_work(struct work_struct *work)
+{
+ int i;
+ struct ceph_mds_client *mdsc =
+ container_of(work, struct ceph_mds_client, delayed_work.work);
+ int renew_interval;
+ int renew_caps;
+ u32 want_map = 0;
+
+ dout(30, "delayed_work\n");
+ ceph_check_delayed_caps(mdsc);
+
+ mutex_lock(&mdsc->mutex);
+ renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
+ renew_caps = time_after_eq(jiffies, HZ*renew_interval +
+ mdsc->last_renew_caps);
+ if (renew_caps)
+ mdsc->last_renew_caps = jiffies;
+
+ for (i = 0; i < mdsc->max_sessions; i++) {
+ struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
+ if (s == NULL)
+ continue;
+ if (s->s_state == CEPH_MDS_SESSION_CLOSING) {
+ dout(10, "resending session close request for mds%d\n",
+ s->s_mds);
+ request_close_session(mdsc, s);
+ ceph_put_mds_session(s);
+ continue;
+ }
+ if (s->s_ttl && time_after(jiffies, s->s_ttl)) {
+ derr(1, "mds%d session probably timed out, "
+ "requesting mds map\n", s->s_mds);
+ want_map = mdsc->mdsmap->m_epoch;
+ }
+ if (s->s_state < CEPH_MDS_SESSION_OPEN) {
+ /* this mds is failed or recovering, just wait */
+ ceph_put_mds_session(s);
+ continue;
+ }
+ mutex_unlock(&mdsc->mutex);
+
+ mutex_lock(&s->s_mutex);
+ if (renew_caps)
+ send_renew_caps(mdsc, s);
+ add_cap_releases(mdsc, s, -1);
+ send_cap_releases(mdsc, s);
+ mutex_unlock(&s->s_mutex);
+ ceph_put_mds_session(s);
+
+ mutex_lock(&mdsc->mutex);
+ }
+ mutex_unlock(&mdsc->mutex);
+
+ if (want_map)
+ ceph_monc_request_mdsmap(&mdsc->client->monc, want_map);
+
+ schedule_delayed(mdsc);
+}
+
+
+void ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
+{
+ mdsc->client = client;
+ mutex_init(&mdsc->mutex);
+ mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
+ init_completion(&mdsc->safe_umount_waiters);
+ init_completion(&mdsc->session_close_waiters);
+ INIT_LIST_HEAD(&mdsc->waiting_for_map);
+ mdsc->sessions = NULL;
+ mdsc->max_sessions = 0;
+ mdsc->stopping = 0;
+ init_rwsem(&mdsc->snap_rwsem);
+ INIT_RADIX_TREE(&mdsc->snap_realms, GFP_NOFS);
+ INIT_LIST_HEAD(&mdsc->snap_empty);
+ spin_lock_init(&mdsc->snap_empty_lock);
+ mdsc->last_tid = 0;
+ INIT_RADIX_TREE(&mdsc->request_tree, GFP_NOFS);
+ INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work);
+ mdsc->last_renew_caps = jiffies;
+ INIT_LIST_HEAD(&mdsc->cap_delay_list);
+ spin_lock_init(&mdsc->cap_delay_lock);
+ INIT_LIST_HEAD(&mdsc->snap_flush_list);
+ spin_lock_init(&mdsc->snap_flush_lock);
+ INIT_LIST_HEAD(&mdsc->cap_dirty);
+ INIT_LIST_HEAD(&mdsc->cap_sync);
+ spin_lock_init(&mdsc->cap_dirty_lock);
+ init_waitqueue_head(&mdsc->cap_sync_wq);
+ spin_lock_init(&mdsc->dentry_lru_lock);
+ INIT_LIST_HEAD(&mdsc->dentry_lru);
+}
+
+/*
+ * drop all leases (and dentry refs) in preparation for umount
+ */
+static void drop_leases(struct ceph_mds_client *mdsc)
+{
+ int i;
+
+ dout(10, "drop_leases\n");
+ mutex_lock(&mdsc->mutex);
+ for (i = 0; i < mdsc->max_sessions; i++) {
+ struct ceph_mds_session *s = __ceph_lookup_mds_session(mdsc, i);
+ if (!s)
+ continue;
+ mutex_unlock(&mdsc->mutex);
+ mutex_lock(&s->s_mutex);
+ mutex_unlock(&s->s_mutex);
+ ceph_put_mds_session(s);
+ mutex_lock(&mdsc->mutex);
+ }
+ mutex_unlock(&mdsc->mutex);
+}
+
+/*
+ * Wait for safe replies on open mds requests. If we time out, drop
+ * all requests from the tree to avoid dangling dentry refs.
+ */
+static void wait_requests(struct ceph_mds_client *mdsc)
+{
+ struct ceph_mds_request *req;
+ struct ceph_client *client = mdsc->client;
+
+ mutex_lock(&mdsc->mutex);
+ if (__get_oldest_tid(mdsc)) {
+ mutex_unlock(&mdsc->mutex);
+ dout(10, "wait_requests waiting for requests\n");
+ wait_for_completion_timeout(&mdsc->safe_umount_waiters,
+ client->mount_args.mount_timeout * HZ);
+ mutex_lock(&mdsc->mutex);
+
+ /* tear down remaining requests */
+ while (radix_tree_gang_lookup(&mdsc->request_tree,
+ (void **)&req, 0, 1)) {
+ dout(10, "wait_requests timed out on tid %llu\n",
+ req->r_tid);
+ radix_tree_delete(&mdsc->request_tree, req->r_tid);
+ ceph_mdsc_put_request(req);
+ }
+ }
+ mutex_unlock(&mdsc->mutex);
+ dout(10, "wait_requests done\n");
+}
+
+/*
+ * called before mount is ro, and before dentries are torn down.
+ * (hmm, does this still race with new lookups?)
+ */
+void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
+{
+ dout(10, "pre_umount\n");
+ mdsc->stopping = 1;
+
+ drop_leases(mdsc);
+ ceph_check_delayed_caps(mdsc);
+ wait_requests(mdsc);
+}
+
+/*
+ * sync - flush all dirty inode data to disk
+ */
+static int are_no_sync_caps(struct ceph_mds_client *mdsc)
+{
+ int empty;
+ spin_lock(&mdsc->cap_dirty_lock);
+ empty = list_empty(&mdsc->cap_sync);
+ spin_unlock(&mdsc->cap_dirty_lock);
+ dout(20, "are_no_sync_caps = %d\n", empty);
+ return empty;
+}
+
+void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
+{
+ dout(10, "sync\n");
+ ceph_check_delayed_caps(mdsc);
+ wait_event(mdsc->cap_sync_wq, are_no_sync_caps(mdsc));
+}
+
+
+/*
+ * called after sb is ro.
+ */
+void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
+{
+ struct ceph_mds_session *session;
+ int i;
+ int n;
+ struct ceph_client *client = mdsc->client;
+ unsigned long started, timeout = client->mount_args.mount_timeout * HZ;
+
+ dout(10, "close_sessions\n");
+
+ mutex_lock(&mdsc->mutex);
+
+ /* close sessions */
+ started = jiffies;
+ while (time_before(jiffies, started + timeout)) {
+ dout(10, "closing sessions\n");
+ n = 0;
+ for (i = 0; i < mdsc->max_sessions; i++) {
+ session = __ceph_lookup_mds_session(mdsc, i);
+ if (!session)
+ continue;
+ mutex_unlock(&mdsc->mutex);
+ mutex_lock(&session->s_mutex);
+ __close_session(mdsc, session);
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ mutex_lock(&mdsc->mutex);
+ n++;
+ }
+ if (n == 0)
+ break;
+
+ if (client->mount_state == CEPH_MOUNT_SHUTDOWN)
+ break;
+
+ dout(10, "waiting for sessions to close\n");
+ mutex_unlock(&mdsc->mutex);
+ wait_for_completion_timeout(&mdsc->session_close_waiters,
+ timeout);
+ mutex_lock(&mdsc->mutex);
+ }
+
+ /* tear down remaining sessions */
+ for (i = 0; i < mdsc->max_sessions; i++) {
+ if (mdsc->sessions[i]) {
+ session = get_session(mdsc->sessions[i]);
+ unregister_session(mdsc, i);
+ mutex_unlock(&mdsc->mutex);
+ mutex_lock(&session->s_mutex);
+ remove_session_caps(session);
+ mutex_unlock(&session->s_mutex);
+ ceph_put_mds_session(session);
+ mutex_lock(&mdsc->mutex);
+ }
+ }
+
+ WARN_ON(!list_empty(&mdsc->cap_delay_list));
+
+ mutex_unlock(&mdsc->mutex);
+
+ ceph_cleanup_empty_realms(mdsc);
+
+ cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
+
+ dout(10, "stopped\n");
+}
+
+void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
+{
+ dout(10, "stop\n");
+ cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
+ if (mdsc->mdsmap)
+ ceph_mdsmap_destroy(mdsc->mdsmap);
+ kfree(mdsc->sessions);
+}
+
+
+/*
+ * handle mds map update.
+ */
+void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+ u32 epoch;
+ u32 maplen;
+ void *p = msg->front.iov_base;
+ void *end = p + msg->front.iov_len;
+ struct ceph_mdsmap *newmap, *oldmap;
+ ceph_fsid_t fsid;
+ int err = -EINVAL;
+ int from;
+ __le64 major, minor;
+
+ if (le32_to_cpu(msg->hdr.src.name.type) == CEPH_ENTITY_TYPE_MDS)
+ from = le32_to_cpu(msg->hdr.src.name.num);
+ else
+ from = -1;
+
+ ceph_decode_need(&p, end, sizeof(fsid)+2*sizeof(u32), bad);
+ ceph_decode_64_le(&p, major);
+ __ceph_fsid_set_major(&fsid, major);
+ ceph_decode_64_le(&p, minor);
+ __ceph_fsid_set_minor(&fsid, minor);
+ if (ceph_fsid_compare(&fsid, &mdsc->client->monc.monmap->fsid)) {
+ derr(0, "got mdsmap with wrong fsid\n");
+ return;
+ }
+ ceph_decode_32(&p, epoch);
+ ceph_decode_32(&p, maplen);
+ dout(2, "handle_map epoch %u len %d\n", epoch, (int)maplen);
+
+ /* do we need it? */
+ ceph_monc_got_mdsmap(&mdsc->client->monc, epoch);
+ mutex_lock(&mdsc->mutex);
+ if (mdsc->mdsmap && epoch <= mdsc->mdsmap->m_epoch) {
+ dout(2, "handle_map epoch %u <= our %u\n",
+ epoch, mdsc->mdsmap->m_epoch);
+ mutex_unlock(&mdsc->mutex);
+ return;
+ }
+
+ newmap = ceph_mdsmap_decode(&p, end);
+ if (IS_ERR(newmap)) {
+ err = PTR_ERR(newmap);
+ goto bad_unlock;
+ }
+
+ /* swap into place */
+ if (mdsc->mdsmap) {
+ oldmap = mdsc->mdsmap;
+ mdsc->mdsmap = newmap;
+ check_new_map(mdsc, newmap, oldmap);
+ ceph_mdsmap_destroy(oldmap);
+
+ /* reconnect? a recovering mds will send us an mdsmap,
+ * indicating their state is RECONNECTING, if it wants us
+ * to reconnect. */
+ if (from >= 0 && from < newmap->m_max_mds &&
+ ceph_mdsmap_get_state(newmap, from) ==
+ CEPH_MDS_STATE_RECONNECT)
+ send_mds_reconnect(mdsc, from);
+ } else {
+ mdsc->mdsmap = newmap; /* first mds map */
+ }
+
+ __wake_requests(mdsc, &mdsc->waiting_for_map);
+
+ mutex_unlock(&mdsc->mutex);
+ schedule_delayed(mdsc);
+ return;
+
+bad_unlock:
+ mutex_unlock(&mdsc->mutex);
+bad:
+ derr(1, "problem with mdsmap %d\n", err);
+ return;
+}
+
+
+/* eof */
diff --git a/fs/staging/ceph/mds_client.h b/fs/staging/ceph/mds_client.h
new file mode 100644
index 0000000..039b9e5
--- /dev/null
+++ b/fs/staging/ceph/mds_client.h
@@ -0,0 +1,347 @@
+#ifndef _FS_CEPH_MDS_CLIENT_H
+#define _FS_CEPH_MDS_CLIENT_H
+
+#include <linux/completion.h>
+#include <linux/list.h>
+#include <linux/mutex.h>
+#include <linux/radix-tree.h>
+#include <linux/spinlock.h>
+
+#include "types.h"
+#include "messenger.h"
+#include "mdsmap.h"
+
+/*
+ * A cluster of MDS (metadata server) daemons is responsible for
+ * managing the file system namespace (the directory hierarchy and
+ * inodes) and for coordinating shared access to storage. Metadata is
+ * partitioning hierarchically across a number of servers, and that
+ * partition varies over time as the cluster adjusts the distribution
+ * in order to balance load.
+ *
+ * The MDS client is primarily responsible to managing synchronous
+ * metadata requests for operations like open, unlink, and so forth.
+ * If there is a MDS failure, we find out about it when we (possibly
+ * request and) receive a new MDS map, and can resubmit affected
+ * requests.
+ *
+ * For the most part, though, we take advantage of a lossless
+ * communications channel to the MDS, and do not need to worry about
+ * timing out or resubmitting requests.
+ *
+ * We maintain a stateful "session" with each MDS we interact with.
+ * Within each session, we sent periodic heartbeat messages to ensure
+ * any capabilities or leases we have been issues remain valid. If
+ * the session times out and goes stale, our leases and capabilities
+ * are no longer valid.
+ */
+
+/*
+ * Some lock dependencies:
+ *
+ * session->s_mutex
+ * mdsc->mutex
+ *
+ * mdsc->snap_rwsem
+ *
+ * inode->i_lock
+ * mdsc->snap_flush_lock
+ * mdsc->cap_delay_lock
+ *
+ *
+ */
+
+struct ceph_client;
+struct ceph_cap;
+
+/*
+ * parsed info about a single inode. pointers are into the encoded
+ * on-wire structures within the mds reply message payload.
+ */
+struct ceph_mds_reply_info_in {
+ struct ceph_mds_reply_inode *in;
+ u32 symlink_len;
+ char *symlink;
+ u32 xattr_len;
+ char *xattr_data;
+};
+
+/*
+ * parsed info about an mds reply, including a "trace" from
+ * the referenced inode, through its parents up to the root
+ * directory, and directory contents (for readdir results).
+ */
+struct ceph_mds_reply_info_parsed {
+ struct ceph_mds_reply_head *head;
+
+ struct ceph_mds_reply_info_in diri, targeti;
+ struct ceph_mds_reply_dirfrag *dirfrag;
+ char *dname;
+ u32 dname_len;
+ struct ceph_mds_reply_lease *dlease;
+
+ struct ceph_mds_reply_dirfrag *dir_dir;
+ int dir_nr;
+ char **dir_dname;
+ u32 *dir_dname_len;
+ struct ceph_mds_reply_lease **dir_dlease;
+ struct ceph_mds_reply_info_in *dir_in;
+ u8 dir_complete, dir_end;
+
+ /* encoded blob describing snapshot contexts for certain
+ operations (e.g., open) */
+ void *snapblob;
+ int snapblob_len;
+};
+
+/*
+ * state associated with each MDS<->client session
+ */
+enum {
+ CEPH_MDS_SESSION_NEW = 1,
+ CEPH_MDS_SESSION_OPENING = 2,
+ CEPH_MDS_SESSION_OPEN = 3,
+ CEPH_MDS_SESSION_CLOSING = 5,
+ CEPH_MDS_SESSION_RECONNECTING = 6
+};
+
+#define CAPS_PER_RELEASE ((PAGE_CACHE_SIZE - \
+ sizeof(struct ceph_mds_cap_release)) / \
+ sizeof(struct ceph_mds_cap_item))
+
+struct ceph_mds_session {
+ int s_mds;
+ int s_state;
+ unsigned long s_ttl; /* time until mds kills us */
+ u64 s_seq; /* incoming msg seq # */
+ struct mutex s_mutex; /* serialize session messages */
+ spinlock_t s_cap_lock; /* protects s_caps, s_cap_{gen,ttl} */
+ u32 s_cap_gen; /* inc each time we get mds stale msg */
+ unsigned long s_cap_ttl; /* when session caps expire */
+ unsigned long s_renew_requested; /* last time we sent a renew req */
+ struct list_head s_caps; /* all caps issued by this session */
+ int s_nr_caps, s_trim_caps;
+ atomic_t s_ref;
+ struct list_head s_waiting; /* waiting requests */
+ struct list_head s_unsafe; /* unsafe requests */
+
+ int s_num_cap_releases;
+ struct list_head s_cap_releases; /* waiting cap_release messages */
+ struct list_head s_cap_releases_done; /* ready to send */
+};
+
+/*
+ * modes of choosing which MDS to send a request to
+ */
+enum {
+ USE_ANY_MDS,
+ USE_RANDOM_MDS,
+ USE_AUTH_MDS, /* prefer authoritative mds for this metadata item */
+};
+
+struct ceph_mds_request;
+struct ceph_mds_client;
+
+typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req);
+
+struct ceph_mds_request_attr {
+ struct attribute attr;
+ ssize_t (*show)(struct ceph_mds_request *,
+ struct ceph_mds_request_attr *,
+ char *);
+ ssize_t (*store)(struct ceph_mds_request *,
+ struct ceph_mds_request_attr *,
+ const char *, size_t);
+};
+
+/*
+ * an in-flight mds request
+ */
+struct ceph_mds_request {
+ u64 r_tid; /* transaction id */
+
+ int r_op;
+ struct inode *r_inode;
+ struct dentry *r_dentry;
+ struct dentry *r_old_dentry; /* rename from or link from */
+ const char *r_path1, *r_path2;
+ struct ceph_vino r_ino1, r_ino2;
+
+ union ceph_mds_request_args r_args;
+ struct page **r_pages;
+ int r_num_pages;
+ int r_data_len;
+
+ int r_inode_drop, r_inode_unless;
+ int r_dentry_drop, r_dentry_unless;
+ int r_old_dentry_drop, r_old_dentry_unless;
+ struct inode *r_old_inode;
+ int r_old_inode_drop, r_old_inode_unless;
+
+ struct inode *r_target_inode;
+
+ struct ceph_msg *r_request; /* original request */
+ struct ceph_msg *r_reply;
+ struct ceph_mds_reply_info_parsed r_reply_info;
+ int r_err;
+ unsigned long r_timeout; /* optional. jiffies */
+
+ unsigned long r_started; /* start time to measure timeout against */
+ unsigned long r_request_started; /* start time for mds request only,
+ used to measure lease durations */
+
+ /* for choosing which mds to send this request to */
+ int r_direct_mode;
+ u32 r_direct_hash; /* choose dir frag based on this dentry hash */
+ bool r_direct_is_hash; /* true if r_direct_hash is valid */
+
+ struct inode *r_unsafe_dir;
+ struct list_head r_unsafe_dir_item;
+
+ /* references to the trailing dentry and inode from parsing the
+ * mds response. also used to feed a VFS-provided dentry into
+ * the reply handler */
+ int r_fmode; /* file mode, if expecting cap */
+ struct ceph_mds_session *r_session;
+ struct ceph_mds_session *r_fwd_session; /* forwarded from */
+ struct inode *r_locked_dir; /* dir (if any) i_mutex locked by vfs */
+
+ int r_attempts; /* resend attempts */
+ int r_num_fwd; /* number of forward attempts */
+ int r_num_stale;
+ int r_resend_mds; /* mds to resend to next, if any*/
+
+ atomic_t r_ref;
+ struct list_head r_wait;
+ struct completion r_completion;
+ struct completion r_safe_completion;
+ ceph_mds_request_callback_t r_callback;
+ struct list_head r_unsafe_item; /* per-session unsafe list item */
+ bool r_got_unsafe, r_got_safe;
+
+ bool r_did_prepopulate;
+ u32 r_readdir_offset;
+
+ struct ceph_cap_reservation r_caps_reservation;
+ int r_num_caps;
+};
+
+/*
+ * mds client state
+ */
+struct ceph_mds_client {
+ struct ceph_client *client;
+ struct mutex mutex; /* all nested structures */
+
+ struct ceph_mdsmap *mdsmap;
+ struct completion safe_umount_waiters, session_close_waiters;
+ struct list_head waiting_for_map;
+
+ struct ceph_mds_session **sessions; /* NULL for mds if no session */
+ int max_sessions; /* len of s_mds_sessions */
+ int stopping; /* true if shutting down */
+
+ /*
+ * snap_rwsem will cover cap linkage into snaprealms, and
+ * realm snap contexts. (later, we can do per-realm snap
+ * contexts locks..) the empty list contains realms with no
+ * references (implying they contain no inodes with caps) that
+ * should be destroyed.
+ */
+ struct rw_semaphore snap_rwsem;
+ struct radix_tree_root snap_realms;
+ struct list_head snap_empty;
+ spinlock_t snap_empty_lock; /* protect snap_empty */
+
+ u64 last_tid; /* most recent mds request */
+ struct radix_tree_root request_tree; /* pending mds requests */
+ struct delayed_work delayed_work; /* delayed work */
+ unsigned long last_renew_caps; /* last time we renewed our caps */
+ struct list_head cap_delay_list; /* caps with delayed release */
+ spinlock_t cap_delay_lock; /* protects cap_delay_list */
+ struct list_head snap_flush_list; /* cap_snaps ready to flush */
+ spinlock_t snap_flush_lock;
+ struct list_head cap_dirty, cap_sync; /* inodes with dirty cap data */
+ spinlock_t cap_dirty_lock;
+ wait_queue_head_t cap_sync_wq;
+
+ struct dentry *debugfs_file;
+
+ spinlock_t dentry_lru_lock;
+ struct list_head dentry_lru;
+ int num_dentry;
+};
+
+extern const char *ceph_mds_op_name(int op);
+
+extern struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *, int mds);
+
+inline static struct ceph_mds_session *
+ceph_get_mds_session(struct ceph_mds_session *s)
+{
+ atomic_inc(&s->s_ref);
+ return s;
+}
+
+/*
+ * requests
+ */
+static inline void ceph_mdsc_get_request(struct ceph_mds_request *req)
+{
+ atomic_inc(&req->r_ref);
+}
+
+extern void ceph_put_mds_session(struct ceph_mds_session *s);
+
+extern void ceph_send_msg_mds(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg, int mds);
+
+extern void ceph_mdsc_init(struct ceph_mds_client *mdsc,
+ struct ceph_client *client);
+extern void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc);
+extern void ceph_mdsc_stop(struct ceph_mds_client *mdsc);
+
+extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc);
+
+extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg);
+extern void ceph_mdsc_handle_session(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg);
+extern void ceph_mdsc_handle_reply(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg);
+extern void ceph_mdsc_handle_forward(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg);
+
+extern void ceph_mdsc_handle_lease(struct ceph_mds_client *mdsc,
+ struct ceph_msg *msg);
+
+extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
+ struct inode *inode,
+ struct dentry *dn, int mask);
+
+extern struct ceph_mds_request *
+ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode);
+extern void ceph_mdsc_submit_request(struct ceph_mds_client *mdsc,
+ struct ceph_mds_request *req);
+extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
+ struct inode *listener,
+ struct ceph_mds_request *req);
+extern void ceph_mdsc_put_request(struct ceph_mds_request *req);
+
+extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
+
+extern void ceph_mdsc_handle_reset(struct ceph_mds_client *mdsc, int mds);
+
+extern struct ceph_mds_request *ceph_mdsc_get_listener_req(struct inode *inode,
+ u64 tid);
+extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
+ int stop_on_nosnap);
+
+extern void __ceph_mdsc_drop_dentry_lease(struct dentry *dentry);
+extern void ceph_mdsc_lease_send_msg(struct ceph_mds_client *mdsc, int mds,
+ struct inode *inode,
+ struct dentry *dentry, char action,
+ u32 seq);
+
+#endif
diff --git a/fs/staging/ceph/mdsmap.c b/fs/staging/ceph/mdsmap.c
new file mode 100644
index 0000000..b8fb067
--- /dev/null
+++ b/fs/staging/ceph/mdsmap.c
@@ -0,0 +1,132 @@
+#include <linux/bug.h>
+#include <linux/err.h>
+#include <linux/random.h>
+#include <linux/slab.h>
+#include <linux/types.h>
+
+#include "mdsmap.h"
+#include "messenger.h"
+#include "decode.h"
+
+#include "ceph_debug.h"
+
+int ceph_debug_mdsmap __read_mostly = -1;
+#define DOUT_MASK DOUT_MASK_MDSMAP
+#define DOUT_VAR ceph_debug_mdsmap
+#include "super.h"
+
+
+/*
+ * choose a random mds that is "up" (i.e. has a state > 0), or -1.
+ */
+int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
+{
+ int n = 0;
+ int i;
+ char r;
+
+ /* count */
+ for (i = 0; i < m->m_max_mds; i++)
+ if (m->m_state[i] > 0)
+ n++;
+ if (n == 0)
+ return -1;
+
+ /* pick */
+ get_random_bytes(&r, 1);
+ n = r % n;
+ i = 0;
+ for (i = 0; n > 0; i++, n--)
+ while (m->m_state[i] <= 0)
+ i++;
+
+ return i;
+}
+
+/*
+ * Ignore any fields we don't care about in the MDS map (there are quite
+ * a few of them).
+ */
+struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
+{
+ struct ceph_mdsmap *m;
+ int i, n;
+ int err = -EINVAL;
+
+ m = kzalloc(sizeof(*m), GFP_NOFS);
+ if (m == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ ceph_decode_need(p, end, 8*sizeof(u32), bad);
+ ceph_decode_32(p, m->m_epoch);
+ ceph_decode_32(p, m->m_client_epoch);
+ ceph_decode_32(p, m->m_last_failure);
+ ceph_decode_32(p, m->m_root);
+ ceph_decode_32(p, m->m_session_timeout);
+ ceph_decode_32(p, m->m_session_autoclose);
+ ceph_decode_32(p, m->m_max_mds);
+
+ m->m_addr = kzalloc(m->m_max_mds*sizeof(*m->m_addr), GFP_NOFS);
+ m->m_state = kzalloc(m->m_max_mds*sizeof(*m->m_state), GFP_NOFS);
+ if (m->m_addr == NULL || m->m_state == NULL)
+ goto badmem;
+
+ /* pick out active nodes from mds_info (state > 0) */
+ ceph_decode_32(p, n);
+ for (i = 0; i < n; i++) {
+ u32 namelen;
+ s32 mds, inc, state;
+ u64 state_seq;
+ struct ceph_entity_addr addr;
+
+ ceph_decode_need(p, end, sizeof(addr) + sizeof(u32), bad);
+ *p += sizeof(addr); /* skip addr key */
+ ceph_decode_32(p, namelen);
+ *p += namelen;
+ ceph_decode_need(p, end, 6*sizeof(u32) + sizeof(addr) +
+ sizeof(struct ceph_timespec), bad);
+ ceph_decode_32(p, mds);
+ ceph_decode_32(p, inc);
+ ceph_decode_32(p, state);
+ ceph_decode_64(p, state_seq);
+ ceph_decode_copy(p, &addr, sizeof(addr));
+ *p += sizeof(struct ceph_timespec) + 2*sizeof(u32);
+ dout(10, "mdsmap_decode %d/%d mds%d.%d %u.%u.%u.%u:%u %s\n",
+ i+1, n, mds, inc, IPQUADPORT(addr.ipaddr),
+ ceph_mds_state_name(state));
+ if (mds >= 0 && mds < m->m_max_mds && state > 0) {
+ m->m_state[mds] = state;
+ m->m_addr[mds] = addr;
+ }
+ }
+
+ /* pg_pools */
+ ceph_decode_32_safe(p, end, n, bad);
+ m->m_num_data_pg_pools = n;
+ m->m_data_pg_pools = kmalloc(sizeof(u32)*n, GFP_NOFS);
+ if (!m->m_data_pg_pools)
+ goto badmem;
+ ceph_decode_need(p, end, sizeof(u32)*(n+1), bad);
+ for (i = 0; i < n; i++)
+ ceph_decode_32(p, m->m_data_pg_pools[i]);
+ ceph_decode_32(p, m->m_cas_pg_pool);
+
+ /* ok, we don't care about the rest. */
+ dout(30, "mdsmap_decode success epoch %u\n", m->m_epoch);
+ return m;
+
+badmem:
+ err = -ENOMEM;
+bad:
+ derr(0, "corrupt mdsmap\n");
+ ceph_mdsmap_destroy(m);
+ return ERR_PTR(-EINVAL);
+}
+
+void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
+{
+ kfree(m->m_addr);
+ kfree(m->m_state);
+ kfree(m->m_data_pg_pools);
+ kfree(m);
+}
diff --git a/fs/staging/ceph/mdsmap.h b/fs/staging/ceph/mdsmap.h
new file mode 100644
index 0000000..5238923
--- /dev/null
+++ b/fs/staging/ceph/mdsmap.h
@@ -0,0 +1,45 @@
+#ifndef _FS_CEPH_MDSMAP_H
+#define _FS_CEPH_MDSMAP_H
+
+#include "types.h"
+
+/*
+ * mds map
+ *
+ * fields limited to those the client cares about
+ */
+struct ceph_mdsmap {
+ u32 m_epoch, m_client_epoch, m_last_failure;
+ u32 m_root;
+ u32 m_session_timeout; /* seconds */
+ u32 m_session_autoclose; /* seconds */
+ u32 m_max_mds; /* size of m_addr, m_state arrays */
+ struct ceph_entity_addr *m_addr; /* mds addrs */
+ s32 *m_state; /* states */
+
+ int m_num_data_pg_pools;
+ u32 *m_data_pg_pools;
+ u32 m_cas_pg_pool;
+};
+
+static inline struct ceph_entity_addr *
+ceph_mdsmap_get_addr(struct ceph_mdsmap *m, int w)
+{
+ if (w >= m->m_max_mds)
+ return NULL;
+ return &m->m_addr[w];
+}
+
+static inline int ceph_mdsmap_get_state(struct ceph_mdsmap *m, int w)
+{
+ BUG_ON(w < 0);
+ if (w >= m->m_max_mds)
+ return CEPH_MDS_STATE_DNE;
+ return m->m_state[w];
+}
+
+extern int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m);
+extern struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end);
+extern void ceph_mdsmap_destroy(struct ceph_mdsmap *m);
+
+#endif
--
1.5.6.5
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
Powered by blists - more mailing lists