lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <20210205225204.32902-22-mgross@linux.intel.com>
Date:   Fri,  5 Feb 2021 14:51:51 -0800
From:   mgross@...ux.intel.com
To:     markgross@...nel.org, mgross@...ux.intel.com, arnd@...db.de,
        bp@...e.de, damien.lemoal@....com, dragan.cvetic@...inx.com,
        gregkh@...uxfoundation.org, corbet@....net,
        palmerdabbelt@...gle.com, paul.walmsley@...ive.com,
        peng.fan@....com, robh+dt@...nel.org, shawnguo@...nel.org,
        jassisinghbrar@...il.com
Cc:     linux-kernel@...r.kernel.org, Seamus Kelly <seamus.kelly@...el.com>
Subject: [PATCH v5 21/34] xlink-core: Enable xlink protocol over pcie

From: Seamus Kelly <seamus.kelly@...el.com>

Enable host system access to the VPU over the xlink protocol over PCIe by
enabling channel multiplexing and dispatching.  This allows for remote host
communication channels across pcie links.

    add dispatcher
    update multiplexer to utilise dispatcher

        xlink-core: Patch set 2

        Add xlink-dispatcher
                creates tx and rx threads
                enables queueing of messages for transmission and on reception

        Update multiplexer to utilise dispatcher:
                handle multiplexing channels over single interface link e.g. PCIe
                process messages received by dispatcher
                pass messages created by API calls to dispatcher for transmission


Cc: Arnd Bergmann <arnd@...db.de>
Cc: Greg Kroah-Hartman <gregkh@...uxfoundation.org>
Reviewed-by: Mark Gross <mgross@...ux.intel.com>
Signed-off-by: Mark Gross <mgross@...ux.intel.com>
Signed-off-by: Seamus Kelly <seamus.kelly@...el.com>
---
 drivers/misc/xlink-core/Makefile            |   2 +-
 drivers/misc/xlink-core/xlink-core.c        |  35 +-
 drivers/misc/xlink-core/xlink-dispatcher.c  | 441 +++++++++++++++++
 drivers/misc/xlink-core/xlink-dispatcher.h  |  26 +
 drivers/misc/xlink-core/xlink-multiplexer.c | 498 +++++++++++++++++++-
 5 files changed, 999 insertions(+), 3 deletions(-)
 create mode 100644 drivers/misc/xlink-core/xlink-dispatcher.c
 create mode 100644 drivers/misc/xlink-core/xlink-dispatcher.h

diff --git a/drivers/misc/xlink-core/Makefile b/drivers/misc/xlink-core/Makefile
index e82b7c72b6b9..ee81f9d05f2b 100644
--- a/drivers/misc/xlink-core/Makefile
+++ b/drivers/misc/xlink-core/Makefile
@@ -2,4 +2,4 @@
 # Makefile for Keem Bay xlink Linux driver
 #
 obj-$(CONFIG_XLINK_CORE) += xlink.o
-xlink-objs += xlink-core.o xlink-multiplexer.o xlink-platform.o xlink-ioctl.o
+xlink-objs += xlink-core.o xlink-multiplexer.o xlink-dispatcher.o xlink-platform.o xlink-ioctl.o
diff --git a/drivers/misc/xlink-core/xlink-core.c b/drivers/misc/xlink-core/xlink-core.c
index dd8db834c184..bdbf8c6a99ca 100644
--- a/drivers/misc/xlink-core/xlink-core.c
+++ b/drivers/misc/xlink-core/xlink-core.c
@@ -21,6 +21,7 @@
 
 #include "xlink-core.h"
 #include "xlink-defs.h"
+#include "xlink-dispatcher.h"
 #include "xlink-ioctl.h"
 #include "xlink-multiplexer.h"
 #include "xlink-platform.h"
@@ -151,6 +152,12 @@ static int kmb_xlink_probe(struct platform_device *pdev)
 		goto r_multiplexer;
 	}
 
+	// initialize dispatcher
+	rc = xlink_dispatcher_init(xlink_dev->pdev);
+	if (rc != X_LINK_SUCCESS) {
+		pr_err("Dispatcher initialization failed\n");
+		goto r_dispatcher;
+	}
 	// initialize xlink data structure
 	xlink_dev->nmb_connected_links = 0;
 	mutex_init(&xlink_dev->lock);
@@ -168,7 +175,7 @@ static int kmb_xlink_probe(struct platform_device *pdev)
 	/*Allocating Major number*/
 	if ((alloc_chrdev_region(&xdev, 0, 1, "xlinkdev")) < 0) {
 		dev_info(&pdev->dev, "Cannot allocate major number\n");
-		goto r_multiplexer;
+		goto r_dispatcher;
 	}
 	dev_info(&pdev->dev, "Major = %d Minor = %d\n", MAJOR(xdev),
 		 MINOR(xdev));
@@ -205,6 +212,8 @@ static int kmb_xlink_probe(struct platform_device *pdev)
 	class_destroy(dev_class);
 r_class:
 	unregister_chrdev_region(xdev, 1);
+r_dispatcher:
+	xlink_dispatcher_destroy();
 r_multiplexer:
 	xlink_multiplexer_destroy();
 	return -1;
@@ -220,6 +229,10 @@ static int kmb_xlink_remove(struct platform_device *pdev)
 	rc = xlink_multiplexer_destroy();
 	if (rc != X_LINK_SUCCESS)
 		pr_err("Multiplexer destroy failed\n");
+	// stop dispatchers and destroy
+	rc = xlink_dispatcher_destroy();
+	if (rc != X_LINK_SUCCESS)
+		pr_err("Dispatcher destroy failed\n");
 
 	mutex_unlock(&xlink->lock);
 	mutex_destroy(&xlink->lock);
@@ -314,6 +327,14 @@ enum xlink_error xlink_connect(struct xlink_handle *handle)
 		link->handle = *handle;
 		xlink->nmb_connected_links++;
 		kref_init(&link->refcount);
+		if (interface != IPC_INTERFACE) {
+			// start dispatcher
+			rc = xlink_dispatcher_start(link->id, &link->handle);
+			if (rc) {
+				pr_err("dispatcher start failed\n");
+				goto r_cleanup;
+			}
+		}
 		// initialize multiplexer connection
 		rc = xlink_multiplexer_connect(link->id);
 		if (rc) {
@@ -649,6 +670,7 @@ EXPORT_SYMBOL_GPL(xlink_release_data);
 enum xlink_error xlink_disconnect(struct xlink_handle *handle)
 {
 	struct xlink_link *link;
+	int interface = NULL_INTERFACE;
 	enum xlink_error rc = X_LINK_ERROR;
 
 	if (!xlink || !handle)
@@ -661,6 +683,17 @@ enum xlink_error xlink_disconnect(struct xlink_handle *handle)
 	// decrement refcount, if count is 0 lock mutex and disconnect
 	if (kref_put_mutex(&link->refcount, release_after_kref_put,
 			   &xlink->lock)) {
+		// stop dispatcher
+		interface = get_interface_from_sw_device_id(link->handle.sw_device_id);
+		if (interface != IPC_INTERFACE) {
+			// stop dispatcher
+			rc = xlink_dispatcher_stop(link->id);
+			if (rc != X_LINK_SUCCESS) {
+				pr_err("dispatcher stop failed\n");
+				mutex_unlock(&xlink->lock);
+				return X_LINK_ERROR;
+			}
+		}
 		// deinitialize multiplexer connection
 		rc = xlink_multiplexer_disconnect(link->id);
 		if (rc) {
diff --git a/drivers/misc/xlink-core/xlink-dispatcher.c b/drivers/misc/xlink-core/xlink-dispatcher.c
new file mode 100644
index 000000000000..11ef8e4110ca
--- /dev/null
+++ b/drivers/misc/xlink-core/xlink-dispatcher.c
@@ -0,0 +1,441 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * xlink Dispatcher.
+ *
+ * Copyright (C) 2018-2019 Intel Corporation
+ *
+ */
+#include <linux/init.h>
+#include <linux/module.h>
+#include <linux/device.h>
+#include <linux/kernel.h>
+#include <linux/slab.h>
+#include <linux/kthread.h>
+#include <linux/list.h>
+#include <linux/semaphore.h>
+#include <linux/mutex.h>
+#include <linux/completion.h>
+#include <linux/sched/signal.h>
+#include <linux/platform_device.h>
+
+#include "xlink-dispatcher.h"
+#include "xlink-multiplexer.h"
+#include "xlink-platform.h"
+
+#define DISPATCHER_RX_TIMEOUT_MSEC 0
+
+/* state of a dispatcher servicing a link to a device*/
+enum dispatcher_state {
+	XLINK_DISPATCHER_INIT,		/* initialized but not used */
+	XLINK_DISPATCHER_RUNNING,	/* currently servicing a link */
+	XLINK_DISPATCHER_STOPPED,	/* no longer servicing a link */
+	XLINK_DISPATCHER_ERROR,		/* fatal error */
+};
+
+/* queue for dispatcher tx thread event handling */
+struct event_queue {
+	u32 count;		/* number of events in the queue */
+	u32 capacity;		/* capacity of events in the queue */
+	struct list_head head;	/* head of event linked list */
+	struct mutex lock;	/* locks queue while accessing */
+};
+
+/* dispatcher servicing a single link to a device */
+struct dispatcher {
+	u32 link_id;			/* id of link being serviced */
+	enum dispatcher_state state;	/* state of the dispatcher */
+	struct xlink_handle *handle;	/* xlink device handle */
+	int interface;			/* underlying interface of link */
+	struct task_struct *rxthread;	/* kthread servicing rx */
+	struct task_struct *txthread;	/* kthread servicing tx */
+	struct event_queue queue;	/* xlink event queue */
+	struct semaphore event_sem;	/* signals tx kthread of events */
+	struct completion rx_done;	/* sync start/stop of rx kthread */
+	struct completion tx_done;	/* sync start/stop of tx thread */
+};
+
+/* xlink dispatcher system component */
+struct xlink_dispatcher {
+	struct dispatcher dispatchers[XLINK_MAX_CONNECTIONS];	/* disp queue */
+	struct device *dev;					/* deallocate data */
+	struct mutex lock;					/* locks when start new disp */
+};
+
+/* global reference to the xlink dispatcher data structure */
+static struct xlink_dispatcher *xlinkd;
+
+/*
+ * Dispatcher Internal Functions
+ *
+ */
+
+static struct dispatcher *get_dispatcher_by_id(u32 id)
+{
+	if (!xlinkd)
+		return NULL;
+
+	if (id >= XLINK_MAX_CONNECTIONS)
+		return NULL;
+
+	return &xlinkd->dispatchers[id];
+}
+
+static u32 event_generate_id(void)
+{
+	static u32 id = 0xa; // TODO: temporary solution
+
+	return id++;
+}
+
+static struct xlink_event *event_dequeue(struct event_queue *queue)
+{
+	struct xlink_event *event = NULL;
+
+	mutex_lock(&queue->lock);
+	if (!list_empty(&queue->head)) {
+		event = list_first_entry(&queue->head, struct xlink_event,
+					 list);
+		list_del(&event->list);
+		queue->count--;
+	}
+	mutex_unlock(&queue->lock);
+	return event;
+}
+
+static int event_enqueue(struct event_queue *queue, struct xlink_event *event)
+{
+	int rc = -1;
+
+	mutex_lock(&queue->lock);
+	if (queue->count < ((queue->capacity / 10) * 7)) {
+		list_add_tail(&event->list, &queue->head);
+		queue->count++;
+		rc = 0;
+	}
+	mutex_unlock(&queue->lock);
+	return rc;
+}
+
+static struct xlink_event *dispatcher_event_get(struct dispatcher *disp)
+{
+	int rc = 0;
+	struct xlink_event *event = NULL;
+
+	// wait until an event is available
+	rc = down_interruptible(&disp->event_sem);
+	// dequeue and return next event to process
+	if (!rc)
+		event = event_dequeue(&disp->queue);
+	return event;
+}
+
+static int is_valid_event_header(struct xlink_event *event)
+{
+	if (event->header.magic != XLINK_EVENT_HEADER_MAGIC)
+		return 0;
+	else
+		return 1;
+}
+
+static int dispatcher_event_send(struct xlink_event *event)
+{
+	size_t event_header_size = sizeof(event->header);
+	int rc;
+
+	// write event header
+	// printk(KERN_DEBUG "Sending event: type = 0x%x, id = 0x%x\n",
+			// event->header.type, event->header.id);
+	rc = xlink_platform_write(event->interface,
+				  event->handle->sw_device_id, &event->header,
+				  &event_header_size, event->header.timeout, NULL);
+	if (rc || event_header_size != sizeof(event->header)) {
+		pr_err("Write header failed %d\n", rc);
+		return rc;
+	}
+	if (event->header.type == XLINK_WRITE_REQ ||
+	    event->header.type == XLINK_WRITE_VOLATILE_REQ) {
+		// write event data
+		rc = xlink_platform_write(event->interface,
+					  event->handle->sw_device_id, event->data,
+					  &event->header.size, event->header.timeout,
+					  NULL);
+		if (rc)
+			pr_err("Write data failed %d\n", rc);
+		if (event->user_data == 1) {
+			if (event->paddr != 0) {
+				xlink_platform_deallocate(xlinkd->dev,
+							  event->data, event->paddr,
+							  event->header.size,
+							  XLINK_PACKET_ALIGNMENT,
+							  XLINK_CMA_MEMORY);
+			} else {
+				xlink_platform_deallocate(xlinkd->dev,
+							  event->data, event->paddr,
+							  event->header.size,
+							  XLINK_PACKET_ALIGNMENT,
+							  XLINK_NORMAL_MEMORY);
+			}
+		}
+	}
+	return rc;
+}
+
+static int xlink_dispatcher_rxthread(void *context)
+{
+	struct dispatcher *disp = (struct dispatcher *)context;
+	struct xlink_event *event;
+	size_t size;
+	int rc;
+
+	// printk(KERN_DEBUG "dispatcher rxthread started\n");
+	event = xlink_create_event(disp->link_id, 0, disp->handle, 0, 0, 0);
+	if (!event)
+		return -1;
+
+	allow_signal(SIGTERM); // allow thread termination while waiting on sem
+	complete(&disp->rx_done);
+	while (!kthread_should_stop()) {
+		size = sizeof(event->header);
+		rc = xlink_platform_read(disp->interface,
+					 disp->handle->sw_device_id,
+					 &event->header, &size,
+					 DISPATCHER_RX_TIMEOUT_MSEC, NULL);
+		if (rc || size != (int)sizeof(event->header))
+			continue;
+		if (is_valid_event_header(event)) {
+			event->link_id = disp->link_id;
+			rc = xlink_multiplexer_rx(event);
+			if (!rc) {
+				event = xlink_create_event(disp->link_id, 0,
+							   disp->handle, 0, 0,
+							   0);
+				if (!event)
+					return -1;
+			}
+		}
+	}
+	// printk(KERN_INFO "dispatcher rxthread stopped\n");
+	complete(&disp->rx_done);
+	do_exit(0);
+	return 0;
+}
+
+static int xlink_dispatcher_txthread(void *context)
+{
+	struct dispatcher *disp = (struct dispatcher *)context;
+	struct xlink_event *event;
+
+	// printk(KERN_DEBUG "dispatcher txthread started\n");
+	allow_signal(SIGTERM); // allow thread termination while waiting on sem
+	complete(&disp->tx_done);
+	while (!kthread_should_stop()) {
+		event = dispatcher_event_get(disp);
+		if (!event)
+			continue;
+
+		dispatcher_event_send(event);
+		xlink_destroy_event(event); // free handled event
+	}
+	// printk(KERN_INFO "dispatcher txthread stopped\n");
+	complete(&disp->tx_done);
+	do_exit(0);
+	return 0;
+}
+
+/*
+ * Dispatcher External Functions
+ *
+ */
+
+enum xlink_error xlink_dispatcher_init(void *dev)
+{
+	struct platform_device *plat_dev = (struct platform_device *)dev;
+	int i;
+
+	xlinkd = kzalloc(sizeof(*xlinkd), GFP_KERNEL);
+	if (!xlinkd)
+		return X_LINK_ERROR;
+
+	xlinkd->dev = &plat_dev->dev;
+	for (i = 0; i < XLINK_MAX_CONNECTIONS; i++) {
+		xlinkd->dispatchers[i].link_id = i;
+		sema_init(&xlinkd->dispatchers[i].event_sem, 0);
+		init_completion(&xlinkd->dispatchers[i].rx_done);
+		init_completion(&xlinkd->dispatchers[i].tx_done);
+		INIT_LIST_HEAD(&xlinkd->dispatchers[i].queue.head);
+		mutex_init(&xlinkd->dispatchers[i].queue.lock);
+		xlinkd->dispatchers[i].queue.count = 0;
+		xlinkd->dispatchers[i].queue.capacity =
+				XLINK_EVENT_QUEUE_CAPACITY;
+		xlinkd->dispatchers[i].state = XLINK_DISPATCHER_INIT;
+	}
+	mutex_init(&xlinkd->lock);
+
+	return X_LINK_SUCCESS;
+}
+
+enum xlink_error xlink_dispatcher_start(int id, struct xlink_handle *handle)
+{
+	struct dispatcher *disp;
+
+	mutex_lock(&xlinkd->lock);
+	// get dispatcher by link id
+	disp = get_dispatcher_by_id(id);
+	if (!disp)
+		goto r_error;
+
+	// cannot start a running or failed dispatcher
+	if (disp->state == XLINK_DISPATCHER_RUNNING ||
+	    disp->state == XLINK_DISPATCHER_ERROR)
+		goto r_error;
+
+	// set the dispatcher context
+	disp->handle = handle;
+	disp->interface = get_interface_from_sw_device_id(handle->sw_device_id);
+
+	// run dispatcher thread to handle and write outgoing packets
+	disp->txthread = kthread_run(xlink_dispatcher_txthread,
+				     (void *)disp, "txthread");
+	if (!disp->txthread) {
+		pr_err("xlink txthread creation failed\n");
+		goto r_txthread;
+	}
+	wait_for_completion(&disp->tx_done);
+	disp->state = XLINK_DISPATCHER_RUNNING;
+	// run dispatcher thread to read and handle incoming packets
+	disp->rxthread = kthread_run(xlink_dispatcher_rxthread,
+				     (void *)disp, "rxthread");
+	if (!disp->rxthread) {
+		pr_err("xlink rxthread creation failed\n");
+		goto r_rxthread;
+	}
+	wait_for_completion(&disp->rx_done);
+	mutex_unlock(&xlinkd->lock);
+
+	return X_LINK_SUCCESS;
+
+r_rxthread:
+	kthread_stop(disp->txthread);
+r_txthread:
+	disp->state = XLINK_DISPATCHER_STOPPED;
+r_error:
+	mutex_unlock(&xlinkd->lock);
+	return X_LINK_ERROR;
+}
+
+enum xlink_error xlink_dispatcher_event_add(enum xlink_event_origin origin,
+					    struct xlink_event *event)
+{
+	struct dispatcher *disp;
+	int rc;
+
+	// get dispatcher by handle
+	disp = get_dispatcher_by_id(event->link_id);
+	if (!disp)
+		return X_LINK_ERROR;
+
+	// only add events if the dispatcher is running
+	if (disp->state != XLINK_DISPATCHER_RUNNING)
+		return X_LINK_ERROR;
+
+	// configure event and add to queue
+	if (origin == EVENT_TX)
+		event->header.id = event_generate_id();
+	event->origin = origin;
+	rc = event_enqueue(&disp->queue, event);
+	if (rc)
+		return X_LINK_CHAN_FULL;
+
+	// notify dispatcher tx thread of new event
+	up(&disp->event_sem);
+	return X_LINK_SUCCESS;
+}
+
+enum xlink_error xlink_dispatcher_stop(int id)
+{
+	struct dispatcher *disp;
+	int rc;
+
+	mutex_lock(&xlinkd->lock);
+	// get dispatcher by link id
+	disp = get_dispatcher_by_id(id);
+	if (!disp)
+		goto r_error;
+
+	// don't stop dispatcher if not started
+	if (disp->state != XLINK_DISPATCHER_RUNNING)
+		goto r_error;
+
+	if (disp->rxthread) {
+		// stop dispatcher rx thread
+		send_sig(SIGTERM, disp->rxthread, 0);
+		rc = kthread_stop(disp->rxthread);
+		if (rc)
+			goto r_thread;
+	}
+	wait_for_completion(&disp->rx_done);
+	if (disp->txthread) {
+		// stop dispatcher tx thread
+		send_sig(SIGTERM, disp->txthread, 0);
+		rc = kthread_stop(disp->txthread);
+		if (rc)
+			goto r_thread;
+	}
+	wait_for_completion(&disp->tx_done);
+	disp->state = XLINK_DISPATCHER_STOPPED;
+	mutex_unlock(&xlinkd->lock);
+	return X_LINK_SUCCESS;
+
+r_thread:
+	// dispatcher now in error state and cannot be used
+	disp->state = XLINK_DISPATCHER_ERROR;
+r_error:
+	mutex_unlock(&xlinkd->lock);
+	return X_LINK_ERROR;
+}
+
+enum xlink_error xlink_dispatcher_destroy(void)
+{
+	enum xlink_event_type type;
+	struct xlink_event *event;
+	struct dispatcher *disp;
+	int i;
+
+	for (i = 0; i < XLINK_MAX_CONNECTIONS; i++) {
+		// get dispatcher by link id
+		disp = get_dispatcher_by_id(i);
+		if (!disp)
+			continue;
+
+		// stop all running dispatchers
+		if (disp->state == XLINK_DISPATCHER_RUNNING)
+			xlink_dispatcher_stop(i);
+
+		// empty queues of all used dispatchers
+		if (disp->state == XLINK_DISPATCHER_INIT)
+			continue;
+
+		// deallocate remaining events in queue
+		while (!list_empty(&disp->queue.head)) {
+			event = event_dequeue(&disp->queue);
+			if (!event)
+				continue;
+			type = event->header.type;
+			if (type == XLINK_WRITE_REQ ||
+			    type == XLINK_WRITE_VOLATILE_REQ) {
+				// deallocate event data
+				xlink_platform_deallocate(xlinkd->dev,
+							  event->data,
+							  event->paddr,
+							  event->header.size,
+							  XLINK_PACKET_ALIGNMENT,
+							  XLINK_NORMAL_MEMORY);
+			}
+			xlink_destroy_event(event);
+		}
+		// destroy dispatcher
+		mutex_destroy(&disp->queue.lock);
+	}
+	mutex_destroy(&xlinkd->lock);
+	return X_LINK_SUCCESS;
+}
diff --git a/drivers/misc/xlink-core/xlink-dispatcher.h b/drivers/misc/xlink-core/xlink-dispatcher.h
new file mode 100644
index 000000000000..d1458e7a4ab7
--- /dev/null
+++ b/drivers/misc/xlink-core/xlink-dispatcher.h
@@ -0,0 +1,26 @@
+/* SPDX-License-Identifier: GPL-2.0-only */
+/*
+ * xlink Dispatcher.
+ *
+ * Copyright (C) 2018-2019 Intel Corporation
+ *
+ */
+#ifndef __XLINK_DISPATCHER_H
+#define __XLINK_DISPATCHER_H
+
+#include "xlink-defs.h"
+
+enum xlink_error xlink_dispatcher_init(void *dev);
+
+enum xlink_error xlink_dispatcher_start(int id, struct xlink_handle *handle);
+
+enum xlink_error xlink_dispatcher_event_add(enum xlink_event_origin origin,
+					    struct xlink_event *event);
+
+enum xlink_error xlink_dispatcher_stop(int id);
+
+enum xlink_error xlink_dispatcher_destroy(void);
+
+enum xlink_error xlink_dispatcher_ipc_passthru_event_add(struct xlink_event *event);
+
+#endif /* __XLINK_DISPATCHER_H */
diff --git a/drivers/misc/xlink-core/xlink-multiplexer.c b/drivers/misc/xlink-core/xlink-multiplexer.c
index 9b1ed008bb56..339734826f3e 100644
--- a/drivers/misc/xlink-core/xlink-multiplexer.c
+++ b/drivers/misc/xlink-core/xlink-multiplexer.c
@@ -27,6 +27,7 @@
 #include <linux/xlink-ipc.h>
 #endif
 
+#include "xlink-dispatcher.h"
 #include "xlink-multiplexer.h"
 #include "xlink-platform.h"
 
@@ -165,6 +166,32 @@ static int is_channel_for_device(u16 chan, u32 sw_device_id,
 	return 0;
 }
 
+static int is_enough_space_in_channel(struct open_channel *opchan,
+				      u32 size)
+{
+	if (opchan->tx_packet_level >= ((XLINK_PACKET_QUEUE_CAPACITY / 100) * THR_UPR)) {
+		pr_info("Packet queue limit reached\n");
+		return 0;
+	}
+	if (opchan->tx_up_limit == 0) {
+		if ((opchan->tx_fill_level + size)
+				> ((opchan->chan->size / 100) * THR_UPR)) {
+			opchan->tx_up_limit = 1;
+			return 0;
+		}
+	}
+	if (opchan->tx_up_limit == 1) {
+		if ((opchan->tx_fill_level + size)
+				< ((opchan->chan->size / 100) * THR_LWR)) {
+			opchan->tx_up_limit = 0;
+			return 1;
+		} else {
+			return 0;
+		}
+	}
+	return 1;
+}
+
 static int is_control_channel(u16 chan)
 {
 	if (chan == IP_CONTROL_CHANNEL || chan == VPU_CONTROL_CHANNEL)
@@ -173,6 +200,51 @@ static int is_control_channel(u16 chan)
 		return 0;
 }
 
+static struct open_channel *get_channel(u32 link_id, u16 chan)
+{
+	if (!xmux->channels[link_id][chan].opchan)
+		return NULL;
+	mutex_lock(&xmux->channels[link_id][chan].opchan->lock);
+	return xmux->channels[link_id][chan].opchan;
+}
+
+static void release_channel(struct open_channel *opchan)
+{
+	if (opchan)
+		mutex_unlock(&opchan->lock);
+}
+
+static int add_packet_to_channel(struct open_channel *opchan,
+				 struct packet_queue *queue,
+				 void *buffer, u32 size,
+				 dma_addr_t paddr)
+{
+	struct packet *pkt;
+
+	if (queue->count < queue->capacity) {
+		pkt = kzalloc(sizeof(*pkt), GFP_KERNEL);
+		if (!pkt)
+			return X_LINK_ERROR;
+		pkt->data = buffer;
+		pkt->length = size;
+		pkt->paddr = paddr;
+		list_add_tail(&pkt->list, &queue->head);
+		queue->count++;
+		opchan->rx_fill_level += pkt->length;
+	}
+	return X_LINK_SUCCESS;
+}
+
+static struct packet *get_packet_from_channel(struct packet_queue *queue)
+{
+	struct packet *pkt = NULL;
+	// get first packet in queue
+	if (!list_empty(&queue->head))
+		pkt = list_first_entry(&queue->head, struct packet, list);
+
+	return pkt;
+}
+
 static int release_packet_from_channel(struct open_channel *opchan,
 				       struct packet_queue *queue,
 				       u8 * const addr,	u32 *size)
@@ -355,15 +427,46 @@ enum xlink_error xlink_multiplexer_destroy(void)
 	return X_LINK_SUCCESS;
 }
 
+static int compl_wait(struct completion *compl, struct open_channel *opchan)
+{
+	int rc;
+	unsigned long tout = msecs_to_jiffies(opchan->chan->timeout);
+
+	if (opchan->chan->timeout == 0) {
+		mutex_unlock(&opchan->lock);
+		rc = wait_for_completion_interruptible(compl);
+		mutex_lock(&opchan->lock);
+		if (rc < 0)	// wait interrupted
+			rc = X_LINK_ERROR;
+	} else {
+		mutex_unlock(&opchan->lock);
+		rc = wait_for_completion_interruptible_timeout(compl, tout);
+		mutex_lock(&opchan->lock);
+		if (rc == 0)
+			rc = X_LINK_TIMEOUT;
+		else if (rc < 0)	// wait interrupted
+			rc = X_LINK_ERROR;
+		else if (rc > 0)
+			rc = X_LINK_SUCCESS;
+	}
+	return rc;
+}
+
 enum xlink_error xlink_multiplexer_tx(struct xlink_event *event,
 				      int *event_queued)
 {
+	struct open_channel *opchan = NULL;
+	struct packet *pkt = NULL;
 	int rc = X_LINK_SUCCESS;
+	u32 link_id = 0;
+	u32 size = 0;
 	u16 chan = 0;
+	u32 save_timeout = 0;
 
 	if (!xmux || !event)
 		return X_LINK_ERROR;
 
+	link_id = event->link_id;
 	chan = event->header.chan;
 
 	// verify channel ID is in range
@@ -379,9 +482,402 @@ enum xlink_error xlink_multiplexer_tx(struct xlink_event *event,
 	if (is_control_channel(chan))
 		return X_LINK_ERROR;
 
-	if (chan < XLINK_IPC_MAX_CHANNELS && event->interface == IPC_INTERFACE)
+	if (chan < XLINK_IPC_MAX_CHANNELS && event->interface == IPC_INTERFACE) {
 		// event should be handled by passthrough
 		rc = xlink_passthrough(event);
+		return rc;
+	}
+	// event should be handled by dispatcher
+	switch (event->header.type) {
+	case XLINK_WRITE_REQ:
+	case XLINK_WRITE_VOLATILE_REQ:
+		opchan = get_channel(link_id, chan);
+		if (!opchan || opchan->chan->status != CHAN_OPEN) {
+			rc = X_LINK_COMMUNICATION_FAIL;
+		} else {
+			event->header.timeout = opchan->chan->timeout;
+			while (!is_enough_space_in_channel(opchan,
+							   event->header.size)) {
+				if (opchan->chan->mode == RXN_TXB ||
+				    opchan->chan->mode == RXB_TXB) {
+					// channel is blocking,
+					// wait for packet to be released
+					rc = compl_wait(&opchan->pkt_released, opchan);
+				} else {
+					rc = X_LINK_CHAN_FULL;
+					break;
+				}
+			}
+			if (rc == X_LINK_SUCCESS) {
+				opchan->tx_fill_level += event->header.size;
+				opchan->tx_packet_level++;
+				xlink_dispatcher_event_add(EVENT_TX, event);
+				*event_queued = 1;
+				if (opchan->chan->mode == RXN_TXB ||
+				    opchan->chan->mode == RXB_TXB) {
+					// channel is blocking,
+					// wait for packet to be consumed
+					mutex_unlock(&opchan->lock);
+					rc = compl_wait(&opchan->pkt_consumed, opchan);
+					mutex_lock(&opchan->lock);
+				}
+			}
+		}
+		release_channel(opchan);
+		break;
+	case XLINK_READ_REQ:
+		opchan = get_channel(link_id, chan);
+		if (!opchan || opchan->chan->status != CHAN_OPEN) {
+			rc = X_LINK_COMMUNICATION_FAIL;
+		} else {
+			event->header.timeout = opchan->chan->timeout;
+			if (opchan->chan->mode == RXB_TXN ||
+			    opchan->chan->mode == RXB_TXB) {
+				// channel is blocking, wait for packet to become available
+				mutex_unlock(&opchan->lock);
+				rc = compl_wait(&opchan->pkt_available, opchan);
+				mutex_lock(&opchan->lock);
+			}
+			if (rc == X_LINK_SUCCESS) {
+				pkt = get_packet_from_channel(&opchan->rx_queue);
+				if (pkt) {
+					*(u32 **)event->pdata = (u32 *)pkt->data;
+					*event->length = pkt->length;
+					xlink_dispatcher_event_add(EVENT_TX, event);
+					*event_queued = 1;
+				} else {
+					rc = X_LINK_ERROR;
+				}
+			}
+		}
+		release_channel(opchan);
+		break;
+	case XLINK_READ_TO_BUFFER_REQ:
+		opchan = get_channel(link_id, chan);
+		if (!opchan || opchan->chan->status != CHAN_OPEN) {
+			rc = X_LINK_COMMUNICATION_FAIL;
+		} else {
+			event->header.timeout = opchan->chan->timeout;
+			if (opchan->chan->mode == RXB_TXN ||
+			    opchan->chan->mode == RXB_TXB) {
+				// channel is blocking, wait for packet to become available
+				mutex_unlock(&opchan->lock);
+				rc = compl_wait(&opchan->pkt_available, opchan);
+				mutex_lock(&opchan->lock);
+			}
+			if (rc == X_LINK_SUCCESS) {
+				pkt = get_packet_from_channel(&opchan->rx_queue);
+				if (pkt) {
+					memcpy(event->data, pkt->data, pkt->length);
+					*event->length = pkt->length;
+					xlink_dispatcher_event_add(EVENT_TX, event);
+					*event_queued = 1;
+				} else {
+					rc = X_LINK_ERROR;
+				}
+			}
+		}
+		release_channel(opchan);
+		break;
+	case XLINK_RELEASE_REQ:
+		opchan = get_channel(link_id, chan);
+		if (!opchan) {
+			rc = X_LINK_COMMUNICATION_FAIL;
+		} else {
+			rc = release_packet_from_channel(opchan,
+							 &opchan->rx_queue,
+							 event->data,
+							 &size);
+			if (rc) {
+				rc = X_LINK_ERROR;
+			} else {
+				event->header.size = size;
+				xlink_dispatcher_event_add(EVENT_TX, event);
+				*event_queued = 1;
+			}
+		}
+		release_channel(opchan);
+		break;
+	case XLINK_OPEN_CHANNEL_REQ:
+		if (xmux->channels[link_id][chan].status == CHAN_CLOSED) {
+			xmux->channels[link_id][chan].size = event->header.size;
+			xmux->channels[link_id][chan].timeout = event->header.timeout;
+			xmux->channels[link_id][chan].mode = (uintptr_t)event->data;
+			rc = multiplexer_open_channel(link_id, chan);
+			if (rc) {
+				rc = X_LINK_ERROR;
+			} else {
+				opchan = get_channel(link_id, chan);
+				if (!opchan) {
+					rc = X_LINK_COMMUNICATION_FAIL;
+				} else {
+					xlink_dispatcher_event_add(EVENT_TX, event);
+					*event_queued = 1;
+					mutex_unlock(&opchan->lock);
+					save_timeout = opchan->chan->timeout;
+					opchan->chan->timeout = OPEN_CHANNEL_TIMEOUT_MSEC;
+					rc = compl_wait(&opchan->opened, opchan);
+					opchan->chan->timeout = save_timeout;
+					if (rc == 0) {
+						xmux->channels[link_id][chan].status = CHAN_OPEN;
+						release_channel(opchan);
+					} else {
+						multiplexer_close_channel(opchan);
+					}
+				}
+			}
+		} else if (xmux->channels[link_id][chan].status == CHAN_OPEN_PEER) {
+			/* channel already open */
+			xmux->channels[link_id][chan].status = CHAN_OPEN; // opened locally
+			xmux->channels[link_id][chan].size = event->header.size;
+			xmux->channels[link_id][chan].timeout = event->header.timeout;
+			xmux->channels[link_id][chan].mode = (uintptr_t)event->data;
+			rc = multiplexer_open_channel(link_id, chan);
+		} else {
+			/* channel already open */
+			rc = X_LINK_ALREADY_OPEN;
+		}
+		break;
+	case XLINK_CLOSE_CHANNEL_REQ:
+		if (xmux->channels[link_id][chan].status == CHAN_OPEN) {
+			opchan = get_channel(link_id, chan);
+			if (!opchan)
+				return X_LINK_COMMUNICATION_FAIL;
+			rc = multiplexer_close_channel(opchan);
+			if (rc)
+				rc = X_LINK_ERROR;
+			else
+				xmux->channels[link_id][chan].status = CHAN_CLOSED;
+		} else {
+			/* can't close channel not open */
+			rc = X_LINK_ERROR;
+		}
+		break;
+	case XLINK_PING_REQ:
+		break;
+	case XLINK_WRITE_RESP:
+	case XLINK_WRITE_VOLATILE_RESP:
+	case XLINK_READ_RESP:
+	case XLINK_READ_TO_BUFFER_RESP:
+	case XLINK_RELEASE_RESP:
+	case XLINK_OPEN_CHANNEL_RESP:
+	case XLINK_CLOSE_CHANNEL_RESP:
+	case XLINK_PING_RESP:
+	default:
+		rc = X_LINK_ERROR;
+	}
+return rc;
+}
+
+enum xlink_error xlink_multiplexer_rx(struct xlink_event *event)
+{
+	struct xlink_event *passthru_event = NULL;
+	struct open_channel *opchan = NULL;
+	int rc = X_LINK_SUCCESS;
+	dma_addr_t paddr = 0;
+	void *buffer = NULL;
+	size_t size = 0;
+	u32 link_id;
+	u16 chan;
+
+	if (!xmux || !event)
+		return X_LINK_ERROR;
+
+	link_id = event->link_id;
+	chan = event->header.chan;
+
+	switch (event->header.type) {
+	case XLINK_WRITE_REQ:
+	case XLINK_WRITE_VOLATILE_REQ:
+		opchan = get_channel(link_id, chan);
+		if (!opchan) {
+			// if we receive data on a closed channel - flush/read the data
+			buffer = xlink_platform_allocate(xmux->dev, &paddr,
+							 event->header.size,
+							 XLINK_PACKET_ALIGNMENT,
+							 XLINK_NORMAL_MEMORY);
+			if (buffer) {
+				size = event->header.size;
+				xlink_platform_read(event->interface,
+						    event->handle->sw_device_id,
+						    buffer, &size, 1000, NULL);
+				xlink_platform_deallocate(xmux->dev, buffer,
+							  paddr,
+							  event->header.size,
+							  XLINK_PACKET_ALIGNMENT,
+							  XLINK_NORMAL_MEMORY);
+			} else {
+				pr_err("Fatal error: can't allocate memory in line:%d func:%s\n", __LINE__, __func__);
+			}
+			rc = X_LINK_COMMUNICATION_FAIL;
+		} else {
+			event->header.timeout = opchan->chan->timeout;
+			buffer = xlink_platform_allocate(xmux->dev, &paddr,
+							 event->header.size,
+							 XLINK_PACKET_ALIGNMENT,
+							 XLINK_NORMAL_MEMORY);
+			if (buffer) {
+				size = event->header.size;
+				rc = xlink_platform_read(event->interface,
+							 event->handle->sw_device_id,
+							 buffer, &size,
+							 opchan->chan->timeout,
+							 NULL);
+				if (rc || event->header.size != size) {
+					xlink_platform_deallocate(xmux->dev, buffer,
+								  paddr,
+								  event->header.size,
+								  XLINK_PACKET_ALIGNMENT,
+								  XLINK_NORMAL_MEMORY);
+					rc = X_LINK_ERROR;
+					release_channel(opchan);
+					break;
+				}
+				event->paddr = paddr;
+				event->data = buffer;
+				if (add_packet_to_channel(opchan, &opchan->rx_queue,
+							  event->data,
+							  event->header.size,
+							  paddr)) {
+					xlink_platform_deallocate(xmux->dev,
+								  buffer, paddr,
+								  event->header.size,
+								  XLINK_PACKET_ALIGNMENT,
+								  XLINK_NORMAL_MEMORY);
+					rc = X_LINK_ERROR;
+					release_channel(opchan);
+					break;
+				}
+				event->header.type = XLINK_WRITE_VOLATILE_RESP;
+				xlink_dispatcher_event_add(EVENT_RX, event);
+				//complete regardless of mode/timeout
+				complete(&opchan->pkt_available);
+			} else {
+				// failed to allocate buffer
+				rc = X_LINK_ERROR;
+			}
+		}
+		release_channel(opchan);
+		break;
+	case XLINK_READ_REQ:
+	case XLINK_READ_TO_BUFFER_REQ:
+		opchan = get_channel(link_id, chan);
+		if (!opchan) {
+			rc = X_LINK_COMMUNICATION_FAIL;
+			break;
+		}
+		event->header.timeout = opchan->chan->timeout;
+		event->header.type = XLINK_READ_TO_BUFFER_RESP;
+		xlink_dispatcher_event_add(EVENT_RX, event);
+		//complete regardless of mode/timeout
+		complete(&opchan->pkt_consumed);
+		release_channel(opchan);
+		break;
+	case XLINK_RELEASE_REQ:
+		opchan = get_channel(link_id, chan);
+		if (!opchan) {
+			rc = X_LINK_COMMUNICATION_FAIL;
+		} else {
+			event->header.timeout = opchan->chan->timeout;
+			opchan->tx_fill_level -= event->header.size;
+			opchan->tx_packet_level--;
+			event->header.type = XLINK_RELEASE_RESP;
+			xlink_dispatcher_event_add(EVENT_RX, event);
+			//complete regardless of mode/timeout
+			complete(&opchan->pkt_released);
+		}
+		release_channel(opchan);
+		break;
+	case XLINK_OPEN_CHANNEL_REQ:
+		if (xmux->channels[link_id][chan].status == CHAN_CLOSED) {
+			xmux->channels[link_id][chan].size = event->header.size;
+			xmux->channels[link_id][chan].timeout = event->header.timeout;
+			//xmux->channels[link_id][chan].mode = *(enum xlink_opmode *)event->data;
+			rc = multiplexer_open_channel(link_id, chan);
+			if (rc) {
+				rc = X_LINK_ERROR;
+			} else {
+				opchan = get_channel(link_id, chan);
+				if (!opchan) {
+					rc = X_LINK_COMMUNICATION_FAIL;
+				} else {
+					xmux->channels[link_id][chan].status = CHAN_OPEN_PEER;
+					complete(&opchan->opened);
+					passthru_event = xlink_create_event(link_id,
+									    XLINK_OPEN_CHANNEL_RESP,
+									    event->handle,
+									    chan,
+									    0,
+									    opchan->chan->timeout);
+					if (!passthru_event) {
+						rc = X_LINK_ERROR;
+						release_channel(opchan);
+						break;
+					}
+					xlink_dispatcher_event_add(EVENT_RX,
+								   passthru_event);
+				}
+				release_channel(opchan);
+			}
+		} else {
+			/* channel already open */
+			opchan = get_channel(link_id, chan);
+			if (!opchan) {
+				rc = X_LINK_COMMUNICATION_FAIL;
+			} else {
+				passthru_event = xlink_create_event(link_id,
+								    XLINK_OPEN_CHANNEL_RESP,
+								    event->handle,
+								    chan, 0, 0);
+				if (!passthru_event) {
+					release_channel(opchan);
+					rc = X_LINK_ERROR;
+					break;
+				}
+				xlink_dispatcher_event_add(EVENT_RX,
+							   passthru_event);
+			}
+			release_channel(opchan);
+		}
+		rc = xlink_passthrough(event);
+		if (rc == 0)
+			xlink_destroy_event(event); // event is handled and can now be freed
+		break;
+	case XLINK_CLOSE_CHANNEL_REQ:
+	case XLINK_PING_REQ:
+		break;
+	case XLINK_WRITE_RESP:
+	case XLINK_WRITE_VOLATILE_RESP:
+		opchan = get_channel(link_id, chan);
+		if (!opchan)
+			rc = X_LINK_COMMUNICATION_FAIL;
+		else
+			xlink_destroy_event(event); // event is handled and can now be freed
+		release_channel(opchan);
+		break;
+	case XLINK_READ_RESP:
+	case XLINK_READ_TO_BUFFER_RESP:
+	case XLINK_RELEASE_RESP:
+		xlink_destroy_event(event); // event is handled and can now be freed
+		break;
+	case XLINK_OPEN_CHANNEL_RESP:
+		opchan = get_channel(link_id, chan);
+		if (!opchan) {
+			rc = X_LINK_COMMUNICATION_FAIL;
+		} else {
+			xlink_destroy_event(event); // event is handled and can now be freed
+			complete(&opchan->opened);
+		}
+		release_channel(opchan);
+		break;
+	case XLINK_CLOSE_CHANNEL_RESP:
+	case XLINK_PING_RESP:
+		xlink_destroy_event(event); // event is handled and can now be freed
+		break;
+	default:
+		rc = X_LINK_ERROR;
+	}
+
 	return rc;
 }
 
-- 
2.17.1

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ