def logOn(self, chatui):
"""
@returns: this breaks with L{interfaces.IAccount}
@returntype: DeferredList of L{interfaces.IClient}s
"""
# Overriding basesupport's implementation on account of the
# fact that _startLogOn tends to return a deferredList rather
# than a simple Deferred, and we need to do registerAccountClient.
if (not self._isConnecting) and (not self._isOnline):
self._isConnecting = 1
d = self._startLogOn(chatui)
d.addErrback(self._loginFailed)
def registerMany(results):
for success, result in results:
if success:
chatui.registerAccountClient(result)
self._cb_logOn(result)
else:
log.err(result)
d.addCallback(registerMany)
return d
else:
raise error.ConnectionError("Connection in progress")
python类DeferredList()的实例源码
def _cbGetFileSize(self, attrs, rf, lf):
if not stat.S_ISREG(attrs['permissions']):
rf.close()
lf.close()
return "Can't get non-regular file: %s" % rf.name
rf.size = attrs['size']
bufferSize = self.client.transport.conn.options['buffersize']
numRequests = self.client.transport.conn.options['requests']
rf.total = 0.0
dList = []
chunks = []
startTime = time.time()
for i in range(numRequests):
d = self._cbGetRead('', rf, lf, chunks, 0, bufferSize, startTime)
dList.append(d)
dl = defer.DeferredList(dList, fireOnOneErrback=1)
dl.addCallback(self._cbGetDone, rf, lf)
return dl
def testResults(self):
inputOutput = [
("add", (2, 3), 5),
("defer", ("a",), "a"),
("dict", ({"a": 1}, "a"), 1),
("triple", ("a", 1), ["a", 1, None])]
dl = []
for meth, args, outp in inputOutput:
d = self.proxy().callRemote(meth, *args)
d.addCallback(self.assertEquals, outp)
dl.append(d)
# SOAPpy kinda blows.
d = self.proxy().callRemote('complex')
d.addCallback(lambda result: result._asdict())
d.addCallback(self.assertEquals, {"a": ["b", "c", 12, []], "D": "foo"})
dl.append(d)
# We now return to our regularly scheduled program, already in progress.
return defer.DeferredList(dl, fireOnOneErrback=True)
def testCooperation(self):
L = []
def myiter(things):
for th in things:
L.append(th)
yield None
groupsOfThings = ['abc', (1, 2, 3), 'def', (4, 5, 6)]
c = task.Cooperator()
tasks = []
for stuff in groupsOfThings:
tasks.append(c.coiterate(myiter(stuff)))
return defer.DeferredList(tasks).addCallback(
lambda ign: self.assertEquals(tuple(L), sum(zip(*groupsOfThings), ())))
def testNotLoggedInReply(self):
"""When not logged in, all commands other than USER and PASS should
get NOT_LOGGED_IN errors.
"""
commandList = ['CDUP', 'CWD', 'LIST', 'MODE', 'PASV',
'PWD', 'RETR', 'STRU', 'SYST', 'TYPE']
# Issue commands, check responses
def checkResponse(exception):
failureResponseLines = exception.args[0]
self.failUnless(failureResponseLines[-1].startswith("530"),
"Response didn't start with 530: %r"
% (failureResponseLines[-1],))
deferreds = []
for command in commandList:
deferred = self.client.queueStringCommand(command)
self.assertFailure(deferred, ftp.CommandFailed)
deferred.addCallback(checkResponse)
deferreds.append(deferred)
return defer.DeferredList(deferreds, fireOnOneErrback=True)
def setUp(self):
from twisted.internet import reactor
self.serverFactory = protocol.ServerFactory()
self.serverFactory.protocol = self.serverProto
self.clientFactory = protocol.ClientFactory()
self.clientFactory.protocol = self.clientProto
self.clientFactory.onMade = defer.Deferred()
self.serverFactory.onMade = defer.Deferred()
self.serverPort = reactor.listenTCP(0, self.serverFactory)
self.clientConn = reactor.connectTCP(
'127.0.0.1', self.serverPort.getHost().port,
self.clientFactory)
def getProtos(rlst):
self.cli = self.clientFactory.theProto
self.svr = self.serverFactory.theProto
dl = defer.DeferredList([self.clientFactory.onMade,
self.serverFactory.onMade])
return dl.addCallback(getProtos)
def testDeferredListWithAlreadyFiredDeferreds(self):
# Create some deferreds, and err one, call the other
d1 = defer.Deferred()
d2 = defer.Deferred()
d1.errback(GenericError('Bang'))
d2.callback(2)
# *Then* build the DeferredList
dl = defer.DeferredList([d1, d2])
result = []
dl.addCallback(result.append)
self.failUnlessEqual(1, len(result))
d1.addErrback(lambda e: None) # Swallow error
def test_concurrentRetrieves(self):
"""
Issue three retrieve calls immediately without waiting for any to
succeed and make sure they all do succeed eventually.
"""
p, t = setUp()
messages = [
p.retrieve(i).addCallback(
self.assertEquals,
["First line of %d." % (i + 1,),
"Second line of %d." % (i + 1,)])
for i
in range(3)]
for i in range(1, 4):
self.assertEquals(t.value(), "RETR %d\r\n" % (i,))
t.clear()
p.dataReceived("+OK 2 lines on the way\r\n")
p.dataReceived("First line of %d.\r\n" % (i,))
p.dataReceived("Second line of %d.\r\n" % (i,))
self.assertEquals(t.value(), "")
p.dataReceived(".\r\n")
return defer.DeferredList(messages, fireOnOneErrback=True)
def stopService(self):
def stop_cb():
if self._connector is not None:
self._connector.disconnect()
del self._connector
service.Service.stopService(self)
l = []
for svc in reversed(list(self)):
l.append(defer.maybeDeferred(svc.stopService))
if l:
l = defer.DeferredList(l)
l.addCallback(stop_cb)
else:
stop_cb()
return l
def fetch_many_async(urls, callback=None, errback=None, **kwargs):
"""
Retrieve a list of URLs asynchronously.
@param callback: Optionally, a function that will be fired one time for
each successful URL, and will be passed its content and the URL itself.
@param errback: Optionally, a function that will be fired one time for each
failing URL, and will be passed the failure and the URL itself.
@return: A C{DeferredList} whose callback chain will be fired as soon as
all downloads have terminated. If an error occurs, the errback chain
of the C{DeferredList} will be fired immediatly.
"""
results = []
for url in urls:
result = fetch_async(url, **kwargs)
if callback:
result.addCallback(callback, url)
if errback:
result.addErrback(errback, url)
results.append(result)
return DeferredList(results, fireOnOneErrback=True, consumeErrors=True)
def getSystem(reactor, hostname):
snmpEngine = SnmpEngine()
def getScalar(objectType):
d = getCmd(snmpEngine,
CommunityData('public', mpModel=0),
UdpTransportTarget((hostname, 161)),
ContextData(),
objectType)
d.addCallback(success, hostname).addErrback(failure, hostname)
return d
return DeferredList(
[getScalar(ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysDescr', 0))),
getScalar(ObjectType(ObjectIdentity('SNMPv2-MIB', 'sysLocation', 0)))]
)
def logOn(self, chatui):
"""
@returns: this breaks with L{interfaces.IAccount}
@returntype: DeferredList of L{interfaces.IClient}s
"""
# Overriding basesupport's implementation on account of the
# fact that _startLogOn tends to return a deferredList rather
# than a simple Deferred, and we need to do registerAccountClient.
if (not self._isConnecting) and (not self._isOnline):
self._isConnecting = 1
d = self._startLogOn(chatui)
d.addErrback(self._loginFailed)
def registerMany(results):
for success, result in results:
if success:
chatui.registerAccountClient(result)
self._cb_logOn(result)
else:
log.err(result)
d.addCallback(registerMany)
return d
else:
raise error.ConnectionError("Connection in progress")
def _cbGetFileSize(self, attrs, rf, lf):
if not stat.S_ISREG(attrs['permissions']):
rf.close()
lf.close()
return "Can't get non-regular file: %s" % rf.name
rf.size = attrs['size']
bufferSize = self.client.transport.conn.options['buffersize']
numRequests = self.client.transport.conn.options['requests']
rf.total = 0.0
dList = []
chunks = []
startTime = time.time()
for i in range(numRequests):
d = self._cbGetRead('', rf, lf, chunks, 0, bufferSize, startTime)
dList.append(d)
dl = defer.DeferredList(dList, fireOnOneErrback=1)
dl.addCallback(self._cbGetDone, rf, lf)
return dl
def testResults(self):
inputOutput = [
("add", (2, 3), 5),
("defer", ("a",), "a"),
("dict", ({"a": 1}, "a"), 1),
("triple", ("a", 1), ["a", 1, None])]
dl = []
for meth, args, outp in inputOutput:
d = self.proxy().callRemote(meth, *args)
d.addCallback(self.assertEquals, outp)
dl.append(d)
# SOAPpy kinda blows.
d = self.proxy().callRemote('complex')
d.addCallback(lambda result: result._asdict())
d.addCallback(self.assertEquals, {"a": ["b", "c", 12, []], "D": "foo"})
dl.append(d)
# We now return to our regularly scheduled program, already in progress.
return defer.DeferredList(dl, fireOnOneErrback=True)
def testCooperation(self):
L = []
def myiter(things):
for th in things:
L.append(th)
yield None
groupsOfThings = ['abc', (1, 2, 3), 'def', (4, 5, 6)]
c = task.Cooperator()
tasks = []
for stuff in groupsOfThings:
tasks.append(c.coiterate(myiter(stuff)))
return defer.DeferredList(tasks).addCallback(
lambda ign: self.assertEquals(tuple(L), sum(zip(*groupsOfThings), ())))
def testNotLoggedInReply(self):
"""When not logged in, all commands other than USER and PASS should
get NOT_LOGGED_IN errors.
"""
commandList = ['CDUP', 'CWD', 'LIST', 'MODE', 'PASV',
'PWD', 'RETR', 'STRU', 'SYST', 'TYPE']
# Issue commands, check responses
def checkResponse(exception):
failureResponseLines = exception.args[0]
self.failUnless(failureResponseLines[-1].startswith("530"),
"Response didn't start with 530: %r"
% (failureResponseLines[-1],))
deferreds = []
for command in commandList:
deferred = self.client.queueStringCommand(command)
self.assertFailure(deferred, ftp.CommandFailed)
deferred.addCallback(checkResponse)
deferreds.append(deferred)
return defer.DeferredList(deferreds, fireOnOneErrback=True)
def setUp(self):
from twisted.internet import reactor
self.serverFactory = protocol.ServerFactory()
self.serverFactory.protocol = self.serverProto
self.clientFactory = protocol.ClientFactory()
self.clientFactory.protocol = self.clientProto
self.clientFactory.onMade = defer.Deferred()
self.serverFactory.onMade = defer.Deferred()
self.serverPort = reactor.listenTCP(0, self.serverFactory)
self.clientConn = reactor.connectTCP(
'127.0.0.1', self.serverPort.getHost().port,
self.clientFactory)
def getProtos(rlst):
self.cli = self.clientFactory.theProto
self.svr = self.serverFactory.theProto
dl = defer.DeferredList([self.clientFactory.onMade,
self.serverFactory.onMade])
return dl.addCallback(getProtos)
def testDeferredListWithAlreadyFiredDeferreds(self):
# Create some deferreds, and err one, call the other
d1 = defer.Deferred()
d2 = defer.Deferred()
d1.errback(GenericError('Bang'))
d2.callback(2)
# *Then* build the DeferredList
dl = defer.DeferredList([d1, d2])
result = []
dl.addCallback(result.append)
self.failUnlessEqual(1, len(result))
d1.addErrback(lambda e: None) # Swallow error
def test_concurrentRetrieves(self):
"""
Issue three retrieve calls immediately without waiting for any to
succeed and make sure they all do succeed eventually.
"""
p, t = setUp()
messages = [
p.retrieve(i).addCallback(
self.assertEquals,
["First line of %d." % (i + 1,),
"Second line of %d." % (i + 1,)])
for i
in range(3)]
for i in range(1, 4):
self.assertEquals(t.value(), "RETR %d\r\n" % (i,))
t.clear()
p.dataReceived("+OK 2 lines on the way\r\n")
p.dataReceived("First line of %d.\r\n" % (i,))
p.dataReceived("Second line of %d.\r\n" % (i,))
self.assertEquals(t.value(), "")
p.dataReceived(".\r\n")
return defer.DeferredList(messages, fireOnOneErrback=True)
def testMultipleSimultaneousCreateSameAbout(self):
deferreds = []
for _ in range(100):
deferreds.append(self.createObject(about='xx'))
results = yield defer.DeferredList(deferreds, consumeErrors=True)
failed = False
objectId = None
for result in results:
if result[0]:
if objectId is None:
objectId = result[1]
else:
self.assertEqual(objectId, result[1])
else:
failed = True
log.err(result[1])
if failed:
self.fail()
def testMultipleSimultaneousUpdateSameAbout(self):
aboutValue = 'yy'
objectId = yield self.createObject(about=aboutValue)
deferreds = []
for _ in range(100):
deferreds.append(self.createObject(about=aboutValue))
results = yield defer.DeferredList(deferreds, consumeErrors=True)
failed = False
for result in results:
if result[0]:
self.assertEqual(objectId, result[1])
else:
failed = True
log.err(result[1])
if failed:
self.fail()
def testMultipleSimultaneousCreates(self):
path = 'fluiddb/testing/test1'
objectId = yield self.createObject()
deferreds = []
try:
for value in (3, None, True, 5.4, False, 'hey', ['a', 'b']):
deferreds.append(self.setTagValue(path, objectId, value))
results = yield defer.DeferredList(deferreds, consumeErrors=True)
failed = False
for result in results:
if not result[0]:
failed = True
log.err(result[1])
if failed:
self.fail()
finally:
yield self.deleteTagValue(path, objectId)
def testMultipleSimultaneousUpdates(self):
path = 'fluiddb/testing/test1'
objectId = yield self.createObject()
yield self.setTagValue(path, objectId, 5)
deferreds = []
try:
for _ in range(10):
for value in (7, True, None, 4.8, False, 'hi', ['aa', 'bb']):
deferreds.append(self.setTagValue(path, objectId, value))
results = yield defer.DeferredList(deferreds, consumeErrors=True)
failed = False
for result in results:
if not result[0]:
failed = True
log.err(result[1])
if failed:
self.fail()
finally:
yield self.deleteTagValue(path, objectId)
def gather(self, futures, consume_exceptions=True):
def completed(res):
rtn = []
for (ok, value) in res:
rtn.append(value)
if not ok and not consume_exceptions:
value.raiseException()
return rtn
# XXX if consume_exceptions is False in asyncio.gather(), it will
# abort on the first raised exception -- should we set
# fireOnOneErrback=True (if consume_exceptions=False?) -- but then
# we'll have to wrap the errback() to extract the "real" failure
# from the FirstError that gets thrown if you set that ...
dl = DeferredList(list(futures), consumeErrors=consume_exceptions)
# we unpack the (ok, value) tuples into just a list of values, so
# that the callback() gets the same value in asyncio and Twisted.
add_callbacks(dl, completed, None)
return dl
def run_scan(self):
all_done = defer.Deferred()
if self.scan_continuous:
all_done.addCallback(lambda ign: self.run_scan())
self.circuits = TwoHop(self.state,
partitions=self.partitions, this_partition=self.this_partition)
def scan_over_next_circuit():
try:
self.fetch(self.circuits.next())
except StopIteration:
# All circuit measurement tasks have been setup. Now wait for
# all tasks to complete before writing results, and firing
# the all_done deferred.
task_list = defer.DeferredList(self.tasks)
task_list.addCallback(lambda _: self.result_sink.end_flush())
task_list.chainDeferred(all_done)
else:
# We have circuits left, schedule scan on the next circuit
self.clock.callLater(self.circuit_launch_delay,
scan_over_next_circuit)
# Scan the first circuit
self.clock.callLater(0, scan_over_next_circuit)
return all_done
def run_scan(state):
circuits = ExitScan(state)
url = 'https://check.torproject.org'
outfile = open("exit-addresses.%s.json" % datetime.datetime.utcnow().isoformat(), 'w+')
all_tasks_done = defer.Deferred()
tasks = []
def pop(circuits):
try:
tasks.append(task.deferLater(
reactor, 0, fetch, circuits.next(), url, state))
reactor.callLater(.2, pop, circuits)
except StopIteration:
results = defer.DeferredList(tasks)
results.addCallback(save_results, outfile)\
.addCallback(lambda _: outfile.close)\
.chainDeferred(all_tasks_done)
reactor.callLater(0, pop, circuits)
return all_tasks_done
def refreshDeviceLists(self):
"""Ask all GPIB bus servers for their available GPIB devices."""
servers = [s for n, s in self.client.servers.items()
if (('GPIB Bus' in n) or ('gpib_bus' in n)) and \
(('List Devices' in s.settings) or \
('list_devices' in s.settings))]
serverNames = [s.name for s in servers]
print 'Pinging servers:', serverNames
resp = yield DeferredList([s.list_devices() for s in servers])
for serverName, (success, addrs) in zip(serverNames, resp):
if not success:
print 'Failed to get device list for:', serverName
else:
print 'Server %s has devices: %s' % (serverName, addrs)
for addr in addrs:
self.gpib_device_connect(serverName, addr)
def resolve(self, names):
"""Resolves DNS names in parallel"""
self._finished = False
self.results = defaultdict(list)
deferred_list = []
for name in names:
for deferred in self.lookup(name):
deferred.addCallback(self._extract_records, name)
deferred.addErrback(self._errback, name)
deferred_list.append(deferred)
deferred_list = defer.DeferredList(deferred_list)
deferred_list.addCallback(self._parse_result)
deferred_list.addCallback(self._finish)
while not self._finished:
reactor.iterate()
# Although the results are in at this point, we may need an extra
# iteration to ensure the resolver library closes its UDP sockets
reactor.iterate()
return dict(self.results)
def collect(self, collectors):
"""Collects timestamp results in parallel, using a DeferredList.
:param collectors: A list of deferreds to wait for - the deferreds
should return integer results.
"""
result = yield defer.DeferredList([
self.snmpv2mib.get_timestamp_and_uptime()
] + list(collectors))
tup = []
for success, value in result:
if success:
tup.append(value)
else:
value.raiseException()
self.collected_times = tuple(tup)
defer.returnValue(self.collected_times)
# We must ignore pickle load failures by catching the Exception base class
# pylint: disable=W0703
def send_catch_log_deferred(signal=Any, sender=Anonymous, *arguments, **named):
"""Like send_catch_log but supports returning deferreds on signal handlers.
Returns a deferred that gets fired once all signal handlers deferreds were
fired.
"""
def logerror(failure, recv):
if dont_log is None or not isinstance(failure.value, dont_log):
log.err(failure, "Error caught on signal handler: %s" % recv)
return failure
dont_log = named.pop('dont_log', None)
dfds = []
for receiver in liveReceivers(getAllReceivers(sender, signal)):
d = maybeDeferred(robustApply, receiver, signal=signal, sender=sender,
*arguments, **named)
d.addErrback(logerror, receiver)
d.addBoth(lambda result: (receiver, result))
dfds.append(d)
d = DeferredList(dfds)
d.addCallback(lambda out: [x[1] for x in out])
return d