lists.openwall.net | lists / announce owl-users owl-dev john-users john-dev passwdqc-users yescrypt popa3d-users / oss-security kernel-hardening musl sabotage tlsify passwords / crypt-dev xvendor / Bugtraq Full-Disclosure linux-kernel linux-netdev linux-ext4 linux-hardening linux-cve-announce PHC | |
Open Source and information security mailing list archives
| ||
|
Date: Thu, 30 Nov 2006 22:55:14 -0800 From: Andrew Morton <akpm@...l.org> To: Steven Whitehouse <swhiteho@...hat.com> Cc: cluster-devel@...hat.com, linux-kernel@...r.kernel.org, Patrick Caulfield <pcaulfie@...hat.com> Subject: Re: [DLM] Add support for tcp communications [38/70] On Thu, 30 Nov 2006 12:19:08 +0000 Steven Whitehouse <swhiteho@...hat.com> wrote: > >From fdda387f73947e6ae511ec601f5b3c6fbb582aac Mon Sep 17 00:00:00 2001 > From: Patrick Caulfield <pcaulfie@...hat.com> > Date: Thu, 2 Nov 2006 11:19:21 -0500 > Subject: [PATCH] [DLM] Add support for tcp communications > > The following patch adds a TCP based communications layer > to the DLM which is compile time selectable. The existing SCTP > layer gives the advantage of allowing multihoming, whereas > the TCP layer has been heavily tested in previous versions of > the DLM and is known to be robust and therefore can be used as > a baseline for performance testing. > I don't think this code is ready. > ... > > +static struct rw_semaphore nodeinfo_lock; This can be initialised at compile-time. > ... > > +#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) > +#define CBUF_EMPTY(cb) ((cb)->len == 0) > +#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) > +#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) > + > +#define CBUF_INIT(cb, size) \ > +do { \ > + (cb)->base = (cb)->len = 0; \ > + (cb)->mask = ((size)-1); \ > +} while(0) > + > +#define CBUF_EAT(cb, n) \ > +do { \ > + (cb)->len -= (n); \ > + (cb)->base += (n); \ > + (cb)->base &= (cb)->mask; \ > +} while(0) Suggest that all of the above be converted to C: lower-case static-inline functions. > + > +/* List of nodes which have writes pending */ > +static struct list_head write_nodes; LIST_HEAD (maybe) > +static spinlock_t write_nodes_lock; DEFINE_SPINLOCK > + > +/* Maximum number of incoming messages to process before > + * doing a schedule() > + */ > +#define MAX_RX_MSG_COUNT 25 > + > +/* Manage daemons */ > +static struct task_struct *recv_task; > +static struct task_struct *send_task; > +static wait_queue_head_t lowcomms_recv_wait; DECLARE_WAIT_QUEUE_HEAD > +static atomic_t accepting; = ATOMIC_INIT(0) Generally, initialising things at compile-time is better: smaller vmlinux, certainty that the code got it right. > +static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc) > +{ > + struct nodeinfo *ni; > + int r; > + int n; > + > + down_read(&nodeinfo_lock); Given that this function can sleep, I wonder if `alloc' is useful. I see lots of callers passing in a literal "0" for `alloc'. That's in fact a secret (GFP_ATOMIC & ~__GFP_HIGH). I doubt if that's what you really meant. Particularly as the code could at least have used __GFP_WAIT (aka GFP_NOIO) which is much, much more reliable than "0". In fact "0" is the least reliable mode possible. IOW, this is all bollixed up. > + ni = idr_find(&nodeinfo_idr, nodeid); > + up_read(&nodeinfo_lock); > + > + if (!ni && alloc) { > + down_write(&nodeinfo_lock); > + > + ni = idr_find(&nodeinfo_idr, nodeid); > + if (ni) > + goto out_up; > + > + r = idr_pre_get(&nodeinfo_idr, alloc); > + if (!r) > + goto out_up; > + > + ni = kmalloc(sizeof(struct nodeinfo), alloc); kzalloc > + if (!ni) > + goto out_up; > + > + r = idr_get_new_above(&nodeinfo_idr, ni, nodeid, &n); > + if (r) { > + kfree(ni); > + ni = NULL; > + goto out_up; > + } > + if (n != nodeid) { > + idr_remove(&nodeinfo_idr, n); > + kfree(ni); > + ni = NULL; > + goto out_up; > + } > + memset(ni, 0, sizeof(struct nodeinfo)); hence nuke > + spin_lock_init(&ni->lock); > + INIT_LIST_HEAD(&ni->writequeue); > + spin_lock_init(&ni->writequeue_lock); > + ni->nodeid = nodeid; > + > + if (nodeid > max_nodeid) > + max_nodeid = nodeid; > + out_up: > + up_write(&nodeinfo_lock); > + } > + > + return ni; > +} > + > +/* Don't call this too often... */ > +static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc) > +{ > + int i; > + struct nodeinfo *ni; > + > + for (i=1; i<=max_nodeid; i++) { > + ni = nodeid2nodeinfo(i, 0); GFP_NOIO (at least) (lots of places) > + if (ni && ni->assoc_id == assoc) > + return ni; > + } > + return NULL; > +} > + > > ... > > +static void send_shutdown(sctp_assoc_t associd) > +{ > + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; > + struct msghdr outmessage; > + struct cmsghdr *cmsg; > + struct sctp_sndrcvinfo *sinfo; > + int ret; > + > + outmessage.msg_name = NULL; > + outmessage.msg_namelen = 0; > + outmessage.msg_control = outcmsg; > + outmessage.msg_controllen = sizeof(outcmsg); > + outmessage.msg_flags = MSG_EOR; > + > + cmsg = CMSG_FIRSTHDR(&outmessage); > + cmsg->cmsg_level = IPPROTO_SCTP; > + cmsg->cmsg_type = SCTP_SNDRCV; > + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); > + outmessage.msg_controllen = cmsg->cmsg_len; > + sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); CMSG_DATA() returns void* - unneeded cast > + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); > + > + sinfo->sinfo_flags |= MSG_EOF; > + sinfo->sinfo_assoc_id = associd; > + > + ret = kernel_sendmsg(sctp_con.sock, &outmessage, NULL, 0, 0); > + > + if (ret != 0) > + log_print("send EOF to node failed: %d", ret); > +} > > ... > > + ni = nodeid2nodeinfo(nodeid, GFP_KERNEL); > + if (!ni) > + return; > + > + /* Save the assoc ID */ > + spin_lock(&ni->lock); > + ni->assoc_id = sn->sn_assoc_change.sac_assoc_id; > + spin_unlock(&ni->lock); A bare assignment inside a lock like this is quite unusual. It is often an indication that something is wrong. > +/* Data received from remote end */ > +static int receive_from_sock(void) > +{ > + int ret = 0; > + struct msghdr msg; > + struct kvec iov[2]; > + unsigned len; > + int r; > + struct sctp_sndrcvinfo *sinfo; > + struct cmsghdr *cmsg; > + struct nodeinfo *ni; > + > + /* These two are marginally too big for stack allocation, but this > + * function is (currently) only called by dlm_recvd so static should be > + * OK. > + */ > + static struct sockaddr_storage msgname; > + static char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; whoa. This is globally singly-threaded code?? > + if (sctp_con.sock == NULL) > + goto out; > + > + if (sctp_con.rx_page == NULL) { > + /* > + * This doesn't need to be atomic, but I think it should > + * improve performance if it is. > + */ > + sctp_con.rx_page = alloc_page(GFP_ATOMIC); > + if (sctp_con.rx_page == NULL) > + goto out_resched; > + CBUF_INIT(&sctp_con.cb, PAGE_CACHE_SIZE); > + } > + > + memset(&incmsg, 0, sizeof(incmsg)); > + memset(&msgname, 0, sizeof(msgname)); > + > + memset(incmsg, 0, sizeof(incmsg)); You just zeroed it twice. > + msg.msg_name = &msgname; > + msg.msg_namelen = sizeof(msgname); > + msg.msg_flags = 0; > + msg.msg_control = incmsg; > + msg.msg_controllen = sizeof(incmsg); > + msg.msg_iovlen = 1; > + > + /* I don't see why this circular buffer stuff is necessary for SCTP > + * which is a packet-based protocol, but the whole thing breaks under > + * load without it! The overhead is minimal (and is in the TCP lowcomms > + * anyway, of course) so I'll leave it in until I can figure out what's > + * really happening. > + */ > + > + /* > + * iov[0] is the bit of the circular buffer between the current end > + * point (cb.base + cb.len) and the end of the buffer. > + */ > + iov[0].iov_len = sctp_con.cb.base - CBUF_DATA(&sctp_con.cb); > + iov[0].iov_base = page_address(sctp_con.rx_page) + > + CBUF_DATA(&sctp_con.cb); > + iov[1].iov_len = 0; > + > + /* > + * iov[1] is the bit of the circular buffer between the start of the > + * buffer and the start of the currently used section (cb.base) > + */ > + if (CBUF_DATA(&sctp_con.cb) >= sctp_con.cb.base) { > + iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&sctp_con.cb); > + iov[1].iov_len = sctp_con.cb.base; > + iov[1].iov_base = page_address(sctp_con.rx_page); > + msg.msg_iovlen = 2; > + } > + len = iov[0].iov_len + iov[1].iov_len; > + > + r = ret = kernel_recvmsg(sctp_con.sock, &msg, iov, msg.msg_iovlen, len, > + MSG_NOSIGNAL | MSG_DONTWAIT); > + if (ret <= 0) > + goto out_close; > + > + msg.msg_control = incmsg; > + msg.msg_controllen = sizeof(incmsg); > + cmsg = CMSG_FIRSTHDR(&msg); > + sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); > + > + if (msg.msg_flags & MSG_NOTIFICATION) { > + process_sctp_notification(&msg, page_address(sctp_con.rx_page)); > + return 0; > + } > + > + /* Is this a new association ? */ > + ni = nodeid2nodeinfo(le32_to_cpu(sinfo->sinfo_ppid), GFP_KERNEL); > + if (ni) { > + ni->assoc_id = sinfo->sinfo_assoc_id; > + if (test_and_clear_bit(NI_INIT_PENDING, &ni->flags)) { > + > + if (!test_and_set_bit(NI_WRITE_PENDING, &ni->flags)) { > + spin_lock_bh(&write_nodes_lock); > + list_add_tail(&ni->write_list, &write_nodes); > + spin_unlock_bh(&write_nodes_lock); > + } > + wake_up_process(send_task); > + } > + } > + > + /* INIT sends a message with length of 1 - ignore it */ > + if (r == 1) > + return 0; > + > + CBUF_ADD(&sctp_con.cb, ret); > + ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid), > + page_address(sctp_con.rx_page), > + sctp_con.cb.base, sctp_con.cb.len, > + PAGE_CACHE_SIZE); > + if (ret < 0) > + goto out_close; > + CBUF_EAT(&sctp_con.cb, ret); > + > + out: Labels go in column 0 or 1 > + ret = 0; > + goto out_ret; > + > + out_resched: > + lowcomms_data_ready(sctp_con.sock->sk, 0); > + ret = 0; > + schedule(); What's the random schedule() for? If !need_reached() it's just a waste of cycles. > + goto out_ret; > + > + out_close: > + if (ret != -EAGAIN) > + log_print("error reading from sctp socket: %d", ret); > + out_ret: > + return ret; > +} > + > > ... > > +void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc) > +{ > + struct writequeue_entry *e; > + int offset = 0; > + int users = 0; > + struct nodeinfo *ni; > + > + if (!atomic_read(&accepting)) > + return NULL; hm, this looks racy. What happens if `accepting' goes to zero 50 nanoseconds later? > + ni = nodeid2nodeinfo(nodeid, allocation); > + if (!ni) > + return NULL; > + > + spin_lock(&ni->writequeue_lock); > + e = list_entry(ni->writequeue.prev, struct writequeue_entry, list); > + if (((struct list_head *) e == &ni->writequeue) || That typecast looks fishy. Perhaps a container_of() or something would be clearer. > + (PAGE_CACHE_SIZE - e->end < len)) { > + e = NULL; > + } else { > + offset = e->end; > + e->end += len; > + users = e->users++; > + } > + spin_unlock(&ni->writequeue_lock); > + > + if (e) { > + got_one: whoa, how'd that label get all the way over there? > + if (users == 0) > + kmap(e->page); > + *ppc = page_address(e->page) + offset; > + return e; > + } > + > + e = new_writequeue_entry(allocation); > + if (e) { > + spin_lock(&ni->writequeue_lock); > + offset = e->end; > + e->end += len; > + e->ni = ni; > + users = e->users++; > + list_add_tail(&e->list, &ni->writequeue); > + spin_unlock(&ni->writequeue_lock); > + goto got_one; > + } > + return NULL; > +} > + > > ... > > +static void initiate_association(int nodeid) > +{ > + struct sockaddr_storage rem_addr; > + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; Another static buffer to worry about. Globally singly-threaded code? > + struct msghdr outmessage; > + struct cmsghdr *cmsg; > + struct sctp_sndrcvinfo *sinfo; > + int ret; > + int addrlen; > + char buf[1]; > + struct kvec iov[1]; > + struct nodeinfo *ni; > + > + log_print("Initiating association with node %d", nodeid); > + > + ni = nodeid2nodeinfo(nodeid, GFP_KERNEL); > + if (!ni) > + return; > + > + if (nodeid_to_addr(nodeid, (struct sockaddr *)&rem_addr)) { > + log_print("no address for nodeid %d", nodeid); > + return; Am surprised that all these errors appear to be ignored. > + } > + > + make_sockaddr(&rem_addr, dlm_config.tcp_port, &addrlen); > + > + outmessage.msg_name = &rem_addr; > + outmessage.msg_namelen = addrlen; > + outmessage.msg_control = outcmsg; > + outmessage.msg_controllen = sizeof(outcmsg); > + outmessage.msg_flags = MSG_EOR; > + > + iov[0].iov_base = buf; > + iov[0].iov_len = 1; > + > + /* Real INIT messages seem to cause trouble. Just send a 1 byte message > + we can afford to lose */ > + cmsg = CMSG_FIRSTHDR(&outmessage); > + cmsg->cmsg_level = IPPROTO_SCTP; > + cmsg->cmsg_type = SCTP_SNDRCV; > + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); > + sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); > + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); > + sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); > + > + outmessage.msg_controllen = cmsg->cmsg_len; > + ret = kernel_sendmsg(sctp_con.sock, &outmessage, iov, 1, 1); > + if (ret < 0) { > + log_print("send INIT to node failed: %d", ret); > + /* Try again later */ > + clear_bit(NI_INIT_PENDING, &ni->flags); > + } > +} > + > +/* Send a message */ > +static int send_to_sock(struct nodeinfo *ni) > +{ > + int ret = 0; > + struct writequeue_entry *e; > + int len, offset; > + struct msghdr outmsg; > + static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))]; Singly-threaded? > + struct cmsghdr *cmsg; > + struct sctp_sndrcvinfo *sinfo; > + struct kvec iov; > + > + /* See if we need to init an association before we start > + sending precious messages */ Please use tabs. > + spin_lock(&ni->lock); > + if (!ni->assoc_id && !test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { > + spin_unlock(&ni->lock); > + initiate_association(ni->nodeid); > + return 0; > + } > + spin_unlock(&ni->lock); > + > + outmsg.msg_name = NULL; /* We use assoc_id */ > + outmsg.msg_namelen = 0; > + outmsg.msg_control = outcmsg; > + outmsg.msg_controllen = sizeof(outcmsg); > + outmsg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | MSG_EOR; > + > + cmsg = CMSG_FIRSTHDR(&outmsg); > + cmsg->cmsg_level = IPPROTO_SCTP; > + cmsg->cmsg_type = SCTP_SNDRCV; > + cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo)); > + sinfo = (struct sctp_sndrcvinfo *)CMSG_DATA(cmsg); Unneeded cast > + memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo)); > + sinfo->sinfo_ppid = cpu_to_le32(dlm_local_nodeid); > + sinfo->sinfo_assoc_id = ni->assoc_id; > + outmsg.msg_controllen = cmsg->cmsg_len; > + > + spin_lock(&ni->writequeue_lock); > + for (;;) { > + if (list_empty(&ni->writequeue)) > + break; > + e = list_entry(ni->writequeue.next, struct writequeue_entry, > + list); > + len = e->len; > + offset = e->offset; > + BUG_ON(len == 0 && e->users == 0); > + spin_unlock(&ni->writequeue_lock); > + kmap(e->page); I think this kmap() gets leaked > + ret = 0; > + if (len) { > + iov.iov_base = page_address(e->page)+offset; > + iov.iov_len = len; > + > + ret = kernel_sendmsg(sctp_con.sock, &outmsg, &iov, 1, > + len); > + if (ret == -EAGAIN) { > + sctp_con.eagain_flag = 1; > + goto out; > + } else if (ret < 0) > + goto send_error; > + } else { > + /* Don't starve people filling buffers */ > + schedule(); > + } > + > + spin_lock(&ni->writequeue_lock); > + e->offset += ret; > + e->len -= ret; > + > + if (e->len == 0 && e->users == 0) { > + list_del(&e->list); > + free_entry(e); > + continue; > + } > + } > + spin_unlock(&ni->writequeue_lock); > + out: > + return ret; > + > + send_error: > + log_print("Error sending to node %d %d", ni->nodeid, ret); > + spin_lock(&ni->lock); > + if (!test_and_set_bit(NI_INIT_PENDING, &ni->flags)) { > + ni->assoc_id = 0; > + spin_unlock(&ni->lock); > + initiate_association(ni->nodeid); > + } else > + spin_unlock(&ni->lock); > + > + return ret; > +} > > ... > > +static void dealloc_nodeinfo(void) > +{ > + int i; > + > + for (i=1; i<=max_nodeid; i++) { > + struct nodeinfo *ni = nodeid2nodeinfo(i, 0); > + if (ni) { > + idr_remove(&nodeinfo_idr, i); Didn't that need locking? > + kfree(ni); > + } > + } > +} > + > > .. > > +static int write_list_empty(void) > +{ > + int status; > + > + spin_lock_bh(&write_nodes_lock); > + status = list_empty(&write_nodes); > + spin_unlock_bh(&write_nodes_lock); > + > + return status; > +} This function's return value is meaningless. As soon as the lock gets dropped, the return value can get out of sync with reality. Looking at the caller, this _might_ happen to be OK, but it's a nasty and dangerous thing. Really the locking should be moved into the caller. > +static int dlm_recvd(void *data) > +{ > + DECLARE_WAITQUEUE(wait, current); > + > + while (!kthread_should_stop()) { > + int count = 0; > + > + set_current_state(TASK_INTERRUPTIBLE); > + add_wait_queue(&lowcomms_recv_wait, &wait); > + if (!test_bit(CF_READ_PENDING, &sctp_con.flags)) > + schedule(); > + remove_wait_queue(&lowcomms_recv_wait, &wait); > + set_current_state(TASK_RUNNING); > + > + if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) { > + int ret; > + > + do { > + ret = receive_from_sock(); > + > + /* Don't starve out everyone else */ > + if (++count >= MAX_RX_MSG_COUNT) { > + schedule(); cond_resched() would be more efficient. Potentially a lot more efficient. (several places). > + count = 0; > + } > + } while (!kthread_should_stop() && ret >=0); > + } > + schedule(); > + } > + > + return 0; > +} > > ... > > +static int daemons_start(void) > +{ > + struct task_struct *p; > + int error; > + > + p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); > + error = IS_ERR(p); > + if (error) { > + log_print("can't start dlm_recvd %d", error); > + return error; > + } whitespace broke > + recv_task = p; > + > + p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); > + error = IS_ERR(p); > + if (error) { > + log_print("can't start dlm_sendd %d", error); > + kthread_stop(recv_task); > + return error; > + } ditto > + send_task = p; > + > + return 0; > +} > + > > ... > > +#ifndef FALSE > +#define FALSE 0 > +#define TRUE 1 > +#endif No, we have a kernel-wide bool/true/false implementation. Please use that. > > +#define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0) > +#define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0) > +#define CBUF_EMPTY(cb) ((cb)->len == 0) > +#define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1)) > +#define CBUF_EAT(cb, n) do { (cb)->len -= (n); \ > + (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0) > +#define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask) This cbuf API seems to be part-implemented in two places. Ditto previous comments about macro->inline conversion > ... > +static struct sockaddr_storage dlm_local_addr; > + > +/* Manage daemons */ > +static struct task_struct *recv_task; > +static struct task_struct *send_task; > + > +static wait_queue_t lowcomms_send_waitq_head; > +static wait_queue_head_t lowcomms_send_waitq; > +static wait_queue_t lowcomms_recv_waitq_head; > +static wait_queue_head_t lowcomms_recv_waitq; > + > +/* An array of pointers to connections, indexed by NODEID */ > +static struct connection **connections; > +static struct semaphore connections_lock; > +static kmem_cache_t *con_cache; > +static int conn_array_size; > +static atomic_t accepting; > + > +/* List of sockets that have reads pending */ > +static struct list_head read_sockets; > +static spinlock_t read_sockets_lock; > + > +/* List of sockets which have writes pending */ > +static struct list_head write_sockets; > +static spinlock_t write_sockets_lock; > + > +/* List of sockets which have connects pending */ > +static struct list_head state_sockets; > +static spinlock_t state_sockets_lock; See previous comments regarding static initialisation > +static struct connection *nodeid2con(int nodeid, gfp_t allocation) > +{ > + struct connection *con = NULL; > + > + down(&connections_lock); > + if (nodeid >= conn_array_size) { > + int new_size = nodeid + NODE_INCREMENT; > + struct connection **new_conns; > + > + new_conns = kmalloc(sizeof(struct connection *) * > + new_size, allocation); > + if (!new_conns) > + goto finish; > + > + memset(new_conns, 0, sizeof(struct connection *) * new_size); kzalloc() > + memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size); > + conn_array_size = new_size; > + kfree(connections); > + connections = new_conns; > + > + } > + > + con = connections[nodeid]; > + if (con == NULL && allocation) { > + con = kmem_cache_alloc(con_cache, allocation); > + if (!con) > + goto finish; > + > + memset(con, 0, sizeof(*con)); kmem_cache_zalloc() > + con->nodeid = nodeid; > + init_rwsem(&con->sock_sem); > + INIT_LIST_HEAD(&con->writequeue); > + spin_lock_init(&con->writequeue_lock); > + > + connections[nodeid] = con; > + } > + > + finish: > + up(&connections_lock); > + return con; > +} > > ... > > +/* Add the port number to an IP6 or 4 sockaddr and return the address > + length */ > +static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port, > + int *addr_len) > +{ > + saddr->ss_family = dlm_local_addr.ss_family; > + if (saddr->ss_family == AF_INET) { > + struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr; > + in4_addr->sin_port = cpu_to_be16(port); > + *addr_len = sizeof(struct sockaddr_in); > + } whitespace broke > + else { } else { > + struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr; > + in6_addr->sin6_port = cpu_to_be16(port); > + *addr_len = sizeof(struct sockaddr_in6); > + } > +} > + > > ... > > +static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr) Exceeds 80 cols (lots of places) > +{ > + struct socket *sock = NULL; > + mm_segment_t fs; whitespace broke > + int result = 0; > + int one = 1; > + int addr_len; > + > + if (dlm_local_addr.ss_family == AF_INET) > + addr_len = sizeof(struct sockaddr_in); > + else > + addr_len = sizeof(struct sockaddr_in6); > + > + /* Create a socket to communicate with */ > + result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock); > + if (result < 0) { > + printk("dlm: Can't create listening comms socket\n"); > + goto create_out; > + } > + > + fs = get_fs(); > + set_fs(get_ds()); > + result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); > + set_fs(fs); > + if (result < 0) { > + printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result); > + } > + sock->sk->sk_user_data = con; > + con->rx_action = accept_from_sock; > + con->sock = sock; > + > + /* Bind to our port */ > + make_sockaddr(saddr, dlm_config.tcp_port, &addr_len); > + result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); > + if (result < 0) { > + printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port); > + sock_release(sock); > + sock = NULL; > + con->sock = NULL; > + goto create_out; > + } > + > + fs = get_fs(); > + set_fs(get_ds()); > + > + result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one)); > + set_fs(fs); > + if (result < 0) { > + printk("dlm: Set keepalive failed: %d\n", result); > + } > + > + result = sock->ops->listen(sock, 5); > + if (result < 0) { > + printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port); > + sock_release(sock); > + sock = NULL; > + goto create_out; > + } > + > + create_out: > + return sock; > +} > + > + > +/* Listen on all interfaces */ > +static int listen_for_all(void) > +{ > + struct socket *sock = NULL; > + struct connection *con = nodeid2con(0, GFP_KERNEL); > + int result = -EINVAL; > + > + /* We don't support multi-homed hosts */ > + memset(con, 0, sizeof(*con)); It would be more efficient to make nodeid2con() return a zeroed object. > + init_rwsem(&con->sock_sem); > + spin_lock_init(&con->writequeue_lock); > + INIT_LIST_HEAD(&con->writequeue); > + set_bit(CF_IS_OTHERCON, &con->flags); > + > + sock = create_listen_sock(con, &dlm_local_addr); > + if (sock) { > + add_sock(sock, con); > + result = 0; > + } > + else { > + result = -EADDRINUSE; > + } > + > + return result; > +} > + > +} > + > > ... > > +void *dlm_lowcomms_get_buffer(int nodeid, int len, > + gfp_t allocation, char **ppc) > +{ > + struct connection *con; > + struct writequeue_entry *e; > + int offset = 0; > + int users = 0; > + > + if (!atomic_read(&accepting)) > + return NULL; > + > + con = nodeid2con(nodeid, allocation); > + if (!con) > + return NULL; > + > + spin_lock(&con->writequeue_lock); > + e = list_entry(con->writequeue.prev, struct writequeue_entry, list); > + if (((struct list_head *) e == &con->writequeue) || There's that cast again > + (PAGE_CACHE_SIZE - e->end < len)) { > + e = NULL; > + } else { > + offset = e->end; > + e->end += len; > + users = e->users++; > + } > + spin_unlock(&con->writequeue_lock); > + > + if (e) { > + got_one: > + if (users == 0) > + kmap(e->page); Please check that this kmap (and the others) don't get leaked. > + *ppc = page_address(e->page) + offset; > + return e; > + } > + > + e = new_writequeue_entry(con, allocation); > + if (e) { > + spin_lock(&con->writequeue_lock); > + offset = e->end; > + e->end += len; > + users = e->users++; > + list_add_tail(&e->list, &con->writequeue); > + spin_unlock(&con->writequeue_lock); > + goto got_one; > + } > + return NULL; > +} > + > +/* Send a message */ > +static int send_to_sock(struct connection *con) > +{ > + int ret = 0; > + ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int); > + const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; > + struct writequeue_entry *e; > + int len, offset; > + > + down_read(&con->sock_sem); > + if (con->sock == NULL) > + goto out_connect; > + > + sendpage = con->sock->ops->sendpage; > + > + spin_lock(&con->writequeue_lock); > + for (;;) { > + e = list_entry(con->writequeue.next, struct writequeue_entry, > + list); > + if ((struct list_head *) e == &con->writequeue) > + break; > + > + len = e->len; > + offset = e->offset; > + BUG_ON(len == 0 && e->users == 0); > + spin_unlock(&con->writequeue_lock); > + > + ret = 0; > + if (len) { > + ret = sendpage(con->sock, e->page, offset, len, > + msg_flags); > + if (ret == -EAGAIN || ret == 0) > + goto out; > + if (ret <= 0) > + goto send_error; > + } > + else { > + /* Don't starve people filling buffers */ > + schedule(); cond_resched() > + } > + > + spin_lock(&con->writequeue_lock); > + e->offset += ret; > + e->len -= ret; > + > + if (e->len == 0 && e->users == 0) { > + list_del(&e->list); > + free_entry(e); > + continue; > + } > + } > + spin_unlock(&con->writequeue_lock); > + out: > + up_read(&con->sock_sem); > + return ret; > + > + send_error: > + up_read(&con->sock_sem); > + close_connection(con, FALSE); > + lowcomms_connect_sock(con); > + return ret; > + > + out_connect: > + up_read(&con->sock_sem); More wayward labels there. > + lowcomms_connect_sock(con); > + return 0; > +} > + > > ... > > +/* Try to send any messages that are pending > + */ > +static void process_output_queue(void) > +{ > + struct list_head *list; > + struct list_head *temp; > + int ret; > + > + spin_lock_bh(&write_sockets_lock); > + list_for_each_safe(list, temp, &write_sockets) { > + struct connection *con = > + list_entry(list, struct connection, write_list); > + clear_bit(CF_WRITE_PENDING, &con->flags); > + list_del(&con->write_list); > + > + spin_unlock_bh(&write_sockets_lock); > + > + ret = send_to_sock(con); > + if (ret < 0) { > + } hmm. > + spin_lock_bh(&write_sockets_lock); > + } > + spin_unlock_bh(&write_sockets_lock); > +} > + > +static void process_state_queue(void) > +{ > + struct list_head *list; > + struct list_head *temp; > + int ret; > + > + spin_lock_bh(&state_sockets_lock); > + list_for_each_safe(list, temp, &state_sockets) { > + struct connection *con = > + list_entry(list, struct connection, state_list); > + list_del(&con->state_list); > + clear_bit(CF_CONNECT_PENDING, &con->flags); > + spin_unlock_bh(&state_sockets_lock); > + > + ret = connect_to_sock(con); > + if (ret < 0) { > + } ? > + spin_lock_bh(&state_sockets_lock); > + } > + spin_unlock_bh(&state_sockets_lock); > +} > + > + > +/* Discard all entries on the write queues */ > +static void clean_writequeues(void) > +{ > + int nodeid; > + > + for (nodeid = 1; nodeid < conn_array_size; nodeid++) { > + struct connection *con = nodeid2con(nodeid, 0); > + > + if (con) > + clean_one_writequeue(con); > + } > +} > + > +static int read_list_empty(void) > +{ > + int status; > + > + spin_lock_bh(&read_sockets_lock); > + status = list_empty(&read_sockets); > + spin_unlock_bh(&read_sockets_lock); > + > + return status; > +} Again, unsafe. > +static int daemons_start(void) > +{ > + struct task_struct *p; > + int error; > + > + p = kthread_run(dlm_recvd, NULL, "dlm_recvd"); > + error = IS_ERR(p); > + if (error) { > + log_print("can't start dlm_recvd %d", error); > + return error; > + } > + recv_task = p; > + > + p = kthread_run(dlm_sendd, NULL, "dlm_sendd"); > + error = IS_ERR(p); > + if (error) { > + log_print("can't start dlm_sendd %d", error); > + kthread_stop(recv_task); > + return error; whitespace broke. > + } > + send_task = p; > + > + return 0; > +} > + > > ... > > +/* This is quite likely to sleep... */ > +int dlm_lowcomms_start(void) > +{ > + int error = 0; > + > + error = -ENOTCONN; > + > + /* > + * Temporarily initialise the waitq head so that lowcomms_send_message > + * doesn't crash if it gets called before the thread is fully > + * initialised > + */ > + init_waitqueue_head(&lowcomms_send_waitq); > + > + error = -ENOMEM; > + connections = kmalloc(sizeof(struct connection *) * > + NODE_INCREMENT, GFP_KERNEL); > + if (!connections) > + goto out; > + > + memset(connections, 0, > + sizeof(struct connection *) * NODE_INCREMENT); more kzalloc() > + conn_array_size = NODE_INCREMENT; > + > + if (dlm_our_addr(&dlm_local_addr, 0)) { > + log_print("no local IP address has been set"); > + goto fail_free_conn; > + } > + if (!dlm_our_addr(&dlm_local_addr, 1)) { > + log_print("This dlm comms module does not support multi-homed clustering"); > + goto fail_free_conn; > + } > + > + con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), > + __alignof__(struct connection), 0, NULL, NULL); > + if (!con_cache) > + goto fail_free_conn; > + > + > + /* Start listening */ > + error = listen_for_all(); > + if (error) > + goto fail_unlisten; > + > + error = daemons_start(); > + if (error) > + goto fail_unlisten; > + > + atomic_set(&accepting, 1); > + > + return 0; > + > + fail_unlisten: > + close_connection(connections[0], 0); > + kmem_cache_free(con_cache, connections[0]); > + kmem_cache_destroy(con_cache); > + > + fail_free_conn: > + kfree(connections); > + > + out: > + return error; > +} > + - To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majordomo@...r.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/
Powered by blists - more mailing lists