using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Runtime.InteropServices; namespace DccpLib { public class DccpSocket : IDisposable { const int SOL_DCCP = 269; const int DCCP_SOCKOPT_QPOLICY_TXQLEN = 17; const int DCCP_SOCKOPT_QPOLICY_ID = 16; const int SOL_SOCKET = 1; const int SO_SNDBUF = 7; const int AF_INET = 2; const int SOCK_DCCP = 6; const int IPPROTO_DCCP = 33; [DllImport("libdccp.so")] public extern static int dccp_send(int sockfd, byte[] buffer, int bufsize, int priority); [DllImport("libdccp.so", CharSet = CharSet.Ansi)] public extern static int dccp_connect(int sockfd, string address, string port); [DllImport("libc.so.6")] public extern static int setsockopt(int sockfd, int level, int option_name, byte[] option_value, int option_len); [DllImport("libc.so.6")] public extern static int close(int sockfd); [DllImport("libc.so.6")] public extern static int socket(int socket_family, int socket_type, int protocol); int sockfd; Thread sender; AutoResetEvent are; byte[] maxpacket = new byte[16376]; IQueuePolicy qpolicy; public DccpSocket(string address, string port, IQueuePolicy qp) { if(qp == null) throw new ArgumentNullException("qp"); qpolicy = qp; are = new AutoResetEvent(false); sockfd = socket(AF_INET, SOCK_DCCP, IPPROTO_DCCP); if(sockfd < 0) throw new Exception("Cannot create DCCP socket."); int result; byte[] bufsize = BitConverter.GetBytes(maxpacket.Length); result=setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, bufsize, bufsize.Length); if(result < 0) throw new Exception("Setsockopt (SO_SNDBUF) call failed."); byte[] policy_txqlen = BitConverter.GetBytes(2); result=setsockopt(sockfd, SOL_DCCP, DCCP_SOCKOPT_QPOLICY_TXQLEN, policy_txqlen, policy_txqlen.Length); if(result < 0) throw new Exception("Setsockopt (DCCP_SOCKOPT_QPOLICY_TXQLEN) call failed."); byte[] policy_id = BitConverter.GetBytes(1); result=setsockopt(sockfd, SOL_DCCP, DCCP_SOCKOPT_QPOLICY_ID, policy_id, policy_id.Length); if(result < 0) throw new Exception("Setsockopt (DCCP_SOCKOPT_QPOLICY_ID) call failed."); result=dccp_connect(sockfd, address, port); if(result < 0) throw new Exception("Connect call failed."); sender = new Thread(new ThreadStart(sendPackets)); sender.Start(); } public void Dispose() { Close(); } public void Close() { sender.Abort(); close(sockfd); } public void Enqueue(byte[] buffer, object priority) { lock(qpolicy) { qpolicy.Push(buffer, priority); are.Set(); } } void sendPackets() { for(bool shouldwait=true;;) { if(shouldwait) are.WaitOne(); if(dccp_send(sockfd, maxpacket, maxpacket.Length, 65)<0) throw new Exception("send failed"); lock(qpolicy) { var best = qpolicy.Pop(); if(best == null) { shouldwait = true; continue; } if(dccp_send(sockfd, best, best.Length, 66) < 0) throw new Exception("send failed"); shouldwait = false; } } } } public interface IQueuePolicy { void Push(byte[] buffer, object priority); byte[] Pop(); } public class PriorityQP : IQueuePolicy { List> list = new List>(); int maxsize; public PriorityQP(int queueLength) { maxsize=queueLength; } public void Push(byte[] buffer, object priority) { list.Add(new KeyValuePair((int)priority, buffer)); KeyValuePair worst=new KeyValuePair(255, null); foreach(var pair in list) if(pair.Keymaxsize) list.Remove(worst); } public byte[] Pop() { var best=new KeyValuePair(0, null); foreach(var pair in list) if(pair.Key>=best.Key) best=pair; list.Remove(best); if(best.Value!=null) Console.WriteLine("Sending "+best.Key); return best.Value; } } }