[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <dd2933125697a8e363fd76f33ce18ec3afebed8b.1271198630.git.yehuda@hq.newdream.net>
Date: Tue, 13 Apr 2010 16:29:12 -0700
From: Yehuda Sadeh <yehuda@...newdream.net>
To: linux-kernel@...r.kernel.org
Cc: linux-fsdevel@...r.kernel.org, yehudasa@...il.com,
sage@...dream.net, axboe@...nel.dk,
Yehuda Sadeh <yehuda@...newdream.net>
Subject: [PATCH 3/6] ceph: message layer can send/receive data in bio
This capability is being added so that we wouldn't
need to copy the data from the rados block device.
Signed-off-by: Yehuda Sadeh <yehuda@...newdream.net>
---
fs/ceph/messenger.c | 175 +++++++++++++++++++++++++++++++++++++++++--------
fs/ceph/messenger.h | 3 +
fs/ceph/osd_client.c | 14 ++++-
fs/ceph/osd_client.h | 4 +-
4 files changed, 164 insertions(+), 32 deletions(-)
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index b6cff8e..3f77210 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -8,6 +8,8 @@
#include <linux/net.h>
#include <linux/socket.h>
#include <linux/string.h>
+#include <linux/bio.h>
+#include <linux/blkdev.h>
#include <net/tcp.h>
#include "super.h"
@@ -529,8 +531,11 @@ static void prepare_write_message(struct ceph_connection *con)
if (le32_to_cpu(m->hdr.data_len) > 0) {
/* initialize page iterator */
con->out_msg_pos.page = 0;
- con->out_msg_pos.page_pos =
- le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+ if (m->pages)
+ con->out_msg_pos.page_pos =
+ le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
+ else
+ con->out_msg_pos.page_pos = 0;
con->out_msg_pos.data_pos = 0;
con->out_msg_pos.did_page_crc = 0;
con->out_more = 1; /* data + footer will follow */
@@ -712,6 +717,29 @@ out:
return ret; /* done! */
}
+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+ if (!bio) {
+ *iter = NULL;
+ *seg = 0;
+ return;
+ }
+ *iter = bio;
+ *seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+ if (*bio_iter == NULL)
+ return;
+
+ BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+ (*seg)++;
+ if (*seg == (*bio_iter)->bi_vcnt)
+ init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+
/*
* Write as much message data payload as we can. If we finish, queue
* up the footer.
@@ -726,14 +754,20 @@ static int write_partial_msg_pages(struct ceph_connection *con)
size_t len;
int crc = con->msgr->nocrc;
int ret;
+ int page_shift;
dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
con->out_msg_pos.page_pos);
- while (con->out_msg_pos.page < con->out_msg->nr_pages) {
+ if (msg->bio && !msg->bio_iter)
+ init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+
+
+ while (data_len - con->out_msg_pos.data_pos > 0) {
struct page *page = NULL;
void *kaddr = NULL;
+ int max_write;
/*
* if we are calculating the data crc (the default), we need
@@ -744,18 +778,29 @@ static int write_partial_msg_pages(struct ceph_connection *con)
page = msg->pages[con->out_msg_pos.page];
if (crc)
kaddr = kmap(page);
+ max_write = PAGE_SIZE;
} else if (msg->pagelist) {
page = list_first_entry(&msg->pagelist->head,
struct page, lru);
if (crc)
kaddr = kmap(page);
+ max_write = PAGE_SIZE;
+ } else if (msg->bio) {
+ struct bio_vec *bv;
+
+ bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
+ page = bv->bv_page;
+ page_shift = bv->bv_offset;
+ if (crc)
+ kaddr = kmap(page) + page_shift;
+ max_write = bv->bv_len;
} else {
page = con->msgr->zero_page;
if (crc)
kaddr = page_address(con->msgr->zero_page);
}
- len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos),
- (int)(data_len - con->out_msg_pos.data_pos));
+ len = min_t(int, max_write - con->out_msg_pos.page_pos,
+ data_len - con->out_msg_pos.data_pos);
if (crc && !con->out_msg_pos.did_page_crc) {
void *base = kaddr + con->out_msg_pos.page_pos;
u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -767,11 +812,12 @@ static int write_partial_msg_pages(struct ceph_connection *con)
}
ret = kernel_sendpage(con->sock, page,
- con->out_msg_pos.page_pos, len,
+ con->out_msg_pos.page_pos + page_shift,
+ len,
MSG_DONTWAIT | MSG_NOSIGNAL |
MSG_MORE);
- if (crc && (msg->pages || msg->pagelist))
+ if (crc && (msg->pages || msg->pagelist || msg->bio))
kunmap(page);
if (ret <= 0)
@@ -786,6 +832,8 @@ static int write_partial_msg_pages(struct ceph_connection *con)
if (msg->pagelist)
list_move_tail(&page->lru,
&msg->pagelist->head);
+ if (msg->bio)
+ iter_bio_next(&msg->bio_iter, &msg->bio_seg);
}
}
@@ -1291,8 +1339,7 @@ static int read_partial_message_section(struct ceph_connection *con,
struct kvec *section, unsigned int sec_len,
u32 *crc)
{
- int left;
- int ret;
+ int ret, left;
BUG_ON(!section);
@@ -1315,13 +1362,79 @@ static int read_partial_message_section(struct ceph_connection *con,
static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
struct ceph_msg_header *hdr,
int *skip);
+
+
+static int read_partial_message_pages(struct ceph_connection *con, struct page **pages, unsigned data_len, int datacrc)
+{
+ void *p;
+ int ret;
+ int left;
+
+ left = min((int)(data_len - con->in_msg_pos.data_pos),
+ (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
+ /* (page) data */
+ BUG_ON(pages == NULL);
+ p = kmap(pages[con->in_msg_pos.page]);
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
+ if (ret > 0 && datacrc)
+ con->in_data_crc =
+ crc32c(con->in_data_crc,
+ p + con->in_msg_pos.page_pos, ret);
+ kunmap(pages[con->in_msg_pos.page]);
+ if (ret <= 0)
+ return ret;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == PAGE_SIZE) {
+ con->in_msg_pos.page_pos = 0;
+ con->in_msg_pos.page++;
+ }
+
+ return ret;
+}
+
+static int read_partial_message_bio(struct ceph_connection *con,
+ struct bio **bio_iter, int *bio_seg,
+ unsigned data_len, int datacrc)
+{
+ struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
+ void *p;
+ int ret, left;
+
+ if (IS_ERR(bv))
+ return PTR_ERR(bv);
+
+ left = min((int)(data_len - con->in_msg_pos.data_pos),
+ (int)(bv->bv_len - con->in_msg_pos.page_pos));
+
+ p = kmap(bv->bv_page) + bv->bv_offset;
+
+ ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
+ left);
+ if (ret > 0 && datacrc)
+ con->in_data_crc =
+ crc32c(con->in_data_crc,
+ p + con->in_msg_pos.page_pos, ret);
+ kunmap(bv->bv_page);
+ if (ret <= 0)
+ return ret;
+ con->in_msg_pos.data_pos += ret;
+ con->in_msg_pos.page_pos += ret;
+ if (con->in_msg_pos.page_pos == bv->bv_len) {
+ con->in_msg_pos.page_pos = 0;
+ iter_bio_next(bio_iter, bio_seg);
+ }
+
+ return ret;
+}
+
/*
* read (part of) a message.
*/
static int read_partial_message(struct ceph_connection *con)
{
struct ceph_msg *m = con->in_msg;
- void *p;
int ret;
int to, left;
unsigned front_len, middle_len, data_len, data_off;
@@ -1385,7 +1498,10 @@ static int read_partial_message(struct ceph_connection *con)
m->middle->vec.iov_len = 0;
con->in_msg_pos.page = 0;
- con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ if (m->pages)
+ con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+ else
+ con->in_msg_pos.page_pos = 0;
con->in_msg_pos.data_pos = 0;
}
@@ -1402,27 +1518,25 @@ static int read_partial_message(struct ceph_connection *con)
if (ret <= 0)
return ret;
}
+ if (m->bio && !m->bio_iter)
+ init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
/* (page) data */
while (con->in_msg_pos.data_pos < data_len) {
- left = min((int)(data_len - con->in_msg_pos.data_pos),
- (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
- BUG_ON(m->pages == NULL);
- p = kmap(m->pages[con->in_msg_pos.page]);
- ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
- left);
- if (ret > 0 && datacrc)
- con->in_data_crc =
- crc32c(con->in_data_crc,
- p + con->in_msg_pos.page_pos, ret);
- kunmap(m->pages[con->in_msg_pos.page]);
- if (ret <= 0)
- return ret;
- con->in_msg_pos.data_pos += ret;
- con->in_msg_pos.page_pos += ret;
- if (con->in_msg_pos.page_pos == PAGE_SIZE) {
- con->in_msg_pos.page_pos = 0;
- con->in_msg_pos.page++;
+ if (m->pages) {
+ ret = read_partial_message_pages(con, m->pages,
+ data_len, datacrc);
+ if (ret <= 0)
+ return ret;
+ } else if (m->bio) {
+
+ ret = read_partial_message_bio(con,
+ &m->bio_iter, &m->bio_seg,
+ data_len, datacrc);
+ if (ret <= 0)
+ return ret;
+ } else {
+ BUG_ON(1);
}
}
@@ -2093,6 +2207,9 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
m->nr_pages = 0;
m->pages = NULL;
m->pagelist = NULL;
+ m->bio = NULL;
+ m->bio_iter = NULL;
+ m->bio_seg = 0;
dout("ceph_msg_new %p front %d\n", m, front_len);
return m;
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index fe24e6f..1f5bb9d 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -84,6 +84,9 @@ struct ceph_msg {
struct ceph_pagelist *pagelist; /* instead of pages */
struct list_head list_head;
struct kref kref;
+ struct bio *bio; /* instead of pages/pagelist */
+ struct bio *bio_iter; /* bio iterator */
+ int bio_seg; /* current bio segment */
bool front_is_vmalloc;
bool more_to_follow;
int front_max;
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 5927bc3..7469683 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -6,6 +6,7 @@
#include <linux/pagemap.h>
#include <linux/slab.h>
#include <linux/uaccess.h>
+#include <linux/bio.h>
#include "super.h"
#include "osd_client.h"
@@ -111,6 +112,8 @@ void ceph_osdc_release_request(struct kref *kref)
if (req->r_own_pages)
ceph_release_page_vector(req->r_pages,
req->r_num_pages);
+ if (req->r_bio)
+ bio_put(req->r_bio);
ceph_put_snap_context(req->r_snapc);
if (req->r_mempool)
mempool_free(req, req->r_osdc->req_mempool);
@@ -124,7 +127,8 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
int do_sync,
bool use_mempool,
gfp_t gfp_flags,
- struct page **pages)
+ struct page **pages,
+ struct bio *bio)
{
struct ceph_osd_request *req;
struct ceph_msg *msg;
@@ -189,6 +193,10 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
req->r_request = msg;
req->r_pages = pages;
+ if (bio) {
+ req->r_bio = bio;
+ bio_get(req->r_bio);
+ }
return req;
}
@@ -290,7 +298,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
ceph_osdc_alloc_request(osdc, flags,
snapc, do_sync,
use_mempool,
- GFP_NOFS, NULL);
+ GFP_NOFS, NULL, NULL);
if (IS_ERR(req))
return req;
@@ -1156,6 +1164,7 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
req->r_request->pages = req->r_pages;
req->r_request->nr_pages = req->r_num_pages;
+ req->r_request->bio = req->r_bio;
register_request(osdc, req);
@@ -1472,6 +1481,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
}
m->pages = req->r_pages;
m->nr_pages = req->r_num_pages;
+ m->bio = req->r_bio;
}
*skip = 0;
req->r_con_filling_msg = ceph_con_get(con);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 0b6b697..76aa63e 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -79,6 +79,7 @@ struct ceph_osd_request {
struct page **r_pages; /* pages for data payload */
int r_pages_from_pool;
int r_own_pages; /* if true, i own page list */
+ struct bio *r_bio; /* instead of pages */
};
struct ceph_osd_client {
@@ -129,7 +130,8 @@ extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *
int do_sync,
bool use_mempool,
gfp_t gfp_flags,
- struct page **pages);
+ struct page **pages,
+ struct bio *bio);
extern void ceph_osdc_build_request(struct ceph_osd_request *req,
u64 off, u64 *plen,
--
1.5.6.5
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
Powered by blists - more mailing lists