#!/usr/bin/python import unittest import socket import threading import struct class Sender(object): def __init__(self, group, port=43214): self.addr = group self.port = port self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) def send(self, msg): self.sock.sendto(msg, (self.addr, self.port)) class Receiver(object): def __init__(self, group, port=43214): addr = socket.inet_pton(socket.AF_INET6, group) self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) self.sock.bind((group, port)) group_addr = addr + struct.pack('@I', 0) self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, group_addr) self.data = None def recv(self): self.data = self.sock.recv(1024) class TestMulticast(unittest.TestCase): def test_same_groups(self): msg = "foo" senders = [Sender("ff15::1")] receivers = [Receiver("ff15::1"), Receiver("ff15::1")] threads = [threading.Thread(target=r.recv) for r in receivers] for t in threads: t.daemon = True t.start() for s in senders: s.send(msg) for t in threads: t.join(1) for r in receivers: self.assertEqual(r.data, msg) def test_different_groups(self): msg = "foo" senders = [Sender("ff15::1"), Sender("ff15::2")] receivers = [Receiver("ff15::1"), Receiver("ff15::2")] threads = [threading.Thread(target=r.recv) for r in receivers] for t in threads: t.daemon = True t.start() for s in senders: s.send(msg) for t in threads: t.join(1) for r in receivers: self.assertEqual(r.data, msg) if __name__ == "__main__": unittest.main()