def testWakerOverflow(self):
self.failure = None
waiter = threading.Event()
def threadedFunction():
# Hopefully a hundred thousand queued calls is enough to
# trigger the error condition
for i in xrange(100000):
try:
reactor.callFromThread(lambda: None)
except:
self.failure = failure.Failure()
break
waiter.set()
reactor.callInThread(threadedFunction)
waiter.wait(120)
if not waiter.isSet():
self.fail("Timed out waiting for event")
if self.failure is not None:
return defer.fail(self.failure)
python类callInThread()的实例源码
def testWakerOverflow(self):
self.failure = None
waiter = threading.Event()
def threadedFunction():
# Hopefully a hundred thousand queued calls is enough to
# trigger the error condition
for i in xrange(100000):
try:
reactor.callFromThread(lambda: None)
except:
self.failure = failure.Failure()
break
waiter.set()
reactor.callInThread(threadedFunction)
waiter.wait(120)
if not waiter.isSet():
self.fail("Timed out waiting for event")
if self.failure is not None:
return defer.fail(self.failure)
def sendResult(self, res, utt_idx, session_id):
utt_id = "utt-%s-%d" % (session_id, utt_idx)
# Update the preview doc
doc = self.db[utt_id]
#print 'got utt alignment'
doc[res["type"] + "_words"] = res["words"]
#del doc[res["type"]]
if 'duration' in res:
doc['duration'] = res['duration']
# having a duration also implies that the wave file is
# ready; import to the database
reactor.callInThread(self._put_attachment, doc)
else:
self.db.onchange(None, {"type": "change",
"id": utt_id,
"doc": doc})
# make sure "start" time is set on utterances
self.ensure_start_times(session_id)
self.check_pending_audio_commands()
def onchange(self, sender, change_doc):
update = False
if change_doc.get("doc", {}).get("type") == "command":
# Save kaldi-sequence from the text
seq = metasentence.MetaSentence(change_doc["doc"].get("text", ""), vocab).get_kaldi_sequence()
change_doc["doc"]["_ks"] = seq
self._command_seqs[change_doc["id"]] = seq
# Set "sender" to None so that all peers get a change update
sender = None
update = True
elif change_doc["type"] == 'delete' and change_doc["id"] in self._command_seqs:
del self._command_seqs[change_doc["id"]]
update = True
elif change_doc.get("doc", {}).get("type") == "audio-command":
print 'got new audio command', change_doc['doc']
self._pending_audio_commands.append(change_doc["doc"])
self.subdir_resources['factory'].check_pending_audio_commands()
minidb.DBFactory.onchange(self, sender, change_doc)
if update:
self.create_language_model()
reactor.callInThread(
self.subdir_resources['factory'].re_run_everything)
def onupload(self, upl):
session_id = upl['path']
# Start a session, if it seems to be a valid media file
# XXX: this list is arbitrary; use MIME or smth
if not session_id.split('.')[-1] in ['mp3', 'mp4', 'm4v', 'wav', 'aac', 'm4a', 'mkv', 'ogg', 'ogv', 'flac']:
return
self.db.onchange(None, {"type": "change",
"id": session_id,
"doc": {
"_id": session_id,
"type": "session",
"peer": upl['filename'],
"filename": upl['filename'],
"s_time": time.time(),
}})
reactor.callInThread(self._process_upload, upl, session_id)
def start_packet_out_stream(self):
def packet_generator():
while 1:
try:
packet = self.packet_out_queue.get(block=True, timeout=1.0)
except Empty:
if self.stopped:
return
else:
yield packet
def stream_packets_out():
generator = packet_generator()
try:
self.local_stub.StreamPacketsOut(generator)
except _Rendezvous, e:
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
reactor.callInThread(stream_packets_out)
def start_packet_in_stream(self):
def receive_packet_in_stream():
streaming_rpc_method = self.local_stub.ReceivePacketsIn
iterator = streaming_rpc_method(empty_pb2.Empty())
try:
for packet_in in iterator:
reactor.callFromThread(self.packet_in_queue.put,
packet_in)
log.debug('enqued-packet-in',
packet_in=packet_in,
queue_len=len(self.packet_in_queue.pending))
except _Rendezvous, e:
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
reactor.callInThread(receive_packet_in_stream)
def start_change_event_in_stream(self):
def receive_change_events():
streaming_rpc_method = self.local_stub.ReceiveChangeEvents
iterator = streaming_rpc_method(empty_pb2.Empty())
try:
for event in iterator:
reactor.callFromThread(self.change_event_queue.put, event)
log.debug('enqued-change-event',
change_event=event,
queue_len=len(self.change_event_queue.pending))
except _Rendezvous, e:
if e.code() == StatusCode.UNAVAILABLE:
os.system("kill -15 {}".format(os.getpid()))
reactor.callInThread(receive_change_events)
def start(self):
if self.running:
return
log.debug('starting')
self.running = True
# Start monitoring the vcore grpc channel
reactor.callInThread(self.monitor_vcore_grpc_channel)
# Start monitoring logical devices and manage agents accordingly
reactor.callLater(0, self.monitor_logical_devices)
log.info('started')
return self
def test_callInThread(self):
"""
Test callInThread functionality: set a C{threading.Event}, and check
that it's not in the main thread.
"""
def cb(ign):
waiter = threading.Event()
result = []
def threadedFunc():
result.append(threadable.isInIOThread())
waiter.set()
reactor.callInThread(threadedFunc)
waiter.wait(120)
if not waiter.isSet():
self.fail("Timed out waiting for event.")
else:
self.assertEqual(result, [False])
return self._waitForThread().addCallback(cb)
def test_callFromThread(self):
"""
Test callFromThread functionality: from the main thread, and from
another thread.
"""
def cb(ign):
firedByReactorThread = defer.Deferred()
firedByOtherThread = defer.Deferred()
def threadedFunc():
reactor.callFromThread(firedByOtherThread.callback, None)
reactor.callInThread(threadedFunc)
reactor.callFromThread(firedByReactorThread.callback, None)
return defer.DeferredList(
[firedByReactorThread, firedByOtherThread],
fireOnOneErrback=True)
return self._waitForThread().addCallback(cb)
def test_wakerOverflow(self):
"""
Try to make an overflow on the reactor waker using callFromThread.
"""
def cb(ign):
self.failure = None
waiter = threading.Event()
def threadedFunction():
# Hopefully a hundred thousand queued calls is enough to
# trigger the error condition
for i in xrange(100000):
try:
reactor.callFromThread(lambda: None)
except:
self.failure = failure.Failure()
break
waiter.set()
reactor.callInThread(threadedFunction)
waiter.wait(120)
if not waiter.isSet():
self.fail("Timed out waiting for event")
if self.failure is not None:
return defer.fail(self.failure)
return self._waitForThread().addCallback(cb)
def render_POST(self, request):
"""
Handle a request from the client.
"""
script_env = {
method: api_method(request, method)
for method in request.sdata.api.fns
}
# Make get do auto-formatting for convenience, even though this
# breaks if you try to use literal '{}' named arguments
# @@@ reconsider whether this is at all a good idea
def get_with_formatting(path, *args):
return api_method(request, 'get')(path.format(*args))
script_env['get'] = get_with_formatting
script_env['re'] = re
script_env['dumps'] = dumps
script_env['defaultdict'] = defaultdict
script_env['OrderedDict'] = OrderedDict
buf = []
def dummy_print(*args):
if len(args) == 1 and (isinstance(args[0], list) or isinstance(args[0], dict)):
buf.append(dumps(args[0], indent=4))
else:
buf.append(' '.join(map(str, args)))
script_env['print'] = dummy_print
def run_script(script):
try:
exec script in script_env
except:
exception_info = sys.exc_info()
buf.extend(traceback.format_exception(*exception_info))
request.sdata.log('got reply {}'.format(buf))
request.sdata.add_to_push_queue('script', text=dumps(buf))
script = request.args['script'][0]
reactor.callInThread(run_script, script)
def deferToThread(f, *args, **kwargs):
"""Run function in thread and return result as Deferred."""
d = defer.Deferred()
from twisted.internet import reactor
reactor.callInThread(_putResultInDeferred, d, f, args, kwargs)
return d
def callMultipleInThread(tupleList):
"""Run a list of functions in the same thread.
tupleList should be a list of (function, argsList, kwargsDict) tuples.
"""
from twisted.internet import reactor
reactor.callInThread(_runMultiple, tupleList)
def printResult(self):
print
print
print "callFromThread latency:"
sum = 0
for t in self.from_times: sum += t
print "%f millisecond" % ((sum / self.numRounds) * 1000)
print "callInThread latency:"
sum = 0
for t in self.in_times: sum += t
print "%f millisecond" % ((sum / self.numRounds) * 1000)
print
print
def testCallFromThread(self):
for i in range(self.numRounds):
reactor.callInThread(self.tcmf_2, time.time())
self.wait()
assert len(self.in_times) == len(self.from_times)
assert len(self.in_times) == self.numRounds
self.printResult()
def testCallInThread(self):
waiter = threading.Event()
result = []
def threadedFunc():
result.append(threadable.isInIOThread())
waiter.set()
reactor.callInThread(threadedFunc)
waiter.wait(120)
if not waiter.isSet():
self.fail("Timed out waiting for event.")
else:
self.assertEquals(result, [False])
def testWakeUp(self):
# Make sure other threads can wake up the reactor
d = Deferred()
def wake():
time.sleep(0.1)
# callFromThread will call wakeUp for us
reactor.callFromThread(d.callback, None)
reactor.callInThread(wake)
return d
def AskForMoreBlocks(self):
reactor.callInThread(self.DoAskForMoreBlocks)
def deferToThread(f, *args, **kwargs):
"""Run function in thread and return result as Deferred."""
d = defer.Deferred()
from twisted.internet import reactor
reactor.callInThread(_putResultInDeferred, d, f, args, kwargs)
return d
def callMultipleInThread(tupleList):
"""Run a list of functions in the same thread.
tupleList should be a list of (function, argsList, kwargsDict) tuples.
"""
from twisted.internet import reactor
reactor.callInThread(_runMultiple, tupleList)
def printResult(self):
print
print
print "callFromThread latency:"
sum = 0
for t in self.from_times: sum += t
print "%f millisecond" % ((sum / self.numRounds) * 1000)
print "callInThread latency:"
sum = 0
for t in self.in_times: sum += t
print "%f millisecond" % ((sum / self.numRounds) * 1000)
print
print
def testCallFromThread(self):
for i in range(self.numRounds):
reactor.callInThread(self.tcmf_2, time.time())
self.wait()
assert len(self.in_times) == len(self.from_times)
assert len(self.in_times) == self.numRounds
self.printResult()
def testCallInThread(self):
waiter = threading.Event()
result = []
def threadedFunc():
result.append(threadable.isInIOThread())
waiter.set()
reactor.callInThread(threadedFunc)
waiter.wait(120)
if not waiter.isSet():
self.fail("Timed out waiting for event.")
else:
self.assertEquals(result, [False])
def testWakeUp(self):
# Make sure other threads can wake up the reactor
d = Deferred()
def wake():
time.sleep(0.1)
# callFromThread will call wakeUp for us
reactor.callFromThread(d.callback, None)
reactor.callInThread(wake)
return d
def deferToThreadInReactor(reactor, f, *args, **kwargs):
"""
Run function in thread and return result as Deferred.
"""
d = defer.Deferred()
reactor.callInThread(_putResultInDeferred, reactor, d, f, args, kwargs)
return d
# uses its own reactor for the threaded calls, unlike Twisted's
def notify_listeners(self, packet):
"""
Send data to all listeners.
:param data: the data to send to all listeners.
"""
for listener in self._listeners:
if listener.use_main_thread:
blockingCallFromThread(reactor, self._deliver_later, listener, packet)
elif reactor.running:
reactor.callInThread(self._deliver_later, listener, packet)
def send(self, socket_address, packet):
if not self.is_open():
return
if reactor.running and socket_address in internet:
reactor.callInThread(internet[socket_address].notify_listeners, (self.wan_address, packet))
else:
raise AssertionError("Received data from unregistered address %s" % repr(socket_address))
def render_GET(self, req):
reactor.callInThread(self._zip, req)
return NOT_DONE_YET