def push_to_member(self, member: Member, ignore_for_statistics=False) -> None:
"""Push to the specified member."""
bptc.logger.debug('Push to {}... ({}, {})'.format(member.verify_key[:6], member.address.host, member.address.port))
with self.hashgraph.lock:
data_string = self.generate_data_string(self.hashgraph.me,
self.hashgraph.get_unknown_events_of(member),
filter_members_with_address(self.hashgraph.known_members.values()))
if not ignore_for_statistics:
factory = PushClientFactory(data_string, network=self, receiver=member)
else:
factory = PushClientFactory(data_string, network=None, receiver=member)
def push():
if member.address is not None:
reactor.connectTCP(member.address.host, member.address.port, factory)
threads.blockingCallFromThread(reactor, push)
python类blockingCallFromThread()的实例源码
def api_method(request, method):
"""
Utility function to create a synchronous wrapper for the JSON-RPC API.
"""
def method_fn(*args, **kwargs):
fn = getattr(request.sdata.api, method)
res = threads.blockingCallFromThread(reactor, fn, *args, **kwargs)
try:
ret = res['result']
ret = m2mstr_object_hook(ret)
except KeyError:
ret = res['error']
return ret
return method_fn
def push_to(self, ip, port) -> None:
"""Push to the specified network address."""
with self.hashgraph.lock:
data_string = self.generate_data_string(self.hashgraph.me,
self.hashgraph.lookup_table,
filter_members_with_address(self.hashgraph.known_members.values()))
factory = PushClientFactory(data_string, network=self)
def push():
reactor.connectTCP(ip, port, factory)
threads.blockingCallFromThread(reactor, push)
def single_pull(self, ip_text_input, port_text_input):
"""Trigger the reactor to pull from the specified client."""
ip = ip_text_input.value
port = int(port_text_input.value)
factory = PullClientFactory(self, doc, ready_event)
threads.blockingCallFromThread(reactor, partial(reactor.connectTCP, ip, port, factory))
def run(self):
while not self.stopped():
ready_event.wait()
ready_event.clear()
print('Try to connect...')
threads.blockingCallFromThread(reactor, partial(reactor.connectTCP, self.ip, self.port, self.factory))
sleep(2.0)
def maybeblockingCallFromThread(self, callable, *args, **kwargs):
"""
Call callable from the reactor thread. If we are in the reactor thread, then call it and return a Deferred.
If we are *not* in the reactor thread, then block on that deferred instal of returning it
"""
# if we're in the reactor thread then just call it
if self.in_reactor_thread():
return callable(*args, **kwargs)
else:
return twisted_threads.blockingCallFromThread(self._reactor, lambda: callable(*args, **kwargs))
def _resolveQuery(self, session, objects, query):
"""Resolve a L{Query}.
@param session: The L{FluidinfoSession} for the request.
@param objects: The L{SecureObjectAPI} to use to fetch object IDs.
@param query: The L{Query} to resolve.
@return: A C{list} of object ID C{str}s that match the query.
"""
try:
result = objects.search([query])
except UnknownPathError as error:
session.log.exception(error)
unknownPath = error.paths[0]
raise TNonexistentTag(unknownPath.encode('utf-8'))
except PermissionDeniedError as error:
session.log.exception(error)
deniedPath, operation = error.pathsAndOperations[0]
raise TNonexistentTag(deniedPath)
try:
with session.timer.track('index-search'):
result = blockingCallFromThread(reactor, result.get)
except SearchError as error:
session.log.exception(error)
raise TParseError(query, error.message)
return result[query]
def _execute(self):
"""
Callback fired when the associated event is set. Run the C{action}
callback on the wrapped descriptor in the main reactor thread and raise
or return whatever it raises or returns to cause this event handler to
be removed from C{self._reactor} if appropriate.
"""
return blockingCallFromThread(
self._reactor, lambda: getattr(self._fd, self._action)())
def _testBlockingCallFromThread(self, reactorFunc):
"""
Utility method to test L{threads.blockingCallFromThread}.
"""
waiter = threading.Event()
results = []
errors = []
def cb1(ign):
def threadedFunc():
try:
r = threads.blockingCallFromThread(reactor, reactorFunc)
except Exception as e:
errors.append(e)
else:
results.append(r)
waiter.set()
reactor.callInThread(threadedFunc)
return threads.deferToThread(waiter.wait, self.getTimeout())
def cb2(ign):
if not waiter.isSet():
self.fail("Timed out waiting for event")
return results, errors
return self._waitForThread().addCallback(cb1).addBoth(cb2)
def test_blockingCallFromThread(self):
"""
Test blockingCallFromThread facility: create a thread, call a function
in the reactor using L{threads.blockingCallFromThread}, and verify the
result returned.
"""
def reactorFunc():
return defer.succeed("foo")
def cb(res):
self.assertEqual(res[0][0], "foo")
return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
def test_asyncBlockingCallFromThread(self):
"""
Test blockingCallFromThread as above, but be sure the resulting
Deferred is not already fired.
"""
def reactorFunc():
d = defer.Deferred()
reactor.callLater(0.1, d.callback, "egg")
return d
def cb(res):
self.assertEqual(res[0][0], "egg")
return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
def test_asyncErrorBlockingCallFromThread(self):
"""
Test error report for blockingCallFromThread as above, but be sure the
resulting Deferred is not already fired.
"""
def reactorFunc():
d = defer.Deferred()
reactor.callLater(0.1, d.errback, RuntimeError("spam"))
return d
def cb(res):
self.assertIsInstance(res[1][0], RuntimeError)
self.assertEqual(res[1][0].args[0], "spam")
return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
def default(self, line):
response = threads.blockingCallFromThread(
reactor, self._run_command, line)
print '\n'.join(response.stdout)
print >>sys.stderr, '\n'.join(response.stderr)
def captureException(self, **extra):
kwargs = self.ravenCaptureArguments(**extra)
exc_info = sys.exc_info()
return blockingCallFromThread(
self.reactor, self.client.captureException, exc_info, **kwargs)
def captureMessage(self, message, **extra):
kwargs = self.ravenCaptureArguments(**extra)
return blockingCallFromThread(
self.reactor, self.client.captureMessage, message, **kwargs)
def write(self, data):
"""
The WSGI I{write} callable returned by the I{start_response} callable.
The given bytes will be written to the response body, possibly flushing
the status and headers first.
This will be called in a non-I/O thread.
"""
# PEP-3333 states:
#
# The server or gateway must transmit the yielded bytestrings to the
# client in an unbuffered fashion, completing the transmission of
# each bytestring before requesting another one.
#
# This write() method is used for the imperative and (indirectly) for
# the more familiar iterable-of-bytestrings WSGI mechanism. It uses
# C{blockingCallFromThread} to schedule writes. This allows exceptions
# to propagate up from the underlying HTTP implementation. However,
# that underlying implementation does not, as yet, provide any way to
# know if the written data has been transmitted, so this method
# violates the above part of PEP-3333.
#
# PEP-3333 also says that a server may:
#
# Use a different thread to ensure that the block continues to be
# transmitted while the application produces the next block.
#
# Which suggests that this is actually compliant with PEP-3333,
# because writes are done in the reactor thread.
#
# However, providing some back-pressure may nevertheless be a Good
# Thing at some point in the future.
def wsgiWrite(started):
if not started:
self._sendResponseHeaders()
self.request.write(data)
try:
return blockingCallFromThread(
self.reactor, wsgiWrite, self.started)
finally:
self.started = True