#include #include #include #include #include #include #include #include #include #include #include struct mmsghdr { struct msghdr msg_hdr; unsigned msg_len; }; #if defined(__x86_64__) || defined(__i386__) #include "linux-2.6-tip/arch/x86/include/asm/unistd.h" #endif #ifndef NSEC_PER_MSEC #define NSEC_PER_MSEC 1000000UL #endif static inline int recvmmsg(int fd, struct mmsghdr *mmsg, unsigned vlen, unsigned flags, struct timespec *timeout) { return syscall(__NR_recvmmsg, fd, mmsg, vlen, flags, timeout); } static void print_stats_peer(struct mmsghdr *datagram, int count, int bytes) { char peer[1024]; int err = getnameinfo(datagram->msg_hdr.msg_name, datagram->msg_hdr.msg_namelen, peer, sizeof(peer), NULL, 0, 0); if (err != 0) { fprintf(stderr, "error using getnameinfo: %s\n", gai_strerror(err)); return; } printf(" %d bytes received from %s in %d datagrams\n", bytes, peer, count); } int main(int argc, char *argv[]) { struct addrinfo *host; struct addrinfo hints = { .ai_family = AF_INET, .ai_socktype = SOCK_DGRAM, .ai_protocol = IPPROTO_UDP, .ai_flags = AI_PASSIVE, }; const char *port = "5001"; int batch_size = 8; long timeout = 10 * NSEC_PER_MSEC; int err, fd; int i; if (argc > 1) port = argv[1]; if (argc > 2) batch_size = atoi(argv[2]); if (argc > 3) timeout = atol(argv[3]) * NSEC_PER_MSEC; char buf[batch_size][256]; struct iovec iovec[batch_size][1]; struct sockaddr addr[batch_size]; struct mmsghdr datagrams[batch_size]; printf("usage: recvmmsg " "\n\nWaiting for datagrams...\n"); err = getaddrinfo(NULL, port, &hints, &host); if (err != 0) { fprintf(stderr, "error using getaddrinfo: %s\n", gai_strerror(err)); goto out; } fd = socket(host->ai_family, host->ai_socktype, host->ai_protocol); if (fd < 0) { perror("socket: "); goto out_freeaddrinfo; } if (bind(fd, host->ai_addr, host->ai_addrlen) < 0) { perror("bind: "); goto out_close_server; } for (i = 0; i < batch_size; ++i) { iovec[i][0].iov_base = buf[i]; iovec[i][0].iov_len = sizeof(buf[i]); datagrams[i].msg_hdr.msg_iov = iovec[i]; datagrams[i].msg_hdr.msg_iovlen = 1; datagrams[i].msg_hdr.msg_name = &addr[i]; datagrams[i].msg_hdr.msg_namelen = sizeof(addr[i]); } struct pollfd pfds[1] = { [0] = { .fd = fd, .events = POLLIN, }, }; while (1) { struct timespec timeout = { .tv_nsec = 10 * NSEC_PER_MSEC, }; if (poll(pfds, 1, -1) < 0) { perror("poll: "); return EXIT_FAILURE; } int nr_datagrams = recvmmsg(fd, datagrams, batch_size, 0, &timeout); if (nr_datagrams == 0) { perror("recvmmsg: "); return EXIT_FAILURE; } printf("nr_datagrams received: %d, remaining: %luns\n", nr_datagrams, timeout.tv_nsec); int peer_count = 1; int peer_bytes = datagrams[0].msg_len; for (i = 1; i < nr_datagrams; ++i) { if (memcmp(datagrams[i - 1].msg_hdr.msg_name, datagrams[i].msg_hdr.msg_name, datagrams[i].msg_hdr.msg_namelen) == 0) { ++peer_count; peer_bytes += datagrams[i].msg_len; continue; } print_stats_peer(&datagrams[i - 1], peer_count, peer_bytes); peer_bytes = datagrams[i].msg_len; peer_count = 1; } print_stats_peer(&datagrams[nr_datagrams - 1], peer_count, peer_bytes); } out_close_server: close(fd); out_freeaddrinfo: freeaddrinfo(host); out: return err; }