lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [day] [month] [year] [list]
Date:   Thu, 23 Jun 2022 14:29:05 +0100
From:   David Howells <dhowells@...hat.com>
To:     linux-afs@...ts.infradead.org
Cc:     dhowells@...hat.com, Marc Dionne <marc.dionne@...istor.com>,
        "David S. Miller" <davem@...emloft.net>, netdev@...r.kernel.org,
        linux-kernel@...r.kernel.org
Subject: [RFC PATCH 2/8] rxrpc: Allow UDP socket sharing for AF_RXRPC service
 sockets

Allow multiple AF_RXRPC service sockets to share a single UDP socket,
provided the set of service IDs supported by each socket is disjoint with
those of the other sockets.

bind() can be used repeatedly on a socket to bind multiple service IDs to
it.  The RXRPC_UPGRADEABLE_SERVICE sockopt can then be used to indicate
service upgrade paths between pairs of service IDs on the same socket.

Binding a service address now causes the preallocation buffers to be
created rather than this being done by listen().

The backlog struct is now replaced by a more comprehensive service struct
that can in future be shared between multiple sockets that are acting as
channels to the same set of services.

The local endpoint then maintains a list of services rather than pointing
at a specific AF_RXRPC socket.  New calls go onto a queue on the
rxrpc_service struct rather than been punted directly to a socket, waiting
for a recvmsg() to pick them up (in the kernel, a notification hook is
called instead).

On a userspace socket, the preallocated call user IDs are now held in a
separate ring and applied when recvmsg() selects the call.

Signed-off-by: David Howells <dhowells@...hat.com>
---

 fs/afs/internal.h            |    3 
 fs/afs/main.c                |    1 
 fs/afs/rxrpc.c               |   83 +++-----
 include/net/af_rxrpc.h       |    9 -
 include/trace/events/afs.h   |    4 
 include/trace/events/rxrpc.h |    6 -
 net/rxrpc/af_rxrpc.c         |  255 +++++++++++++++++++-----
 net/rxrpc/ar-internal.h      |   95 ++++++---
 net/rxrpc/call_accept.c      |  444 +++++++++++++++++++++++++++---------------
 net/rxrpc/call_object.c      |  133 ++++++++-----
 net/rxrpc/conn_service.c     |   17 +-
 net/rxrpc/input.c            |   25 +-
 net/rxrpc/key.c              |    4 
 net/rxrpc/local_object.c     |   24 +-
 net/rxrpc/net_ns.c           |    1 
 net/rxrpc/peer_object.c      |   17 +-
 net/rxrpc/proc.c             |   46 ++--
 net/rxrpc/recvmsg.c          |   67 +++++-
 net/rxrpc/security.c         |   22 +-
 net/rxrpc/server_key.c       |   12 +
 20 files changed, 823 insertions(+), 445 deletions(-)

diff --git a/fs/afs/internal.h b/fs/afs/internal.h
index a6f25d9e75b5..799f0773e7de 100644
--- a/fs/afs/internal.h
+++ b/fs/afs/internal.h
@@ -259,8 +259,6 @@ struct afs_net {
 
 	/* AF_RXRPC I/O stuff */
 	struct socket		*socket;
-	struct afs_call		*spare_incoming_call;
-	struct work_struct	charge_preallocation_work;
 	struct mutex		socket_mutex;
 	atomic_t		nr_outstanding_calls;
 	atomic_t		nr_superblocks;
@@ -1273,7 +1271,6 @@ extern struct workqueue_struct *afs_async_calls;
 
 extern int __net_init afs_open_socket(struct afs_net *);
 extern void __net_exit afs_close_socket(struct afs_net *);
-extern void afs_charge_preallocation(struct work_struct *);
 extern void afs_put_call(struct afs_call *);
 extern void afs_make_call(struct afs_addr_cursor *, struct afs_call *, gfp_t);
 extern long afs_wait_for_call_to_complete(struct afs_call *, struct afs_addr_cursor *);
diff --git a/fs/afs/main.c b/fs/afs/main.c
index eae288c8d40a..93388d821a8c 100644
--- a/fs/afs/main.c
+++ b/fs/afs/main.c
@@ -74,7 +74,6 @@ static int __net_init afs_net_init(struct net *net_ns)
 	net->live = true;
 	generate_random_uuid((unsigned char *)&net->uuid);
 
-	INIT_WORK(&net->charge_preallocation_work, afs_charge_preallocation);
 	mutex_init(&net->socket_mutex);
 
 	net->cells = RB_ROOT;
diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c
index a5434f3e57c6..af11ca170d96 100644
--- a/fs/afs/rxrpc.c
+++ b/fs/afs/rxrpc.c
@@ -19,6 +19,8 @@ struct workqueue_struct *afs_async_calls;
 static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
 static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
 static void afs_process_async_call(struct work_struct *);
+static unsigned long afs_rx_preallocate_call(struct sock *, struct rxrpc_call *,
+					     unsigned int *);
 static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
 static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
 static int afs_deliver_cm_op_id(struct afs_call *);
@@ -81,7 +83,10 @@ int afs_open_socket(struct afs_net *net)
 	 * it sends back to us.
 	 */
 
-	rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
+	rxrpc_kernel_new_call_notification(socket,
+					   afs_wake_up_async_call,
+					   afs_rx_preallocate_call,
+					   afs_rx_new_call,
 					   afs_rx_discard_new_call);
 
 	ret = kernel_listen(socket, INT_MAX);
@@ -89,7 +94,6 @@ int afs_open_socket(struct afs_net *net)
 		goto error_2;
 
 	net->socket = socket;
-	afs_charge_preallocation(&net->charge_preallocation_work);
 	_leave(" = 0");
 	return 0;
 
@@ -110,11 +114,6 @@ void afs_close_socket(struct afs_net *net)
 	kernel_listen(net->socket, 0);
 	flush_workqueue(afs_async_calls);
 
-	if (net->spare_incoming_call) {
-		afs_put_call(net->spare_incoming_call);
-		net->spare_incoming_call = NULL;
-	}
-
 	_debug("outstanding %u", atomic_read(&net->nr_outstanding_calls));
 	wait_var_event(&net->nr_outstanding_calls,
 		       !atomic_read(&net->nr_outstanding_calls));
@@ -133,7 +132,7 @@ void afs_close_socket(struct afs_net *net)
  */
 static struct afs_call *afs_alloc_call(struct afs_net *net,
 				       const struct afs_call_type *type,
-				       gfp_t gfp)
+				       bool prealloc, gfp_t gfp)
 {
 	struct afs_call *call;
 	int o;
@@ -151,7 +150,8 @@ static struct afs_call *afs_alloc_call(struct afs_net *net,
 	spin_lock_init(&call->state_lock);
 	call->iter = &call->def_iter;
 
-	o = atomic_inc_return(&net->nr_outstanding_calls);
+	if (!prealloc)
+		o = atomic_inc_return(&net->nr_outstanding_calls);
 	trace_afs_call(call, afs_call_trace_alloc, 1, o,
 		       __builtin_return_address(0));
 	return call;
@@ -229,7 +229,7 @@ struct afs_call *afs_alloc_flat_call(struct afs_net *net,
 {
 	struct afs_call *call;
 
-	call = afs_alloc_call(net, type, GFP_NOFS);
+	call = afs_alloc_call(net, type, false, GFP_NOFS);
 	if (!call)
 		goto nomem_call;
 
@@ -703,45 +703,25 @@ static void afs_process_async_call(struct work_struct *work)
 	_leave("");
 }
 
-static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
-{
-	struct afs_call *call = (struct afs_call *)user_call_ID;
-
-	call->rxcall = rxcall;
-}
-
-/*
- * Charge the incoming call preallocation.
- */
-void afs_charge_preallocation(struct work_struct *work)
+static unsigned long afs_rx_preallocate_call(struct sock *sock,
+					     struct rxrpc_call *rxcall,
+					     unsigned int *_debug_id)
 {
-	struct afs_net *net =
-		container_of(work, struct afs_net, charge_preallocation_work);
-	struct afs_call *call = net->spare_incoming_call;
+	struct afs_call *call;
+	struct afs_net *net = afs_sock2net(sock);
 
-	for (;;) {
-		if (!call) {
-			call = afs_alloc_call(net, &afs_RXCMxxxx, GFP_KERNEL);
-			if (!call)
-				break;
-
-			call->drop_ref = true;
-			call->async = true;
-			call->state = AFS_CALL_SV_AWAIT_OP_ID;
-			init_waitqueue_head(&call->waitq);
-			afs_extract_to_tmp(call);
-		}
+	call = afs_alloc_call(net, &afs_RXCMxxxx, true, GFP_KERNEL);
+	if (!call)
+		return 0;
 
-		if (rxrpc_kernel_charge_accept(net->socket,
-					       afs_wake_up_async_call,
-					       afs_rx_attach,
-					       (unsigned long)call,
-					       GFP_KERNEL,
-					       call->debug_id) < 0)
-			break;
-		call = NULL;
-	}
-	net->spare_incoming_call = call;
+	*_debug_id = call->debug_id;
+	call->rxcall = rxcall;
+	call->drop_ref = true;
+	call->async = true;
+	call->state = AFS_CALL_SV_AWAIT_OP_ID;
+	init_waitqueue_head(&call->waitq);
+	afs_extract_to_tmp(call);
+	return (unsigned long)call;
 }
 
 /*
@@ -751,6 +731,11 @@ static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
 				    unsigned long user_call_ID)
 {
 	struct afs_call *call = (struct afs_call *)user_call_ID;
+	unsigned int o;
+
+	o = atomic_inc_return(&call->net->nr_outstanding_calls);
+	trace_afs_call(call, afs_call_trace_discard, atomic_read(&call->usage),
+		        o, __builtin_return_address(0));
 
 	call->rxcall = NULL;
 	afs_put_call(call);
@@ -762,9 +747,13 @@ static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
 static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
 			    unsigned long user_call_ID)
 {
+	struct afs_call *call = (struct afs_call *)user_call_ID;
 	struct afs_net *net = afs_sock2net(sk);
+	unsigned int o;
 
-	queue_work(afs_wq, &net->charge_preallocation_work);
+	o = atomic_inc_return(&net->nr_outstanding_calls);
+	trace_afs_call(call, afs_call_trace_accept, atomic_read(&call->usage),
+		        o, __builtin_return_address(0));
 }
 
 /*
diff --git a/include/net/af_rxrpc.h b/include/net/af_rxrpc.h
index cee5f83c0f11..71e943c32027 100644
--- a/include/net/af_rxrpc.h
+++ b/include/net/af_rxrpc.h
@@ -34,9 +34,12 @@ typedef void (*rxrpc_notify_end_tx_t)(struct sock *, struct rxrpc_call *,
 typedef void (*rxrpc_notify_new_call_t)(struct sock *, struct rxrpc_call *,
 					unsigned long);
 typedef void (*rxrpc_discard_new_call_t)(struct rxrpc_call *, unsigned long);
-typedef void (*rxrpc_user_attach_call_t)(struct rxrpc_call *, unsigned long);
+typedef unsigned long (*rxrpc_preallocate_call_t)(struct sock *, struct rxrpc_call *,
+						  unsigned int *);
 
 void rxrpc_kernel_new_call_notification(struct socket *,
+					rxrpc_notify_rx_t,
+					rxrpc_preallocate_call_t,
 					rxrpc_notify_new_call_t,
 					rxrpc_discard_new_call_t);
 struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
@@ -60,9 +63,6 @@ void rxrpc_kernel_end_call(struct socket *, struct rxrpc_call *);
 void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
 			   struct sockaddr_rxrpc *);
 bool rxrpc_kernel_get_srtt(struct socket *, struct rxrpc_call *, u32 *);
-int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
-			       rxrpc_user_attach_call_t, unsigned long, gfp_t,
-			       unsigned int);
 void rxrpc_kernel_set_tx_length(struct socket *, struct rxrpc_call *, s64);
 bool rxrpc_kernel_check_life(const struct socket *, const struct rxrpc_call *);
 u32 rxrpc_kernel_get_epoch(struct socket *, struct rxrpc_call *);
@@ -73,5 +73,6 @@ void rxrpc_kernel_set_max_life(struct socket *, struct rxrpc_call *,
 			       unsigned long);
 
 int rxrpc_sock_set_min_security_level(struct sock *sk, unsigned int val);
+int rxrpc_sock_set_upgradeable_service(struct sock *sk, unsigned int val[2]);
 
 #endif /* _NET_RXRPC_H */
diff --git a/include/trace/events/afs.h b/include/trace/events/afs.h
index 499f5fabd20f..bbc5f471973a 100644
--- a/include/trace/events/afs.h
+++ b/include/trace/events/afs.h
@@ -19,7 +19,9 @@
 #define __AFS_DECLARE_TRACE_ENUMS_ONCE_ONLY
 
 enum afs_call_trace {
+	afs_call_trace_accept,
 	afs_call_trace_alloc,
+	afs_call_trace_discard,
 	afs_call_trace_free,
 	afs_call_trace_get,
 	afs_call_trace_put,
@@ -323,7 +325,9 @@ enum afs_cb_break_reason {
  * Declare tracing information enums and their string mappings for display.
  */
 #define afs_call_traces \
+	EM(afs_call_trace_accept,		"ACCPT") \
 	EM(afs_call_trace_alloc,		"ALLOC") \
+	EM(afs_call_trace_discard,		"DSCRD") \
 	EM(afs_call_trace_free,			"FREE ") \
 	EM(afs_call_trace_get,			"GET  ") \
 	EM(afs_call_trace_put,			"PUT  ") \
diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index d20bf4aa0204..b9b0b694b223 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -68,8 +68,11 @@
 	E_(rxrpc_client_to_idle,		"->Idle")
 
 #define rxrpc_call_traces \
+	EM(rxrpc_call_accepted,			"ACC") \
 	EM(rxrpc_call_connected,		"CON") \
+	EM(rxrpc_call_discard,			"DSC") \
 	EM(rxrpc_call_error,			"*E*") \
+	EM(rxrpc_call_get_socket_list,		"Gsl") \
 	EM(rxrpc_call_got,			"GOT") \
 	EM(rxrpc_call_got_kernel,		"Gke") \
 	EM(rxrpc_call_got_timer,		"GTM") \
@@ -80,6 +83,7 @@
 	EM(rxrpc_call_put_kernel,		"Pke") \
 	EM(rxrpc_call_put_noqueue,		"PnQ") \
 	EM(rxrpc_call_put_notimer,		"PnT") \
+	EM(rxrpc_call_put_socket_list,		"Psl") \
 	EM(rxrpc_call_put_timer,		"PTM") \
 	EM(rxrpc_call_put_userid,		"Pus") \
 	EM(rxrpc_call_queued,			"QUE") \
@@ -439,7 +443,7 @@ TRACE_EVENT(rxrpc_call,
 		    __entry->aux = aux;
 			   ),
 
-	    TP_printk("c=%08x %s u=%d sp=%pSR a=%p",
+	    TP_printk("c=%08x %s u=%d sp=%pSR a=%px",
 		      __entry->call,
 		      __print_symbolic(__entry->op, rxrpc_call_traces),
 		      __entry->usage,
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index 5519c2bef7f5..703e10969d2f 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -15,6 +15,7 @@
 #include <linux/random.h>
 #include <linux/poll.h>
 #include <linux/proc_fs.h>
+#include <linux/circ_buf.h>
 #include <linux/key-type.h>
 #include <net/net_namespace.h>
 #include <net/sock.h>
@@ -123,6 +124,117 @@ static int rxrpc_validate_address(struct rxrpc_sock *rx,
 	return 0;
 }
 
+/*
+ * Bind a service to an rxrpc socket.
+ */
+static int rxrpc_bind_service(struct rxrpc_sock *rx, struct rxrpc_local *local,
+			      u16 service_id)
+{
+	struct rxrpc_service_ids *ids;
+	struct rxrpc_service *b;
+	int i;
+
+	list_for_each_entry(b, &local->services, local_link) {
+		ids = rcu_dereference_protected(
+			b->ids, lockdep_is_held(&rx->local->services_lock));
+		for (i = 0; i < ids->nr_ids; i++)
+			if (service_id == ids->ids[i].service_id)
+				return -EADDRINUSE;
+	}
+
+	b = kzalloc(sizeof(struct rxrpc_service), GFP_KERNEL);
+	if (!b)
+		return -ENOMEM;
+	INIT_LIST_HEAD(&b->local_link);
+	INIT_LIST_HEAD(&b->to_be_accepted);
+	INIT_LIST_HEAD(&b->waiting_sockets);
+	INIT_WORK(&b->preallocator, rxrpc_service_preallocate);
+	spin_lock_init(&b->incoming_lock);
+	refcount_set(&b->ref, 1);
+	refcount_set(&b->active, 1);
+	b->local = local;
+
+	ids = kzalloc(struct_size(ids, ids, 1), GFP_KERNEL);
+	if (!ids) {
+		kfree(b);
+		return -ENOMEM;
+	}
+
+	ids->nr_ids = 1;
+	ids->ids[0].service_id = service_id;
+	rcu_assign_pointer(b->ids, ids);
+
+	rx->service = b;
+	rx->local = local;
+	list_add_tail_rcu(&b->local_link, &local->services);
+	return 0;
+}
+
+/*
+ * Bind an additional service to an rxrpc socket.
+ */
+static int rxrpc_bind_service2(struct rxrpc_sock *rx, struct rxrpc_local *local,
+			       u16 service_id)
+{
+	struct rxrpc_service_ids *ids, *old;
+	struct rxrpc_service *b;
+	int i;
+
+	list_for_each_entry(b, &local->services, local_link) {
+		ids = rcu_dereference_protected(
+			b->ids, lockdep_is_held(&rx->local->services_lock));
+		for (i = 0; i < ids->nr_ids; i++)
+			if (service_id == ids->ids[i].service_id)
+				return -EADDRINUSE;
+	}
+
+	b = rx->service;
+	old = rcu_dereference_protected(
+		b->ids, lockdep_is_held(&local->services_lock));
+
+	ids = kzalloc(struct_size(ids, ids, old->nr_ids + 1), GFP_KERNEL);
+	if (!ids)
+		return -ENOMEM;
+
+	memcpy(ids, old, struct_size(ids, ids, old->nr_ids));
+	ids->ids[ids->nr_ids++].service_id = service_id;
+	rcu_assign_pointer(b->ids, ids);
+	kfree_rcu(old, rcu);
+	return 0;
+}
+
+/*
+ * Mark a service on an rxrpc socket as upgradable.  Both service IDs must have
+ * been bound to this socket and upgrade-to-same is not allowed.
+ */
+static int rxrpc_bind_service_upgrade(struct rxrpc_sock *rx,
+				      u16 service_id, u16 upgrade_to)
+{
+	struct rxrpc_service_ids *ids;
+	struct rxrpc_service *b = rx->service;
+	int i;
+
+	if (upgrade_to == service_id)
+		return -EINVAL;
+
+	ids = rcu_dereference_protected(
+		b->ids, lockdep_is_held(&rx->local->services_lock));
+	for (i = 0; i < ids->nr_ids; i++)
+		if (upgrade_to == ids->ids[i].service_id)
+			goto found_upgrade;
+	return -EINVAL;
+
+found_upgrade:
+	for (i = 0; i < ids->nr_ids; i++) {
+		if (service_id == ids->ids[i].service_id) {
+			if (ids->ids[i].upgrade_to)
+				return -EINVAL;
+			ids->ids[i].upgrade_to = upgrade_to;
+		}
+	}
+	return 0;
+}
+
 /*
  * bind a local address to an RxRPC socket
  */
@@ -145,41 +257,45 @@ static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
 
 	switch (rx->sk.sk_state) {
 	case RXRPC_UNBOUND:
-		rx->srx = *srx;
-		local = rxrpc_lookup_local(sock_net(&rx->sk), &rx->srx);
+		local = rxrpc_lookup_local(sock_net(&rx->sk), srx,
+					   rx->sk.sk_kern_sock);
 		if (IS_ERR(local)) {
 			ret = PTR_ERR(local);
 			goto error_unlock;
 		}
 
 		if (service_id) {
-			write_lock(&local->services_lock);
-			if (rcu_access_pointer(local->service))
+			mutex_lock(&local->services_lock);
+			ret = rxrpc_bind_service(rx, local, service_id);
+			if (ret)
 				goto service_in_use;
-			rx->local = local;
-			rcu_assign_pointer(local->service, rx);
-			write_unlock(&local->services_lock);
+			mutex_unlock(&local->services_lock);
 
+			srx->srx_service = 0;
+			rx->srx = *srx;
 			rx->sk.sk_state = RXRPC_SERVER_BOUND;
 		} else {
+			srx->srx_service = 0;
+			rx->srx = *srx;
 			rx->local = local;
 			rx->sk.sk_state = RXRPC_CLIENT_BOUND;
 		}
 		break;
 
 	case RXRPC_SERVER_BOUND:
+		local = rx->local;
 		ret = -EINVAL;
 		if (service_id == 0)
 			goto error_unlock;
-		ret = -EADDRINUSE;
-		if (service_id == rx->srx.srx_service)
-			goto error_unlock;
-		ret = -EINVAL;
-		srx->srx_service = rx->srx.srx_service;
+		srx->srx_service = 0;
 		if (memcmp(srx, &rx->srx, sizeof(*srx)) != 0)
 			goto error_unlock;
-		rx->second_service = service_id;
-		rx->sk.sk_state = RXRPC_SERVER_BOUND2;
+
+		mutex_lock(&local->services_lock);
+		ret = rxrpc_bind_service2(rx, local, service_id);
+		if (ret)
+			goto service_in_use;
+		mutex_unlock(&local->services_lock);
 		break;
 
 	default:
@@ -192,7 +308,7 @@ static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
 	return 0;
 
 service_in_use:
-	write_unlock(&local->services_lock);
+	mutex_unlock(&local->services_lock);
 	rxrpc_unuse_local(local);
 	rxrpc_put_local(local);
 	ret = -EADDRINUSE;
@@ -209,20 +325,21 @@ static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
 static int rxrpc_listen(struct socket *sock, int backlog)
 {
 	struct sock *sk = sock->sk;
+	struct rxrpc_service *b;
 	struct rxrpc_sock *rx = rxrpc_sk(sk);
-	unsigned int max, old;
+	unsigned int max;
 	int ret;
 
 	_enter("%p,%d", rx, backlog);
 
 	lock_sock(&rx->sk);
+	b = rx->service;
 
 	switch (rx->sk.sk_state) {
 	case RXRPC_UNBOUND:
 		ret = -EADDRNOTAVAIL;
 		break;
 	case RXRPC_SERVER_BOUND:
-	case RXRPC_SERVER_BOUND2:
 		ASSERT(rx->local != NULL);
 		max = READ_ONCE(rxrpc_max_backlog);
 		ret = -EINVAL;
@@ -230,19 +347,24 @@ static int rxrpc_listen(struct socket *sock, int backlog)
 			backlog = max;
 		else if (backlog < 0 || backlog > max)
 			break;
-		old = sk->sk_max_ack_backlog;
-		sk->sk_max_ack_backlog = backlog;
-		ret = rxrpc_service_prealloc(rx, GFP_KERNEL);
-		if (ret == 0)
-			rx->sk.sk_state = RXRPC_SERVER_LISTENING;
-		else
-			sk->sk_max_ack_backlog = old;
+		ret = -ENOMEM;
+		if (!rx->sk.sk_kern_sock) {
+			rx->call_id_backlog =
+				kcalloc(RXRPC_BACKLOG_MAX,
+					sizeof(rx->call_id_backlog[0]),
+					GFP_KERNEL);
+			if (!rx->call_id_backlog)
+				break;
+		}
+		b->max_tba = backlog;
+		rx->sk.sk_state = RXRPC_SERVER_LISTENING;
+		schedule_work(&b->preallocator);
+		ret = 0;
 		break;
 	case RXRPC_SERVER_LISTENING:
 		if (backlog == 0) {
 			rx->sk.sk_state = RXRPC_SERVER_LISTEN_DISABLED;
-			sk->sk_max_ack_backlog = 0;
-			rxrpc_discard_prealloc(rx);
+			b->max_tba = 0;
 			ret = 0;
 			break;
 		}
@@ -354,7 +476,7 @@ void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
 	_enter("%d{%d}", call->debug_id, refcount_read(&call->ref));
 
 	mutex_lock(&call->user_mutex);
-	rxrpc_release_call(rxrpc_sk(sock->sk), call);
+	rxrpc_release_call(call);
 
 	/* Make sure we're not going to call back into a kernel service */
 	if (call->notify_rx) {
@@ -400,6 +522,8 @@ EXPORT_SYMBOL(rxrpc_kernel_get_epoch);
 /**
  * rxrpc_kernel_new_call_notification - Get notifications of new calls
  * @sock: The socket to intercept received messages on
+ * @notify_rx: Event notification function for the call
+ * @preallocate_call: Func to obtain a user_call_ID
  * @notify_new_call: Function to be called when new calls appear
  * @discard_new_call: Function to discard preallocated calls
  *
@@ -407,13 +531,19 @@ EXPORT_SYMBOL(rxrpc_kernel_get_epoch);
  */
 void rxrpc_kernel_new_call_notification(
 	struct socket *sock,
+	rxrpc_notify_rx_t notify_rx,
+	rxrpc_preallocate_call_t preallocate_call,
 	rxrpc_notify_new_call_t notify_new_call,
 	rxrpc_discard_new_call_t discard_new_call)
 {
 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
+	struct rxrpc_service *b = rx->service;
 
-	rx->notify_new_call = notify_new_call;
-	rx->discard_new_call = discard_new_call;
+	b->kernel_sock = sock->sk;
+	b->notify_rx = notify_rx;
+	b->preallocate_call = preallocate_call;
+	b->notify_new_call = notify_new_call;
+	b->discard_new_call = discard_new_call;
 }
 EXPORT_SYMBOL(rxrpc_kernel_new_call_notification);
 
@@ -539,7 +669,8 @@ static int rxrpc_sendmsg(struct socket *sock, struct msghdr *m, size_t len)
 			ret = -EAFNOSUPPORT;
 			goto error_unlock;
 		}
-		local = rxrpc_lookup_local(sock_net(sock->sk), &rx->srx);
+		local = rxrpc_lookup_local(sock_net(sock->sk), &rx->srx,
+					   rx->sk.sk_kern_sock);
 		if (IS_ERR(local)) {
 			ret = PTR_ERR(local);
 			goto error_unlock;
@@ -586,6 +717,22 @@ int rxrpc_sock_set_min_security_level(struct sock *sk, unsigned int val)
 }
 EXPORT_SYMBOL(rxrpc_sock_set_min_security_level);
 
+int rxrpc_sock_set_upgradeable_service(struct sock *sk, unsigned int val[2])
+{
+	struct rxrpc_sock *rx = rxrpc_sk(sk);
+	int ret = -EISCONN;
+
+	lock_sock(sk);
+	if (rx->sk.sk_state == RXRPC_SERVER_BOUND) {
+		mutex_lock(&rx->local->services_lock);
+		ret = rxrpc_bind_service_upgrade(rx, val[0], val[1]);
+		mutex_unlock(&rx->local->services_lock);
+	}
+	release_sock(sk);
+	return ret;
+}
+EXPORT_SYMBOL(rxrpc_sock_set_upgradeable_service);
+
 /*
  * set RxRPC socket options
  */
@@ -653,24 +800,21 @@ static int rxrpc_setsockopt(struct socket *sock, int level, int optname,
 
 		case RXRPC_UPGRADEABLE_SERVICE:
 			ret = -EINVAL;
-			if (optlen != sizeof(service_upgrade) ||
-			    rx->service_upgrade.from != 0)
+			if (optlen != sizeof(service_upgrade))
 				goto error;
 			ret = -EISCONN;
-			if (rx->sk.sk_state != RXRPC_SERVER_BOUND2)
+			if (rx->sk.sk_state != RXRPC_SERVER_BOUND)
 				goto error;
 			ret = -EFAULT;
 			if (copy_from_sockptr(service_upgrade, optval,
 					   sizeof(service_upgrade)) != 0)
 				goto error;
-			ret = -EINVAL;
-			if ((service_upgrade[0] != rx->srx.srx_service ||
-			     service_upgrade[1] != rx->second_service) &&
-			    (service_upgrade[0] != rx->second_service ||
-			     service_upgrade[1] != rx->srx.srx_service))
+			mutex_lock(&rx->local->services_lock);
+			ret = rxrpc_bind_service_upgrade(rx, service_upgrade[0],
+							 service_upgrade[1]);
+			mutex_unlock(&rx->local->services_lock);
+			if (ret < 0)
 				goto error;
-			rx->service_upgrade.from = service_upgrade[0];
-			rx->service_upgrade.to = service_upgrade[1];
 			goto success;
 
 		default:
@@ -721,6 +865,7 @@ static __poll_t rxrpc_poll(struct file *file, struct socket *sock,
 {
 	struct sock *sk = sock->sk;
 	struct rxrpc_sock *rx = rxrpc_sk(sk);
+	struct rxrpc_service *b = rx->service;
 	__poll_t mask;
 
 	sock_poll_wait(file, sock, wait);
@@ -737,6 +882,12 @@ static __poll_t rxrpc_poll(struct file *file, struct socket *sock,
 	if (rxrpc_writable(sk))
 		mask |= EPOLLOUT | EPOLLWRNORM;
 
+	if (b &&
+	    !list_empty(&b->to_be_accepted) &&
+	    CIRC_CNT(rx->call_id_backlog_head, rx->call_id_backlog_tail,
+		     RXRPC_BACKLOG_MAX) == 0)
+		mask |= EPOLLIN | EPOLLRDNORM;
+
 	return mask;
 }
 
@@ -771,17 +922,15 @@ static int rxrpc_create(struct net *net, struct socket *sock, int protocol,
 	sock_set_flag(sk, SOCK_RCU_FREE);
 	sk->sk_state		= RXRPC_UNBOUND;
 	sk->sk_write_space	= rxrpc_write_space;
-	sk->sk_max_ack_backlog	= 0;
 	sk->sk_destruct		= rxrpc_sock_destructor;
 
 	rx = rxrpc_sk(sk);
 	rx->family = protocol;
 	rx->calls = RB_ROOT;
 
-	spin_lock_init(&rx->incoming_lock);
 	INIT_LIST_HEAD(&rx->sock_calls);
-	INIT_LIST_HEAD(&rx->to_be_accepted);
 	INIT_LIST_HEAD(&rx->recvmsg_q);
+	INIT_LIST_HEAD(&rx->accepting_link);
 	rwlock_init(&rx->recvmsg_lock);
 	rwlock_init(&rx->call_lock);
 	memset(&rx->srx, 0, sizeof(rx->srx));
@@ -803,7 +952,6 @@ static int rxrpc_create(struct net *net, struct socket *sock, int protocol,
 static int rxrpc_shutdown(struct socket *sock, int flags)
 {
 	struct sock *sk = sock->sk;
-	struct rxrpc_sock *rx = rxrpc_sk(sk);
 	int ret = 0;
 
 	_enter("%p,%d", sk, flags);
@@ -823,9 +971,6 @@ static int rxrpc_shutdown(struct socket *sock, int flags)
 		ret = -ESHUTDOWN;
 	}
 	spin_unlock_bh(&sk->sk_receive_queue.lock);
-
-	rxrpc_discard_prealloc(rx);
-
 	release_sock(sk);
 	return ret;
 }
@@ -862,6 +1007,8 @@ static int rxrpc_release_sock(struct sock *sk)
 	/* declare the socket closed for business */
 	sock_orphan(sk);
 	sk->sk_shutdown = SHUTDOWN_MASK;
+	if (rx->service)
+		rxrpc_deactivate_service(rx);
 
 	/* We want to kill off all connections from a service socket
 	 * as fast as possible because we can't share these; client
@@ -869,7 +1016,6 @@ static int rxrpc_release_sock(struct sock *sk)
 	 */
 	switch (sk->sk_state) {
 	case RXRPC_SERVER_BOUND:
-	case RXRPC_SERVER_BOUND2:
 	case RXRPC_SERVER_LISTENING:
 	case RXRPC_SERVER_LISTEN_DISABLED:
 		rx->local->service_closed = true;
@@ -880,14 +1026,11 @@ static int rxrpc_release_sock(struct sock *sk)
 	sk->sk_state = RXRPC_CLOSE;
 	spin_unlock_bh(&sk->sk_receive_queue.lock);
 
-	if (rx->local && rcu_access_pointer(rx->local->service) == rx) {
-		write_lock(&rx->local->services_lock);
-		rcu_assign_pointer(rx->local->service, NULL);
-		write_unlock(&rx->local->services_lock);
-	}
-
 	/* try to flush out this socket */
-	rxrpc_discard_prealloc(rx);
+	if (rx->service) {
+		kfree(rx->call_id_backlog);
+		rxrpc_put_service(rxnet, rx->service);
+	}
 	rxrpc_release_calls_on_socket(rx);
 	flush_workqueue(rxrpc_workqueue);
 	rxrpc_purge_queue(&sk->sk_receive_queue);
@@ -901,8 +1044,6 @@ static int rxrpc_release_sock(struct sock *sk)
 	rx->local = NULL;
 	key_put(rx->key);
 	rx->key = NULL;
-	key_put(rx->securities);
-	rx->securities = NULL;
 	sock_put(sk);
 
 	_leave(" = 0");
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 68d269598305..89f86c31a50b 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -47,7 +47,6 @@ enum {
 	RXRPC_CLIENT_UNBOUND,		/* Unbound socket used as client */
 	RXRPC_CLIENT_BOUND,		/* client local address bound */
 	RXRPC_SERVER_BOUND,		/* server local address bound */
-	RXRPC_SERVER_BOUND2,		/* second server local address bound */
 	RXRPC_SERVER_LISTENING,		/* server listening for connections */
 	RXRPC_SERVER_LISTEN_DISABLED,	/* server listening disabled */
 	RXRPC_CLOSE,			/* socket is being closed */
@@ -57,6 +56,7 @@ enum {
  * Per-network namespace data.
  */
 struct rxrpc_net {
+	struct net		*net;
 	struct proc_dir_entry	*proc_net;	/* Subdir in /proc/net */
 	u32			epoch;		/* Local epoch for detecting local-end reset */
 	struct list_head	calls;		/* List of calls active in this namespace */
@@ -97,14 +97,49 @@ struct rxrpc_net {
 };
 
 /*
- * Service backlog preallocation.
+ * List of services listened on and their upgraded versions.
+ */
+struct rxrpc_service_ids {
+	struct rcu_head		rcu;
+	unsigned int		nr_ids;
+	struct {
+		u16		service_id;	/* Service ID to listen on */
+		u16		upgrade_to;	/* Service ID to upgrade to (or 0) */
+	} ids[];
+};
+
+/*
+ * Active service.  This may be shared between a number of sockets.
  *
  * This contains circular buffers of preallocated peers, connections and calls
  * for incoming service calls and their head and tail pointers.  This allows
  * calls to be set up in the data_ready handler, thereby avoiding the need to
  * shuffle packets around so much.
  */
-struct rxrpc_backlog {
+struct rxrpc_service {
+	struct rcu_head		rcu;
+	struct rxrpc_local	*local;		/* Local endpoint */
+	struct list_head	local_link;	/* Link in local->service_list */
+	struct list_head	to_be_accepted;	/* Calls awaiting acceptance */
+	struct list_head	waiting_sockets; /* List of sockets waiting to accept */
+	struct work_struct	preallocator;	/* Preallocator work item */
+	struct rxrpc_service_ids __rcu *ids;	/* List of service IDs */
+	struct key		*securities;	/* list of server security descriptors */
+	spinlock_t		incoming_lock;	/* Incoming call vs service shutdown lock */
+	refcount_t		ref;
+	refcount_t		active;		/* Active socket count */
+	unsigned int		nr_tba;		/* Number of calls on to_be_accepted */
+	unsigned int		max_tba;	/* Maximum number of calls on to_be_accepted */
+	int			error;		/* Allocation error */
+
+	/* Notifications for kernel-based services. */
+	struct sock		*kernel_sock;	/* Kernel's socket */
+	rxrpc_notify_rx_t	notify_rx;	/* Setting to call->notify_rx */
+	rxrpc_preallocate_call_t preallocate_call; /* Func to preallocate a call */
+	rxrpc_notify_new_call_t	notify_new_call; /* Func to notify of new call */
+	rxrpc_discard_new_call_t discard_new_call; /* Func to discard a new call */
+
+	/* Structure preallocation. */
 	unsigned short		peer_backlog_head;
 	unsigned short		peer_backlog_tail;
 	unsigned short		conn_backlog_head;
@@ -123,31 +158,24 @@ struct rxrpc_backlog {
 struct rxrpc_sock {
 	/* WARNING: sk has to be the first member */
 	struct sock		sk;
-	rxrpc_notify_new_call_t	notify_new_call; /* Func to notify of new call */
-	rxrpc_discard_new_call_t discard_new_call; /* Func to discard a new call */
 	struct rxrpc_local	*local;		/* local endpoint */
-	struct rxrpc_backlog	*backlog;	/* Preallocation for services */
-	spinlock_t		incoming_lock;	/* Incoming call vs service shutdown lock */
+	struct rxrpc_service	*service;
+	unsigned long		*call_id_backlog; /* Precharged call IDs */
 	struct list_head	sock_calls;	/* List of calls owned by this socket */
-	struct list_head	to_be_accepted;	/* calls awaiting acceptance */
 	struct list_head	recvmsg_q;	/* Calls awaiting recvmsg's attention  */
+	struct list_head	accepting_link;	/* Link in service->waiting_sockets */
 	struct hlist_node	ns_link;	/* Link in net->sockets */
 	rwlock_t		recvmsg_lock;	/* Lock for recvmsg_q */
 	struct key		*key;		/* security for this socket */
-	struct key		*securities;	/* list of server security descriptors */
 	struct rb_root		calls;		/* User ID -> call mapping */
 	unsigned long		flags;
-#define RXRPC_SOCK_CONNECTED		0	/* connect_srx is set */
+#define RXRPC_SOCK_CONNECTED	0		/* connect_srx is set */
 	rwlock_t		call_lock;	/* lock for calls */
 	u32			min_sec_level;	/* minimum security level */
 #define RXRPC_SECURITY_MAX	RXRPC_SECURITY_ENCRYPT
 	bool			exclusive;	/* Exclusive connection for a client socket */
-	u16			second_service;	/* Additional service bound to the endpoint */
-	struct {
-		/* Service upgrade information */
-		u16		from;		/* Service ID to upgrade (if not 0) */
-		u16		to;		/* service ID to upgrade to */
-	} service_upgrade;
+	unsigned short		call_id_backlog_head;
+	unsigned short		call_id_backlog_tail;
 	sa_family_t		family;		/* Protocol family created with */
 	struct sockaddr_rxrpc	srx;		/* Primary Service/local addresses */
 	struct sockaddr_rxrpc	connect_srx;	/* Default client address from connect() */
@@ -278,17 +306,18 @@ struct rxrpc_local {
 	struct hlist_node	link;
 	struct socket		*socket;	/* my UDP socket */
 	struct work_struct	processor;
-	struct rxrpc_sock __rcu	*service;	/* Service(s) listening on this endpoint */
+	struct list_head	services;	/* Service(s) listening on this endpoint */
+	struct mutex		services_lock;	/* Lock for services list */
 	struct rw_semaphore	defrag_sem;	/* control re-enablement of IP DF bit */
 	struct sk_buff_head	reject_queue;	/* packets awaiting rejection */
 	struct sk_buff_head	event_queue;	/* endpoint event packets awaiting processing */
 	struct rb_root		client_bundles;	/* Client connection bundles by socket params */
 	spinlock_t		client_bundles_lock; /* Lock for client_bundles */
 	spinlock_t		lock;		/* access lock */
-	rwlock_t		services_lock;	/* lock for services list */
 	int			debug_id;	/* debug ID for printks */
 	bool			dead;
 	bool			service_closed;	/* Service socket closed */
+	bool			kernel;		/* T if kernel endpoint */
 	struct sockaddr_rxrpc	srx;		/* local address */
 };
 
@@ -500,6 +529,7 @@ enum rxrpc_call_flag {
 	RXRPC_CALL_DISCONNECTED,	/* The call has been disconnected */
 	RXRPC_CALL_KERNEL,		/* The call was made by the kernel */
 	RXRPC_CALL_UPGRADE,		/* Service upgrade was requested for the call */
+	RXRPC_CALL_NEWLY_ACCEPTED,	/* Incoming call hasn't yet been reported by recvmsg */
 };
 
 /*
@@ -564,8 +594,10 @@ struct rxrpc_call {
 	struct rcu_head		rcu;
 	struct rxrpc_connection	*conn;		/* connection carrying call */
 	struct rxrpc_peer	*peer;		/* Peer record for remote address */
+	struct rxrpc_local	*local;		/* Local endpoint */
 	struct rxrpc_sock __rcu	*socket;	/* socket responsible */
 	struct rxrpc_net	*rxnet;		/* Network namespace to which call belongs */
+	struct rxrpc_service	*service;	/* The service this call came in through */
 	const struct rxrpc_security *security;	/* applied security module */
 	struct mutex		user_mutex;	/* User access mutex */
 	unsigned long		ack_at;		/* When deferred ACK needs to happen */
@@ -585,7 +617,6 @@ struct rxrpc_call {
 	struct list_head	link;		/* link in master call list */
 	struct list_head	chan_wait_link;	/* Link in conn->bundle->waiting_calls */
 	struct hlist_node	error_link;	/* link in error distribution list */
-	struct list_head	accept_link;	/* Link in rx->acceptq */
 	struct list_head	recvmsg_link;	/* Link in rx->recvmsg_q */
 	struct list_head	sock_link;	/* Link in rx->sock_calls */
 	struct rb_node		sock_node;	/* Node in rx->calls */
@@ -757,12 +788,12 @@ extern struct workqueue_struct *rxrpc_workqueue;
 /*
  * call_accept.c
  */
-int rxrpc_service_prealloc(struct rxrpc_sock *, gfp_t);
-void rxrpc_discard_prealloc(struct rxrpc_sock *);
+void rxrpc_service_preallocate(struct work_struct *work);
+void rxrpc_put_service(struct rxrpc_net *, struct rxrpc_service *);
+void rxrpc_deactivate_service(struct rxrpc_sock *);
 struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *,
-					   struct rxrpc_sock *,
 					   struct sk_buff *);
-void rxrpc_accept_incoming_calls(struct rxrpc_local *);
+void rxrpc_accept_incoming_call(struct rxrpc_sock *);
 int rxrpc_user_charge_accept(struct rxrpc_sock *, unsigned long);
 
 /*
@@ -788,15 +819,15 @@ extern unsigned int rxrpc_max_call_lifetime;
 extern struct kmem_cache *rxrpc_call_jar;
 
 struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *, unsigned long);
-struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *, gfp_t, unsigned int);
+struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_local *, gfp_t, unsigned int,
+				    struct rxrpc_service *);
 struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
 					 struct rxrpc_conn_parameters *,
 					 struct sockaddr_rxrpc *,
 					 struct rxrpc_call_params *, gfp_t,
 					 unsigned int);
-void rxrpc_incoming_call(struct rxrpc_sock *, struct rxrpc_call *,
-			 struct sk_buff *);
-void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
+void rxrpc_incoming_call(struct rxrpc_call *, struct sk_buff *);
+void rxrpc_release_call(struct rxrpc_call *);
 void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
 bool __rxrpc_queue_call(struct rxrpc_call *);
 bool rxrpc_queue_call(struct rxrpc_call *);
@@ -898,7 +929,7 @@ static inline void rxrpc_reduce_conn_timer(struct rxrpc_connection *conn,
 struct rxrpc_connection *rxrpc_find_service_conn_rcu(struct rxrpc_peer *,
 						     struct sk_buff *);
 struct rxrpc_connection *rxrpc_prealloc_service_connection(struct rxrpc_net *, gfp_t);
-void rxrpc_new_incoming_connection(struct rxrpc_sock *, struct rxrpc_connection *,
+void rxrpc_new_incoming_connection(const struct rxrpc_service *, struct rxrpc_connection *,
 				   const struct rxrpc_security *, struct sk_buff *);
 void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
 
@@ -929,7 +960,8 @@ extern void rxrpc_process_local_events(struct rxrpc_local *);
 /*
  * local_object.c
  */
-struct rxrpc_local *rxrpc_lookup_local(struct net *, const struct sockaddr_rxrpc *);
+struct rxrpc_local *rxrpc_lookup_local(struct net *, const struct sockaddr_rxrpc *,
+				       bool);
 struct rxrpc_local *rxrpc_get_local(struct rxrpc_local *);
 struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *);
 void rxrpc_put_local(struct rxrpc_local *);
@@ -995,8 +1027,7 @@ struct rxrpc_peer *rxrpc_lookup_peer_rcu(struct rxrpc_local *,
 struct rxrpc_peer *rxrpc_lookup_peer(struct rxrpc_sock *, struct rxrpc_local *,
 				     struct sockaddr_rxrpc *, gfp_t);
 struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *, gfp_t);
-void rxrpc_new_incoming_peer(struct rxrpc_sock *, struct rxrpc_local *,
-			     struct rxrpc_peer *);
+void rxrpc_new_incoming_peer(struct rxrpc_local *, struct rxrpc_peer *);
 void rxrpc_destroy_all_peers(struct rxrpc_net *);
 struct rxrpc_peer *rxrpc_get_peer(struct rxrpc_peer *);
 struct rxrpc_peer *rxrpc_get_peer_maybe(struct rxrpc_peer *);
@@ -1065,7 +1096,7 @@ int __init rxrpc_init_security(void);
 const struct rxrpc_security *rxrpc_security_lookup(u8);
 void rxrpc_exit_security(void);
 int rxrpc_init_client_conn_security(struct rxrpc_connection *);
-const struct rxrpc_security *rxrpc_get_incoming_security(struct rxrpc_sock *,
+const struct rxrpc_security *rxrpc_get_incoming_security(struct rxrpc_service *,
 							 struct sk_buff *);
 struct key *rxrpc_look_up_server_security(struct rxrpc_connection *,
 					  struct sk_buff *, u32, u32);
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 99e10eea3732..3cba4dacb8d4 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -31,167 +31,284 @@ static void rxrpc_dummy_notify(struct sock *sk, struct rxrpc_call *call,
  * Preallocate a single service call, connection and peer and, if possible,
  * give them a user ID and attach the user's side of the ID to them.
  */
-static int rxrpc_service_prealloc_one(struct rxrpc_sock *rx,
-				      struct rxrpc_backlog *b,
-				      rxrpc_notify_rx_t notify_rx,
-				      rxrpc_user_attach_call_t user_attach_call,
-				      unsigned long user_call_ID, gfp_t gfp,
-				      unsigned int debug_id)
+void rxrpc_service_preallocate(struct work_struct *work)
 {
-	const void *here = __builtin_return_address(0);
-	struct rxrpc_call *call, *xcall;
-	struct rxrpc_net *rxnet = rxrpc_net(sock_net(&rx->sk));
-	struct rb_node *parent, **pp;
-	int max, tmp;
-	unsigned int size = RXRPC_BACKLOG_MAX;
-	unsigned int head, tail, call_head, call_tail;
-
-	max = rx->sk.sk_max_ack_backlog;
-	tmp = rx->sk.sk_ack_backlog;
-	if (tmp >= max) {
-		_leave(" = -ENOBUFS [full %u]", max);
-		return -ENOBUFS;
-	}
-	max -= tmp;
+	struct rxrpc_service *b = container_of(work, struct rxrpc_service, preallocator);
+	struct rxrpc_local *local = b->local;
+	struct rxrpc_net *rxnet = local->rxnet;
+	unsigned int head, tail, size = RXRPC_BACKLOG_MAX;
+	int max = b->max_tba;
 
-	/* We don't need more conns and peers than we have calls, but on the
-	 * other hand, we shouldn't ever use more peers than conns or conns
-	 * than calls.
-	 */
-	call_head = b->call_backlog_head;
-	call_tail = READ_ONCE(b->call_backlog_tail);
-	tmp = CIRC_CNT(call_head, call_tail, size);
-	if (tmp >= max) {
-		_leave(" = -ENOBUFS [enough %u]", tmp);
-		return -ENOBUFS;
-	}
-	max = tmp + 1;
+	if (!refcount_read(&b->active))
+		return;
 
 	head = b->peer_backlog_head;
 	tail = READ_ONCE(b->peer_backlog_tail);
-	if (CIRC_CNT(head, tail, size) < max) {
-		struct rxrpc_peer *peer = rxrpc_alloc_peer(rx->local, gfp);
+	while (CIRC_CNT(head, tail, size) < max) {
+		struct rxrpc_peer *peer = rxrpc_alloc_peer(local, GFP_KERNEL);
 		if (!peer)
-			return -ENOMEM;
-		b->peer_backlog[head] = peer;
-		smp_store_release(&b->peer_backlog_head,
-				  (head + 1) & (size - 1));
+			goto nomem;
+		b->peer_backlog[head++] = peer;
+		head &= size - 1;
+		smp_store_release(&b->peer_backlog_head, head);
 	}
 
 	head = b->conn_backlog_head;
 	tail = READ_ONCE(b->conn_backlog_tail);
-	if (CIRC_CNT(head, tail, size) < max) {
+	while (CIRC_CNT(head, tail, size) < max) {
 		struct rxrpc_connection *conn;
 
-		conn = rxrpc_prealloc_service_connection(rxnet, gfp);
+		conn = rxrpc_prealloc_service_connection(rxnet, GFP_KERNEL);
 		if (!conn)
-			return -ENOMEM;
-		b->conn_backlog[head] = conn;
-		smp_store_release(&b->conn_backlog_head,
-				  (head + 1) & (size - 1));
+			goto nomem;
 
 		trace_rxrpc_conn(conn->debug_id, rxrpc_conn_new_service,
-				 refcount_read(&conn->ref), here);
+				 refcount_read(&conn->ref), NULL);
+
+		b->conn_backlog[head++] = conn;
+		head &= size - 1;
+		smp_store_release(&b->conn_backlog_head, head);
 	}
 
-	/* Now it gets complicated, because calls get registered with the
-	 * socket here, with a user ID preassigned by the user.
-	 */
-	call = rxrpc_alloc_call(rx, gfp, debug_id);
-	if (!call)
-		return -ENOMEM;
-	call->flags |= (1 << RXRPC_CALL_IS_SERVICE);
-	call->state = RXRPC_CALL_SERVER_PREALLOC;
+	head = b->call_backlog_head;
+	tail = READ_ONCE(b->call_backlog_tail);
+	while (CIRC_CNT(head, tail, size) < max) {
+		struct rxrpc_call *call;
+		unsigned int debug_id = 0;
+
+		if (!b->preallocate_call)
+			debug_id = atomic_inc_return(&rxrpc_debug_id);
+
+		call = rxrpc_alloc_call(b->local, GFP_KERNEL, debug_id, b);
+		if (!call)
+			goto nomem;
+		call->flags |= (1 << RXRPC_CALL_IS_SERVICE);
+		call->state = RXRPC_CALL_SERVER_PREALLOC;
+		call->notify_rx = b->notify_rx;
+		__set_bit(RXRPC_CALL_NEWLY_ACCEPTED, &call->flags);
+
+		if (b->kernel_sock) {
+			struct rxrpc_sock *rx = rxrpc_sk(b->kernel_sock);
+			rxrpc_get_call(call, rxrpc_call_get_socket_list);
+			write_lock(&rx->call_lock);
+			list_add(&call->sock_link, &rx->sock_calls);
+			write_unlock(&rx->call_lock);
+		} else {
+			refcount_inc(&b->ref);
+			call->service = b;
+		}
 
-	trace_rxrpc_call(call->debug_id, rxrpc_call_new_service,
-			 refcount_read(&call->ref),
-			 here, (const void *)user_call_ID);
+		spin_lock_bh(&rxnet->call_lock);
+		list_add_tail_rcu(&call->link, &rxnet->calls);
+		spin_unlock_bh(&rxnet->call_lock);
+
+		trace_rxrpc_call(call->debug_id, rxrpc_call_new_service,
+				 refcount_read(&call->ref), NULL,
+				 (void *)(unsigned long)head);
+
+		b->call_backlog[head++] = call;
+		head &= size - 1;
+		smp_store_release(&b->call_backlog_head, head);
+	}
+
+	return;
+
+nomem:
+	WRITE_ONCE(b->error, -ENOMEM);
+}
+
+/*
+ * Attempt to add a user call ID to the preallocation ring.
+ */
+static int rxrpc_service_charge_user_call_id(struct rxrpc_sock *rx,
+					     unsigned long user_call_ID)
+{
+	struct rxrpc_service *b = rx->service;
+	struct rxrpc_call *xcall;
+	struct rb_node *p;
+	unsigned int head, tail, size = RXRPC_BACKLOG_MAX;
+	unsigned int i;
+	int ret = -EBADSLT, max = b->max_tba;
+
+	_enter("%lx", user_call_ID);
+
+	if (!b)
+		return -EINVAL;
+
+	write_lock(&rx->call_lock);
+
+	/* Check the user ID isn't already in use in the active tree. */
+	p = rx->calls.rb_node;
+	while (p) {
+		xcall = rb_entry(p, struct rxrpc_call, sock_node);
+		if (user_call_ID < xcall->user_call_ID)
+			p = p->rb_left;
+		else if (user_call_ID > xcall->user_call_ID)
+			p = p->rb_right;
+		else
+			goto err;
+	}
+
+	/* We also need to check the preallocation ring. */
+	for (i = 0; i < size; i++)
+		if (user_call_ID == rx->call_id_backlog[i])
+			goto err;
+
+	ret = -ENOBUFS;
+	head = rx->call_id_backlog_head;
+	tail = READ_ONCE(rx->call_id_backlog_tail);
+
+	if (CIRC_CNT(head, tail, size) >= max)
+		goto err;
+
+	rx->call_id_backlog[head & (size - 1)] = user_call_ID;
+	smp_store_release(&rx->call_id_backlog_head, (head + 1) & (size - 1));
+
+	if (list_empty(&rx->accepting_link)) {
+		spin_lock_bh(&b->incoming_lock);
+		if (list_empty(&rx->accepting_link))
+			list_add_tail(&rx->accepting_link, &b->waiting_sockets);
+		spin_unlock_bh(&b->incoming_lock);
+	}			
+	ret = 0;
+
+err:
+	write_unlock(&rx->call_lock);
+	return ret;
+}
+
+/*
+ * Pick an ID for an incoming call and attach it to the socket.  This is only
+ * used for sockets opened by userspace.  Kernel sockets get the user ID set
+ * during preallocation.
+ */
+void rxrpc_accept_incoming_call(struct rxrpc_sock *rx)
+{
+	struct rxrpc_service *b = rx->service;
+	struct rxrpc_call *xcall, *call;
+	struct rb_node *parent, **pp;
+	const void *here = __builtin_return_address(0);
+	unsigned int head, tail, size = RXRPC_BACKLOG_MAX;
+
+	if (CIRC_CNT(rx->call_id_backlog_head, rx->call_id_backlog_tail, size) == 0)
+		return;
 
 	write_lock(&rx->call_lock);
 
-	/* Check the user ID isn't already in use */
+	/* Obtain an ID from the preallocation ring. */
+	head = smp_load_acquire(&rx->call_id_backlog_head);
+	tail = rx->call_id_backlog_tail;
+
+	if (CIRC_CNT(head, tail, size) == 0) {
+		write_unlock(&rx->call_lock);
+		return;
+	}
+
+	spin_lock_bh(&b->incoming_lock);
+
+	call = list_first_entry_or_null(&b->to_be_accepted,
+					struct rxrpc_call, recvmsg_link);
+	if (!call) {
+		spin_unlock_bh(&b->incoming_lock);
+		write_unlock(&rx->call_lock);
+		return;
+	}
+
+	write_lock(&rx->recvmsg_lock);
+	rxrpc_get_call(call, rxrpc_call_got_userid);
+	call->user_call_ID = rx->call_id_backlog[tail];
+	rcu_assign_pointer(call->socket, rx);
+	/* recvmsg_link mustn't be seen to be empty. */
+	list_move_tail(&call->recvmsg_link, &rx->recvmsg_q);
+	b->nr_tba--;
+	write_unlock(&rx->recvmsg_lock);
+
+	rx->call_id_backlog[tail] = 0;
+	tail = (tail + 1) & (size - 1);
+	smp_store_release(&rx->call_id_backlog_tail, tail);
+
+	if (CIRC_CNT(head, tail, size) == 0)
+		list_del_init(&rx->accepting_link);
+
+	spin_unlock_bh(&b->incoming_lock);
+
+	trace_rxrpc_call(call->debug_id, rxrpc_call_accepted,
+			 refcount_read(&call->ref),
+			 here, (const void *)call->user_call_ID);
+
+	/* Insert the ID */
+	set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
+
 	pp = &rx->calls.rb_node;
 	parent = NULL;
 	while (*pp) {
 		parent = *pp;
 		xcall = rb_entry(parent, struct rxrpc_call, sock_node);
-		if (user_call_ID < xcall->user_call_ID)
+		if (call->user_call_ID < xcall->user_call_ID)
 			pp = &(*pp)->rb_left;
-		else if (user_call_ID > xcall->user_call_ID)
+		else if (call->user_call_ID > xcall->user_call_ID)
 			pp = &(*pp)->rb_right;
 		else
 			goto id_in_use;
 	}
 
-	call->user_call_ID = user_call_ID;
-	call->notify_rx = notify_rx;
-	if (user_attach_call) {
-		rxrpc_get_call(call, rxrpc_call_got_kernel);
-		user_attach_call(call, user_call_ID);
-	}
-
-	rxrpc_get_call(call, rxrpc_call_got_userid);
 	rb_link_node(&call->sock_node, parent, pp);
 	rb_insert_color(&call->sock_node, &rx->calls);
-	set_bit(RXRPC_CALL_HAS_USERID, &call->flags);
 
+	rxrpc_get_call(call, rxrpc_call_get_socket_list);
 	list_add(&call->sock_link, &rx->sock_calls);
-
 	write_unlock(&rx->call_lock);
 
-	rxnet = call->rxnet;
-	spin_lock_bh(&rxnet->call_lock);
-	list_add_tail_rcu(&call->link, &rxnet->calls);
-	spin_unlock_bh(&rxnet->call_lock);
-
-	b->call_backlog[call_head] = call;
-	smp_store_release(&b->call_backlog_head, (call_head + 1) & (size - 1));
-	_leave(" = 0 [%d -> %lx]", call->debug_id, user_call_ID);
-	return 0;
+	_leave(" [%d -> %lx]", call->debug_id, call->user_call_ID);
+	return;
 
 id_in_use:
+	WARN_ON(1);
 	write_unlock(&rx->call_lock);
 	rxrpc_cleanup_call(call);
-	_leave(" = -EBADSLT");
-	return -EBADSLT;
+	return;
 }
 
 /*
- * Allocate the preallocation buffers for incoming service calls.  These must
- * be charged manually.
+ * Dispose of a service record.
  */
-int rxrpc_service_prealloc(struct rxrpc_sock *rx, gfp_t gfp)
+void rxrpc_put_service(struct rxrpc_net *rxnet, struct rxrpc_service *b)
 {
-	struct rxrpc_backlog *b = rx->backlog;
-
-	if (!b) {
-		b = kzalloc(sizeof(struct rxrpc_backlog), gfp);
-		if (!b)
-			return -ENOMEM;
-		rx->backlog = b;
+	if (b && refcount_dec_and_test(&b->ref)) {
+		key_put(b->securities);
+		kfree_rcu(b, rcu);
 	}
-
-	return 0;
 }
 
 /*
  * Discard the preallocation on a service.
  */
-void rxrpc_discard_prealloc(struct rxrpc_sock *rx)
+void rxrpc_deactivate_service(struct rxrpc_sock *rx)
 {
-	struct rxrpc_backlog *b = rx->backlog;
+	struct rxrpc_service *b = rx->service;
 	struct rxrpc_net *rxnet = rxrpc_net(sock_net(&rx->sk));
 	unsigned int size = RXRPC_BACKLOG_MAX, head, tail;
 
-	if (!b)
+	spin_lock_bh(&b->incoming_lock);
+	list_del_init(&rx->accepting_link);
+	spin_unlock_bh(&b->incoming_lock);
+
+	if (!refcount_dec_and_test(&rx->service->active))
 		return;
-	rx->backlog = NULL;
 
-	/* Make sure that there aren't any incoming calls in progress before we
-	 * clear the preallocation buffers.
+	kdebug("-- deactivate --");
+
+	/* Now that active is 0, make sure that there aren't any incoming calls
+	 * being set up before we clear the preallocation buffers.
 	 */
-	spin_lock_bh(&rx->incoming_lock);
-	spin_unlock_bh(&rx->incoming_lock);
+	spin_lock_bh(&b->incoming_lock);
+	spin_unlock_bh(&b->incoming_lock);
+
+	mutex_lock(&rx->local->services_lock);
+	list_del_rcu(&b->local_link);
+	mutex_unlock(&rx->local->services_lock);
+
+	cancel_work_sync(&b->preallocator);
 
 	head = b->peer_backlog_head;
 	tail = b->peer_backlog_tail;
@@ -218,23 +335,44 @@ void rxrpc_discard_prealloc(struct rxrpc_sock *rx)
 
 	head = b->call_backlog_head;
 	tail = b->call_backlog_tail;
+	kdebug("backlog %x %x", head, tail);
 	while (CIRC_CNT(head, tail, size) > 0) {
 		struct rxrpc_call *call = b->call_backlog[tail];
-		rcu_assign_pointer(call->socket, rx);
-		if (rx->discard_new_call) {
+
+		kdebug("discard c=%08x", call->debug_id);
+
+		trace_rxrpc_call(call->debug_id, rxrpc_call_discard,
+				 refcount_read(&call->ref),
+				 NULL, (const void *)call->user_call_ID);
+		if (b->discard_new_call) {
 			_debug("discard %lx", call->user_call_ID);
-			rx->discard_new_call(call, call->user_call_ID);
+			b->discard_new_call(call, call->user_call_ID);
 			if (call->notify_rx)
 				call->notify_rx = rxrpc_dummy_notify;
 			rxrpc_put_call(call, rxrpc_call_put_kernel);
 		}
+
 		rxrpc_call_completed(call);
-		rxrpc_release_call(rx, call);
+		rxrpc_release_call(call);
 		rxrpc_put_call(call, rxrpc_call_put);
 		tail = (tail + 1) & (size - 1);
 	}
 
-	kfree(b);
+	spin_lock_bh(&b->incoming_lock);
+	while (!list_empty(&b->to_be_accepted)) {
+		struct rxrpc_call *call =
+			list_entry(b->to_be_accepted.next,
+				   struct rxrpc_call, recvmsg_link);
+		list_del(&call->recvmsg_link);
+		spin_unlock_bh(&b->incoming_lock);
+		if (rxrpc_abort_call("SKR", call, 0, RX_CALL_DEAD, -ECONNRESET))
+			rxrpc_send_abort_packet(call);
+		rxrpc_release_call(call);
+		rxrpc_put_call(call, rxrpc_call_put);
+		spin_lock_bh(&b->incoming_lock);
+	}
+	spin_unlock_bh(&b->incoming_lock);
+
 }
 
 /*
@@ -257,14 +395,13 @@ static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb)
  * Allocate a new incoming call from the prealloc pool, along with a connection
  * and a peer as necessary.
  */
-static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
+static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_service *b,
 						    struct rxrpc_local *local,
 						    struct rxrpc_peer *peer,
 						    struct rxrpc_connection *conn,
 						    const struct rxrpc_security *sec,
 						    struct sk_buff *skb)
 {
-	struct rxrpc_backlog *b = rx->backlog;
 	struct rxrpc_call *call;
 	unsigned short call_head, conn_head, peer_head;
 	unsigned short call_tail, conn_tail, peer_tail;
@@ -298,7 +435,7 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
 					  (peer_tail + 1) &
 					  (RXRPC_BACKLOG_MAX - 1));
 
-			rxrpc_new_incoming_peer(rx, local, peer);
+			rxrpc_new_incoming_peer(local, peer);
 		}
 
 		/* Now allocate and set up the connection */
@@ -309,7 +446,7 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
 		conn->params.local = rxrpc_get_local(local);
 		conn->params.peer = peer;
 		rxrpc_see_connection(conn);
-		rxrpc_new_incoming_connection(rx, conn, sec, skb);
+		rxrpc_new_incoming_connection(b, conn, sec, skb);
 	} else {
 		rxrpc_get_connection(conn);
 	}
@@ -326,6 +463,7 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
 	call->security_ix = conn->security_ix;
 	call->peer = rxrpc_get_peer(conn->params.peer);
 	call->cong_cwnd = call->peer->cong_cwnd;
+	__set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
 	return call;
 }
 
@@ -345,20 +483,32 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
  * The call is returned with the user access mutex held.
  */
 struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
-					   struct rxrpc_sock *rx,
 					   struct sk_buff *skb)
 {
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	const struct rxrpc_service_ids *ids;
 	const struct rxrpc_security *sec = NULL;
 	struct rxrpc_connection *conn;
+	struct rxrpc_service *b;
 	struct rxrpc_peer *peer = NULL;
 	struct rxrpc_call *call = NULL;
+	unsigned int i;
 
 	_enter("");
 
-	spin_lock(&rx->incoming_lock);
-	if (rx->sk.sk_state == RXRPC_SERVER_LISTEN_DISABLED ||
-	    rx->sk.sk_state == RXRPC_CLOSE) {
+	list_for_each_entry_rcu(b, &local->services, local_link) {
+		ids = rcu_dereference(b->ids);
+		for (i = 0; i < ids->nr_ids; i++)
+			if (ids->ids[i].service_id == sp->hdr.serviceId)
+				goto found_service;
+	}
+	_leave(" = NULL [no srv]");
+	return NULL;
+
+found_service:
+	spin_lock(&b->incoming_lock);
+
+	if (refcount_read(&b->active) == 0) {
 		trace_rxrpc_abort(0, "CLS", sp->hdr.cid, sp->hdr.callNumber,
 				  sp->hdr.seq, RX_INVALID_OPERATION, ESHUTDOWN);
 		skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
@@ -366,20 +516,25 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 		goto no_call;
 	}
 
+	if (b->nr_tba >= b->max_tba) {
+		skb->mark = RXRPC_SKB_MARK_REJECT_BUSY;
+		goto no_call;
+	}
+
 	/* The peer, connection and call may all have sprung into existence due
 	 * to a duplicate packet being handled on another CPU in parallel, so
 	 * we have to recheck the routing.  However, we're now holding
-	 * rx->incoming_lock, so the values should remain stable.
+	 * incoming_lock, so the values should remain stable.
 	 */
 	conn = rxrpc_find_connection_rcu(local, skb, &peer);
 
 	if (!conn) {
-		sec = rxrpc_get_incoming_security(rx, skb);
+		sec = rxrpc_get_incoming_security(b, skb);
 		if (!sec)
 			goto no_call;
 	}
 
-	call = rxrpc_alloc_incoming_call(rx, local, peer, conn, sec, skb);
+	call = rxrpc_alloc_incoming_call(b, local, peer, conn, sec, skb);
 	if (!call) {
 		skb->mark = RXRPC_SKB_MARK_REJECT_BUSY;
 		goto no_call;
@@ -389,11 +544,14 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 			    sp->hdr.serial, sp->hdr.seq);
 
 	/* Make the call live. */
-	rxrpc_incoming_call(rx, call, skb);
+	rcu_assign_pointer(call->socket, b->kernel_sock);
+	rxrpc_incoming_call(call, skb);
 	conn = call->conn;
 
-	if (rx->notify_new_call)
-		rx->notify_new_call(&rx->sk, call, call->user_call_ID);
+	if (b->notify_new_call)
+		b->notify_new_call(b->kernel_sock, call, call->user_call_ID);
+	spin_unlock(&b->incoming_lock);
+	schedule_work(&b->preallocator);
 
 	spin_lock(&conn->state_lock);
 	switch (conn->state) {
@@ -422,7 +580,6 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 		BUG();
 	}
 	spin_unlock(&conn->state_lock);
-	spin_unlock(&rx->incoming_lock);
 
 	rxrpc_send_ping(call, skb);
 
@@ -431,13 +588,13 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
 	 * (recvmsg queue, to-be-accepted queue or user ID tree) or the kernel
 	 * service to prevent the call from being deallocated too early.
 	 */
-	rxrpc_put_call(call, rxrpc_call_put);
+	//rxrpc_put_call(call, rxrpc_call_put);
 
 	_leave(" = %p{%d}", call, call->debug_id);
 	return call;
 
 no_call:
-	spin_unlock(&rx->incoming_lock);
+	spin_unlock(&b->incoming_lock);
 	_leave(" = NULL [%u]", skb->mark);
 	return NULL;
 }
@@ -447,45 +604,14 @@ struct rxrpc_call *rxrpc_new_incoming_call(struct rxrpc_local *local,
  */
 int rxrpc_user_charge_accept(struct rxrpc_sock *rx, unsigned long user_call_ID)
 {
-	struct rxrpc_backlog *b = rx->backlog;
+	int ret;
 
 	if (rx->sk.sk_state == RXRPC_CLOSE)
 		return -ESHUTDOWN;
 
-	return rxrpc_service_prealloc_one(rx, b, NULL, NULL, user_call_ID,
-					  GFP_KERNEL,
-					  atomic_inc_return(&rxrpc_debug_id));
-}
-
-/*
- * rxrpc_kernel_charge_accept - Charge up socket with preallocated calls
- * @sock: The socket on which to preallocate
- * @notify_rx: Event notification function for the call
- * @user_attach_call: Func to attach call to user_call_ID
- * @user_call_ID: The tag to attach to the preallocated call
- * @gfp: The allocation conditions.
- * @debug_id: The tracing debug ID.
- *
- * Charge up the socket with preallocated calls, each with a user ID.  A
- * function should be provided to effect the attachment from the user's side.
- * The user is given a ref to hold on the call.
- *
- * Note that the call may be come connected before this function returns.
- */
-int rxrpc_kernel_charge_accept(struct socket *sock,
-			       rxrpc_notify_rx_t notify_rx,
-			       rxrpc_user_attach_call_t user_attach_call,
-			       unsigned long user_call_ID, gfp_t gfp,
-			       unsigned int debug_id)
-{
-	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
-	struct rxrpc_backlog *b = rx->backlog;
-
-	if (sock->sk->sk_state == RXRPC_CLOSE)
-		return -ESHUTDOWN;
+	ret = xchg(&rx->service->error, 0);
+	if (ret < 0)
+		return ret;
 
-	return rxrpc_service_prealloc_one(rx, b, notify_rx,
-					  user_attach_call, user_call_ID,
-					  gfp, debug_id);
+	return rxrpc_service_charge_user_call_id(rx, user_call_ID);
 }
-EXPORT_SYMBOL(rxrpc_kernel_charge_accept);
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 84d0a4109645..e90b205a6c0f 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -119,16 +119,20 @@ struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *rx,
 /*
  * allocate a new call
  */
-struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
-				    unsigned int debug_id)
+struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_local *local, gfp_t gfp,
+				    unsigned int debug_id,
+				    struct rxrpc_service *b)
 {
 	struct rxrpc_call *call;
-	struct rxrpc_net *rxnet = rxrpc_net(sock_net(&rx->sk));
+	struct rxrpc_net *rxnet = local->rxnet;
 
 	call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
 	if (!call)
 		return NULL;
 
+	call->debug_id = debug_id;
+	refcount_set(&call->ref, 1);
+
 	call->rxtx_buffer = kcalloc(RXRPC_RXTX_BUFF_SIZE,
 				    sizeof(struct sk_buff *),
 				    gfp);
@@ -139,12 +143,20 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	if (!call->rxtx_annotations)
 		goto nomem_2;
 
+	if (b && b->preallocate_call) {
+		call->user_call_ID = b->preallocate_call(b->kernel_sock,
+							 call, &call->debug_id);
+		if (!call->user_call_ID)
+			goto nomem_3;
+		rxrpc_get_call(call, rxrpc_call_got_kernel);
+	}
+
 	mutex_init(&call->user_mutex);
 
 	/* Prevent lockdep reporting a deadlock false positive between the afs
 	 * filesystem and sys_sendmsg() via the mmap sem.
 	 */
-	if (rx->sk.sk_kern_sock)
+	if (local->kernel)
 		lockdep_set_class(&call->user_mutex,
 				  &rxrpc_call_user_mutex_lock_class_key);
 
@@ -152,7 +164,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	INIT_WORK(&call->processor, &rxrpc_process_call);
 	INIT_LIST_HEAD(&call->link);
 	INIT_LIST_HEAD(&call->chan_wait_link);
-	INIT_LIST_HEAD(&call->accept_link);
 	INIT_LIST_HEAD(&call->recvmsg_link);
 	INIT_LIST_HEAD(&call->sock_link);
 	init_waitqueue_head(&call->waitq);
@@ -160,8 +171,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	spin_lock_init(&call->notify_lock);
 	spin_lock_init(&call->input_lock);
 	rwlock_init(&call->state_lock);
-	refcount_set(&call->ref, 1);
-	call->debug_id = debug_id;
 	call->tx_total_len = -1;
 	call->next_rx_timo = 20 * HZ;
 	call->next_req_timo = 1 * HZ;
@@ -176,11 +185,14 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	call->cong_cwnd = 2;
 	call->cong_ssthresh = RXRPC_RXTX_BUFF_SIZE - 1;
 
+	call->local = local;
 	call->rxnet = rxnet;
 	call->rtt_avail = RXRPC_CALL_RTT_AVAIL_MASK;
 	atomic_inc(&rxnet->nr_calls);
 	return call;
 
+nomem_3:
+	kfree(call->rxtx_annotations);
 nomem_2:
 	kfree(call->rxtx_buffer);
 nomem:
@@ -201,7 +213,7 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx,
 
 	_enter("");
 
-	call = rxrpc_alloc_call(rx, gfp, debug_id);
+	call = rxrpc_alloc_call(rx->local, gfp, debug_id, NULL);
 	if (!call)
 		return ERR_PTR(-ENOMEM);
 	call->state = RXRPC_CALL_CLIENT_AWAIT_CONN;
@@ -332,6 +344,8 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 	rxrpc_get_call(call, rxrpc_call_got_userid);
 	rb_link_node(&call->sock_node, parent, pp);
 	rb_insert_color(&call->sock_node, &rx->calls);
+
+	rxrpc_get_call(call, rxrpc_call_get_socket_list);
 	list_add(&call->sock_link, &rx->sock_calls);
 
 	write_unlock(&rx->call_lock);
@@ -373,7 +387,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 				    RX_CALL_DEAD, -EEXIST);
 	trace_rxrpc_call(call->debug_id, rxrpc_call_error,
 			 refcount_read(&call->ref), here, ERR_PTR(-EEXIST));
-	rxrpc_release_call(rx, call);
+	rxrpc_release_call(call);
 	mutex_unlock(&call->user_mutex);
 	rxrpc_put_call(call, rxrpc_call_put);
 	_leave(" = -EEXIST");
@@ -398,9 +412,7 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
  * Set up an incoming call.  call->conn points to the connection.
  * This is called in BH context and isn't allowed to fail.
  */
-void rxrpc_incoming_call(struct rxrpc_sock *rx,
-			 struct rxrpc_call *call,
-			 struct sk_buff *skb)
+void rxrpc_incoming_call(struct rxrpc_call *call, struct sk_buff *skb)
 {
 	struct rxrpc_connection *conn = call->conn;
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
@@ -408,7 +420,6 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
 
 	_enter(",%d", call->conn->debug_id);
 
-	rcu_assign_pointer(call->socket, rx);
 	call->call_id		= sp->hdr.callNumber;
 	call->service_id	= sp->hdr.serviceId;
 	call->cid		= sp->hdr.cid;
@@ -523,11 +534,12 @@ static void rxrpc_cleanup_ring(struct rxrpc_call *call)
 /*
  * Detach a call from its owning socket.
  */
-void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
+void rxrpc_release_call(struct rxrpc_call *call)
 {
 	const void *here = __builtin_return_address(0);
 	struct rxrpc_connection *conn = call->conn;
-	bool put = false;
+	struct rxrpc_sock *rx = rcu_access_pointer(call->socket);
+	bool put = false, put_sl = false;
 
 	_enter("{%d,%d}", call->debug_id, refcount_read(&call->ref));
 
@@ -545,34 +557,45 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 	rxrpc_put_call_slot(call);
 	rxrpc_delete_call_timer(call);
 
-	/* Make sure we don't get any more notifications */
-	write_lock_bh(&rx->recvmsg_lock);
+	if (rx) {
+		/* Make sure we don't get any more notifications */
+		write_lock_bh(&rx->recvmsg_lock);
 
-	if (!list_empty(&call->recvmsg_link)) {
-		_debug("unlinking once-pending call %p { e=%lx f=%lx }",
-		       call, call->events, call->flags);
-		list_del(&call->recvmsg_link);
-		put = true;
-	}
+		if (!list_empty(&call->recvmsg_link)) {
+			_debug("unlinking once-pending call %p { e=%lx f=%lx }",
+			       call, call->events, call->flags);
+			list_del(&call->recvmsg_link);
+			put = true;
+		}
 
-	/* list_empty() must return false in rxrpc_notify_socket() */
-	call->recvmsg_link.next = NULL;
-	call->recvmsg_link.prev = NULL;
+		/* list_empty() must return false in rxrpc_notify_socket() */
+		call->recvmsg_link.next = LIST_POISON1;
+		call->recvmsg_link.prev = LIST_POISON2;
 
-	write_unlock_bh(&rx->recvmsg_lock);
-	if (put)
-		rxrpc_put_call(call, rxrpc_call_put);
+		write_unlock_bh(&rx->recvmsg_lock);
+		if (put)
+			rxrpc_put_call(call, rxrpc_call_put);
 
-	write_lock(&rx->call_lock);
+		write_lock(&rx->call_lock);
 
-	if (test_and_clear_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
-		rb_erase(&call->sock_node, &rx->calls);
-		memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
-		rxrpc_put_call(call, rxrpc_call_put_userid);
-	}
+		put = test_and_clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
+		if (put) {
+			rb_erase(&call->sock_node, &rx->calls);
+			memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
+		}
 
-	list_del(&call->sock_link);
-	write_unlock(&rx->call_lock);
+		put_sl = !list_empty(&call->sock_link);
+		if (put_sl)
+			list_del_init(&call->sock_link);
+		write_unlock(&rx->call_lock);
+		if (put)
+			rxrpc_put_call(call, rxrpc_call_put_userid);
+		if (put_sl)
+			rxrpc_put_call(call, rxrpc_call_put_socket_list);
+	} else {
+		if (!list_empty(&call->recvmsg_link))
+			list_del(&call->recvmsg_link);
+	}
 
 	_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
 
@@ -592,24 +615,25 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
 
 	_enter("%p", rx);
 
-	while (!list_empty(&rx->to_be_accepted)) {
-		call = list_entry(rx->to_be_accepted.next,
-				  struct rxrpc_call, accept_link);
-		list_del(&call->accept_link);
-		rxrpc_abort_call("SKR", call, 0, RX_CALL_DEAD, -ECONNRESET);
-		rxrpc_put_call(call, rxrpc_call_put);
-	}
-
+	write_lock(&rx->call_lock);
 	while (!list_empty(&rx->sock_calls)) {
 		call = list_entry(rx->sock_calls.next,
 				  struct rxrpc_call, sock_link);
-		rxrpc_get_call(call, rxrpc_call_got);
-		rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, -ECONNRESET);
-		rxrpc_send_abort_packet(call);
-		rxrpc_release_call(rx, call);
-		rxrpc_put_call(call, rxrpc_call_put);
+		rxrpc_see_call(call);
+		list_del_init(&call->sock_link);
+		write_unlock(&rx->call_lock);
+
+		if (call->state != RXRPC_CALL_SERVER_PREALLOC &&
+		    call->state != RXRPC_CALL_COMPLETE &&
+		    rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, -ECONNRESET))
+			rxrpc_send_abort_packet(call);
+		rxrpc_release_call(call);
+		rxrpc_put_call(call, rxrpc_call_put_socket_list);
+
+		write_lock(&rx->call_lock);
 	}
 
+	write_unlock(&rx->call_lock);
 	_leave("");
 }
 
@@ -627,9 +651,8 @@ void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
 	ASSERT(call != NULL);
 
 	dead = __refcount_dec_and_test(&call->ref, &n);
-	trace_rxrpc_call(debug_id, op, n, here, NULL);
+	trace_rxrpc_call(debug_id, op, n - 1, here, NULL);
 	if (dead) {
-		_debug("call %d dead", call->debug_id);
 		ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
 
 		if (!list_empty(&call->link)) {
@@ -652,6 +675,7 @@ static void rxrpc_destroy_call(struct work_struct *work)
 
 	rxrpc_delete_call_timer(call);
 
+	rxrpc_put_service(rxnet, call->service);
 	rxrpc_put_connection(call->conn);
 	rxrpc_put_peer(call->peer);
 	kfree(call->rxtx_buffer);
@@ -687,7 +711,10 @@ void rxrpc_cleanup_call(struct rxrpc_call *call)
 	memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
 
 	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
-	ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
+	if (WARN_ON(!test_bit(RXRPC_CALL_RELEASED, &call->flags))) {
+		kdebug("### UNRELEASED c=%08x", call->debug_id);
+		return;
+	}
 
 	rxrpc_cleanup_ring(call);
 	rxrpc_free_skb(call->tx_pending, rxrpc_skb_cleaned);
diff --git a/net/rxrpc/conn_service.c b/net/rxrpc/conn_service.c
index 6e6aa02c6f9e..594173b8a02e 100644
--- a/net/rxrpc/conn_service.c
+++ b/net/rxrpc/conn_service.c
@@ -153,12 +153,14 @@ struct rxrpc_connection *rxrpc_prealloc_service_connection(struct rxrpc_net *rxn
  * Set up an incoming connection.  This is called in BH context with the RCU
  * read lock held.
  */
-void rxrpc_new_incoming_connection(struct rxrpc_sock *rx,
+void rxrpc_new_incoming_connection(const struct rxrpc_service *b,
 				   struct rxrpc_connection *conn,
 				   const struct rxrpc_security *sec,
 				   struct sk_buff *skb)
 {
+	const struct rxrpc_service_ids *ids;
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	unsigned int i;
 
 	_enter("");
 
@@ -178,9 +180,16 @@ void rxrpc_new_incoming_connection(struct rxrpc_sock *rx,
 	 * first packet on a new connection.  Once done, it applies to all
 	 * subsequent calls on that connection.
 	 */
-	if (sp->hdr.userStatus == RXRPC_USERSTATUS_SERVICE_UPGRADE &&
-	    conn->service_id == rx->service_upgrade.from)
-		conn->service_id = rx->service_upgrade.to;
+	if (sp->hdr.userStatus == RXRPC_USERSTATUS_SERVICE_UPGRADE) {
+		ids = rcu_dereference(b->ids);
+		for (i = 0; i < ids->nr_ids; i++) {
+			if (conn->service_id == ids->ids[i].service_id &&
+			    ids->ids[i].upgrade_to) {
+				conn->service_id = ids->ids[i].upgrade_to;
+				break;
+			}
+		}
+	}
 
 	/* Make the connection a target for incoming packets. */
 	rxrpc_publish_service_conn(conn->params.peer, conn);
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 721d847ba92b..5f3871357362 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1130,8 +1130,7 @@ static void rxrpc_input_call_packet(struct rxrpc_call *call,
  *
  * TODO: If callNumber > call_id + 1, renegotiate security.
  */
-static void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
-					  struct rxrpc_connection *conn,
+static void rxrpc_input_implicit_end_call(struct rxrpc_connection *conn,
 					  struct rxrpc_call *call)
 {
 	switch (READ_ONCE(call->state)) {
@@ -1149,9 +1148,9 @@ static void rxrpc_input_implicit_end_call(struct rxrpc_sock *rx,
 		break;
 	}
 
-	spin_lock(&rx->incoming_lock);
+	spin_lock(&call->input_lock);
 	__rxrpc_disconnect_call(conn, call);
-	spin_unlock(&rx->incoming_lock);
+	spin_unlock(&call->input_lock);
 }
 
 /*
@@ -1246,7 +1245,6 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 	struct rxrpc_call *call = NULL;
 	struct rxrpc_skb_priv *sp;
 	struct rxrpc_peer *peer = NULL;
-	struct rxrpc_sock *rx = NULL;
 	unsigned int channel;
 
 	_enter("%p", udp_sk);
@@ -1357,14 +1355,8 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 		 * that would begin a call are explicitly rejected and the rest
 		 * are just discarded.
 		 */
-		rx = rcu_dereference(local->service);
-		if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
-			    sp->hdr.serviceId != rx->second_service)) {
-			if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
-			    sp->hdr.seq == 1)
-				goto unsupported_service;
-			goto discard;
-		}
+		if (list_empty(&local->services))
+			goto unsupported_service;
 	}
 
 	conn = rxrpc_find_connection_rcu(local, skb, &peer);
@@ -1433,7 +1425,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 			if (rxrpc_to_client(sp))
 				goto reject_packet;
 			if (call)
-				rxrpc_input_implicit_end_call(rx, conn, call);
+				rxrpc_input_implicit_end_call(conn, call);
 			call = NULL;
 		}
 
@@ -1453,7 +1445,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 			goto bad_message;
 		if (sp->hdr.seq != 1)
 			goto discard;
-		call = rxrpc_new_incoming_call(local, rx, skb);
+		call = rxrpc_new_incoming_call(local, skb);
 		if (!call)
 			goto reject_packet;
 	}
@@ -1477,6 +1469,9 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
 	goto post_abort;
 
 unsupported_service:
+	if (sp->hdr.type != RXRPC_PACKET_TYPE_DATA ||
+	    sp->hdr.seq != 1)
+		goto discard;
 	trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
 			  RX_INVALID_OPERATION, EOPNOTSUPP);
 	skb->priority = RX_INVALID_OPERATION;
diff --git a/net/rxrpc/key.c b/net/rxrpc/key.c
index 8d2073e0e3da..bbb270a01810 100644
--- a/net/rxrpc/key.c
+++ b/net/rxrpc/key.c
@@ -452,7 +452,7 @@ int rxrpc_request_key(struct rxrpc_sock *rx, sockptr_t optval, int optlen)
 
 	_enter("");
 
-	if (optlen <= 0 || optlen > PAGE_SIZE - 1 || rx->securities)
+	if (optlen <= 0 || optlen > PAGE_SIZE - 1)
 		return -EINVAL;
 
 	description = memdup_sockptr_nul(optval, optlen);
@@ -689,7 +689,7 @@ static long rxrpc_read(const struct key *key,
 #undef ENCODE
 
 	ASSERTCMP(tok, ==, ntoks);
-	ASSERTCMP((char __user *) xdr - buffer, ==, size);
+	ASSERTCMP((unsigned long) xdr - (unsigned long)buffer, ==, size);
 	_leave(" = %zu", size);
 	return size;
 }
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index 96ecb7356c0f..2a01c79a51e0 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -73,15 +73,18 @@ static long rxrpc_local_cmp_key(const struct rxrpc_local *local,
  * Allocate a new local endpoint.
  */
 static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
-					     const struct sockaddr_rxrpc *srx)
+					     const struct sockaddr_rxrpc *srx,
+					     bool kernel)
 {
 	struct rxrpc_local *local;
 
 	local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
 	if (local) {
+		local->kernel = kernel;
 		refcount_set(&local->ref, 1);
 		atomic_set(&local->active_users, 1);
 		local->rxnet = rxnet;
+		INIT_LIST_HEAD(&local->services);
 		INIT_HLIST_NODE(&local->link);
 		INIT_WORK(&local->processor, rxrpc_local_processor);
 		init_rwsem(&local->defrag_sem);
@@ -90,7 +93,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
 		local->client_bundles = RB_ROOT;
 		spin_lock_init(&local->client_bundles_lock);
 		spin_lock_init(&local->lock);
-		rwlock_init(&local->services_lock);
+		mutex_init(&local->services_lock);
 		local->debug_id = atomic_inc_return(&rxrpc_debug_id);
 		memcpy(&local->srx, srx, sizeof(*srx));
 		local->srx.srx_service = 0;
@@ -176,7 +179,8 @@ static int rxrpc_open_socket(struct rxrpc_local *local, struct net *net)
  * Look up or create a new local endpoint using the specified local address.
  */
 struct rxrpc_local *rxrpc_lookup_local(struct net *net,
-				       const struct sockaddr_rxrpc *srx)
+				       const struct sockaddr_rxrpc *srx,
+				       bool kernel)
 {
 	struct rxrpc_local *local;
 	struct rxrpc_net *rxnet = rxrpc_net(net);
@@ -197,12 +201,12 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *net,
 		if (diff != 0)
 			continue;
 
-		/* Services aren't allowed to share transport sockets, so
-		 * reject that here.  It is possible that the object is dying -
-		 * but it may also still have the local transport address that
-		 * we want bound.
+		/* Userspace services aren't allowed to share transport sockets
+		 * with kernel services, so reject that here.  It is possible
+		 * that the object is dying - but it may also still have the
+		 * local transport address that we want bound.
 		 */
-		if (srx->srx_service) {
+		if (local->kernel != kernel) {
 			local = NULL;
 			goto addr_in_use;
 		}
@@ -219,7 +223,7 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *net,
 		goto found;
 	}
 
-	local = rxrpc_alloc_local(rxnet, srx);
+	local = rxrpc_alloc_local(rxnet, srx, kernel);
 	if (!local)
 		goto nomem;
 
@@ -379,7 +383,7 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
 
 	rxrpc_clean_up_local_conns(local);
 	rxrpc_service_connection_reaper(&rxnet->service_conn_reaper);
-	ASSERT(!local->service);
+	ASSERT(list_empty(&local->services));
 
 	if (socket) {
 		local->socket = NULL;
diff --git a/net/rxrpc/net_ns.c b/net/rxrpc/net_ns.c
index 4107099a259c..524265924f9f 100644
--- a/net/rxrpc/net_ns.c
+++ b/net/rxrpc/net_ns.c
@@ -45,6 +45,7 @@ static __net_init int rxrpc_init_net(struct net *net)
 	struct rxrpc_net *rxnet = rxrpc_net(net);
 	int ret, i;
 
+	rxnet->net = net;
 	rxnet->live = true;
 	get_random_bytes(&rxnet->epoch, sizeof(rxnet->epoch));
 	rxnet->epoch |= RXRPC_RANDOM_EPOCH;
diff --git a/net/rxrpc/peer_object.c b/net/rxrpc/peer_object.c
index 26d2ae9baaf2..9f250237c622 100644
--- a/net/rxrpc/peer_object.c
+++ b/net/rxrpc/peer_object.c
@@ -149,10 +149,8 @@ struct rxrpc_peer *rxrpc_lookup_peer_rcu(struct rxrpc_local *local,
  * assess the MTU size for the network interface through which this peer is
  * reached
  */
-static void rxrpc_assess_MTU_size(struct rxrpc_sock *rx,
-				  struct rxrpc_peer *peer)
+static void rxrpc_assess_MTU_size(struct rxrpc_peer *peer)
 {
-	struct net *net = sock_net(&rx->sk);
 	struct dst_entry *dst;
 	struct rtable *rt;
 	struct flowi fl;
@@ -160,6 +158,7 @@ static void rxrpc_assess_MTU_size(struct rxrpc_sock *rx,
 #ifdef CONFIG_AF_RXRPC_IPV6
 	struct flowi6 *fl6 = &fl.u.ip6;
 #endif
+	struct net *net = peer->local->rxnet->net;
 
 	peer->if_mtu = 1500;
 
@@ -243,11 +242,10 @@ struct rxrpc_peer *rxrpc_alloc_peer(struct rxrpc_local *local, gfp_t gfp)
 /*
  * Initialise peer record.
  */
-static void rxrpc_init_peer(struct rxrpc_sock *rx, struct rxrpc_peer *peer,
-			    unsigned long hash_key)
+static void rxrpc_init_peer(struct rxrpc_peer *peer, unsigned long hash_key)
 {
 	peer->hash_key = hash_key;
-	rxrpc_assess_MTU_size(rx, peer);
+	rxrpc_assess_MTU_size(peer);
 	peer->mtu = peer->if_mtu;
 	peer->rtt_last_req = ktime_get_real();
 
@@ -292,7 +290,7 @@ static struct rxrpc_peer *rxrpc_create_peer(struct rxrpc_sock *rx,
 	peer = rxrpc_alloc_peer(local, gfp);
 	if (peer) {
 		memcpy(&peer->srx, srx, sizeof(*srx));
-		rxrpc_init_peer(rx, peer, hash_key);
+		rxrpc_init_peer(peer, hash_key);
 	}
 
 	_leave(" = %p", peer);
@@ -310,14 +308,13 @@ static void rxrpc_free_peer(struct rxrpc_peer *peer)
  * since we've already done a search in the list from the non-reentrant context
  * (the data_ready handler) that is the only place we can add new peers.
  */
-void rxrpc_new_incoming_peer(struct rxrpc_sock *rx, struct rxrpc_local *local,
-			     struct rxrpc_peer *peer)
+void rxrpc_new_incoming_peer(struct rxrpc_local *local, struct rxrpc_peer *peer)
 {
 	struct rxrpc_net *rxnet = local->rxnet;
 	unsigned long hash_key;
 
 	hash_key = rxrpc_peer_hash_key(local, &peer->srx);
-	rxrpc_init_peer(rx, peer, hash_key);
+	rxrpc_init_peer(peer, hash_key);
 
 	spin_lock(&rxnet->peer_hash_lock);
 	hash_add_rcu(rxnet->peer_hash, &peer->hash_link, hash_key);
diff --git a/net/rxrpc/proc.c b/net/rxrpc/proc.c
index b92303012338..814a6aba4c17 100644
--- a/net/rxrpc/proc.c
+++ b/net/rxrpc/proc.c
@@ -49,7 +49,6 @@ static void rxrpc_call_seq_stop(struct seq_file *seq, void *v)
 static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
 {
 	struct rxrpc_local *local;
-	struct rxrpc_sock *rx;
 	struct rxrpc_peer *peer;
 	struct rxrpc_call *call;
 	struct rxrpc_net *rxnet = rxrpc_net(seq_file_net(seq));
@@ -68,16 +67,11 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
 
 	call = list_entry(v, struct rxrpc_call, link);
 
-	rx = rcu_dereference(call->socket);
-	if (rx) {
-		local = READ_ONCE(rx->local);
-		if (local)
-			sprintf(lbuff, "%pISpc", &local->srx.transport);
-		else
-			strcpy(lbuff, "no_local");
-	} else {
-		strcpy(lbuff, "no_socket");
-	}
+	local = call->local;
+	if (local)
+		sprintf(lbuff, "%pISpc", &local->srx.transport);
+	else
+		strcpy(lbuff, "no_local");
 
 	peer = call->peer;
 	if (peer)
@@ -403,22 +397,23 @@ const struct seq_operations rxrpc_local_seq_ops = {
  */
 static int rxrpc_socket_seq_show(struct seq_file *seq, void *v)
 {
+	struct rxrpc_service_ids *ids;
+	struct rxrpc_service *b;
 	struct rxrpc_local *local;
 	struct rxrpc_sock *rx;
 	struct list_head *p;
-	char lbuff[50];
-	unsigned int nr_calls = 0, nr_acc = 0, nr_attend = 0;
+	char lbuff[50], sep;
+	unsigned int nr_calls = 0, nr_acc, nr_attend = 0, i;
 
 	if (v == SEQ_START_TOKEN) {
 		seq_puts(seq,
 			 "Proto Local                                          "
-			 " Use Svc1 Svc2 nCal nAcc nAtn\n");
+			 " Use nCal nAcc nAtn Services\n");
 		return 0;
 	}
 
 	rx = hlist_entry(v, struct rxrpc_sock, ns_link);
 	local = rx->local;
-
 	if (local)
 		sprintf(lbuff, "%pISpc", &local->srx.transport);
 	else
@@ -428,11 +423,11 @@ static int rxrpc_socket_seq_show(struct seq_file *seq, void *v)
 	list_for_each(p, &rx->sock_calls) {
 		nr_calls++;
 	}
-	list_for_each(p, &rx->to_be_accepted) {
-		nr_acc++;
-	}
 	read_unlock(&rx->call_lock);
 
+	b = rx->service;
+	nr_acc = b ? b->nr_tba : 0;
+
 	read_lock_bh(&rx->recvmsg_lock);
 	list_for_each(p, &rx->recvmsg_q) {
 		nr_attend++;
@@ -440,12 +435,23 @@ static int rxrpc_socket_seq_show(struct seq_file *seq, void *v)
 	read_unlock_bh(&rx->recvmsg_lock);
 
 	seq_printf(seq,
-		   "UDP   %-47.47s %3d %4x %4x %4u %4u %4u\n",
+		   "UDP   %-47.47s %3d %4x %4x %4u",
 		   lbuff,
 		   refcount_read(&rx->sk.sk_refcnt),
-		   rx->srx.srx_service, rx->second_service,
 		   nr_calls, nr_acc, nr_attend);
 
+	if (b) {
+		sep = ' ';
+		ids = rcu_dereference(b->ids);
+		for (i = 0; i < ids->nr_ids; i++) {
+			seq_printf(seq, "%c%x", sep, ids->ids[i].service_id);
+			if (ids->ids[i].upgrade_to)
+				seq_printf(seq, ">%x", ids->ids[i].upgrade_to);
+			sep = ',';
+		}
+	}
+
+	seq_putc(seq, '\n');
 	return 0;
 }
 
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 250f23bc1c07..2b596e2172ce 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -32,28 +32,50 @@ void rxrpc_notify_socket(struct rxrpc_call *call)
 
 	rcu_read_lock();
 
+	if (call->notify_rx) {
+		spin_lock_bh(&call->notify_lock);
+		call->notify_rx(sk, call, call->user_call_ID);
+		spin_unlock_bh(&call->notify_lock);
+		goto out;
+	}
+
 	rx = rcu_dereference(call->socket);
-	sk = &rx->sk;
-	if (rx && sk->sk_state < RXRPC_CLOSE) {
-		if (call->notify_rx) {
-			spin_lock_bh(&call->notify_lock);
-			call->notify_rx(sk, call, call->user_call_ID);
-			spin_unlock_bh(&call->notify_lock);
-		} else {
-			write_lock_bh(&rx->recvmsg_lock);
-			if (list_empty(&call->recvmsg_link)) {
-				rxrpc_get_call(call, rxrpc_call_got);
-				list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
-			}
-			write_unlock_bh(&rx->recvmsg_lock);
+	if (rx) {
+		sk = &rx->sk;
+		if (sk->sk_state >= RXRPC_CLOSE)
+			goto out;
 
-			if (!sock_flag(sk, SOCK_DEAD)) {
-				_debug("call %ps", sk->sk_data_ready);
+		write_lock_bh(&rx->recvmsg_lock);
+		if (list_empty(&call->recvmsg_link)) {
+			rxrpc_get_call(call, rxrpc_call_got);
+			list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
+		}
+		write_unlock_bh(&rx->recvmsg_lock);
+
+		if (!sock_flag(sk, SOCK_DEAD)) {
+			_debug("call %ps", sk->sk_data_ready);
+			sk->sk_data_ready(sk);
+		}
+		goto out;
+	}
+
+	if (call->service) {
+		struct rxrpc_service *b = call->service;
+
+		spin_lock_bh(&b->incoming_lock);
+		if (list_empty(&call->recvmsg_link)) {
+			list_add_tail(&call->recvmsg_link, &b->to_be_accepted);
+			b->nr_tba++;
+		}
+		list_for_each_entry(rx, &b->waiting_sockets, accepting_link) {
+			sk = &rx->sk;
+			if (!sock_flag(sk, SOCK_DEAD))
 				sk->sk_data_ready(sk);
-			}
 		}
+		spin_unlock_bh(&b->incoming_lock);
 	}
 
+out:
 	rcu_read_unlock();
 	_leave("");
 }
@@ -505,6 +527,11 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 try_again:
 	lock_sock(&rx->sk);
 
+	/* If there's a call we can accept, add that to the queue. */
+	if (rx->sk.sk_state == RXRPC_SERVER_LISTENING &&
+	    !list_empty(&rx->service->to_be_accepted))
+		rxrpc_accept_incoming_call(rx);		
+
 	/* Return immediately if a client socket has no outstanding calls */
 	if (RB_EMPTY_ROOT(&rx->calls) &&
 	    list_empty(&rx->recvmsg_q) &&
@@ -587,6 +614,12 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 			goto error_unlock_call;
 	}
 
+	if (test_and_clear_bit(RXRPC_CALL_NEWLY_ACCEPTED, &call->flags)) {
+		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, NULL);
+		if (ret < 0)
+			goto error_unlock_call;
+	}
+
 	if (msg->msg_name && call->peer) {
 		struct sockaddr_rxrpc *srx = msg->msg_name;
 		size_t len = sizeof(call->peer->srx);
@@ -622,7 +655,7 @@ int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 		if (ret < 0)
 			goto error_unlock_call;
 		if (!(flags & MSG_PEEK))
-			rxrpc_release_call(rx, call);
+			rxrpc_release_call(call);
 		msg->msg_flags |= MSG_EOR;
 		ret = 1;
 	}
diff --git a/net/rxrpc/security.c b/net/rxrpc/security.c
index 50cb5f1ee0c0..0d4c049ede58 100644
--- a/net/rxrpc/security.c
+++ b/net/rxrpc/security.c
@@ -104,7 +104,7 @@ int rxrpc_init_client_conn_security(struct rxrpc_connection *conn)
 /*
  * Set the ops a server connection.
  */
-const struct rxrpc_security *rxrpc_get_incoming_security(struct rxrpc_sock *rx,
+const struct rxrpc_security *rxrpc_get_incoming_security(struct rxrpc_service *b,
 							 struct sk_buff *skb)
 {
 	const struct rxrpc_security *sec;
@@ -123,7 +123,7 @@ const struct rxrpc_security *rxrpc_get_incoming_security(struct rxrpc_sock *rx,
 	}
 
 	if (sp->hdr.securityIndex != RXRPC_SECURITY_NONE &&
-	    !rx->securities) {
+	    !b->securities) {
 		trace_rxrpc_abort(0, "SVR",
 				  sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
 				  RX_INVALID_OPERATION, EKEYREJECTED);
@@ -143,8 +143,11 @@ struct key *rxrpc_look_up_server_security(struct rxrpc_connection *conn,
 					  u32 kvno, u32 enctype)
 {
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
-	struct rxrpc_sock *rx;
+	const struct rxrpc_service_ids *ids;
+	const struct rxrpc_service *b;
+	const struct rxrpc_local *local = conn->params.local;
 	struct key *key = ERR_PTR(-EKEYREJECTED);
+	unsigned int i;
 	key_ref_t kref = NULL;
 	char kdesc[5 + 1 + 3 + 1 + 12 + 1 + 12 + 1];
 	int ret;
@@ -163,12 +166,17 @@ struct key *rxrpc_look_up_server_security(struct rxrpc_connection *conn,
 
 	rcu_read_lock();
 
-	rx = rcu_dereference(conn->params.local->service);
-	if (!rx)
-		goto out;
+	list_for_each_entry_rcu(b, &local->services, local_link) {
+		ids = rcu_dereference(b->ids);
+		for (i = 0; i < ids->nr_ids; i++)
+			if (ids->ids[i].service_id == sp->hdr.serviceId)
+				goto found_service;
+	}
+	goto out;
 
+found_service:
 	/* look through the service's keyring */
-	kref = keyring_search(make_key_ref(rx->securities, 1UL),
+	kref = keyring_search(make_key_ref(b->securities, 1UL),
 			      &key_type_rxrpc_s, kdesc, true);
 	if (IS_ERR(kref)) {
 		key = ERR_CAST(kref);
diff --git a/net/rxrpc/server_key.c b/net/rxrpc/server_key.c
index ee269e0e6ee8..017837940b0d 100644
--- a/net/rxrpc/server_key.c
+++ b/net/rxrpc/server_key.c
@@ -122,10 +122,11 @@ int rxrpc_server_keyring(struct rxrpc_sock *rx, sockptr_t optval, int optlen)
 {
 	struct key *key;
 	char *description;
+	int ret = -EBUSY;
 
 	_enter("");
 
-	if (optlen <= 0 || optlen > PAGE_SIZE - 1)
+	if (optlen <= 0 || optlen > PAGE_SIZE - 1 || rx->service)
 		return -EINVAL;
 
 	description = memdup_sockptr_nul(optval, optlen);
@@ -139,8 +140,13 @@ int rxrpc_server_keyring(struct rxrpc_sock *rx, sockptr_t optval, int optlen)
 		return PTR_ERR(key);
 	}
 
-	rx->securities = key;
 	kfree(description);
+
+	if (cmpxchg(&rx->service->securities, NULL, key) == NULL)
+		ret = 0;
+	else
+		key_put(key);
+
 	_leave(" = 0 [key %x]", key->serial);
-	return 0;
+	return ret;
 }


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ