def testRebind(self):
# Ensure binding the same DatagramProtocol repeatedly invokes all
# the right callbacks.
server = Server()
d = server.startedDeferred = defer.Deferred()
p = reactor.listenUDP(0, server, interface="127.0.0.1")
def cbStarted(ignored, port):
return port.stopListening()
def cbStopped(ignored):
d = server.startedDeferred = defer.Deferred()
p = reactor.listenUDP(0, server, interface="127.0.0.1")
return d.addCallback(cbStarted, p)
return d.addCallback(cbStarted, p)
python类DatagramProtocol()的实例源码
def testRebind(self):
# Ensure binding the same DatagramProtocol repeatedly invokes all
# the right callbacks.
server = Server()
d = server.startedDeferred = defer.Deferred()
p = reactor.listenUDP(0, server, interface="127.0.0.1")
def cbStarted(ignored, port):
return port.stopListening()
def cbStopped(ignored):
d = server.startedDeferred = defer.Deferred()
p = reactor.listenUDP(0, server, interface="127.0.0.1")
return d.addCallback(cbStarted, p)
return d.addCallback(cbStarted, p)
def __init__(self, _parent, _name, _config, _logger, _port):
self._parent = _parent
self._logger = _logger
self._config = _config
self._system = _name
self._gateways = [(self._parent._gateway, self._parent._gateway_port)]
self._ambeRxPort = _port # Port to listen on for AMBE frames to transmit to all peers
self._dmrgui = '127.0.0.1'
self._sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self._slot = 2 # "current slot"
self.rx = [0, RX_SLOT(1, 0, 0, 0, 1), RX_SLOT(2, 0, 0, 0, 1)]
self.tx = [0, TX_SLOT(1, 0, 0, 0, 1), TX_SLOT(2, 0, 0, 0, 1)]
class UDP_IMPORT(DatagramProtocol):
def __init__(self, callback_function):
self.func = callback_function
def datagramReceived(self, _data, (_host, _port)):
self.func(_data, (_host, _port))
def test_invalidDescriptor(self):
"""
An implementation of L{IReactorSocket.adoptDatagramPort} raises
L{socket.error} if passed an integer which is not associated with a
socket.
"""
reactor = self.buildReactor()
probe = socket.socket()
fileno = probe.fileno()
probe.close()
exc = self.assertRaises(
socket.error,
reactor.adoptDatagramPort, fileno, socket.AF_INET,
DatagramProtocol())
if platform.isWindows() and _PY3:
self.assertEqual(exc.args[0], errno.WSAENOTSOCK)
else:
self.assertEqual(exc.args[0], errno.EBADF)
def test_invalidAddressFamily(self):
"""
An implementation of L{IReactorSocket.adoptDatagramPort} raises
L{UnsupportedAddressFamily} if passed an address family it does not
support.
"""
reactor = self.buildReactor()
port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(port.close)
arbitrary = 2 ** 16 + 7
self.assertRaises(
UnsupportedAddressFamily,
reactor.adoptDatagramPort, port.fileno(), arbitrary,
DatagramProtocol())
def test_UDP(self):
"""
Test L{internet.UDPServer} with a random port: starting the service
should give it valid port, and stopService should free it so that we
can start a server on the same port again.
"""
if not interfaces.IReactorUDP(reactor, None):
raise unittest.SkipTest("This reactor does not support UDP sockets")
p = protocol.DatagramProtocol()
t = internet.UDPServer(0, p)
t.startService()
num = t._port.getHost().port
self.assertNotEqual(num, 0)
def onStop(ignored):
t = internet.UDPServer(num, p)
t.startService()
return t.stopService()
return defer.maybeDeferred(t.stopService).addCallback(onStop)
def test_startStop(self):
"""
The L{DatagramProtocol}'s C{startProtocol} and C{stopProtocol}
methods are called when its transports starts and stops listening,
respectively.
"""
server = Server()
d = server.startedDeferred = defer.Deferred()
port1 = reactor.listenUDP(0, server, interface="127.0.0.1")
def cbStarted(ignored):
self.assertEqual(server.started, 1)
self.assertEqual(server.stopped, 0)
return port1.stopListening()
def cbStopped(ignored):
self.assertEqual(server.stopped, 1)
return d.addCallback(cbStarted).addCallback(cbStopped)
def test_rebind(self):
"""
Re-listening with the same L{DatagramProtocol} re-invokes the
C{startProtocol} callback.
"""
server = Server()
d = server.startedDeferred = defer.Deferred()
p = reactor.listenUDP(0, server, interface="127.0.0.1")
def cbStarted(ignored, port):
return port.stopListening()
def cbStopped(ignored):
d = server.startedDeferred = defer.Deferred()
p = reactor.listenUDP(0, server, interface="127.0.0.1")
return d.addCallback(cbStarted, p)
return d.addCallback(cbStarted, p)
def __init__(self, bindAddress, proto, maxPacketSize=8192):
assert isinstance(proto, protocol.DatagramProtocol)
self.state = "disconnected"
from twisted.internet import reactor
self.bindAddress = bindAddress
self._connectedAddr = None
self.protocol = proto
self.maxPacketSize = maxPacketSize
self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
self.read_op = self.read_op_class(self)
self.readbuf = reactor.AllocateReadBuffer(maxPacketSize)
self.reactor = reactor
def testUDP(self):
p = reactor.listenUDP(0, protocol.DatagramProtocol())
portNo = p.getHost().port
self.assertNotEqual(str(p).find(str(portNo)), -1,
"%d not found in %s" % (portNo, p))
return p.stopListening()
def testUDP(self):
if not interfaces.IReactorUDP(reactor, None):
raise unittest.SkipTest, "This reactor does not support UDP sockets"
p = protocol.DatagramProtocol()
t = internet.TCPServer(0, p)
t.startService()
num = t._port.getHost().port
def onStop(ignored):
t = internet.TCPServer(num, p)
t.startService()
return t.stopService()
return defer.maybeDeferred(t.stopService).addCallback(onStop)
def get_local_ip():
"""
Returns a deferred which will be called with a
2-uple (lan_flag, ip_address) :
- lan_flag:
- True if it's a local network (RFC1918)
- False if it's a WAN address
- ip_address is the actual ip address
@return: A deferred called with the above defined tuple
@rtype: L{twisted.internet.defer.Deferred}
"""
# first we try a connected udp socket, then via multicast
logging.debug("Resolving dns to get udp ip")
try:
ipaddr = yield reactor.resolve('A.ROOT-SERVERS.NET')
except:
pass
else:
udpprot = DatagramProtocol()
port = reactor.listenUDP(0, udpprot)
udpprot.transport.connect(ipaddr, 7)
localip = udpprot.transport.getHost().host
port.stopListening()
if is_bogus_ip(localip):
raise RuntimeError, "Invalid IP address returned"
else:
defer.returnValue((is_rfc1918_ip(localip), localip))
logging.debug("Multicast ping to retrieve local IP")
ipaddr = yield _discover_multicast()
defer.returnValue((is_rfc1918_ip(ipaddr), ipaddr))
def get_local_ip():
"""
Returns a deferred which will be called with a
2-uple (lan_flag, ip_address) :
- lan_flag:
- True if it's a local network (RFC1918)
- False if it's a WAN address
- ip_address is the actual ip address
@return: A deferred called with the above defined tuple
@rtype: L{twisted.internet.defer.Deferred}
"""
# first we try a connected udp socket, then via multicast
logging.debug("Resolving dns to get udp ip")
try:
ipaddr = yield reactor.resolve('A.ROOT-SERVERS.NET')
except:
pass
else:
udpprot = DatagramProtocol()
port = reactor.listenUDP(0, udpprot)
udpprot.transport.connect(ipaddr, 7)
localip = udpprot.transport.getHost().host
port.stopListening()
if is_bogus_ip(localip):
raise RuntimeError, "Invalid IP address returned"
else:
defer.returnValue((is_rfc1918_ip(localip), localip))
logging.debug("Multicast ping to retrieve local IP")
ipaddr = yield _discover_multicast()
defer.returnValue((is_rfc1918_ip(ipaddr), ipaddr))
def get_local_ip():
"""
Returns a deferred which will be called with a
2-uple (lan_flag, ip_address) :
- lan_flag:
- True if it's a local network (RFC1918)
- False if it's a WAN address
- ip_address is the actual ip address
@return: A deferred called with the above defined tuple
@rtype: L{twisted.internet.defer.Deferred}
"""
# first we try a connected udp socket, then via multicast
logging.debug("Resolving dns to get udp ip")
try:
ipaddr = yield reactor.resolve('A.ROOT-SERVERS.NET')
except:
pass
else:
udpprot = DatagramProtocol()
port = reactor.listenUDP(0, udpprot)
udpprot.transport.connect(ipaddr, 7)
localip = udpprot.transport.getHost().host
port.stopListening()
if is_bogus_ip(localip):
raise RuntimeError, "Invalid IP address returned"
else:
defer.returnValue((is_rfc1918_ip(localip), localip))
logging.debug("Multicast ping to retrieve local IP")
ipaddr = yield _discover_multicast()
defer.returnValue((is_rfc1918_ip(ipaddr), ipaddr))
def __init__(self, bindAddress, proto, maxPacketSize=8192):
assert isinstance(proto, protocol.DatagramProtocol)
self.state = "disconnected"
from twisted.internet import reactor
self.bindAddress = bindAddress
self._connectedAddr = None
self.protocol = proto
self.maxPacketSize = maxPacketSize
self.logstr = reflect.qual(self.protocol.__class__) + " (UDP)"
self.read_op = self.read_op_class(self)
self.readbuf = reactor.AllocateReadBuffer(maxPacketSize)
self.reactor = reactor
def testUDP(self):
p = reactor.listenUDP(0, protocol.DatagramProtocol())
portNo = p.getHost().port
self.assertNotEqual(str(p).find(str(portNo)), -1,
"%d not found in %s" % (portNo, p))
return p.stopListening()
def testUDP(self):
if not interfaces.IReactorUDP(reactor, None):
raise unittest.SkipTest, "This reactor does not support UDP sockets"
p = protocol.DatagramProtocol()
t = internet.TCPServer(0, p)
t.startService()
num = t._port.getHost().port
def onStop(ignored):
t = internet.TCPServer(num, p)
t.startService()
return t.stopService()
return defer.maybeDeferred(t.stopService).addCallback(onStop)
def get_local_ip():
"""
Returns a deferred which will be called with a
2-uple (lan_flag, ip_address) :
- lan_flag:
- True if it's a local network (RFC1918)
- False if it's a WAN address
- ip_address is the actual ip address
@return: A deferred called with the above defined tuple
@rtype: L{twisted.internet.defer.Deferred}
"""
# first we try a connected udp socket, then via multicast
logging.debug("Resolving dns to get udp ip")
try:
ipaddr = yield reactor.resolve('A.ROOT-SERVERS.NET')
except:
pass
else:
udpprot = DatagramProtocol()
port = reactor.listenUDP(0, udpprot)
udpprot.transport.connect(ipaddr, 7)
localip = udpprot.transport.getHost().host
port.stopListening()
if is_bogus_ip(localip):
raise RuntimeError, "Invalid IP address returned"
else:
defer.returnValue((is_rfc1918_ip(localip), localip))
logging.debug("Multicast ping to retrieve local IP")
ipaddr = yield _discover_multicast()
defer.returnValue((is_rfc1918_ip(ipaddr), ipaddr))
def get_local_ip():
"""
Returns a deferred which will be called with a
2-uple (lan_flag, ip_address) :
- lan_flag:
- True if it's a local network (RFC1918)
- False if it's a WAN address
- ip_address is the actual ip address
@return: A deferred called with the above defined tuple
@rtype: L{twisted.internet.defer.Deferred}
"""
# first we try a connected udp socket, then via multicast
logging.debug("Resolving dns to get udp ip")
try:
ipaddr = yield reactor.resolve('A.ROOT-SERVERS.NET')
except:
pass
else:
udpprot = DatagramProtocol()
port = reactor.listenUDP(0, udpprot)
udpprot.transport.connect(ipaddr, 7)
localip = udpprot.transport.getHost().host
port.stopListening()
if is_bogus_ip(localip):
raise RuntimeError, "Invalid IP address returned"
else:
defer.returnValue((is_rfc1918_ip(localip), localip))
logging.debug("Multicast ping to retrieve local IP")
ipaddr = yield _discover_multicast()
defer.returnValue((is_rfc1918_ip(ipaddr), ipaddr))
def get_local_ip():
"""
Returns a deferred which will be called with a
2-uple (lan_flag, ip_address) :
- lan_flag:
- True if it's a local network (RFC1918)
- False if it's a WAN address
- ip_address is the actual ip address
@return: A deferred called with the above defined tuple
@rtype: L{twisted.internet.defer.Deferred}
"""
# first we try a connected udp socket, then via multicast
logging.debug("Resolving dns to get udp ip")
try:
ipaddr = yield reactor.resolve('A.ROOT-SERVERS.NET')
except:
pass
else:
udpprot = DatagramProtocol()
port = reactor.listenUDP(0, udpprot)
udpprot.transport.connect(ipaddr, 7)
localip = udpprot.transport.getHost().host
port.stopListening()
if is_bogus_ip(localip):
raise RuntimeError, "Invalid IP address returned"
else:
defer.returnValue((is_rfc1918_ip(localip), localip))
logging.debug("Multicast ping to retrieve local IP")
ipaddr = yield _discover_multicast()
defer.returnValue((is_rfc1918_ip(ipaddr), ipaddr))
def test_stopOnlyCloses(self):
"""
When the L{IListeningPort} returned by
L{IReactorSocket.adoptDatagramPort} is stopped using
C{stopListening}, the underlying socket is closed but not
shutdown. This allows another process which still has a
reference to it to continue reading and writing to it.
"""
reactor = self.buildReactor()
portSocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(portSocket.close)
portSocket.bind(("127.0.0.1", 0))
portSocket.setblocking(False)
# The file descriptor is duplicated by adoptDatagramPort
port = reactor.adoptDatagramPort(
portSocket.fileno(), portSocket.family, DatagramProtocol())
d = port.stopListening()
def stopped(ignored):
# Should still be possible to recv on portSocket. If
# it was shutdown, the exception would be EINVAL instead.
exc = self.assertRaises(socket.error, portSocket.recvfrom, 1)
if platform.isWindows() and _PY3:
self.assertEqual(exc.args[0], errno.WSAEWOULDBLOCK)
else:
self.assertEqual(exc.args[0], errno.EAGAIN)
d.addCallback(stopped)
d.addErrback(err, "Failed to read on original port.")
needsRunningReactor(
reactor,
lambda: d.addCallback(lambda ignored: reactor.stop()))
reactor.run()
def test_listenMode(self):
"""
The UNIX socket created by L{IReactorUNIXDatagram.listenUNIXDatagram}
is created with the mode specified.
"""
self._modeTest('listenUNIXDatagram', self.mktemp(), DatagramProtocol())
def test_listenOnLinuxAbstractNamespace(self):
"""
On Linux, a UNIX socket path may begin with C{'\0'} to indicate a socket
in the abstract namespace. L{IReactorUNIX.listenUNIXDatagram} accepts
such a path.
"""
path = _abstractPath(self)
reactor = self.buildReactor()
port = reactor.listenUNIXDatagram('\0' + path, DatagramProtocol())
self.assertEqual(port.getHost(), UNIXAddress('\0' + path))
def test_oldAddress(self):
"""
The C{type} of the host address of a listening L{DatagramProtocol}'s
transport is C{"UDP"}.
"""
server = Server()
d = server.startedDeferred = defer.Deferred()
p = reactor.listenUDP(0, server, interface="127.0.0.1")
def cbStarted(ignored):
addr = p.getHost()
self.assertEqual(addr.type, 'UDP')
return p.stopListening()
return d.addCallback(cbStarted)
def parser(self):
"""
Get a function for parsing a datagram read from a I{tun} device.
@return: A function which accepts a datagram exactly as might be read
from a I{tun} device. The datagram is expected to ultimately carry
a UDP datagram. When called, it returns a L{list} of L{tuple}s.
Each tuple has the UDP application data as the first element and
the sender address as the second element.
"""
datagrams = []
receiver = DatagramProtocol()
def capture(*args):
datagrams.append(args)
receiver.datagramReceived = capture
udp = RawUDPProtocol()
udp.addProto(12345, receiver)
ip = IPProtocol()
ip.addProto(17, udp)
def parse(data):
# TUN devices omit the ethernet framing so we can start parsing
# right at the IP layer.
ip.datagramReceived(data, False, None, None, None)
return datagrams
return parse
def parser(self):
"""
Get a function for parsing a datagram read from a I{tap} device.
@return: A function which accepts a datagram exactly as might be read
from a I{tap} device. The datagram is expected to ultimately carry
a UDP datagram. When called, it returns a L{list} of L{tuple}s.
Each tuple has the UDP application data as the first element and
the sender address as the second element.
"""
datagrams = []
receiver = DatagramProtocol()
def capture(*args):
datagrams.append(args)
receiver.datagramReceived = capture
udp = RawUDPProtocol()
udp.addProto(12345, receiver)
ip = IPProtocol()
ip.addProto(17, udp)
ether = EthernetProtocol()
ether.addProto(0x800, ip)
def parser(datagram):
# TAP devices might include a PI header. Strip that off if we
# expect it to be there.
if self.pi:
datagram = datagram[_PI_SIZE:]
# TAP devices include ethernet framing so start parsing at the
# ethernet layer.
ether.datagramReceived(datagram)
return datagrams
return parser
def testAddingBadProtos_WrongLevel(self):
"""Adding a wrong level protocol raises an exception."""
e = rawudp.RawUDPProtocol()
try:
e.addProto(42, "silliness")
except TypeError as e:
if e.args == ('Added protocol must be an instance of DatagramProtocol',):
pass
else:
raise
else:
raise AssertionError('addProto must raise an exception for bad protocols')
def testAddingBadProtos_TooSmall(self):
"""Adding a protocol with a negative number raises an exception."""
e = rawudp.RawUDPProtocol()
try:
e.addProto(-1, protocol.DatagramProtocol())
except TypeError as e:
if e.args == ('Added protocol must be positive or zero',):
pass
else:
raise
else:
raise AssertionError('addProto must raise an exception for bad protocols')
def testAddingBadProtos_TooBig(self):
"""Adding a protocol with a number >=2**16 raises an exception."""
e = rawudp.RawUDPProtocol()
try:
e.addProto(2**16, protocol.DatagramProtocol())
except TypeError as e:
if e.args == ('Added protocol must fit in 16 bits',):
pass
else:
raise
else:
raise AssertionError('addProto must raise an exception for bad protocols')
def test_init(self):
socket_path = self.patch_socket_path()
service = LeaseSocketService(
sentinel.service, sentinel.reactor)
self.assertIsInstance(service, Service)
self.assertIsInstance(service, DatagramProtocol)
self.assertIs(service.reactor, sentinel.reactor)
self.assertIs(service.client_service, sentinel.service)
self.assertEquals(socket_path, service.address)