def setUp(self):
self.server = Server()
self.client = Client()
# multicast won't work if we listen over loopback, apparently
self.port1 = reactor.listenMulticast(0, self.server)
self.port2 = reactor.listenMulticast(0, self.client)
self.client.transport.connect(
"127.0.0.1", self.server.transport.getHost().port)
python类listenMulticast()的实例源码
def test_multicast(self):
"""
Test that a multicast group can be joined and messages sent to and
received from it.
"""
c = Server()
p = reactor.listenMulticast(0, c)
addr = self.server.transport.getHost()
joined = self.server.transport.joinGroup("225.0.0.250")
def cbJoined(ignored):
d = self.server.packetReceived = Deferred()
c.transport.write("hello world", ("225.0.0.250", addr.port))
return d
joined.addCallback(cbJoined)
def cbPacket(ignored):
self.assertEquals(self.server.packets[0][0], "hello world")
joined.addCallback(cbPacket)
def cleanup(passthrough):
result = maybeDeferred(p.stopListening)
result.addCallback(lambda ign: passthrough)
return result
joined.addCallback(cleanup)
return joined
def test_multiListen(self):
"""
Test that multiple sockets can listen on the same multicast port and
that they both receive multicast messages directed to that address.
"""
firstClient = Server()
firstPort = reactor.listenMulticast(
0, firstClient, listenMultiple=True)
portno = firstPort.getHost().port
secondClient = Server()
secondPort = reactor.listenMulticast(
portno, secondClient, listenMultiple=True)
joined = self.server.transport.joinGroup("225.0.0.250")
def serverJoined(ignored):
d1 = firstClient.packetReceived = Deferred()
d2 = secondClient.packetReceived = Deferred()
firstClient.transport.write("hello world", ("225.0.0.250", portno))
return gatherResults([d1, d2])
joined.addCallback(serverJoined)
def gotPackets(ignored):
self.assertEquals(firstClient.packets[0][0], "hello world")
self.assertEquals(secondClient.packets[0][0], "hello world")
joined.addCallback(gotPackets)
def cleanup(passthrough):
result = gatherResults([
maybeDeferred(firstPort.stopListening),
maybeDeferred(secondPort.stopListening)])
result.addCallback(lambda ign: passthrough)
return result
joined.addBoth(cleanup)
return joined
def search_device(self):
"""
Triggers a UPnP device discovery.
The returned deferred will be called with the L{UPnPDevice} that has
been found in the LAN.
@return: A deferred called with the detected L{UPnPDevice} instance.
@rtype: L{twisted.internet.defer.Deferred}
"""
if self._discovery is not None:
raise ValueError('already used')
self._discovery = defer.Deferred()
self._discovery_timeout = reactor.callLater(6, self._on_discovery_timeout)
attempt = 0
mcast = None
while True:
try:
self.mcast = reactor.listenMulticast(1900+attempt, self)
break
except CannotListenError:
attempt = random.randint(0, 500)
# joined multicast group, starting upnp search
self.mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
return self._discovery
#Private methods
def _discover_multicast():
"""
Local IP discovery protocol via multicast:
- Broadcast 3 ping multicast packet with "ping" in it
- Wait for an answer
- Retrieve the ip address from the returning packet, which is ours
"""
nonce = str(random.randrange(2**64))
p = _LocalNetworkMulticast(nonce)
for attempt in itertools.count():
port = 11000 + random.randint(0, 5000)
try:
mcast = reactor.listenMulticast(port, p)
except CannotListenError:
if attempt >= 10:
raise
continue
else:
break
try:
yield mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
logging.debug("Sending multicast ping")
for i in xrange(3):
p.transport.write(nonce, ('239.255.255.250', port))
address, = yield p.address_received.get_deferred(5)
finally:
mcast.stopListening()
defer.returnValue(address)
def search_device(self):
"""
Triggers a UPnP device discovery.
The returned deferred will be called with the L{UPnPDevice} that has
been found in the LAN.
@return: A deferred called with the detected L{UPnPDevice} instance.
@rtype: L{twisted.internet.defer.Deferred}
"""
if self._discovery is not None:
raise ValueError('already used')
self._discovery = defer.Deferred()
self._discovery_timeout = reactor.callLater(6, self._on_discovery_timeout)
attempt = 0
mcast = None
while True:
try:
self.mcast = reactor.listenMulticast(1900+attempt, self)
break
except CannotListenError:
attempt = random.randint(0, 500)
# joined multicast group, starting upnp search
self.mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
return self._discovery
#Private methods
def _discover_multicast():
"""
Local IP discovery protocol via multicast:
- Broadcast 3 ping multicast packet with "ping" in it
- Wait for an answer
- Retrieve the ip address from the returning packet, which is ours
"""
nonce = str(random.randrange(2**64))
p = _LocalNetworkMulticast(nonce)
for attempt in itertools.count():
port = 11000 + random.randint(0, 5000)
try:
mcast = reactor.listenMulticast(port, p)
except CannotListenError:
if attempt >= 10:
raise
continue
else:
break
try:
yield mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
logging.debug("Sending multicast ping")
for i in xrange(3):
p.transport.write(nonce, ('239.255.255.250', port))
address, = yield p.address_received.get_deferred(5)
finally:
mcast.stopListening()
defer.returnValue(address)
def search_device(self):
"""
Triggers a UPnP device discovery.
The returned deferred will be called with the L{UPnPDevice} that has
been found in the LAN.
@return: A deferred called with the detected L{UPnPDevice} instance.
@rtype: L{twisted.internet.defer.Deferred}
"""
if self._discovery is not None:
raise ValueError('already used')
self._discovery = defer.Deferred()
self._discovery_timeout = reactor.callLater(6, self._on_discovery_timeout)
attempt = 0
mcast = None
while True:
try:
self.mcast = reactor.listenMulticast(1900+attempt, self)
break
except CannotListenError:
attempt = random.randint(0, 500)
# joined multicast group, starting upnp search
self.mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
return self._discovery
#Private methods
def _discover_multicast():
"""
Local IP discovery protocol via multicast:
- Broadcast 3 ping multicast packet with "ping" in it
- Wait for an answer
- Retrieve the ip address from the returning packet, which is ours
"""
nonce = str(random.randrange(2**64))
p = _LocalNetworkMulticast(nonce)
for attempt in itertools.count():
port = 11000 + random.randint(0, 5000)
try:
mcast = reactor.listenMulticast(port, p)
except CannotListenError:
if attempt >= 10:
raise
continue
else:
break
try:
yield mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
logging.debug("Sending multicast ping")
for i in xrange(3):
p.transport.write(nonce, ('239.255.255.250', port))
address, = yield p.address_received.get_deferred(5)
finally:
mcast.stopListening()
defer.returnValue(address)
def setUp(self):
self.server = Server()
self.client = Client()
# multicast won't work if we listen over loopback, apparently
self.port1 = reactor.listenMulticast(0, self.server)
self.port2 = reactor.listenMulticast(0, self.client)
self.client.transport.connect(
"127.0.0.1", self.server.transport.getHost().port)
def test_multicast(self):
"""
Test that a multicast group can be joined and messages sent to and
received from it.
"""
c = Server()
p = reactor.listenMulticast(0, c)
addr = self.server.transport.getHost()
joined = self.server.transport.joinGroup("225.0.0.250")
def cbJoined(ignored):
d = self.server.packetReceived = Deferred()
c.transport.write("hello world", ("225.0.0.250", addr.port))
return d
joined.addCallback(cbJoined)
def cbPacket(ignored):
self.assertEquals(self.server.packets[0][0], "hello world")
joined.addCallback(cbPacket)
def cleanup(passthrough):
result = maybeDeferred(p.stopListening)
result.addCallback(lambda ign: passthrough)
return result
joined.addCallback(cleanup)
return joined
def test_multiListen(self):
"""
Test that multiple sockets can listen on the same multicast port and
that they both receive multicast messages directed to that address.
"""
firstClient = Server()
firstPort = reactor.listenMulticast(
0, firstClient, listenMultiple=True)
portno = firstPort.getHost().port
secondClient = Server()
secondPort = reactor.listenMulticast(
portno, secondClient, listenMultiple=True)
joined = self.server.transport.joinGroup("225.0.0.250")
def serverJoined(ignored):
d1 = firstClient.packetReceived = Deferred()
d2 = secondClient.packetReceived = Deferred()
firstClient.transport.write("hello world", ("225.0.0.250", portno))
return gatherResults([d1, d2])
joined.addCallback(serverJoined)
def gotPackets(ignored):
self.assertEquals(firstClient.packets[0][0], "hello world")
self.assertEquals(secondClient.packets[0][0], "hello world")
joined.addCallback(gotPackets)
def cleanup(passthrough):
result = gatherResults([
maybeDeferred(firstPort.stopListening),
maybeDeferred(secondPort.stopListening)])
result.addCallback(lambda ign: passthrough)
return result
joined.addBoth(cleanup)
return joined
def main():
"""
Function to run if this is the entry point and not imported like a module
:return:
"""
consumer = ParlayConsumer()
reactor.listenMulticast(CONSUMER_RESPONSE_PORT, consumer, listenMultiple=True)
def print_and_exit():
consumer.print_output()
reactor.stop()
reactor.callLater(1.5, print_and_exit)
reactor.run()
def search_device(self):
"""
Triggers a UPnP device discovery.
The returned deferred will be called with the L{UPnPDevice} that has
been found in the LAN.
@return: A deferred called with the detected L{UPnPDevice} instance.
@rtype: L{twisted.internet.defer.Deferred}
"""
if self._discovery is not None:
raise ValueError('already used')
self._discovery = defer.Deferred()
self._discovery_timeout = reactor.callLater(6, self._on_discovery_timeout)
attempt = 0
mcast = None
while True:
try:
self.mcast = reactor.listenMulticast(1900+attempt, self)
break
except CannotListenError:
attempt = random.randint(0, 500)
# joined multicast group, starting upnp search
self.mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
return self._discovery
#Private methods
def _discover_multicast():
"""
Local IP discovery protocol via multicast:
- Broadcast 3 ping multicast packet with "ping" in it
- Wait for an answer
- Retrieve the ip address from the returning packet, which is ours
"""
nonce = str(random.randrange(2**64))
p = _LocalNetworkMulticast(nonce)
for attempt in itertools.count():
port = 11000 + random.randint(0, 5000)
try:
mcast = reactor.listenMulticast(port, p)
except CannotListenError:
if attempt >= 10:
raise
continue
else:
break
try:
yield mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
logging.debug("Sending multicast ping")
for i in xrange(3):
p.transport.write(nonce, ('239.255.255.250', port))
address, = yield p.address_received.get_deferred(5)
finally:
mcast.stopListening()
defer.returnValue(address)
def search_device(self):
"""
Triggers a UPnP device discovery.
The returned deferred will be called with the L{UPnPDevice} that has
been found in the LAN.
@return: A deferred called with the detected L{UPnPDevice} instance.
@rtype: L{twisted.internet.defer.Deferred}
"""
if self._discovery is not None:
raise ValueError('already used')
self._discovery = defer.Deferred()
self._discovery_timeout = reactor.callLater(6, self._on_discovery_timeout)
attempt = 0
mcast = None
while True:
try:
self.mcast = reactor.listenMulticast(1900+attempt, self)
break
except CannotListenError:
attempt = random.randint(0, 500)
# joined multicast group, starting upnp search
self.mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
return self._discovery
#Private methods
def _discover_multicast():
"""
Local IP discovery protocol via multicast:
- Broadcast 3 ping multicast packet with "ping" in it
- Wait for an answer
- Retrieve the ip address from the returning packet, which is ours
"""
nonce = str(random.randrange(2**64))
p = _LocalNetworkMulticast(nonce)
for attempt in itertools.count():
port = 11000 + random.randint(0, 5000)
try:
mcast = reactor.listenMulticast(port, p)
except CannotListenError:
if attempt >= 10:
raise
continue
else:
break
try:
yield mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
logging.debug("Sending multicast ping")
for i in xrange(3):
p.transport.write(nonce, ('239.255.255.250', port))
address, = yield p.address_received.get_deferred(5)
finally:
mcast.stopListening()
defer.returnValue(address)
def search_device(self):
"""
Triggers a UPnP device discovery.
The returned deferred will be called with the L{UPnPDevice} that has
been found in the LAN.
@return: A deferred called with the detected L{UPnPDevice} instance.
@rtype: L{twisted.internet.defer.Deferred}
"""
if self._discovery is not None:
raise ValueError('already used')
self._discovery = defer.Deferred()
self._discovery_timeout = reactor.callLater(6, self._on_discovery_timeout)
attempt = 0
mcast = None
while True:
try:
self.mcast = reactor.listenMulticast(1900+attempt, self)
break
except CannotListenError:
attempt = random.randint(0, 500)
# joined multicast group, starting upnp search
self.mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
self.transport.write(_UPNP_SEARCH_REQUEST, (_UPNP_MCAST, _UPNP_PORT))
return self._discovery
#Private methods
def _discover_multicast():
"""
Local IP discovery protocol via multicast:
- Broadcast 3 ping multicast packet with "ping" in it
- Wait for an answer
- Retrieve the ip address from the returning packet, which is ours
"""
nonce = str(random.randrange(2**64))
p = _LocalNetworkMulticast(nonce)
for attempt in itertools.count():
port = 11000 + random.randint(0, 5000)
try:
mcast = reactor.listenMulticast(port, p)
except CannotListenError:
if attempt >= 10:
raise
continue
else:
break
try:
yield mcast.joinGroup('239.255.255.250', socket.INADDR_ANY)
logging.debug("Sending multicast ping")
for i in xrange(3):
p.transport.write(nonce, ('239.255.255.250', port))
address, = yield p.address_received.get_deferred(5)
finally:
mcast.stopListening()
defer.returnValue(address)
def setUp(self):
self.server = Server()
self.client = Client()
# multicast won't work if we listen over loopback, apparently
self.port1 = reactor.listenMulticast(0, self.server)
self.port2 = reactor.listenMulticast(0, self.client)
self.client.transport.connect(
"127.0.0.1", self.server.transport.getHost().port)
def test_multicast(self):
"""
Test that a multicast group can be joined and messages sent to and
received from it.
"""
c = Server()
p = reactor.listenMulticast(0, c)
addr = self.server.transport.getHost()
joined = self.server.transport.joinGroup("225.0.0.250")
def cbJoined(ignored):
d = self.server.packetReceived = Deferred()
c.transport.write(b"hello world", ("225.0.0.250", addr.port))
return d
joined.addCallback(cbJoined)
def cbPacket(ignored):
self.assertEqual(self.server.packets[0][0], b"hello world")
joined.addCallback(cbPacket)
def cleanup(passthrough):
result = maybeDeferred(p.stopListening)
result.addCallback(lambda ign: passthrough)
return result
joined.addCallback(cleanup)
return joined
def test_multiListen(self):
"""
Test that multiple sockets can listen on the same multicast port and
that they both receive multicast messages directed to that address.
"""
firstClient = Server()
firstPort = reactor.listenMulticast(
0, firstClient, listenMultiple=True)
portno = firstPort.getHost().port
secondClient = Server()
secondPort = reactor.listenMulticast(
portno, secondClient, listenMultiple=True)
theGroup = "225.0.0.250"
joined = gatherResults([self.server.transport.joinGroup(theGroup),
firstPort.joinGroup(theGroup),
secondPort.joinGroup(theGroup)])
def serverJoined(ignored):
d1 = firstClient.packetReceived = Deferred()
d2 = secondClient.packetReceived = Deferred()
firstClient.transport.write(b"hello world", (theGroup, portno))
return gatherResults([d1, d2])
joined.addCallback(serverJoined)
def gotPackets(ignored):
self.assertEqual(firstClient.packets[0][0], b"hello world")
self.assertEqual(secondClient.packets[0][0], b"hello world")
joined.addCallback(gotPackets)
def cleanup(passthrough):
result = gatherResults([
maybeDeferred(firstPort.stopListening),
maybeDeferred(secondPort.stopListening)])
result.addCallback(lambda ign: passthrough)
return result
joined.addBoth(cleanup)
return joined
def __init__(self, addr):
self.addr = addr
self.ssdp = reactor.listenMulticast(ssdp_port, self, listenMultiple=True)
self.ssdp.setLoopbackMode(1)
self.ssdp.joinGroup(ssdp_addr, interface=addr)