def __init__(self, *args, **kwargs):
"""
Initialization method.
Note that we can't call reactor methods directly here because
it's not thread-safe, so we schedule the reactor/connection
stuff to be run from the event loop thread when it gets the
chance.
"""
Connection.__init__(self, *args, **kwargs)
self.is_closed = True
self.connector = None
reactor.callFromThread(self.add_connection)
self._loop.maybe_start()
python类callFromThread()的实例源码
def startLogging(logfilename, sysLog, prefix, nodaemon):
if logfilename == '-':
if not nodaemon:
print 'daemons cannot log to stdout'
os._exit(1)
logFile = sys.stdout
elif sysLog:
syslog.startLogging(prefix)
elif nodaemon and not logfilename:
logFile = sys.stdout
else:
logFile = app.getLogFile(logfilename or 'twistd.log')
try:
import signal
except ImportError:
pass
else:
def rotateLog(signal, frame):
from twisted.internet import reactor
reactor.callFromThread(logFile.rotate)
signal.signal(signal.SIGUSR1, rotateLog)
if not sysLog:
log.startLogging(logFile)
sys.stdout.flush()
def stop_reactor():
"""Stop the reactor and join the reactor thread until it stops.
Call this function in teardown at the module or package level to
reset the twisted system after your tests. You *must* do this if
you mix tests using these tools and tests using twisted.trial.
"""
global _twisted_thread
def stop_reactor():
'''Helper for calling stop from withing the thread.'''
reactor.stop()
reactor.callFromThread(stop_reactor)
reactor_thread.join()
for p in reactor.getDelayedCalls():
if p.active():
p.cancel()
_twisted_thread = None
def MessageReceived(self, m):
# self.Log("Messagereceived and processed ...: %s " % m.Command)
if m.Command == 'verack':
self.HandleVerack()
elif m.Command == 'version':
self.HandleVersion(m.Payload)
elif m.Command == 'getaddr':
self.SendPeerInfo()
elif m.Command == 'getdata':
self.HandleGetDataMessageReceived(m.Payload)
elif m.Command == 'inv':
self.HandleInvMessage(m.Payload)
elif m.Command == 'block':
self.HandleBlockReceived(m.Payload)
elif m.Command == 'headers':
reactor.callFromThread(self.HandleBlockHeadersReceived, m.Payload)
# self.HandleBlockHeadersReceived(m.Payload)
elif m.Command == 'addr':
self.HandlePeerInfoReceived(m.Payload)
else:
self.Log("Command %s not implemented " % m.Command)
def startLogging(logfilename, sysLog, prefix, nodaemon):
if logfilename == '-':
if not nodaemon:
print 'daemons cannot log to stdout'
os._exit(1)
logFile = sys.stdout
elif sysLog:
syslog.startLogging(prefix)
elif nodaemon and not logfilename:
logFile = sys.stdout
else:
logFile = app.getLogFile(logfilename or 'twistd.log')
try:
import signal
except ImportError:
pass
else:
def rotateLog(signal, frame):
from twisted.internet import reactor
reactor.callFromThread(logFile.rotate)
signal.signal(signal.SIGUSR1, rotateLog)
if not sysLog:
log.startLogging(logFile)
sys.stdout.flush()
def stop_reactor():
"""Stop the reactor and join the reactor thread until it stops.
Call this function in teardown at the module or package level to
reset the twisted system after your tests. You *must* do this if
you mix tests using these tools and tests using twisted.trial.
"""
global _twisted_thread
def stop_reactor():
'''Helper for calling stop from withing the thread.'''
reactor.stop()
reactor.callFromThread(stop_reactor)
reactor_thread.join()
for p in reactor.getDelayedCalls():
if p.active():
p.cancel()
_twisted_thread = None
def blockingCallFromThread(reactor, f, *args, **kwargs):
"""
Improved version of twisted's blockingCallFromThread that shows the complete
stacktrace when an exception is raised on the reactor's thread.
If being called from the reactor thread already, just return the result of execution of the callable.
"""
if isInIOThread():
return f(*args, **kwargs)
else:
queue = Queue.Queue()
def _callFromThread():
result = defer.maybeDeferred(f, *args, **kwargs)
result.addBoth(queue.put)
reactor.callFromThread(_callFromThread)
result = queue.get()
if isinstance(result, failure.Failure):
other_thread_tb = traceback.extract_tb(result.getTracebackObject())
this_thread_tb = traceback.extract_stack()
logger.error("Exception raised on the reactor's thread %s: \"%s\".\n Traceback from this thread:\n%s\n"
" Traceback from the reactor's thread:\n %s", result.type.__name__, result.getErrorMessage(),
''.join(traceback.format_list(this_thread_tb)), ''.join(traceback.format_list(other_thread_tb)))
result.raiseException()
return result
def threaded_reactor():
"""
Start the Twisted reactor in a separate thread, if not already done.
Returns the reactor.
"""
global _twisted_thread
if not _twisted_thread:
from threading import Thread
_twisted_thread = Thread(target=lambda: reactor.run(installSignalHandlers=False), name="Twisted")
_twisted_thread.setDaemon(True)
_twisted_thread.start()
def hook_observer():
observer = log.PythonLoggingObserver()
observer.start()
import logging
log.msg("PythonLoggingObserver hooked up", logLevel=logging.DEBUG)
reactor.callFromThread(hook_observer)
return reactor, _twisted_thread
def re_run(self, utt):
if 'wavpath' not in utt:
return
k = Kaldi(
get_resource('data/nnet_a_gpu_online'),
self.gen_hclg_filename,
get_resource('PROTO_LANGDIR'))
audio = numm3.sound2np(
os.path.join(self.resources['attach'].attachdir, utt['wavpath']),
nchannels=1,
R=8000)
k.push_chunk(audio.tostring())
wds = k.get_final()
k.stop()
for wd in wds:
del wd['phones']
utt['command_words'] = wds
utt['command'] = ' '.join([X['word'] for X in wds])
reactor.callFromThread(self.db.onchange, None, {"type": "change",
"id": utt["_id"],
"doc": utt})
def spawnAdmin(self, user):
if user.permission==0:
time.sleep(5)
else:
proc = subprocess.Popen(['phantomjs', 'phantom/checkMessages.js',
globalVals.args.domain+':'+str(globalVals.args.port), ADMIN_PASS],
stdout=subprocess.PIPE
)
log,_ = proc.communicate()
print log
f = open('phantom/messages.log','a')
f.write(log)
f.close()
time.sleep(5)
reactor.callFromThread(self.adminSendResponse, user)
def connect(self):
"""
Connect to MQTT broker.
"""
# TODO: This is currently done synchronous which could have issues in timeout situations
# because it would block other subsystems.
# => Check if we can do asynchronous connection establishment.
self.client = mqtt.Client(client_id=self.name, clean_session=True, userdata={'foo': 'bar'})
if self.broker_username:
self.client.username_pw_set(self.broker_username, self.broker_password)
self.client.on_connect = lambda *args: reactor.callFromThread(self.on_connect, *args)
self.client.on_message = lambda *args: reactor.callFromThread(self.on_message, *args)
self.client.on_log = lambda *args: reactor.callFromThread(self.on_log, *args)
# Connect with retry
self.connect_loop = LoopingCall(self.connect_with_retry)
self.connect_loop.start(self.retry_interval, now=True)
def subscribe(self, *args):
#d = self.protocol.subscribe("foo/bar/baz", 0)
log.info(u"Subscribing to topics {subscriptions}. protocol={protocol}", subscriptions=self.subscriptions, protocol=self.protocol)
for topic in self.subscriptions:
log.info(u"Subscribing to topic '{topic}'", topic=topic)
# Topic name **must not** be unicode, so casting to string
e = self.protocol.subscribe(str(topic), 0)
log.info(u"Setting callback handler: {callback}", callback=self.callback)
self.protocol.setPublishHandler(self.on_message_twisted)
"""
def cb(*args, **kwargs):
log.info('publishHandler got called: name={name}, args={args}, kwargs={kwargs}', name=self.name, args=args, kwargs=kwargs)
return reactor.callFromThread(self.callback, *args, **kwargs)
self.protocol.setPublishHandler(cb)
"""
def start_packet_in_stream(self):
def receive_packet_in_stream():
streaming_rpc_method = self.local_stub.ReceivePacketsIn
iterator = streaming_rpc_method(empty_pb2.Empty())
try:
for packet_in in iterator:
reactor.callFromThread(self.packet_in_queue.put,
packet_in)
log.debug('enqued-packet-in',
packet_in=packet_in,
queue_len=len(self.packet_in_queue.pending))
except _Rendezvous, e:
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
reactor.callInThread(receive_packet_in_stream)
def start_change_event_in_stream(self):
def receive_change_events():
streaming_rpc_method = self.local_stub.ReceiveChangeEvents
iterator = streaming_rpc_method(empty_pb2.Empty())
try:
for event in iterator:
reactor.callFromThread(self.change_event_queue.put, event)
log.debug('enqued-change-event',
change_event=event,
queue_len=len(self.change_event_queue.pending))
except _Rendezvous, e:
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
reactor.callInThread(receive_change_events)
def recv(self):
"""Called on the select thread when a packet arrives"""
try:
frame = self.rcv_frame()
except RuntimeError as e:
# we observed this happens sometimes right after the socket was
# attached to a newly created veth interface. So we log it, but
# allow to continue.
log.warn('afpacket-recv-error', code=-1)
return
log.debug('frame-received', iface=self.iface_name, len=len(frame),
hex=hexify(frame))
self.received +=1
dispatched = False
for proxy in self.proxies:
if proxy.filter is None or proxy.filter(frame):
log.debug('frame-dispatched')
dispatched = True
reactor.callFromThread(self._dispatch, proxy, frame)
if not dispatched:
self.discarded += 1
log.debug('frame-discarded')
def receive_twisted(self, channels):
"""Twisted-native implementation of receive."""
deferred = defer.Deferred()
def resolve_deferred(future):
reactor.callFromThread(deferred.callback, future.result())
future = self.thread.twisted_schedule(RECEIVE_TWISTED, channels)
future.add_done_callback(resolve_deferred)
defer.returnValue((yield deferred))
# TODO: Is it optimal to read bytes from content frame, call python
# decode method to convert it to string and than parse it with
# msgpack? We should minimize useless work on message receive.
def work(self):
batch, consumed = self.collect_batch()
self.states_context.fetch()
self.process_batch(batch)
self.update_score.flush()
self.states_context.release()
# Exiting, if crawl is finished
if self.strategy.finished():
logger.info("Successfully reached the crawling goal.")
logger.info("Closing crawling strategy.")
self.strategy.close()
logger.info("Finishing.")
reactor.callFromThread(reactor.stop)
self.stats['last_consumed'] = consumed
self.stats['last_consumption_run'] = asctime()
self.stats['consumed_since_start'] += consumed
def test_callFromThread(self):
"""
Test callFromThread functionality: from the main thread, and from
another thread.
"""
def cb(ign):
firedByReactorThread = defer.Deferred()
firedByOtherThread = defer.Deferred()
def threadedFunc():
reactor.callFromThread(firedByOtherThread.callback, None)
reactor.callInThread(threadedFunc)
reactor.callFromThread(firedByReactorThread.callback, None)
return defer.DeferredList(
[firedByReactorThread, firedByOtherThread],
fireOnOneErrback=True)
return self._waitForThread().addCallback(cb)
def test_wakerOverflow(self):
"""
Try to make an overflow on the reactor waker using callFromThread.
"""
def cb(ign):
self.failure = None
waiter = threading.Event()
def threadedFunction():
# Hopefully a hundred thousand queued calls is enough to
# trigger the error condition
for i in xrange(100000):
try:
reactor.callFromThread(lambda: None)
except:
self.failure = failure.Failure()
break
waiter.set()
reactor.callInThread(threadedFunction)
waiter.wait(120)
if not waiter.isSet():
self.fail("Timed out waiting for event")
if self.failure is not None:
return defer.fail(self.failure)
return self._waitForThread().addCallback(cb)
def _puller(self):
logger.debug('Starting puller loop')
while True:
if not reactor.running or self._stop:
logger.debug('Puller loop dying')
reactor.callFromThread(self.stopped.callback, None)
return
channels = [self.send_channel] + list(self._pull_channels)
if not channels:
time.sleep(0.05)
continue
channel, message = self.channel_layer.receive(channels, block=False)
if not channel:
time.sleep(0.01)
continue
logger.debug('We got message on channel: %s' % (channel, ))
reactor.callFromThread(self.handle_reply, channel, message)
def deferred_from_future(future):
"""Converts a concurrent.futures.Future object to a twisted.internet.defer.Deferred obejct.
See: https://twistedmatrix.com/pipermail/twisted-python/2011-January/023296.html
"""
d = Deferred()
def callback(future):
e = future.exception()
if e:
if DEFERRED_RUN_IN_REACTOR_THREAD:
reactor.callFromThread(d.errback, e)
else:
d.errback(e)
else:
if DEFERRED_RUN_IN_REACTOR_THREAD:
reactor.callFromThread(d.callback, future.result())
else:
d.callback(future.result())
future.add_done_callback(callback)
return d
def execute_from_command_line():
# Limit concurrency in all thread-pools to ONE.
from maasserver.utils import threads
threads.install_default_pool(maxthreads=1)
threads.install_database_unpool(maxthreads=1)
# Disable all database connections in the reactor.
from maasserver.utils import orm
from twisted.internet import reactor
assert not reactor.running, "The reactor has been started too early."
reactor.callFromThread(orm.disable_all_database_connections)
# Configure logging; Django is no longer responsible for this. Behave as
# if we're always at an interactive terminal (i.e. do not wrap stdout or
# stderr with log machinery).
from provisioningserver import logger
logger.configure(mode=logger.LoggingMode.COMMAND)
# Hand over to Django.
from django.core import management
management.execute_from_command_line()
def deferred_from_future(future):
"""Converts a concurrent.futures.Future object to a twisted.internet.defer.Deferred obejct.
See: https://twistedmatrix.com/pipermail/twisted-python/2011-January/023296.html
"""
d = Deferred()
def callback(future):
e = future.exception()
if e:
if DEFERRED_RUN_IN_REACTOR_THREAD:
reactor.callFromThread(d.errback, e)
else:
d.errback(e)
else:
if DEFERRED_RUN_IN_REACTOR_THREAD:
reactor.callFromThread(d.callback, future.result())
else:
d.callback(future.result())
future.add_done_callback(callback)
return d
def run(self):
req = self._req
code = http.OK
try: ret = self._fnc(req)
except Exception as e:
ret = str(e)
code = http.INTERNAL_SERVER_ERROR
def finishRequest():
req.setResponseCode(code)
if code == http.OK:
req.setHeader('Content-type', 'application/xhtml+xml')
req.setHeader('charset', 'UTF-8')
req.write(ret)
req.finish()
if self._stillAlive:
reactor.callFromThread(finishRequest)
def render(self, req):
self._req = req
self._stillAlive = True
if hasattr(req, 'notifyFinish'):
req.notifyFinish().addErrback(self.connectionLost)
d = autotimer.parseEPGAsync().addCallback(self.epgCallback).addErrback(self.epgErrback)
def timeout():
if not d.called and self._stillAlive:
reactor.callFromThread(lambda: req.write("<ignore />"))
reactor.callLater(50, timeout)
reactor.callLater(50, timeout)
req.setResponseCode(http.OK)
req.setHeader('Content-type', 'application/xhtml+xml')
req.setHeader('charset', 'UTF-8')
req.write("""<?xml version=\"1.0\" encoding=\"UTF-8\" ?><e2simplexmlresult>""")
return server.NOT_DONE_YET
def run(self):
req = self._req
if self._stillAlive:
req.setResponseCode(http.OK)
req.setHeader('Content-type', 'application/xhtml+xml')
req.setHeader('charset', 'UTF-8')
reactor.callFromThread(lambda: req.write("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n<e2autotimersimulate api_version=\"" + str(API_VERSION) + "\">\n"))
def finishRequest():
req.write('</e2autotimersimulate>')
req.finish()
try: autotimer.parseEPG(simulateOnly=True, callback=self.intermediateWrite)
except Exception as e:
def finishRequest():
req.write('<exception>'+str(e)+'</exception><|PURPOSEFULLYBROKENXML<')
req.finish()
if self._stillAlive:
reactor.callFromThread(finishRequest)
def __init__(self, *args, **kwargs):
"""
Initialization method.
Note that we can't call reactor methods directly here because
it's not thread-safe, so we schedule the reactor/connection
stuff to be run from the event loop thread when it gets the
chance.
"""
Connection.__init__(self, *args, **kwargs)
self.is_closed = True
self.connector = None
reactor.callFromThread(self.add_connection)
self._loop.maybe_start()
def _cleanup(self):
if self._thread:
reactor.callFromThread(reactor.stop)
self._thread.join(timeout=1.0)
if self._thread.is_alive():
log.warning("Event loop thread could not be joined, so "
"shutdown may not be clean. Please call "
"Cluster.shutdown() to avoid this.")
log.debug("Event loop thread was joined")
def add_timer(self, timer):
self._timers.add_timer(timer)
# callFromThread to schedule from the loop thread, where
# the timeout task can safely be modified
reactor.callFromThread(self._schedule_timeout, timer.end)
def push(self, data):
"""
This function is called when outgoing data should be queued
for sending.
Note that we can't call transport.write() directly because
it is not thread-safe, so we schedule it to run from within
the event loop when it gets the chance.
"""
reactor.callFromThread(self.connector.transport.write, data)