lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite for Android: free password hash cracker in your pocket
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1236638439-6753-11-git-send-email-sage@newdream.net>
Date:	Mon,  9 Mar 2009 15:40:29 -0700
From:	Sage Weil <sage@...dream.net>
To:	linux-kernel@...r.kernel.org, linux-fsdevel@...r.kernel.org
Cc:	Sage Weil <sage@...dream.net>
Subject: [PATCH 10/20] ceph: OSD client

The OSD client is responsible for reading and writing data from/to the
object storage pool.  This includes determining where objects are
stored in the cluster, and ensuring that requests are retried or
redirected in the event of a node failure or data migration.

If an OSD does not respond before a timeout expires, 'ping' messages
are sent across the lossless, ordered communications channel to
ensure that any break in the TCP is discovered.  If the session does
reset, a reconnection is attempted and affected requests are resent
(by the message transport layer).

Signed-off-by: Sage Weil <sage@...dream.net>
---
 fs/ceph/osd_client.c | 1173 ++++++++++++++++++++++++++++++++++++++++++++++++++
 fs/ceph/osd_client.h |  142 ++++++
 fs/ceph/osdmap.c     |  641 +++++++++++++++++++++++++++
 fs/ceph/osdmap.h     |  106 +++++
 4 files changed, 2062 insertions(+), 0 deletions(-)
 create mode 100644 fs/ceph/osd_client.c
 create mode 100644 fs/ceph/osd_client.h
 create mode 100644 fs/ceph/osdmap.c
 create mode 100644 fs/ceph/osdmap.h

diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
new file mode 100644
index 0000000..1b15e2e
--- /dev/null
+++ b/fs/ceph/osd_client.c
@@ -0,0 +1,1173 @@
+#include <linux/err.h>
+#include <linux/highmem.h>
+#include <linux/mm.h>
+#include <linux/pagemap.h>
+#include <linux/slab.h>
+#include <linux/uaccess.h>
+
+#include "ceph_debug.h"
+
+int ceph_debug_osdc __read_mostly = -1;
+#define DOUT_MASK DOUT_MASK_OSDC
+#define DOUT_VAR ceph_debug_osdc
+#include "super.h"
+
+#include "osd_client.h"
+#include "messenger.h"
+#include "crush/mapper.h"
+#include "decode.h"
+
+
+/*
+ * calculate the mapping of a file extent onto an object, and fill out the
+ * request accordingly.  shorten extent as necessary if it crosses an
+ * object boundary.
+ */
+static void calc_layout(struct ceph_osd_client *osdc,
+			struct ceph_vino vino, struct ceph_file_layout *layout,
+			u64 off, u64 *plen,
+			struct ceph_osd_request *req)
+{
+	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
+	struct ceph_osd_op *op = (void *)(reqhead + 1);
+	u64 orig_len = *plen;
+	u64 objoff, objlen;    /* extent in object */
+
+	/* object extent? */
+	reqhead->oid.ino = cpu_to_le64(vino.ino);
+	reqhead->oid.snap = cpu_to_le64(vino.snap);
+
+	calc_file_object_mapping(layout, off, plen, &reqhead->oid,
+				 &objoff, &objlen);
+	if (*plen < orig_len)
+		dout(10, " skipping last %llu, final file extent %llu~%llu\n",
+		     orig_len - *plen, off, *plen);
+	op->offset = cpu_to_le64(objoff);
+	op->length = cpu_to_le64(objlen);
+	req->r_num_pages = calc_pages_for(off, *plen);
+
+	/* pgid? */
+	calc_object_layout(&reqhead->layout, &reqhead->oid, layout,
+			   osdc->osdmap);
+
+	dout(10, "calc_layout %llx.%08x %llu~%llu pgid %llx (%d pages)\n",
+	     le64_to_cpu(reqhead->oid.ino), le32_to_cpu(reqhead->oid.bno),
+	     objoff, objlen, le64_to_cpu(reqhead->layout.ol_pgid),
+	     req->r_num_pages);
+}
+
+
+/*
+ * requests
+ */
+static void get_request(struct ceph_osd_request *req)
+{
+	atomic_inc(&req->r_ref);
+}
+
+void ceph_osdc_put_request(struct ceph_osd_request *req)
+{
+	dout(10, "put_request %p %d -> %d\n", req, atomic_read(&req->r_ref),
+	     atomic_read(&req->r_ref)-1);
+	BUG_ON(atomic_read(&req->r_ref) <= 0);
+	if (atomic_dec_and_test(&req->r_ref)) {
+		if (req->r_request)
+			ceph_msg_put(req->r_request);
+		if (req->r_reply)
+			ceph_msg_put(req->r_reply);
+		ceph_put_snap_context(req->r_snapc);
+		kfree(req);
+	}
+}
+
+/*
+ * build new request AND message, calculate layout, and adjust file
+ * extent as needed.  include addition truncate or sync osd ops.
+ */
+struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
+					       struct ceph_file_layout *layout,
+					       struct ceph_vino vino,
+					       u64 off, u64 *plen, int opcode,
+					       struct ceph_snap_context *snapc,
+					       int do_sync,
+					       u32 truncate_seq,
+					       u64 truncate_size)
+{
+	struct ceph_osd_request *req;
+	struct ceph_msg *msg;
+	int num_pages = calc_pages_for(off, *plen);
+	struct ceph_osd_request_head *head;
+	struct ceph_osd_op *op;
+	__le64 *snaps;
+	int do_trunc = truncate_seq && (off + *plen > truncate_size);
+	int num_op = 1 + do_sync + do_trunc;
+	size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
+	int i;
+	u64 prevofs;
+
+	/* we may overallocate here, if our write extent is shortened below */
+	req = kzalloc(sizeof(*req) + num_pages*sizeof(void *), GFP_NOFS);
+	if (req == NULL)
+		return ERR_PTR(-ENOMEM);
+
+	/* create message */
+	if (snapc)
+		msg_size += sizeof(u64) * snapc->num_snaps;
+	msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
+	if (IS_ERR(msg)) {
+		kfree(req);
+		return ERR_PTR(PTR_ERR(msg));
+	}
+	memset(msg->front.iov_base, 0, msg->front.iov_len);
+	head = msg->front.iov_base;
+	op = (void *)(head + 1);
+	snaps = (void *)(op + num_op);
+
+	head->client_inc = cpu_to_le32(1); /* always, for now. */
+	head->flags = 0;
+	head->num_ops = cpu_to_le16(num_op);
+	op->op = cpu_to_le16(opcode);
+
+	req->r_request = msg;
+	req->r_snapc = ceph_get_snap_context(snapc);
+
+	/* calculate max write size, pgid */
+	calc_layout(osdc, vino, layout, off, plen, req);
+	req->r_pgid.pg64 = le64_to_cpu(head->layout.ol_pgid);
+
+	/* additional ops */
+	if (do_trunc) {
+		op++;
+		op->op = cpu_to_le16(opcode == CEPH_OSD_OP_READ ? 
+			     CEPH_OSD_OP_MASKTRUNC : CEPH_OSD_OP_SETTRUNC);
+		op->truncate_seq = cpu_to_le32(truncate_seq);
+		prevofs =  le64_to_cpu((op-1)->offset);
+		op->truncate_size = cpu_to_le64(truncate_size - (off - prevofs));
+	}
+	if (do_sync) {
+		op++;
+		op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
+	}
+	if (snapc) {
+		head->snap_seq = cpu_to_le64(snapc->seq);
+		head->num_snaps = cpu_to_le32(snapc->num_snaps);
+		for (i = 0; i < snapc->num_snaps; i++)
+			snaps[i] = cpu_to_le64(snapc->snaps[i]);
+	}
+
+	atomic_set(&req->r_ref, 1);
+	init_completion(&req->r_completion);
+	return req;
+}
+
+
+/*
+ * Register request, assign tid.  If this is the first request, set up
+ * the timeout event.
+ */
+static int register_request(struct ceph_osd_client *osdc,
+			    struct ceph_osd_request *req)
+{
+	struct ceph_osd_request_head *head = req->r_request->front.iov_base;
+	int rc;
+
+	mutex_lock(&osdc->request_mutex);
+	req->r_tid = ++osdc->last_tid;
+	head->tid = cpu_to_le64(req->r_tid);
+
+	dout(30, "register_request %p tid %lld\n", req, req->r_tid);
+	rc = radix_tree_insert(&osdc->request_tree, req->r_tid, (void *)req);
+	if (rc < 0)
+		goto out;
+
+	get_request(req);
+	osdc->num_requests++;
+
+	req->r_timeout_stamp =
+		jiffies + osdc->client->mount_args.osd_timeout*HZ;
+
+	if (osdc->num_requests == 1) {
+		osdc->timeout_tid = req->r_tid;
+		dout(30, "  timeout on tid %llu at %lu\n", req->r_tid,
+		     req->r_timeout_stamp);
+		schedule_delayed_work(&osdc->timeout_work,
+		      round_jiffies_relative(req->r_timeout_stamp - jiffies));
+	}
+
+out:
+	mutex_unlock(&osdc->request_mutex);
+	return rc;
+}
+
+/*
+ * Timeout callback, called every N seconds when 1 or more osd
+ * requests has been active for more than N seconds.  When this
+ * happens, we ping all OSDs with requests who have timed out to
+ * ensure any communications channel reset is detected.  Reset the
+ * request timeouts another N seconds in the future as we go.
+ * Reschedule the timeout event another N seconds in future (unless
+ * there are no open requests).
+ */
+static void handle_timeout(struct work_struct *work)
+{
+	struct ceph_osd_client *osdc =
+		container_of(work, struct ceph_osd_client, timeout_work.work);
+	struct ceph_osd_request *req;
+	unsigned long timeout = osdc->client->mount_args.osd_timeout * HZ;
+	unsigned long next_timeout = timeout + jiffies; 
+	RADIX_TREE(pings, GFP_NOFS);  /* only send 1 ping per osd */
+	u64 next_tid = 0;
+	int got;
+
+	dout(10, "timeout\n");
+	down_read(&osdc->map_sem);
+
+	ceph_monc_request_osdmap(&osdc->client->monc, osdc->osdmap->epoch+1);
+
+	mutex_lock(&osdc->request_mutex);
+	while (1) {
+		got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
+					     next_tid, 1);
+		if (got == 0)
+			break;
+		next_tid = req->r_tid + 1;
+		if (time_before(jiffies, req->r_timeout_stamp))
+			goto next;
+
+		req->r_timeout_stamp = next_timeout;
+		if (req->r_last_osd >= 0 &&
+		    radix_tree_lookup(&pings, req->r_last_osd) == NULL) {
+			struct ceph_entity_name n = {
+				.type = cpu_to_le32(CEPH_ENTITY_TYPE_OSD),
+				.num = cpu_to_le32(req->r_last_osd)
+			};
+			dout(20, " tid %llu (at least) timed out on osd%d\n",
+			     req->r_tid, req->r_last_osd);
+			radix_tree_insert(&pings, req->r_last_osd, req);
+			ceph_ping(osdc->client->msgr, n, &req->r_last_osd_addr);
+		}
+
+	next:
+		got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
+					     next_tid, 1);
+	}
+
+	while (radix_tree_gang_lookup(&pings, (void **)&req, 0, 1))
+		radix_tree_delete(&pings, req->r_last_osd);
+
+	if (osdc->timeout_tid)
+		schedule_delayed_work(&osdc->timeout_work,
+				      round_jiffies_relative(timeout));
+
+	mutex_unlock(&osdc->request_mutex);
+
+	up_read(&osdc->map_sem);
+}
+
+/*
+ * called under osdc->request_mutex
+ */
+static void __unregister_request(struct ceph_osd_client *osdc,
+				 struct ceph_osd_request *req)
+{
+	dout(30, "__unregister_request %p tid %lld\n", req, req->r_tid);
+	radix_tree_delete(&osdc->request_tree, req->r_tid);
+
+	osdc->num_requests--;
+	ceph_osdc_put_request(req);
+
+	if (req->r_tid == osdc->timeout_tid) {
+		if (osdc->num_requests == 0) {
+			dout(30, "no requests, canceling timeout\n");
+			osdc->timeout_tid = 0;
+			cancel_delayed_work(&osdc->timeout_work);
+		} else {
+			struct ceph_osd_request *req;
+			int ret;
+
+			ret = radix_tree_gang_lookup(&osdc->request_tree,
+						     (void **)&req, 0, 1);
+			BUG_ON(ret != 1);
+			osdc->timeout_tid = req->r_tid;
+			dout(30, "rescheduled timeout on tid %llu at %lu\n",
+			     req->r_tid, req->r_timeout_stamp);
+			schedule_delayed_work(&osdc->timeout_work,
+			      round_jiffies_relative(req->r_timeout_stamp -
+						     jiffies));
+		}
+	}
+}
+
+/*
+ * pick an osd.  the first up osd in the pg.  or -1.
+ * caller should hold map_sem for read.
+ */
+static int map_osds(struct ceph_osd_client *osdc,
+		    struct ceph_osd_request *req)
+{
+	int ruleno;
+	unsigned pps; /* placement ps */
+	int osds[10], osd = -1;
+	int i, num;
+
+	ruleno = crush_find_rule(osdc->osdmap->crush, req->r_pgid.pg.pool,
+				 req->r_pgid.pg.type, req->r_pgid.pg.size);
+	if (ruleno < 0) {
+		derr(0, "map_osds no crush rule for pool %d type %d size %d\n",
+		     req->r_pgid.pg.pool, req->r_pgid.pg.type,
+		     req->r_pgid.pg.size);
+		return -1;
+	}
+
+	if (req->r_pgid.pg.preferred >= 0)
+		pps = ceph_stable_mod(req->r_pgid.pg.ps,
+				     osdc->osdmap->lpgp_num,
+				     osdc->osdmap->lpgp_num_mask);
+	else
+		pps = ceph_stable_mod(req->r_pgid.pg.ps,
+				     osdc->osdmap->pgp_num,
+				     osdc->osdmap->pgp_num_mask);
+	num = crush_do_rule(osdc->osdmap->crush, ruleno, pps, osds,
+			    min_t(int, req->r_pgid.pg.size, ARRAY_SIZE(osds)),
+			    req->r_pgid.pg.preferred, osdc->osdmap->osd_weight);
+
+	/* primary is first up osd */
+	for (i = 0; i < num; i++)
+		if (ceph_osd_is_up(osdc->osdmap, osds[i])) {
+			osd = osds[i];
+			break;
+		}
+	if (req->r_last_osd == osd)
+		return 0;
+	req->r_last_osd = osd;
+	return 1;
+}
+
+/*
+ * caller should hold map_sem (for read)
+ */
+static int send_request(struct ceph_osd_client *osdc,
+			struct ceph_osd_request *req)
+{
+	struct ceph_osd_request_head *reqhead;
+	int osd;
+
+	map_osds(osdc, req);
+	if (req->r_last_osd < 0) {
+		dout(10, "send_request %p no up osds in pg\n", req);
+		ceph_monc_request_osdmap(&osdc->client->monc,
+					 osdc->osdmap->epoch+1);
+		return 0;
+	}
+	osd = req->r_last_osd;
+
+	dout(10, "send_request %p tid %llu to osd%d flags %d\n",
+	     req, req->r_tid, osd, req->r_flags);
+
+	reqhead = req->r_request->front.iov_base;
+	reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
+	reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
+
+	req->r_request->hdr.dst.name.type =
+		cpu_to_le32(CEPH_ENTITY_TYPE_OSD);
+	req->r_request->hdr.dst.name.num = cpu_to_le32(osd);
+	req->r_request->hdr.dst.addr = osdc->osdmap->osd_addr[osd];
+
+	req->r_last_osd_addr = req->r_request->hdr.dst.addr;
+	req->r_timeout_stamp = jiffies+osdc->client->mount_args.osd_timeout*HZ;
+
+	ceph_msg_get(req->r_request); /* send consumes a ref */
+	return ceph_msg_send(osdc->client->msgr, req->r_request,
+			     BASE_DELAY_INTERVAL);
+}
+
+/*
+ * handle osd op reply.  either call the callback if it is specified,
+ * or do the completion to wake up the waiting thread.
+ */
+void ceph_osdc_handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+{
+	struct ceph_osd_reply_head *rhead = msg->front.iov_base;
+	struct ceph_osd_request *req;
+	u64 tid;
+	int numops;
+
+	if (msg->front.iov_len < sizeof(*rhead))
+		goto bad;
+	tid = le64_to_cpu(rhead->tid);
+	numops = le32_to_cpu(rhead->num_ops);
+	if (msg->front.iov_len != sizeof(*rhead) +
+	    numops * sizeof(struct ceph_osd_op))
+		goto bad;
+	dout(10, "handle_reply %p tid %llu\n", msg, tid);
+
+	/* lookup */
+	mutex_lock(&osdc->request_mutex);
+	req = radix_tree_lookup(&osdc->request_tree, tid);
+	if (req == NULL) {
+		dout(10, "handle_reply tid %llu dne\n", tid);
+		mutex_unlock(&osdc->request_mutex);
+		return;
+	}
+	get_request(req);
+	if (req->r_reply == NULL) {
+		/* no data payload, or r_reply would have been set by
+		   prepare_pages. */
+		ceph_msg_get(msg);
+		req->r_reply = msg;
+	} else if (req->r_reply == msg) {
+		/* r_reply was set by prepare_pages; now it's fully read. */
+	} else {
+		dout(10, "handle_reply tid %llu already had reply?\n", tid);
+		goto done;
+	}
+	dout(10, "handle_reply tid %llu flags %d\n", tid,
+	     le32_to_cpu(rhead->flags));
+	__unregister_request(osdc, req);
+	mutex_unlock(&osdc->request_mutex);
+
+	if (req->r_callback)
+		req->r_callback(req);
+	else
+		complete(&req->r_completion);  /* see do_sync_request */
+done:
+	ceph_osdc_put_request(req);
+	return;
+
+bad:
+	derr(0, "got corrupt osd_op_reply got %d %d expected %d\n",
+	     (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
+	     (int)sizeof(*rhead));
+}
+
+
+/*
+ * Resubmit osd requests whose osd or osd address has changed.  Request
+ * a new osd map if osds are down, or we are otherwise unable to determine
+ * how to direct a request.
+ *
+ * If @who is specified, resubmit requests for that specific osd.
+ *
+ * Caller should hold map_sem for read.
+ */
+static void kick_requests(struct ceph_osd_client *osdc,
+			  struct ceph_entity_addr *who)
+{
+	struct ceph_osd_request *req;
+	u64 next_tid = 0;
+	int got;
+	int needmap = 0;
+
+	mutex_lock(&osdc->request_mutex);
+	while (1) {
+		got = radix_tree_gang_lookup(&osdc->request_tree, (void **)&req,
+					     next_tid, 1);
+		if (got == 0)
+			break;
+		next_tid = req->r_tid + 1;
+
+		if (map_osds(osdc, req) == 0)
+			continue;  /* no change */
+
+		if (req->r_last_osd < 0) {
+			dout(20, "tid %llu maps to no valid osd\n", req->r_tid);
+			needmap++;  /* request a newer map */
+			memset(&req->r_last_osd_addr, 0,
+			       sizeof(req->r_last_osd_addr));
+			continue;
+		}
+
+		dout(20, "kicking tid %llu osd%d\n", req->r_tid,
+		     req->r_last_osd);
+		get_request(req);
+		mutex_unlock(&osdc->request_mutex);
+		req->r_request = ceph_msg_maybe_dup(req->r_request);
+		if (!req->r_aborted) {
+			req->r_flags |= CEPH_OSD_OP_RETRY;
+			send_request(osdc, req);
+		}
+		ceph_osdc_put_request(req);
+		mutex_lock(&osdc->request_mutex);
+	}
+	mutex_unlock(&osdc->request_mutex);
+
+	if (needmap) {
+		dout(10, "%d requests for down osds, need new map\n", needmap);
+		ceph_monc_request_osdmap(&osdc->client->monc,
+					 osdc->osdmap->epoch+1);
+	}
+}
+
+/*
+ * Process updated osd map.
+ *
+ * The message contains any number of incremental and full maps.
+ */
+void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+{
+	void *p, *end, *next;
+	u32 nr_maps, maplen;
+	u32 epoch;
+	struct ceph_osdmap *newmap = NULL, *oldmap;
+	int err;
+	ceph_fsid_t fsid;
+	__le64 major, minor;
+
+	dout(2, "handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
+	p = msg->front.iov_base;
+	end = p + msg->front.iov_len;
+
+	/* verify fsid */
+	ceph_decode_need(&p, end, sizeof(fsid), bad);
+	ceph_decode_64_le(&p, major);
+	__ceph_fsid_set_major(&fsid, major);
+	ceph_decode_64_le(&p, minor);
+	__ceph_fsid_set_minor(&fsid, minor);
+	if (ceph_fsid_compare(&fsid, &osdc->client->monc.monmap->fsid)) {
+		derr(0, "got map with wrong fsid, ignoring\n");
+		return;
+	}
+
+	down_write(&osdc->map_sem);
+
+	/* incremental maps */
+	ceph_decode_32_safe(&p, end, nr_maps, bad);
+	dout(10, " %d inc maps\n", nr_maps);
+	while (nr_maps > 0) {
+		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
+		ceph_decode_32(&p, epoch);
+		ceph_decode_32(&p, maplen);
+		ceph_decode_need(&p, end, maplen, bad);
+		next = p + maplen;
+		if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
+			dout(10, "applying incremental map %u len %d\n",
+			     epoch, maplen);
+			newmap = apply_incremental(&p, next, osdc->osdmap,
+						   osdc->client->msgr);
+			if (IS_ERR(newmap)) {
+				err = PTR_ERR(newmap);
+				goto bad;
+			}
+			if (newmap != osdc->osdmap) {
+				osdmap_destroy(osdc->osdmap);
+				osdc->osdmap = newmap;
+			}
+		} else {
+			dout(10, "ignoring incremental map %u len %d\n",
+			     epoch, maplen);
+		}
+		p = next;
+		nr_maps--;
+	}
+	if (newmap)
+		goto done;
+
+	/* full maps */
+	ceph_decode_32_safe(&p, end, nr_maps, bad);
+	dout(30, " %d full maps\n", nr_maps);
+	while (nr_maps) {
+		ceph_decode_need(&p, end, 2*sizeof(u32), bad);
+		ceph_decode_32(&p, epoch);
+		ceph_decode_32(&p, maplen);
+		ceph_decode_need(&p, end, maplen, bad);
+		if (nr_maps > 1) {
+			dout(5, "skipping non-latest full map %u len %d\n",
+			     epoch, maplen);
+		} else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
+			dout(10, "skipping full map %u len %d, "
+			     "older than our %u\n", epoch, maplen,
+			     osdc->osdmap->epoch);
+		} else {
+			dout(10, "taking full map %u len %d\n", epoch, maplen);
+			newmap = osdmap_decode(&p, p+maplen);
+			if (IS_ERR(newmap)) {
+				err = PTR_ERR(newmap);
+				goto bad;
+			}
+			oldmap = osdc->osdmap;
+			osdc->osdmap = newmap;
+			if (oldmap)
+				osdmap_destroy(oldmap);
+		}
+		p += maplen;
+		nr_maps--;
+	}
+
+done:
+	downgrade_write(&osdc->map_sem);
+	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
+	if (newmap)
+		kick_requests(osdc, NULL);
+	up_read(&osdc->map_sem);
+	return;
+
+bad:
+	derr(1, "handle_map corrupt msg\n");
+	up_write(&osdc->map_sem);
+	return;
+}
+
+/*
+ * If we detect that a tcp connection to an osd resets, we need to
+ * resubmit all requests for that osd.  That's because although we reliably
+ * deliver our requests, the osd doesn't not try as hard to deliver the
+ * reply (because it does not get notification when clients, mds' leave
+ * the cluster).
+ */
+void ceph_osdc_handle_reset(struct ceph_osd_client *osdc,
+			    struct ceph_entity_addr *addr)
+{
+	down_read(&osdc->map_sem);
+	kick_requests(osdc, addr);
+	up_read(&osdc->map_sem);
+}
+
+
+/*
+ * A read request prepares specific pages that data is to be read into.
+ * When a message is being read off the wire, we call prepare_pages to
+ * find those pages.
+ *  0 = success, -1 failure.
+ */
+int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want)
+{
+	struct ceph_client *client = p;
+	struct ceph_osd_client *osdc = &client->osdc;
+	struct ceph_osd_reply_head *rhead = m->front.iov_base;
+	struct ceph_osd_request *req;
+	u64 tid;
+	int ret = -1;
+	int type = le16_to_cpu(m->hdr.type);
+
+	dout(10, "prepare_pages on msg %p want %d\n", m, want);
+	if (unlikely(type != CEPH_MSG_OSD_OPREPLY))
+		return -1;  /* hmm! */
+
+	tid = le64_to_cpu(rhead->tid);
+	mutex_lock(&osdc->request_mutex);
+	req = radix_tree_lookup(&osdc->request_tree, tid);
+	if (!req) {
+		dout(10, "prepare_pages unknown tid %llu\n", tid);
+		goto out;
+	}
+	dout(10, "prepare_pages tid %llu has %d pages, want %d\n",
+	     tid, req->r_num_pages, want);
+	if (likely(req->r_num_pages >= want && req->r_reply == NULL)) {
+		m->pages = req->r_pages;
+		m->nr_pages = req->r_num_pages;
+		ceph_msg_get(m);
+		req->r_reply = m;
+		ret = 0; /* success */
+	}
+out:
+	mutex_unlock(&osdc->request_mutex);
+	return ret;
+}
+
+/*
+ * Register request, send initial attempt.
+ */
+static int start_request(struct ceph_osd_client *osdc,
+			 struct ceph_osd_request *req)
+{
+	int rc;
+
+	rc = register_request(osdc, req);
+	if (rc < 0)
+		return rc;
+	down_read(&osdc->map_sem);
+	rc = send_request(osdc, req);
+	up_read(&osdc->map_sem);
+	return rc;
+}
+
+/*
+ * synchronously do an osd request.
+ *
+ * If we are interrupted, take our pages away from any previous sent
+ * request message that may still be being written to the socket.
+ */
+static int do_sync_request(struct ceph_osd_client *osdc,
+			   struct ceph_osd_request *req)
+{
+	struct ceph_osd_reply_head *replyhead;
+	__s32 rc;
+	int bytes;
+
+	rc = start_request(osdc, req);	/* register+send request */
+	if (rc)
+		return rc;
+
+	rc = wait_for_completion_interruptible(&req->r_completion);
+	if (rc < 0) {
+		struct ceph_msg *msg;
+
+		dout(0, "tid %llu err %d, revoking %p pages\n", req->r_tid,
+		     rc, req->r_request);
+		/*
+		 * we were interrupted.
+		 *
+		 * mark req aborted _before_ revoking pages, so that
+		 * if a racing kick_request _does_ dup the page vec
+		 * pointer, it will definitely then see the aborted
+		 * flag and not send the request.
+		 */
+		req->r_aborted = 1;
+		msg = req->r_request;
+		mutex_lock(&msg->page_mutex);
+		msg->pages = NULL;
+		mutex_unlock(&msg->page_mutex);
+		if (req->r_reply) {
+			mutex_lock(&req->r_reply->page_mutex);
+			req->r_reply->pages = NULL;
+			mutex_unlock(&req->r_reply->page_mutex);
+		}
+		return rc;
+	}
+
+	/* parse reply */
+	replyhead = req->r_reply->front.iov_base;
+	rc = le32_to_cpu(replyhead->result);
+	bytes = le32_to_cpu(req->r_reply->hdr.data_len);
+	dout(10, "do_sync_request tid %llu result %d, %d bytes\n",
+	     req->r_tid, rc, bytes);
+	if (rc < 0)
+		return rc;
+	return bytes;
+}
+/*
+ * init, shutdown
+ */
+void ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
+{
+	dout(5, "init\n");
+	osdc->client = client;
+	osdc->osdmap = NULL;
+	init_rwsem(&osdc->map_sem);
+	init_completion(&osdc->map_waiters);
+	osdc->last_requested_map = 0;
+	mutex_init(&osdc->request_mutex);
+	osdc->timeout_tid = 0;
+	osdc->last_tid = 0;
+	INIT_RADIX_TREE(&osdc->request_tree, GFP_NOFS);
+	osdc->num_requests = 0;
+	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
+}
+
+void ceph_osdc_stop(struct ceph_osd_client *osdc)
+{
+	cancel_delayed_work_sync(&osdc->timeout_work);
+	if (osdc->osdmap) {
+		osdmap_destroy(osdc->osdmap);
+		osdc->osdmap = NULL;
+	}
+}
+
+
+
+/*
+ * synchronous read direct to user buffer.
+ *
+ * if read spans object boundary, just do two separate reads.
+ *
+ * FIXME: for a correct atomic read, we should take read locks on all
+ * objects.
+ */
+int ceph_osdc_sync_read(struct ceph_osd_client *osdc, struct ceph_vino vino,
+			struct ceph_file_layout *layout,
+			u64 off, u64 len,
+			u32 truncate_seq, u64 truncate_size,
+			char __user *data)
+{
+	struct ceph_osd_request *req;
+	int i, po, left, l;
+	int rc;
+	int finalrc = 0;
+
+	dout(10, "sync_read on vino %llx.%llx at %llu~%llu\n", vino.ino,
+	     vino.snap, off, len);
+
+more:
+	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+				    CEPH_OSD_OP_READ, NULL, 0,
+				    truncate_seq, truncate_size);
+	if (IS_ERR(req))
+		return PTR_ERR(req);
+
+	dout(10, "sync_read %llu~%llu -> %d pages\n", off, len,
+	     req->r_num_pages);
+
+	/* allocate temp pages to hold data */
+	for (i = 0; i < req->r_num_pages; i++) {
+		req->r_pages[i] = alloc_page(GFP_NOFS);
+		if (req->r_pages[i] == NULL) {
+			req->r_num_pages = i+1;
+			ceph_osdc_put_request(req);
+			return -ENOMEM;
+		}
+	}
+
+	rc = do_sync_request(osdc, req);
+	if (rc > 0) {
+		/* copy into user buffer */
+		po = off & ~PAGE_CACHE_MASK;
+		left = rc;
+		i = 0;
+		while (left > 0) {
+			int bad;
+			l = min_t(int, left, PAGE_CACHE_SIZE-po);
+			bad = copy_to_user(data,
+					   page_address(req->r_pages[i]) + po,
+					   l);
+			if (bad == l) {
+				rc = -EFAULT;
+				goto out;
+			}
+			data += l - bad;
+			left -= l - bad;
+			if (po) {
+				po += l - bad;
+				if (po == PAGE_CACHE_SIZE)
+					po = 0;
+			}
+			i++;
+		}
+	}
+out:
+	ceph_osdc_put_request(req);
+	if (rc > 0) {
+		finalrc += rc;
+		off += rc;
+		len -= rc;
+		if (len > 0)
+			goto more;
+	} else {
+		finalrc = rc;
+	}
+	dout(10, "sync_read result %d\n", finalrc);
+	return finalrc;
+}
+
+/*
+ * Read a single page.  Return number of bytes read (or zeroed).
+ */
+int ceph_osdc_readpage(struct ceph_osd_client *osdc, struct ceph_vino vino,
+		       struct ceph_file_layout *layout,
+		       u64 off, u64 len,
+		       u32 truncate_seq, u64 truncate_size,
+		       struct page *page)
+{
+	struct ceph_osd_request *req;
+	int rc, read = 0;
+
+	dout(10, "readpage on ino %llx.%llx at %lld~%lld\n", vino.ino,
+	     vino.snap, off, len);
+	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+				    CEPH_OSD_OP_READ, NULL, 0,
+				    truncate_seq, truncate_size);
+	if (IS_ERR(req))
+		return PTR_ERR(req);
+	BUG_ON(len != PAGE_CACHE_SIZE);
+
+	req->r_pages[0] = page;
+	rc = do_sync_request(osdc, req);
+
+	if (rc >= 0) {
+		read = rc;
+		rc = len;
+	} else if (rc == -ENOENT) {
+		rc = len;
+	}
+
+	if (read < PAGE_CACHE_SIZE) {
+		dout(10, "readpage zeroing %p from %d\n", page, read);
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
+		zero_user_segment(page, read, PAGE_CACHE_SIZE);
+#else
+		zero_user_page(page, read, PAGE_CACHE_SIZE-read, KM_USER0);
+#endif
+	}
+
+	ceph_osdc_put_request(req);
+	dout(10, "readpage result %d\n", rc);
+	return rc;
+}
+
+/*
+ * Read some contiguous pages from page_list.  Return number of bytes
+ * read (or zeroed).
+ */
+int ceph_osdc_readpages(struct ceph_osd_client *osdc,
+			struct address_space *mapping,
+			struct ceph_vino vino, struct ceph_file_layout *layout,
+			u64 off, u64 len,
+			u32 truncate_seq, u64 truncate_size,
+			struct list_head *page_list, int num_pages)
+{
+	struct ceph_osd_request *req;
+	struct ceph_osd_request_head *reqhead;
+	struct ceph_osd_op *op;
+	struct page *page;
+	pgoff_t next_index;
+	int contig_pages = 0;
+	int i = 0;
+	int rc = 0, read = 0;
+
+	/*
+	 * for now, our strategy is simple: start with the
+	 * initial page, and fetch as much of that object as
+	 * we can that falls within the range specified by
+	 * num_pages.
+	 */
+	dout(10, "readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
+	     vino.snap, off, len);
+
+	/* alloc request, w/ optimistically-sized page vector */
+	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+				    CEPH_OSD_OP_READ, NULL, 0,
+				    truncate_seq, truncate_size);
+	if (IS_ERR(req))
+		return PTR_ERR(req);
+
+	/* build vector from page_list */
+	next_index = list_entry(page_list->prev, struct page, lru)->index;
+	list_for_each_entry_reverse(page, page_list, lru) {
+		if (page->index == next_index) {
+			dout(20, "readpages page %d %p\n", contig_pages, page);
+			req->r_pages[contig_pages] = page;
+			contig_pages++;
+			next_index++;
+		} else {
+			break;
+		}
+	}
+	BUG_ON(!contig_pages);
+	len = min((contig_pages << PAGE_CACHE_SHIFT) - (off & ~PAGE_CACHE_MASK),
+		  len);
+	req->r_num_pages = contig_pages;
+	reqhead = req->r_request->front.iov_base;
+	op = (void *)(reqhead + 1);
+	op->length = cpu_to_le64(len);
+	dout(10, "readpages final extent is %llu~%llu -> %d pages\n",
+	     off, len, req->r_num_pages);
+	rc = do_sync_request(osdc, req);
+
+	if (rc >= 0) {
+		read = rc;
+		rc = len;
+	} else if (rc == -ENOENT) {
+		rc = len;
+	}
+
+	/* zero trailing pages on success */
+	if (read < (contig_pages << PAGE_CACHE_SHIFT)) {
+		if (read & ~PAGE_CACHE_MASK) {
+			i = read >> PAGE_CACHE_SHIFT;
+			page = req->r_pages[i];
+			dout(20, "readpages zeroing %d %p from %d\n", i, page,
+			     (int)(read & ~PAGE_CACHE_MASK));
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
+			zero_user_segment(page, read & ~PAGE_CACHE_MASK,
+					  PAGE_CACHE_SIZE);
+#else
+			zero_user_page(page, read & ~PAGE_CACHE_MASK,
+				       PAGE_CACHE_SIZE - (read & ~PAGE_CACHE_MASK),
+				       KM_USER0);
+#endif
+			read += PAGE_CACHE_SIZE;
+		}
+		for (i = read >> PAGE_CACHE_SHIFT; i < contig_pages; i++) {
+			page = req->r_pages[i];
+			dout(20, "readpages zeroing %d %p\n", i, page);
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2, 6, 25)
+			zero_user_segment(page, 0, PAGE_CACHE_SIZE);
+#else
+			zero_user_page(page, 0, PAGE_CACHE_SIZE, KM_USER0);
+#endif
+		}
+	}
+
+	ceph_osdc_put_request(req);
+	dout(10, "readpages result %d\n", rc);
+	return rc;
+}
+
+
+/*
+ * synchronous write.  from userspace.
+ *
+ * FIXME: if write spans object boundary, just do two separate write.
+ * for a correct atomic write, we should take write locks on all
+ * objects, rollback on failure, etc.
+ */
+int ceph_osdc_sync_write(struct ceph_osd_client *osdc, struct ceph_vino vino,
+			 struct ceph_file_layout *layout,
+			 struct ceph_snap_context *snapc,
+			 u64 off, u64 len,
+			 u32 truncate_seq, u64 truncate_size,
+			 const char __user *data)
+{
+	struct ceph_msg *reqm;
+	struct ceph_osd_request_head *reqhead;
+	struct ceph_osd_request *req;
+	int i, po, l, left;
+	int rc;
+	int finalrc = 0;
+
+	dout(10, "sync_write on ino %llx.%llx at %llu~%llu\n", vino.ino,
+	     vino.snap, off, len);
+
+more:
+	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+				    CEPH_OSD_OP_WRITE, snapc, 0,
+				    truncate_seq, truncate_size);
+	if (IS_ERR(req))
+		return PTR_ERR(req);
+	reqm = req->r_request;
+	reqhead = reqm->front.iov_base;
+	reqhead->flags =
+		cpu_to_le32(CEPH_OSD_OP_ACK |           /* ack for now, FIXME */
+			    CEPH_OSD_OP_ORDERSNAP |     /* EOLDSNAPC if ooo */
+			    CEPH_OSD_OP_MODIFY);
+
+	dout(10, "sync_write %llu~%llu -> %d pages\n", off, len,
+	     req->r_num_pages);
+
+	/* copy data into a set of pages */
+	left = len;
+	po = off & ~PAGE_MASK;
+	for (i = 0; i < req->r_num_pages; i++) {
+		int bad;
+		req->r_pages[i] = alloc_page(GFP_NOFS);
+		if (req->r_pages[i] == NULL) {
+			req->r_num_pages = i+1;
+			rc = -ENOMEM;
+			goto out;
+		}
+		l = min_t(int, PAGE_SIZE-po, left);
+		bad = copy_from_user(page_address(req->r_pages[i]) + po, data,
+				     l);
+		if (bad == l) {
+			req->r_num_pages = i+1;
+			rc = -EFAULT;
+			goto out;
+		}
+		data += l - bad;
+		left -= l - bad;
+		if (po) {
+			po += l - bad;
+			if (po == PAGE_CACHE_SIZE)
+				po = 0;
+		}
+	}
+	reqm->pages = req->r_pages;
+	reqm->nr_pages = req->r_num_pages;
+	reqm->hdr.data_len = cpu_to_le32(len);
+	reqm->hdr.data_off = cpu_to_le16(off);
+
+	rc = do_sync_request(osdc, req);
+out:
+	for (i = 0; i < req->r_num_pages; i++)
+		__free_pages(req->r_pages[i], 0);
+	ceph_osdc_put_request(req);
+	if (rc == 0) {
+		finalrc += len;
+		off += len;
+		len -= len;
+		if (len > 0)
+			goto more;
+	} else {
+		finalrc = rc;
+	}
+	dout(10, "sync_write result %d\n", finalrc);
+	return finalrc;
+}
+
+/*
+ * do a sync write for N pages
+ */
+int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
+			 struct ceph_file_layout *layout,
+			 struct ceph_snap_context *snapc,
+			 u64 off, u64 len,
+			 u32 truncate_seq, u64 truncate_size,
+			 struct page **pages, int num_pages)
+{
+	struct ceph_msg *reqm;
+	struct ceph_osd_request_head *reqhead;
+	struct ceph_osd_op *op;
+	struct ceph_osd_request *req;
+	int rc = 0;
+	int flags;
+
+	BUG_ON(vino.snap != CEPH_NOSNAP);
+
+	req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
+				    CEPH_OSD_OP_WRITE, snapc, 0,
+				    truncate_seq, truncate_size);
+	if (IS_ERR(req))
+		return PTR_ERR(req);
+	reqm = req->r_request;
+	reqhead = reqm->front.iov_base;
+	op = (void *)(reqhead + 1);
+
+	flags = CEPH_OSD_OP_MODIFY;
+	if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK)
+		flags |= CEPH_OSD_OP_ACK;
+	else
+		flags |= CEPH_OSD_OP_ONDISK;
+	reqhead->flags = cpu_to_le32(flags);
+
+	len = le64_to_cpu(op->length);
+	dout(10, "writepages %llu~%llu -> %d pages\n", off, len,
+	     req->r_num_pages);
+
+	/* copy page vector */
+	memcpy(req->r_pages, pages, req->r_num_pages * sizeof(struct page *));
+	reqm->pages = req->r_pages;
+	reqm->nr_pages = req->r_num_pages;
+	reqm->hdr.data_len = cpu_to_le32(len);
+	reqm->hdr.data_off = cpu_to_le16(off);
+
+	rc = do_sync_request(osdc, req);
+	ceph_osdc_put_request(req);
+	if (rc == 0)
+		rc = len;
+	dout(10, "writepages result %d\n", rc);
+	return rc;
+}
+
+/*
+ * start an async multipage write
+ */
+int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
+			       struct ceph_osd_request *req,
+			       u64 len, int num_pages)
+{
+	struct ceph_msg *reqm = req->r_request;
+	struct ceph_osd_request_head *reqhead = reqm->front.iov_base;
+	struct ceph_osd_op *op = (void *)(reqhead + 1);
+	u64 off = le64_to_cpu(op->offset);
+	int rc;
+	int flags;
+
+	dout(10, "writepages_start %llu~%llu, %d pages\n", off, len, num_pages);
+
+	flags = CEPH_OSD_OP_MODIFY;
+	if (osdc->client->mount_args.flags & CEPH_MOUNT_UNSAFE_WRITEBACK)
+		flags |= CEPH_OSD_OP_ACK;
+	else
+		flags |= CEPH_OSD_OP_ONDISK;
+	reqhead->flags = cpu_to_le32(flags);
+	op->length = cpu_to_le64(len);
+
+	/* reference pages in message */
+	reqm->pages = req->r_pages;
+	reqm->nr_pages = req->r_num_pages = num_pages;
+	reqm->hdr.data_len = cpu_to_le32(len);
+	reqm->hdr.data_off = cpu_to_le16(off);
+
+	rc = start_request(osdc, req);
+	return rc;
+}
+
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
new file mode 100644
index 0000000..687ec78
--- /dev/null
+++ b/fs/ceph/osd_client.h
@@ -0,0 +1,142 @@
+#ifndef _FS_CEPH_OSD_CLIENT_H
+#define _FS_CEPH_OSD_CLIENT_H
+
+#include <linux/radix-tree.h>
+#include <linux/completion.h>
+
+#include "types.h"
+#include "osdmap.h"
+
+/*
+ * All data objects are stored within a cluster/cloud of OSDs, or
+ * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
+ * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
+ * remote daemons serving up and coordinating consistent and safe
+ * access to storage.
+ *
+ * Cluster membership and the mapping of data objects onto storage devices
+ * are described by the osd map.
+ *
+ * We keep track of pending OSD requests (read, write), resubmit
+ * requests to different OSDs when the cluster topology/data layout
+ * change, or retry the affected requests when the communications
+ * channel with an OSD is reset.
+ */
+
+struct ceph_msg;
+struct ceph_snap_context;
+struct ceph_osd_request;
+
+/*
+ * completion callback for async writepages
+ */
+typedef void (*ceph_osdc_callback_t)(struct ceph_osd_request *);
+
+/* an in-flight request */
+struct ceph_osd_request {
+	u64             r_tid;              /* unique for this client */
+	struct ceph_msg  *r_request;
+	struct ceph_msg  *r_reply;
+	int               r_result;
+	int               r_flags;     /* any additional flags for the osd */
+	int               r_aborted;   /* set if we cancel this request */
+
+	atomic_t          r_ref;
+	struct completion r_completion;       /* on completion, or... */
+	ceph_osdc_callback_t r_callback;      /* ...async callback. */
+	struct inode *r_inode;                /* needed for async write */
+	struct writeback_control *r_wbc;
+
+	int               r_last_osd;   /* pg osds */
+	struct ceph_entity_addr r_last_osd_addr;
+	unsigned long     r_timeout_stamp;
+
+	union ceph_pg     r_pgid;             /* placement group */
+	struct ceph_snap_context *r_snapc;    /* snap context for writes */
+	unsigned          r_num_pages;        /* size of page array (follows) */
+	struct page      *r_pages[0];         /* pages for data payload */
+};
+
+struct ceph_osd_client {
+	struct ceph_client     *client;
+
+	struct ceph_osdmap     *osdmap;       /* current map */
+	struct rw_semaphore    map_sem;
+	struct completion      map_waiters;
+	u64                    last_requested_map;
+
+	struct mutex           request_mutex;
+	u64                    timeout_tid;   /* tid of timeout triggering rq */
+	u64                    last_tid;      /* tid of last request */
+	struct radix_tree_root request_tree;  /* pending requests, by tid */
+	int                    num_requests;
+	struct delayed_work    timeout_work;
+};
+
+extern void ceph_osdc_init(struct ceph_osd_client *osdc,
+			   struct ceph_client *client);
+extern void ceph_osdc_stop(struct ceph_osd_client *osdc);
+
+extern void ceph_osdc_handle_reset(struct ceph_osd_client *osdc,
+				   struct ceph_entity_addr *addr);
+
+extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
+				   struct ceph_msg *msg);
+extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
+				 struct ceph_msg *msg);
+
+/* incoming read messages use this to discover which pages to read
+ * the data payload into. */
+extern int ceph_osdc_prepare_pages(void *p, struct ceph_msg *m, int want);
+
+extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
+				      struct ceph_file_layout *layout,
+				      struct ceph_vino vino,
+				      u64 offset, u64 *len, int op,
+				      struct ceph_snap_context *snapc,
+				      int do_sync, u32 truncate_seq,
+				      u64 truncate_size);
+extern void ceph_osdc_put_request(struct ceph_osd_request *req);
+
+extern int ceph_osdc_readpage(struct ceph_osd_client *osdc,
+			      struct ceph_vino vino,
+			      struct ceph_file_layout *layout,
+			      u64 off, u64 len,
+			      u32 truncate_seq, u64 truncate_size,
+			      struct page *page);
+extern int ceph_osdc_readpages(struct ceph_osd_client *osdc,
+			       struct address_space *mapping,
+			       struct ceph_vino vino,
+			       struct ceph_file_layout *layout,
+			       u64 off, u64 len,
+			       u32 truncate_seq, u64 truncate_size,
+			       struct list_head *page_list, int nr_pages);
+
+extern int ceph_osdc_writepages(struct ceph_osd_client *osdc,
+				struct ceph_vino vino,
+				struct ceph_file_layout *layout,
+				struct ceph_snap_context *sc,
+				u64 off, u64 len,
+				u32 truncate_seq, u64 truncate_size,
+				struct page **pagevec, int nr_pages);
+extern int ceph_osdc_writepages_start(struct ceph_osd_client *osdc,
+				      struct ceph_osd_request *req,
+				      u64 len,
+				      int nr_pages);
+
+extern int ceph_osdc_sync_read(struct ceph_osd_client *osdc,
+			       struct ceph_vino vino,
+			       struct ceph_file_layout *layout,
+			       u64 off, u64 len,
+			       u32 truncate_seq, u64 truncate_size,
+			       char __user *data);
+extern int ceph_osdc_sync_write(struct ceph_osd_client *osdc,
+				struct ceph_vino vino,
+				struct ceph_file_layout *layout,
+				struct ceph_snap_context *sc,
+				u64 off, u64 len,
+				u32 truncate_seq, u64 truncate_size,
+				const char __user *data);
+
+#endif
+
diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c
new file mode 100644
index 0000000..6be0984
--- /dev/null
+++ b/fs/ceph/osdmap.c
@@ -0,0 +1,641 @@
+
+#include <asm/div64.h>
+
+#include "super.h"
+#include "osdmap.h"
+#include "crush/hash.h"
+#include "decode.h"
+
+#include "ceph_debug.h"
+
+int ceph_debug_osdmap __read_mostly = -1;
+#define DOUT_MASK DOUT_MASK_OSDMAP
+#define DOUT_VAR ceph_debug_osdmap
+
+/* maps */
+
+static int calc_bits_of(unsigned t)
+{
+	int b = 0;
+	while (t) {
+		t = t >> 1;
+		b++;
+	}
+	return b;
+}
+
+/*
+ * the foo_mask is the smallest value 2^n-1 that is >= foo.
+ */
+static void calc_pg_masks(struct ceph_osdmap *map)
+{
+	map->pg_num_mask = (1 << calc_bits_of(map->pg_num-1)) - 1;
+	map->pgp_num_mask = (1 << calc_bits_of(map->pgp_num-1)) - 1;
+	map->lpg_num_mask = (1 << calc_bits_of(map->lpg_num-1)) - 1;
+	map->lpgp_num_mask = (1 << calc_bits_of(map->lpgp_num-1)) - 1;
+}
+
+/*
+ * decode crush map
+ */
+static int crush_decode_uniform_bucket(void **p, void *end,
+				       struct crush_bucket_uniform *b)
+{
+	int j;
+	dout(30, "crush_decode_uniform_bucket %p to %p\n", *p, end);
+	b->primes = kmalloc(b->h.size * sizeof(u32), GFP_NOFS);
+	if (b->primes == NULL)
+		return -ENOMEM;
+	ceph_decode_need(p, end, (1+b->h.size) * sizeof(u32), bad);
+	for (j = 0; j < b->h.size; j++)
+		ceph_decode_32(p, b->primes[j]);
+	ceph_decode_32(p, b->item_weight);
+	return 0;
+bad:
+	return -EINVAL;
+}
+
+static int crush_decode_list_bucket(void **p, void *end,
+				    struct crush_bucket_list *b)
+{
+	int j;
+	dout(30, "crush_decode_list_bucket %p to %p\n", *p, end);
+	b->item_weights = kmalloc(b->h.size * sizeof(u32), GFP_NOFS);
+	if (b->item_weights == NULL)
+		return -ENOMEM;
+	b->sum_weights = kmalloc(b->h.size * sizeof(u32), GFP_NOFS);
+	if (b->sum_weights == NULL)
+		return -ENOMEM;
+	ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
+	for (j = 0; j < b->h.size; j++) {
+		ceph_decode_32(p, b->item_weights[j]);
+		ceph_decode_32(p, b->sum_weights[j]);
+	}
+	return 0;
+bad:
+	return -EINVAL;
+}
+
+static int crush_decode_tree_bucket(void **p, void *end,
+				    struct crush_bucket_tree *b)
+{
+	int j;
+	dout(30, "crush_decode_tree_bucket %p to %p\n", *p, end);
+	b->node_weights = kmalloc(b->h.size * sizeof(u32), GFP_NOFS);
+	if (b->node_weights == NULL)
+		return -ENOMEM;
+	ceph_decode_need(p, end, b->h.size * sizeof(u32), bad);
+	for (j = 0; j < b->h.size; j++)
+		ceph_decode_32(p, b->node_weights[j]);
+	return 0;
+bad:
+	return -EINVAL;
+}
+
+static int crush_decode_straw_bucket(void **p, void *end,
+				     struct crush_bucket_straw *b)
+{
+	int j;
+	dout(30, "crush_decode_straw_bucket %p to %p\n", *p, end);
+	b->item_weights = kmalloc(b->h.size * sizeof(u32), GFP_NOFS);
+	if (b->item_weights == NULL)
+		return -ENOMEM;
+	b->straws = kmalloc(b->h.size * sizeof(u32), GFP_NOFS);
+	if (b->straws == NULL)
+		return -ENOMEM;
+	ceph_decode_need(p, end, 2 * b->h.size * sizeof(u32), bad);
+	for (j = 0; j < b->h.size; j++) {
+		ceph_decode_32(p, b->item_weights[j]);
+		ceph_decode_32(p, b->straws[j]);
+	}
+	return 0;
+bad:
+	return -EINVAL;
+}
+
+static struct crush_map *crush_decode(void *pbyval, void *end)
+{
+	struct crush_map *c;
+	int err = -EINVAL;
+	int i, j;
+	void **p = &pbyval;
+	void *start = pbyval;
+	u32 magic;
+
+	dout(30, "crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
+
+	c = kzalloc(sizeof(*c), GFP_NOFS);
+	if (c == NULL)
+		return ERR_PTR(-ENOMEM);
+
+	ceph_decode_need(p, end, 4*sizeof(u32), bad);
+	ceph_decode_32(p, magic);
+	if (magic != CRUSH_MAGIC) {
+		derr(0, "crush_decode magic %x != current %x\n",
+		     (unsigned)magic, (unsigned)CRUSH_MAGIC);
+		goto bad;
+	}
+	ceph_decode_32(p, c->max_buckets);
+	ceph_decode_32(p, c->max_rules);
+	ceph_decode_32(p, c->max_devices);
+
+	c->device_parents = kmalloc(c->max_devices * sizeof(u32), GFP_NOFS);
+	if (c->device_parents == NULL)
+		goto badmem;
+	c->bucket_parents = kmalloc(c->max_buckets * sizeof(u32), GFP_NOFS);
+	if (c->bucket_parents == NULL)
+		goto badmem;
+
+	c->buckets = kmalloc(c->max_buckets * sizeof(*c->buckets), GFP_NOFS);
+	if (c->buckets == NULL)
+		goto badmem;
+	c->rules = kmalloc(c->max_rules * sizeof(*c->rules), GFP_NOFS);
+	if (c->rules == NULL)
+		goto badmem;
+
+	/* buckets */
+	for (i = 0; i < c->max_buckets; i++) {
+		int size = 0;
+		u32 alg;
+		struct crush_bucket *b;
+
+		ceph_decode_32_safe(p, end, alg, bad);
+		if (alg == 0) {
+			c->buckets[i] = NULL;
+			continue;
+		}
+		dout(30, "crush_decode bucket %d off %x %p to %p\n",
+		     i, (int)(*p-start), *p, end);
+
+		switch (alg) {
+		case CRUSH_BUCKET_UNIFORM:
+			size = sizeof(struct crush_bucket_uniform);
+			break;
+		case CRUSH_BUCKET_LIST:
+			size = sizeof(struct crush_bucket_list);
+			break;
+		case CRUSH_BUCKET_TREE:
+			size = sizeof(struct crush_bucket_tree);
+			break;
+		case CRUSH_BUCKET_STRAW:
+			size = sizeof(struct crush_bucket_straw);
+			break;
+		default:
+			goto bad;
+		}
+		BUG_ON(size == 0);
+		b = c->buckets[i] = kzalloc(size, GFP_NOFS);
+		if (b == NULL)
+			goto badmem;
+
+		ceph_decode_need(p, end, 4*sizeof(u32), bad);
+		ceph_decode_32(p, b->id);
+		ceph_decode_16(p, b->type);
+		ceph_decode_16(p, b->alg);
+		ceph_decode_32(p, b->weight);
+		ceph_decode_32(p, b->size);
+
+		dout(30, "crush_decode bucket size %d off %x %p to %p\n",
+		     b->size, (int)(*p-start), *p, end);
+
+		b->items = kmalloc(b->size * sizeof(__s32), GFP_NOFS);
+		if (b->items == NULL)
+			goto badmem;
+
+		ceph_decode_need(p, end, b->size*sizeof(u32), bad);
+		for (j = 0; j < b->size; j++)
+			ceph_decode_32(p, b->items[j]);
+
+		switch (b->alg) {
+		case CRUSH_BUCKET_UNIFORM:
+			err = crush_decode_uniform_bucket(p, end,
+				  (struct crush_bucket_uniform *)b);
+			if (err < 0)
+				goto bad;
+			break;
+		case CRUSH_BUCKET_LIST:
+			err = crush_decode_list_bucket(p, end,
+			       (struct crush_bucket_list *)b);
+			if (err < 0)
+				goto bad;
+			break;
+		case CRUSH_BUCKET_TREE:
+			err = crush_decode_tree_bucket(p, end,
+				(struct crush_bucket_tree *)b);
+			if (err < 0)
+				goto bad;
+			break;
+		case CRUSH_BUCKET_STRAW:
+			err = crush_decode_straw_bucket(p, end,
+				(struct crush_bucket_straw *)b);
+			if (err < 0)
+				goto bad;
+			break;
+		}
+	}
+
+	/* rules */
+	dout(30, "rule vec is %p\n", c->rules);
+	for (i = 0; i < c->max_rules; i++) {
+		u32 yes;
+		struct crush_rule *r;
+
+		ceph_decode_32_safe(p, end, yes, bad);
+		if (!yes) {
+			dout(30, "crush_decode NO rule %d off %x %p to %p\n",
+			     i, (int)(*p-start), *p, end);
+			c->rules[i] = NULL;
+			continue;
+		}
+
+		dout(30, "crush_decode rule %d off %x %p to %p\n",
+		     i, (int)(*p-start), *p, end);
+
+		/* len */
+		ceph_decode_32_safe(p, end, yes, bad);
+
+		r = c->rules[i] = kmalloc(sizeof(*r) +
+					  yes*sizeof(struct crush_rule_step),
+					  GFP_NOFS);
+		if (r == NULL)
+			goto badmem;
+		dout(30, " rule %d is at %p\n", i, r);
+		r->len = yes;
+		ceph_decode_copy_safe(p, end, &r->mask, 4, bad); /* 4 u8's */
+		ceph_decode_need(p, end, r->len*3*sizeof(u32), bad);
+		for (j = 0; j < r->len; j++) {
+			ceph_decode_32(p, r->steps[j].op);
+			ceph_decode_32(p, r->steps[j].arg1);
+			ceph_decode_32(p, r->steps[j].arg2);
+		}
+	}
+
+	/* ignore trailing name maps. */
+
+	dout(30, "crush_decode success\n");
+	return c;
+
+badmem:
+	err = -ENOMEM;
+bad:
+	dout(30, "crush_decode fail %d\n", err);
+	crush_destroy(c);
+	return ERR_PTR(err);
+}
+
+
+/*
+ * osd map
+ */
+void osdmap_destroy(struct ceph_osdmap *map)
+{
+	dout(10, "osdmap_destroy %p\n", map);
+	if (map->crush)
+		crush_destroy(map->crush);
+	kfree(map->osd_state);
+	kfree(map->osd_weight);
+	kfree(map->osd_addr);
+	kfree(map);
+}
+
+/*
+ * adjust max osd value.  reallocate arrays.
+ */
+static int osdmap_set_max_osd(struct ceph_osdmap *map, int max)
+{
+	u8 *state;
+	struct ceph_entity_addr *addr;
+	u32 *weight;
+
+	state = kzalloc(max * sizeof(*state), GFP_NOFS);
+	addr = kzalloc(max * sizeof(*addr), GFP_NOFS);
+	weight = kzalloc(max * sizeof(*weight), GFP_NOFS);
+	if (state == NULL || addr == NULL || weight == NULL) {
+		kfree(state);
+		kfree(addr);
+		kfree(weight);
+		return -ENOMEM;
+	}
+
+	/* copy old? */
+	if (map->osd_state) {
+		memcpy(state, map->osd_state, map->max_osd*sizeof(*state));
+		memcpy(addr, map->osd_addr, map->max_osd*sizeof(*addr));
+		memcpy(weight, map->osd_weight, map->max_osd*sizeof(*weight));
+		kfree(map->osd_state);
+		kfree(map->osd_addr);
+		kfree(map->osd_weight);
+	}
+
+	map->osd_state = state;
+	map->osd_weight = weight;
+	map->osd_addr = addr;
+	map->max_osd = max;
+	return 0;
+}
+
+/*
+ * decode a full map.
+ */
+struct ceph_osdmap *osdmap_decode(void **p, void *end)
+{
+	struct ceph_osdmap *map;
+	u32 len, max, i;
+	int err = -EINVAL;
+	void *start = *p;
+	__le64 major, minor;
+
+	dout(30, "osdmap_decode %p to %p len %d\n", *p, end, (int)(end - *p));
+
+	map = kzalloc(sizeof(*map), GFP_NOFS);
+	if (map == NULL)
+		return ERR_PTR(-ENOMEM);
+
+	ceph_decode_need(p, end, 2*sizeof(u64)+11*sizeof(u32), bad);
+	ceph_decode_64_le(p, major);
+	__ceph_fsid_set_major(&map->fsid, major);
+	ceph_decode_64_le(p, minor);
+	__ceph_fsid_set_minor(&map->fsid, minor);
+	ceph_decode_32(p, map->epoch);
+	ceph_decode_32_le(p, map->ctime.tv_sec);
+	ceph_decode_32_le(p, map->ctime.tv_nsec);
+	ceph_decode_32_le(p, map->mtime.tv_sec);
+	ceph_decode_32_le(p, map->mtime.tv_nsec);
+	ceph_decode_32(p, map->pg_num);
+	ceph_decode_32(p, map->pgp_num);
+	ceph_decode_32(p, map->lpg_num);
+	ceph_decode_32(p, map->lpgp_num);
+	ceph_decode_32(p, map->last_pg_change);
+	ceph_decode_32(p, map->flags);
+
+	calc_pg_masks(map);
+
+	ceph_decode_32(p, max);
+
+	/* (re)alloc osd arrays */
+	err = osdmap_set_max_osd(map, max);
+	if (err < 0)
+		goto bad;
+	dout(30, "osdmap_decode max_osd = %d\n", map->max_osd);
+
+	/* osds */
+	err = -EINVAL;
+	ceph_decode_need(p, end, 3*sizeof(u32) +
+			 map->max_osd*(1 + sizeof(*map->osd_weight) +
+				       sizeof(*map->osd_addr)), bad);
+	*p += 4; /* skip length field (should match max) */
+	ceph_decode_copy(p, map->osd_state, map->max_osd);
+
+	*p += 4; /* skip length field (should match max) */
+	for (i = 0; i < map->max_osd; i++)
+		ceph_decode_32(p, map->osd_weight[i]);
+
+	*p += 4; /* skip length field (should match max) */
+	ceph_decode_copy(p, map->osd_addr, map->max_osd*sizeof(*map->osd_addr));
+
+	/* crush */
+	ceph_decode_32_safe(p, end, len, bad);
+	dout(30, "osdmap_decode crush len %d from off 0x%x\n", len,
+	     (int)(*p - start));
+	ceph_decode_need(p, end, len, bad);
+	map->crush = crush_decode(*p, end);
+	*p += len;
+	if (IS_ERR(map->crush)) {
+		err = PTR_ERR(map->crush);
+		map->crush = NULL;
+		goto bad;
+	}
+
+	/* ignore the rest of the map */
+	*p = end;
+
+	dout(30, "osdmap_decode done %p %p\n", *p, end);
+	return map;
+
+bad:
+	dout(30, "osdmap_decode fail\n");
+	osdmap_destroy(map);
+	return ERR_PTR(err);
+}
+
+/*
+ * decode and apply an incremental map update.
+ */
+struct ceph_osdmap *apply_incremental(void **p, void *end,
+				      struct ceph_osdmap *map,
+				      struct ceph_messenger *msgr)
+{
+	struct ceph_osdmap *newmap = map;
+	struct crush_map *newcrush = NULL;
+	ceph_fsid_t fsid;
+	u32 epoch = 0;
+	struct ceph_timespec ctime;
+	u32 len, x;
+	__s32 new_flags, max;
+	void *start = *p;
+	int err = -EINVAL;
+	__le64 major, minor;
+
+	ceph_decode_need(p, end, sizeof(fsid)+sizeof(ctime)+2*sizeof(u32),
+			 bad);
+	ceph_decode_64_le(p, major);
+	__ceph_fsid_set_major(&fsid, major);
+	ceph_decode_64_le(p, minor);
+	__ceph_fsid_set_minor(&fsid, minor);
+	ceph_decode_32(p, epoch);
+	BUG_ON(epoch != map->epoch+1);
+	ceph_decode_32_le(p, ctime.tv_sec);
+	ceph_decode_32_le(p, ctime.tv_nsec);
+	ceph_decode_32(p, new_flags);
+
+	/* full map? */
+	ceph_decode_32_safe(p, end, len, bad);
+	if (len > 0) {
+		dout(20, "apply_incremental full map len %d, %p to %p\n",
+		     len, *p, end);
+		newmap = osdmap_decode(p, min(*p+len, end));
+		return newmap;  /* error or not */
+	}
+
+	/* new crush? */
+	ceph_decode_32_safe(p, end, len, bad);
+	if (len > 0) {
+		dout(20, "apply_incremental new crush map len %d, %p to %p\n",
+		     len, *p, end);
+		newcrush = crush_decode(*p, min(*p+len, end));
+		if (IS_ERR(newcrush))
+			return ERR_PTR(PTR_ERR(newcrush));
+	}
+
+	/* new flags? */
+	if (new_flags >= 0)
+		map->flags = new_flags;
+
+	ceph_decode_need(p, end, 5*sizeof(u32), bad);
+
+	/* new max? */
+	ceph_decode_32(p, max);
+	if (max >= 0) {
+		err = osdmap_set_max_osd(map, max);
+		if (err < 0)
+			goto bad;
+	}
+	ceph_decode_32(p, x);
+	if (x)
+		map->pg_num = x;
+	ceph_decode_32(p, x);
+	if (x)
+		map->pgp_num = x;
+	ceph_decode_32(p, x);
+	if (x)
+		map->lpg_num = x;
+	ceph_decode_32(p, x);
+	if (x)
+		map->lpgp_num = x;
+
+	map->epoch++;
+	map->ctime = map->ctime;
+	if (newcrush) {
+		if (map->crush)
+			crush_destroy(map->crush);
+		map->crush = newcrush;
+		newcrush = NULL;
+	}
+
+	/* new_up */
+	err = -EINVAL;
+	ceph_decode_32_safe(p, end, len, bad);
+	while (len--) {
+		u32 osd;
+		struct ceph_entity_addr addr;
+		ceph_decode_32_safe(p, end, osd, bad);
+		ceph_decode_copy_safe(p, end, &addr, sizeof(addr), bad);
+		dout(1, "osd%d up\n", osd);
+		BUG_ON(osd >= map->max_osd);
+		map->osd_state[osd] |= CEPH_OSD_UP;
+		map->osd_addr[osd] = addr;
+	}
+
+	/* new_down */
+	ceph_decode_32_safe(p, end, len, bad);
+	while (len--) {
+		u32 osd;
+		ceph_decode_32_safe(p, end, osd, bad);
+		(*p)++;  /* clean flag */
+		dout(1, "osd%d down\n", osd);
+		if (osd < map->max_osd) {
+			map->osd_state[osd] &= ~CEPH_OSD_UP;
+			ceph_messenger_mark_down(msgr, &map->osd_addr[osd]);
+		}
+	}
+
+	/* new_weight */
+	ceph_decode_32_safe(p, end, len, bad);
+	while (len--) {
+		u32 osd, off;
+		ceph_decode_need(p, end, sizeof(u32)*2, bad);
+		ceph_decode_32(p, osd);
+		ceph_decode_32(p, off);
+		dout(1, "osd%d weight 0x%x %s\n", osd, off,
+		     off == CEPH_OSD_IN ? "(in)" :
+		     (off == CEPH_OSD_OUT ? "(out)" : ""));
+		if (osd < map->max_osd)
+			map->osd_weight[osd] = off;
+	}
+
+	/* ignore the rest */
+	*p = end;
+	return map;
+
+bad:
+	derr(10, "corrupt incremental osdmap epoch %d off %d (%p of %p-%p)\n",
+	     epoch, (int)(*p - start), *p, start, end);
+	if (newcrush)
+		crush_destroy(newcrush);
+	return ERR_PTR(err);
+}
+
+
+
+
+/*
+ * calculate file layout from given offset, length.
+ * fill in correct oid, logical length, and object extent
+ * offset, length.
+ *
+ * for now, we write only a single su, until we can
+ * pass a stride back to the caller.
+ */
+void calc_file_object_mapping(struct ceph_file_layout *layout,
+			      u64 off, u64 *plen,
+			      struct ceph_object *oid,
+			      u64 *oxoff, u64 *oxlen)
+{
+	u32 osize = le32_to_cpu(layout->fl_object_size);
+	u32 su = le32_to_cpu(layout->fl_stripe_unit);
+	u32 sc = le32_to_cpu(layout->fl_stripe_count);
+	u32 bl, stripeno, stripepos, objsetno;
+	u32 su_per_object;
+	u64 t;
+
+	dout(80, "mapping %llu~%llu  osize %u fl_su %u\n", off, *plen,
+	     osize, su);
+	su_per_object = osize / le32_to_cpu(layout->fl_stripe_unit);
+	dout(80, "osize %u / su %u = su_per_object %u\n", osize, su,
+	     su_per_object);
+
+	BUG_ON((su & ~PAGE_MASK) != 0);
+	/* bl = *off / su; */
+	t = off;
+	do_div(t, su);
+	bl = t;
+	dout(80, "off %llu / su %u = bl %u\n", off, su, bl);
+
+	stripeno = bl / sc;
+	stripepos = bl % sc;
+	objsetno = stripeno / su_per_object;
+
+	oid->bno = cpu_to_le32(objsetno * sc + stripepos);
+	dout(80, "objset %u * sc %u = bno %u\n", objsetno, sc, oid->bno);
+	/* *oxoff = *off / layout->fl_stripe_unit; */
+	t = off;
+	*oxoff = do_div(t, su);
+	*oxlen = min_t(u64, *plen, su - *oxoff);
+	*plen = *oxlen;
+
+	dout(80, " obj extent %llu~%llu\n", *oxoff, *oxlen);
+}
+
+/*
+ * calculate an object layout (i.e. pgid) from an oid,
+ * file_layout, and osdmap
+ */
+void calc_object_layout(struct ceph_object_layout *ol,
+			struct ceph_object *oid,
+			struct ceph_file_layout *fl,
+			struct ceph_osdmap *osdmap)
+{
+	unsigned num, num_mask;
+	union ceph_pg pgid;
+	u64 ino = le64_to_cpu(oid->ino);
+	unsigned bno = le32_to_cpu(oid->bno);
+	s32 preferred = (s32)le32_to_cpu(fl->fl_pg_preferred);
+
+	if (preferred >= 0) {
+		num = osdmap->lpg_num;
+		num_mask = osdmap->lpg_num_mask;
+	} else {
+		num = osdmap->pg_num;
+		num_mask = osdmap->pg_num_mask;
+	}
+
+	pgid.pg64 = 0;   /* start with it zeroed out */
+	pgid.pg.ps = bno + crush_hash32_2(ino, ino>>32);
+	pgid.pg.preferred = preferred;
+	pgid.pg.type = fl->fl_pg_type;
+	pgid.pg.size = fl->fl_pg_size;
+	pgid.pg.pool = fl->fl_pg_pool;
+
+	ol->ol_pgid = cpu_to_le64(pgid.pg64);
+	ol->ol_stripe_unit = fl->fl_object_stripe_unit;
+}
diff --git a/fs/ceph/osdmap.h b/fs/ceph/osdmap.h
new file mode 100644
index 0000000..f3b2964
--- /dev/null
+++ b/fs/ceph/osdmap.h
@@ -0,0 +1,106 @@
+#ifndef _FS_CEPH_OSDMAP_H
+#define _FS_CEPH_OSDMAP_H
+
+#include "types.h"
+#include "ceph_fs.h"
+#include "crush/crush.h"
+
+/*
+ * The osd map describes the current membership of the osd cluster and
+ * specifies the mapping of objects to placement groups and placement
+ * groups to (sets of) osds.  That is, it completely specifies the
+ * (desired) distribution of all data objects in the system at some
+ * point in time.
+ *
+ * Each map version is identified by an epoch, which increases monotonically.
+ *
+ * The map can be updated either via an incremental map (diff) describing
+ * the change between two successive epochs, or as a fully encoded map.
+ */
+struct ceph_osdmap {
+	ceph_fsid_t fsid;
+	u32 epoch;
+	u32 mkfs_epoch;
+	struct ceph_timespec ctime, mtime;
+
+	/* these parameters describe the number of placement groups
+	 * in the system.  foo_mask is the smallest value (2**n-1) >= foo. */
+	u32 pg_num, pg_num_mask;
+	u32 pgp_num, pgp_num_mask;
+	u32 lpg_num, lpg_num_mask;
+	u32 lpgp_num, lpgp_num_mask;
+	u32 last_pg_change;   /* epoch of last pg count change */
+
+	u32 flags;         /* CEPH_OSDMAP_* */
+
+	u32 max_osd;       /* size of osd_state, _offload, _addr arrays */
+	u8 *osd_state;     /* CEPH_OSD_* */
+	u32 *osd_weight;   /* 0 = failed, 0x10000 = 100% normal */
+	struct ceph_entity_addr *osd_addr;
+
+	/* the CRUSH map specifies the mapping of placement groups to
+	 * the list of osds that store+replicate them. */
+	struct crush_map *crush;
+};
+
+static inline int ceph_osd_is_up(struct ceph_osdmap *map, int osd)
+{
+	return (osd < map->max_osd) && (map->osd_state[osd] & CEPH_OSD_UP);
+}
+
+static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag)
+{
+	return map && (map->flags & flag);
+}
+
+static inline char *ceph_osdmap_state_str(char *str, int len, int state)
+{
+	int flag = 0;
+
+	if (!len)
+		goto done;
+
+	*str = '\0';
+	if (state) {
+		if (state & CEPH_OSD_EXISTS) {
+			snprintf(str, len, "exists");
+			flag = 1;
+		}
+		if (state & CEPH_OSD_UP) {
+			snprintf(str, len, "%s%s%s", str, (flag ? ", " : ""), "up");
+			flag = 1;
+		}
+	} else {
+		snprintf(str, len, "doesn't exist");
+	}
+done:
+	return str;
+}
+
+static inline struct ceph_entity_addr *ceph_osd_addr(struct ceph_osdmap *map,
+						     int osd)
+{
+	if (osd >= map->max_osd)
+		return 0;
+	return &map->osd_addr[osd];
+}
+
+extern struct ceph_osdmap *osdmap_decode(void **p, void *end);
+extern struct ceph_osdmap *apply_incremental(void **p, void *end,
+					     struct ceph_osdmap *map,
+					     struct ceph_messenger *msgr);
+extern void osdmap_destroy(struct ceph_osdmap *map);
+
+/* calculate mapping of a file extent to an object */
+extern void calc_file_object_mapping(struct ceph_file_layout *layout,
+				     u64 off, u64 *plen,
+				     struct ceph_object *oid,
+				     u64 *oxoff, u64 *oxlen);
+
+/* calculate mapping of object to a placement group */
+extern void calc_object_layout(struct ceph_object_layout *ol,
+			       struct ceph_object *oid,
+			       struct ceph_file_layout *fl,
+			       struct ceph_osdmap *osdmap);
+
+#endif
-- 
1.5.6.5

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ