def _send(self, data, rpcID, address):
""" Transmit the specified data over UDP, breaking it up into several
packets if necessary
If the data is spread over multiple UDP datagrams, the packets have the
following structure::
| | | | | |||||||||||| 0x00 |
|Transmision|Total number|Sequence number| RPC ID |Header end|
| type ID | of packets |of this packet | | indicator|
| (1 byte) | (2 bytes) | (2 bytes) |(20 bytes)| (1 byte) |
| | | | | |||||||||||| |
@note: The header used for breaking up large data segments will
possibly be moved out of the KademliaProtocol class in the
future, into something similar to a message translator/encoder
class (see C{kademlia.msgformat} and C{kademlia.encoding}).
"""
if len(data) > self.msgSizeLimit:
# We have to spread the data over multiple UDP datagrams, and provide sequencing information
# 1st byte is transmission type id, bytes 2 & 3 are the total number of packets in this transmission, bytes 4 & 5 are the sequence number for this specific packet
totalPackets = len(data) / self.msgSizeLimit
if len(data) % self.msgSizeLimit > 0:
totalPackets += 1
encTotalPackets = chr(totalPackets >> 8) + chr(totalPackets & 0xff)
seqNumber = 0
startPos = 0
while seqNumber < totalPackets:
#reactor.iterate() #IGNORE:E1101
packetData = data[startPos:startPos+self.msgSizeLimit]
encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff)
txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData)
reactor.callLater(self.maxToSendDelay*seqNumber+self.minToSendDelay, self.transport.write, txData, address) #IGNORE:E1101
startPos += self.msgSizeLimit
seqNumber += 1
else:
self.transport.write(data, address)
python类iterate()的实例源码
def _send(self, data, rpcID, address):
""" Transmit the specified data over UDP, breaking it up into several
packets if necessary
If the data is spread over multiple UDP datagrams, the packets have the
following structure::
| | | | | |||||||||||| 0x00 |
|Transmision|Total number|Sequence number| RPC ID |Header end|
| type ID | of packets |of this packet | | indicator|
| (1 byte) | (2 bytes) | (2 bytes) |(20 bytes)| (1 byte) |
| | | | | |||||||||||| |
@note: The header used for breaking up large data segments will
possibly be moved out of the KademliaProtocol class in the
future, into something similar to a message translator/encoder
class (see C{kademlia.msgformat} and C{kademlia.encoding}).
"""
if len(data) > self.msgSizeLimit:
# We have to spread the data over multiple UDP datagrams, and provide sequencing information
# 1st byte is transmission type id, bytes 2 & 3 are the total number of packets in this transmission, bytes 4 & 5 are the sequence number for this specific packet
totalPackets = len(data) / self.msgSizeLimit
if len(data) % self.msgSizeLimit > 0:
totalPackets += 1
encTotalPackets = chr(totalPackets >> 8) + chr(totalPackets & 0xff)
seqNumber = 0
startPos = 0
while seqNumber < totalPackets:
#reactor.iterate() #IGNORE:E1101
packetData = data[startPos:startPos+self.msgSizeLimit]
encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff)
txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData)
reactor.callLater(self.maxToSendDelay*seqNumber+self.minToSendDelay, self.transport.write, txData, address) #IGNORE:E1101
startPos += self.msgSizeLimit
seqNumber += 1
else:
self.transport.write(data, address)
def _send(self, data, rpcID, address):
""" Transmit the specified data over UDP, breaking it up into several
packets if necessary
If the data is spread over multiple UDP datagrams, the packets have the
following structure::
| | | | | |||||||||||| 0x00 |
|Transmision|Total number|Sequence number| RPC ID |Header end|
| type ID | of packets |of this packet | | indicator|
| (1 byte) | (2 bytes) | (2 bytes) |(20 bytes)| (1 byte) |
| | | | | |||||||||||| |
@note: The header used for breaking up large data segments will
possibly be moved out of the KademliaProtocol class in the
future, into something similar to a message translator/encoder
class (see C{kademlia.msgformat} and C{kademlia.encoding}).
"""
if len(data) > self.msgSizeLimit:
# We have to spread the data over multiple UDP datagrams, and provide sequencing information
# 1st byte is transmission type id, bytes 2 & 3 are the total number of packets in this transmission, bytes 4 & 5 are the sequence number for this specific packet
totalPackets = len(data) / self.msgSizeLimit
if len(data) % self.msgSizeLimit > 0:
totalPackets += 1
encTotalPackets = chr(totalPackets >> 8) + chr(totalPackets & 0xff)
seqNumber = 0
startPos = 0
while seqNumber < totalPackets:
#reactor.iterate() #IGNORE:E1101
packetData = data[startPos:startPos+self.msgSizeLimit]
encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff)
txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData)
reactor.callLater(self.maxToSendDelay*seqNumber+self.minToSendDelay, self.transport.write, txData, address) #IGNORE:E1101
startPos += self.msgSizeLimit
seqNumber += 1
else:
self.transport.write(data, address)
def _send(self, data, rpcID, address):
""" Transmit the specified data over UDP, breaking it up into several
packets if necessary
If the data is spread over multiple UDP datagrams, the packets have the
following structure::
| | | | | |||||||||||| 0x00 |
|Transmision|Total number|Sequence number| RPC ID |Header end|
| type ID | of packets |of this packet | | indicator|
| (1 byte) | (2 bytes) | (2 bytes) |(20 bytes)| (1 byte) |
| | | | | |||||||||||| |
@note: The header used for breaking up large data segments will
possibly be moved out of the KademliaProtocol class in the
future, into something similar to a message translator/encoder
class (see C{kademlia.msgformat} and C{kademlia.encoding}).
"""
if len(data) > self.msgSizeLimit:
# We have to spread the data over multiple UDP datagrams, and provide sequencing information
# 1st byte is transmission type id, bytes 2 & 3 are the total number of packets in this transmission, bytes 4 & 5 are the sequence number for this specific packet
totalPackets = len(data) / self.msgSizeLimit
if len(data) % self.msgSizeLimit > 0:
totalPackets += 1
encTotalPackets = chr(totalPackets >> 8) + chr(totalPackets & 0xff)
seqNumber = 0
startPos = 0
while seqNumber < totalPackets:
#reactor.iterate() #IGNORE:E1101
packetData = data[startPos:startPos+self.msgSizeLimit]
encSeqNumber = chr(seqNumber >> 8) + chr(seqNumber & 0xff)
txData = '\x00%s%s%s\x00%s' % (encTotalPackets, encSeqNumber, rpcID, packetData)
self._sendNext(txData, address)
startPos += self.msgSizeLimit
seqNumber += 1
else:
self._sendNext(data, address)
def stopProtocol(self):
""" Called when the transport is disconnected.
Will only be called once, after all ports are disconnected.
"""
for key in self._callLaterList.keys():
try:
if key > time.time():
self._callLaterList[key].cancel()
except Exception, e:
print e
del self._callLaterList[key]
#TODO: test: do we really need the reactor.iterate() call?
reactor.iterate()
def testHiddenException(self):
"""
What happens if an error is raised in a DelayedCall and an error is
also raised in the test?
L{test_reporter.ErrorReportingTests.testHiddenException} checks that
both errors get reported.
Note that this behaviour is deprecated. A B{real} test would return a
Deferred that got triggered by the callLater. This would guarantee the
delayed call error gets reported.
"""
reactor.callLater(0, self.go)
reactor.iterate(0.01)
self.fail("Deliberate failure to mask the hidden exception")
def testTriggerSystemEvent1(self):
l = []
l2 = []
d = Deferred()
d2 = Deferred()
def _returnDeferred(d=d):
return d
def _returnDeferred2(d2=d2):
return d2
def _appendToList(l=l):
l.append(1)
def _appendToList2(l2=l2):
l2.append(1)
## d.addCallback(lambda x: sys.stdout.write("firing d\n"))
## d2.addCallback(lambda x: sys.stdout.write("firing d2\n"))
r = reactor
self.addTrigger("before", "test", _appendToList)
self.addTrigger("during", "test", _appendToList)
self.addTrigger("after", "test", _appendToList)
self.assertEquals(len(l), 0, "Nothing happened yet.")
r.fireSystemEvent("test")
r.iterate()
self.assertEquals(len(l), 3, "Should have filled the list.")
l[:]=[]
self.addTrigger("before", "defer", _returnDeferred)
self.addTrigger("before", "defer", _returnDeferred2)
self.addTrigger("during", "defer", _appendToList)
self.addTrigger("after", "defer", _appendToList)
r.fireSystemEvent("defer")
self.assertEquals(len(l), 0, "Event should not have fired yet.")
d.callback(None)
self.assertEquals(len(l), 0, "Event still should not have fired yet.")
d2.callback(None)
self.assertEquals(len(l), 2)
l[:]=[]
a = self.addTrigger("before", "remove", _appendToList)
b = self.addTrigger("before", "remove", _appendToList2)
self.removeTrigger(b)
r.fireSystemEvent("remove")
self.assertEquals(len(l), 1)
self.assertEquals(len(l2), 0)
def testTriggerSystemEvent1(self):
l = []
l2 = []
d = Deferred()
d2 = Deferred()
def _returnDeferred(d=d):
return d
def _returnDeferred2(d2=d2):
return d2
def _appendToList(l=l):
l.append(1)
def _appendToList2(l2=l2):
l2.append(1)
## d.addCallback(lambda x: sys.stdout.write("firing d\n"))
## d2.addCallback(lambda x: sys.stdout.write("firing d2\n"))
r = reactor
self.addTrigger("before", "test", _appendToList)
self.addTrigger("during", "test", _appendToList)
self.addTrigger("after", "test", _appendToList)
self.assertEquals(len(l), 0, "Nothing happened yet.")
r.fireSystemEvent("test")
r.iterate()
self.assertEquals(len(l), 3, "Should have filled the list.")
l[:]=[]
self.addTrigger("before", "defer", _returnDeferred)
self.addTrigger("before", "defer", _returnDeferred2)
self.addTrigger("during", "defer", _appendToList)
self.addTrigger("after", "defer", _appendToList)
r.fireSystemEvent("defer")
self.assertEquals(len(l), 0, "Event should not have fired yet.")
d.callback(None)
self.assertEquals(len(l), 0, "Event still should not have fired yet.")
d2.callback(None)
self.assertEquals(len(l), 2)
l[:]=[]
a = self.addTrigger("before", "remove", _appendToList)
b = self.addTrigger("before", "remove", _appendToList2)
self.removeTrigger(b)
r.fireSystemEvent("remove")
self.assertEquals(len(l), 1)
self.assertEquals(len(l2), 0)