def getHostByName(self, name, timeout = (1, 3, 11, 45)):
if timeout:
timeoutDelay = reduce(operator.add, timeout)
else:
timeoutDelay = 60
userDeferred = defer.Deferred()
lookupDeferred = threads.deferToThread(socket.gethostbyname, name)
cancelCall = self.reactor.callLater(
timeoutDelay, self._cleanup, name, lookupDeferred)
self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
return userDeferred
python类deferToThread()的实例源码
def testDeferredResult(self):
d = threads.deferToThread(lambda x, y=5: x + y, 3, y=4)
d.addCallback(self.assertEquals, 7)
return d
def testDeferredFailure(self):
class NewError(Exception):
pass
def raiseError():
raise NewError
d = threads.deferToThread(raiseError)
return self.assertFailure(d, NewError)
def testDeferredFailure2(self):
# set up a condition that causes cReactor to hang. These conditions
# can also be set by other tests when the full test suite is run in
# alphabetical order (test_flow.FlowTest.testThreaded followed by
# test_internet.ReactorCoreTestCase.testStop, to be precise). By
# setting them up explicitly here, we can reproduce the hang in a
# single precise test case instead of depending upon side effects of
# other tests.
#
# alas, this test appears to flunk the default reactor too
d = threads.deferToThread(lambda: None)
d.addCallback(lambda ign: threads.deferToThread(lambda: 1/0))
return self.assertFailure(d, ZeroDivisionError)
def pamAuthenticate(service, user, conv):
return threads.deferToThread(pamAuthenticateThread, service, user, conv)
def symbolscheck(self):
threads.deferToThread(self.JobTask)
self.timer.startLongTimer(POLLTIME)
def _run(self):
from twisted.internet import threads
from enigma import eTimer
self.aborted = False
self.pos = 0
threads.deferToThread(self.work).addBoth(self.onComplete)
self.timer = eTimer()
self.timer.callback.append(self.onTimer)
self.timer.start(5)
def iconcheck(self):
try:
threads.deferToThread(self.JobTask)
except:
pass
self.timer.startLongTimer(30)
def main():
params, pipes, _ = command.configure()
def run_shell():
shell_vars = {
'p': ParamsProxy(params),
'm': MeterProxy(pipes),
}
code.interact(banner=BANNER, local=shell_vars)
deferred = threads.deferToThread(run_shell)
deferred.addCallback(lambda result: reactor.stop())
reactor.run()
def from_thread(func, *args, **kwargs):
call = lambda: deferToThread(func, *args, **kwargs)
return cpu_core_semaphore.run(call)
def camcheck(self):
global isBusy
isBusy = True
threads.deferToThread(self.JobTask)
self.timer.startLongTimer(POLLTIME)
def _run(self):
from twisted.internet import threads
from enigma import eTimer
self.aborted = False
self.pos = 0
threads.deferToThread(self.work).addBoth(self.onComplete)
self.timer = eTimer()
self.timer.callback.append(self.onTimer)
self.timer.start(5)
def iconcheck(self):
threads.deferToThread(self.JobTask)
self.timer.startLongTimer(30)
def fetch_async(*args, **kwargs):
"""Retrieve a URL asynchronously.
@return: A C{Deferred} resulting in the URL content.
"""
return deferToThread(fetch, *args, **kwargs)
def call_in_thread(self, callback, errback, f, *args, **kwargs):
"""
Execute a callable object in a new separate thread.
@param callback: A function to call in case C{f} was successful, it
will be passed the return value of C{f}.
@param errback: A function to call in case C{f} raised an exception,
it will be pass a C{(type, value, traceback)} tuple giving
information about the raised exception (see L{sys.exc_info}).
@note: Both C{callback} and C{errback} will be executed in the
the parent thread.
"""
def on_success(result):
if callback:
return callback(result)
def on_failure(failure):
exc_info = (failure.type, failure.value, failure.tb)
if errback:
errback(*exc_info)
else:
logging.error(exc_info[1], exc_info=exc_info)
deferred = deferToThread(f, *args, **kwargs)
deferred.addCallback(on_success)
deferred.addErrback(on_failure)
def run(self):
if not self._should_run():
return
self._monitor.ping()
deferred = threads.deferToThread(self._perform_rados_call)
deferred.addCallback(self._handle_usage)
return deferred
def run(self):
if not self._should_run():
return
self._monitor.ping()
host = self._get_recon_host()
deferred = threads.deferToThread(self._perform_recon_call, host)
deferred.addCallback(self._handle_usage)
return deferred
def request_with_payload(self, payload):
resource = DataCollectingResource()
port = reactor.listenTCP(
0, server.Site(resource), interface="127.0.0.1")
self.ports.append(port)
transport = HTTPTransport(
None, "http://localhost:%d/" % (port.getHost().port,))
result = deferToThread(transport.exchange, payload, computer_id="34",
exchange_token="abcd-efgh", message_api="X.Y")
def got_result(ignored):
try:
get_header = resource.request.requestHeaders.getRawHeaders
except AttributeError:
# For backwards compatibility with Twisted versions
# without requestHeaders
def get_header(header):
return [resource.request.received_headers[header]]
self.assertEqual(get_header(u"x-computer-id"), ["34"])
self.assertEqual(get_header("x-exchange-token"), ["abcd-efgh"])
self.assertEqual(
get_header("user-agent"), ["landscape-client/%s" % (VERSION,)])
self.assertEqual(get_header("x-message-api"), ["X.Y"])
self.assertEqual(bpickle.loads(resource.content), payload)
result.addCallback(got_result)
return result
def test_ssl_verification_negative(self):
"""
If the SSL server provides a key which is not verified by the
specified public key, then the client should immediately end
the connection without uploading any message data.
"""
self.log_helper.ignore_errors(PyCurlError)
r = DataCollectingResource()
context_factory = DefaultOpenSSLContextFactory(
BADPRIVKEY, BADPUBKEY)
port = reactor.listenSSL(0, server.Site(r), context_factory,
interface="127.0.0.1")
self.ports.append(port)
transport = HTTPTransport(None, "https://localhost:%d/"
% (port.getHost().port,), pubkey=PUBKEY)
result = deferToThread(transport.exchange, "HI", computer_id="34",
message_api="X.Y")
def got_result(ignored):
self.assertIs(r.request, None)
self.assertIs(r.content, None)
self.assertTrue("server certificate verification failed"
in self.logfile.getvalue())
result.addErrback(got_result)
return result
def onChallenge(self, challenge):
logger.debug('Received CHALLENGE: %s' % challenge)
# `sync_session._on_challenge` should resolve `self.on_challenge_defer`
threads.deferToThread(partial(self._sync_session._on_challenge, challenge))
return self.on_challenge_defer