def runCommand(self, tc):
"""Called from the gui thread, pass a ThreadCommand instance to the
network"""
reactor.callFromThread(self._doRunCommand, tc)
python类callFromThread()的实例源码
def callFromThread(self, f, *args, **kw):
assert callable(f), "%s is not callable" % f
with NullContext():
# This NullContext is mainly for an edge case when running
# TwistedIOLoop on top of a TornadoReactor.
# TwistedIOLoop.add_callback uses reactor.callFromThread and
# should not pick up additional StackContexts along the way.
self._io_loop.add_callback(f, *args, **kw)
# We don't need the waker code from the super class, Tornado uses
# its own waker.
def add_callback(self, callback, *args, **kwargs):
self.reactor.callFromThread(
self._run_callback,
functools.partial(wrap(callback), *args, **kwargs))
def callFromThread(self, f, *args, **kw):
assert callable(f), "%s is not callable" % f
with NullContext():
# This NullContext is mainly for an edge case when running
# TwistedIOLoop on top of a TornadoReactor.
# TwistedIOLoop.add_callback uses reactor.callFromThread and
# should not pick up additional StackContexts along the way.
self._io_loop.add_callback(f, *args, **kw)
# We don't need the waker code from the super class, Tornado uses
# its own waker.
def add_callback(self, callback, *args, **kwargs):
self.reactor.callFromThread(
self._run_callback,
functools.partial(wrap(callback), *args, **kwargs))
def callFromThread(self, f, *args, **kw):
assert callable(f), "%s is not callable" % f
with NullContext():
# This NullContext is mainly for an edge case when running
# TwistedIOLoop on top of a TornadoReactor.
# TwistedIOLoop.add_callback uses reactor.callFromThread and
# should not pick up additional StackContexts along the way.
self._io_loop.add_callback(f, *args, **kw)
# We don't need the waker code from the super class, Tornado uses
# its own waker.
def _putResultInDeferred(deferred, f, args, kwargs):
"""Run a function and give results to a Deferred."""
from twisted.internet import reactor
try:
result = f(*args, **kwargs)
except:
f = failure.Failure()
reactor.callFromThread(deferred.errback, f)
else:
reactor.callFromThread(deferred.callback, result)
def printResult(self):
print
print
print "callFromThread latency:"
sum = 0
for t in self.from_times: sum += t
print "%f millisecond" % ((sum / self.numRounds) * 1000)
print "callInThread latency:"
sum = 0
for t in self.in_times: sum += t
print "%f millisecond" % ((sum / self.numRounds) * 1000)
print
print
def tcmf_2(self, start):
# runs in thread
self.in_times.append(time.time() - start)
reactor.callFromThread(self.tcmf_3, time.time())
def testCallFromThread(self):
firedByReactorThread = defer.Deferred()
firedByOtherThread = defer.Deferred()
def threadedFunc():
reactor.callFromThread(firedByOtherThread.callback, None)
reactor.callInThread(threadedFunc)
reactor.callFromThread(firedByReactorThread.callback, None)
return defer.DeferredList(
[firedByReactorThread, firedByOtherThread],
fireOnOneErrback=True)
def testCallMultiple(self):
L = []
N = 10
d = defer.Deferred()
def finished():
self.assertEquals(L, range(N))
d.callback(None)
threads.callMultipleInThread([
(L.append, (i,), {}) for i in xrange(N)
] + [(reactor.callFromThread, (finished,), {})])
return d
def testWakeUp(self):
# Make sure other threads can wake up the reactor
d = Deferred()
def wake():
time.sleep(0.1)
# callFromThread will call wakeUp for us
reactor.callFromThread(d.callback, None)
reactor.callInThread(wake)
return d
def _callFromThreadCallback(self, d):
reactor.callFromThread(self._callFromThreadCallback2, d)
reactor.callLater(0, self._stopCallFromThreadCallback)
def testCallFromThreadStops(self):
"""
Ensure that callFromThread from inside a callFromThread
callback doesn't sit in an infinite loop and lets other
things happen too.
"""
self.stopped = False
d = defer.Deferred()
reactor.callFromThread(self._callFromThreadCallback, d)
return d
def schedule(self, *args, **kwargs):
"""Override in subclasses."""
reactor.callFromThread(*args, **kwargs)
def __setattr__(self, name, value):
if name not in self._params:
raise AttributeError(name)
reactor.callFromThread(self._params.__setitem__, name, value)
def reset(self):
reactor.callFromThread(self._atomic_reset)
def tearDown(self):
reactor.callFromThread(reactor.stop)
def stop_reactor_thread():
"""Stop twisted's reactor."""
reactor.callFromThread(reactor.stop)
def check_storage_path(self):
if not os.path.exists(self.storage_path):
try:
os.makedirs(self.storage_path)
except OSError as e:
if e.errno == 13:
log.err("Can't create storage directory: access denied")
else:
raise e
reactor.callFromThread(self.stop)