def message_received(self, unwrapped_message):
"""
message is of type UnwrappedMessage
"""
delay = self._sys_rand.randint(0, self.max_delay)
action = start_action(
action_type=u"send delayed message",
delay=delay,
)
with action.context():
d = deferLater(self.reactor, delay, self.protocol.packet_proxy, unwrapped_message)
DeferredContext(d).addActionFinish()
self._pending_sends.add(d)
def _remove(res, d=d):
self._pending_sends.remove(d)
return res
d.addBoth(_remove)
python类deferLater()的实例源码
def sendEvents(self, events):
if not events:
return
self.state = PythonCollectionTask.STATE_SEND_EVENTS
if len(events) < 1:
return
# Default event fields.
for i, event in enumerate(events):
event.setdefault('device', self.configId)
event.setdefault('severity', ZenEventClasses.Info)
# On CTRL-C or exit the reactor might stop before we get to this
# call and generate a traceback.
if reactor.running:
#do in chunks of 100 to give time to reactor
self._eventService.sendEvent(event)
if i % 100:
yield task.deferLater(reactor, 0, lambda: None)
def message_received(self, unwrapped_message):
"""
message is of type UnwrappedMessage
"""
self._batch.append(unwrapped_message) # [(destination, sphinx_packet)
if len(self._batch) >= self.threshold_count:
delay = self._sys_rand.randint(0, self.max_delay)
action = start_action(
action_type=u"send delayed message batch",
delay=delay,
)
with action.context():
released = self._batch
self._batch = []
random.shuffle(released)
d = deferLater(self.reactor, delay, self.batch_send, released)
DeferredContext(d).addActionFinish()
self._pending_batch_sends.add(d)
def _remove(res, d=d):
self._pending_batch_sends.remove(d)
return res
d.addBoth(_remove)
def __init__(self, host=defaultHost, port=4400, redisPort=6379, neo4jPort=7474, initialKey=None):
#self.protocol = Peer(self)
self.host = host
self.port = port
self.users = {} # maps user names to Chat instances
self.redisPort = redisPort
self.neo4jPort = neo4jPort
if initialKey: # need test case
self.commandKeys.append(initialKey)
# self.redis.addToKeys(initialKey)
"""Add loops to factory? why not add loops to main reactor??"""
defly = task.deferLater(reactor, 10, self.ping)
defly.addErrback(whoops)
#reactor.callLater(2, redis_test)
#task.deferLater(reactor, 60, hiya).addCallback(lambda _: reactor.stop())
loop = task.LoopingCall(peerBeat)
loopDeferred = loop.start(15.0)
loopDeferred.addCallback(peerSuccess)
loopDeferred.addErrback(peerFailure)
# pylint: disable=no-self-use
def run_scan(state):
circuits = ExitScan(state)
url = 'https://check.torproject.org'
outfile = open("exit-addresses.%s.json" % datetime.datetime.utcnow().isoformat(), 'w+')
all_tasks_done = defer.Deferred()
tasks = []
def pop(circuits):
try:
tasks.append(task.deferLater(
reactor, 0, fetch, circuits.next(), url, state))
reactor.callLater(.2, pop, circuits)
except StopIteration:
results = defer.DeferredList(tasks)
results.addCallback(save_results, outfile)\
.addCallback(lambda _: outfile.close)\
.chainDeferred(all_tasks_done)
reactor.callLater(0, pop, circuits)
return all_tasks_done
def start_initial(game):
round_data, users_plots = get_round(game)
state = 'initial'
if round_data is None:
game.end_time = timezone.now()
game.save()
game.broadcast(action='redirect', url=reverse('interactive:exit'))
return
else:
cache.set(game.id, {'state': state,
'round_data': round_data,
'users_plots': users_plots,
})
initial(game, round_data, users_plots)
task.deferLater(reactor, 1, game_state_checker, game, state, round_data, users_plots).addErrback(twisted_error)
def throttled(func):
"""Decorator for AgentProxyMixIn.getTable to throttle requests"""
def _wrapper(*args, **kwargs):
self = args[0]
last_request = getattr(self, '_last_request')
delay = (last_request + self.throttle_delay) - time.time()
setattr(self, '_last_request', time.time())
if delay > 0:
_logger.debug("%sss delay due to throttling: %r", delay, self)
return deferLater(reactor, delay, func, *args, **kwargs)
else:
return func(*args, **kwargs)
return wraps(func)(_wrapper)
# pylint: disable=R0903
def test_callback(self):
"""
The L{Deferred} returned by L{task.deferLater} is called back after
the specified delay with the result of the function passed in.
"""
results = []
flag = object()
def callable(foo, bar):
results.append((foo, bar))
return flag
clock = task.Clock()
d = task.deferLater(clock, 3, callable, 'foo', bar='bar')
d.addCallback(self.assertIs, flag)
clock.advance(2)
self.assertEqual(results, [])
clock.advance(1)
self.assertEqual(results, [('foo', 'bar')])
return d
def test_cancel(self):
"""
The L{Deferred} returned by L{task.deferLater} can be
cancelled to prevent the call from actually being performed.
"""
called = []
clock = task.Clock()
d = task.deferLater(clock, 1, called.append, None)
d.cancel()
def cbCancelled(ignored):
# Make sure there are no calls outstanding.
self.assertEqual([], clock.getDelayedCalls())
# And make sure the call didn't somehow happen already.
self.assertFalse(called)
self.assertFailure(d, defer.CancelledError)
d.addCallback(cbCancelled)
return d
def stopListening(self):
"""
Stop accepting connections on this port.
This will shut down my socket and call self.connectionLost().
@return: A L{Deferred} that fires when this port has stopped.
"""
self.stopReading()
if self.disconnecting:
return self._stoppedDeferred
elif self.connected:
self._stoppedDeferred = task.deferLater(
self.reactor, 0, self.connectionLost)
self.disconnecting = True
return self._stoppedDeferred
else:
return defer.succeed(None)
def _loopbackAsyncContinue(ignored, server, serverToClient, client,
clientToServer, pumpPolicy):
# Clear the Deferred from each message queue, since it has already fired
# and cannot be used again.
clientToServer._notificationDeferred = None
serverToClient._notificationDeferred = None
# Schedule some more byte-pushing to happen. This isn't done
# synchronously because no actual transport can re-enter dataReceived as
# a result of calling write, and doing this synchronously could result
# in that.
from twisted.internet import reactor
return deferLater(
reactor, 0,
_loopbackAsyncBody,
server, serverToClient, client, clientToServer, pumpPolicy)
def parse(self, response):
webpage = response.webpage
frame = webpage.mainFrame()
name_input = frame.findFirstElement('#the-basics + div .well input')
name_input.setAttribute('value', "World")
# Trigger change event.
name_input.evaluateJavaScript("""
var event = document.createEvent("HTMLEvents");
event.initEvent("change", false, true);
this.dispatchEvent(event);
""")
# Let WebKit run.
yield deferLater(reactor, 0, lambda: None)
h1 = frame.findFirstElement('#the-basics + div .well h1')
text = h1.toPlainText()
returnValue([AngularJSHelloText(text=text)])
def _run_command(self, command):
base64_encoded_command = base64.encodestring('{0}\r\n'.format(command))
yield self._sender.send_request(
'send',
shell_id=self._shell_id,
command_id=self._command_id,
base64_encoded_command=base64_encoded_command)
stdout = []
stderr = []
for i in xrange(_MAX_REQUESTS_PER_COMMAND):
out, err = yield task.deferLater(
reactor, self._READ_DELAY, self._get_output)
stderr.extend(err)
if not out:
continue
stdout.extend(out[:-1])
if out[-1] == self._prompt:
break
stdout.append(out[-1])
else:
raise Exception("Reached max requests per command.")
defer.returnValue((stdout, stderr))
def _iterate(reactor, intervals, f):
"""
Run a function repeatedly.
:param reactor: See ``run_many_service``.
:return Deferred: A deferred which fires when ``f`` fails or when
``intervals`` is exhausted.
"""
while True:
before = reactor.seconds()
yield f()
after = reactor.seconds()
try:
interval = next(intervals)
except StopIteration:
break
delay = max(0, interval - (after - before))
yield deferLater(reactor, delay, lambda: None)
def loop_until_success(predicate, timeout=None, message="task"):
"""
Call predicate every second, until it fires a non-failed Deferred, or hits
the timeout.
:param predicate: Callable returning termination condition.
:type predicate: 0-argument callable returning a Deferred.
:return: A ``Deferred`` firing with the first non-failed Deferred from
``predicate``, or, if predicate didn't fire with non-``Failure``-y
thing within the timeout, returns the ``Failure``.
"""
d = maybeDeferred(predicate)
then = time.time()
def loop(failure):
if timeout and time.time() - then > timeout:
# propogate the failure
return failure
print "Retrying %s given result %r..." % (message, failure.getErrorMessage())
d = deferLater(reactor, 1.0, predicate)
d.addErrback(loop)
return d
d.addErrback(loop)
return d
def loop_until(predicate, timeout=None, message="task"):
"""
Call predicate every second, until it returns something ``Truthy``.
:param predicate: Callable returning termination condition.
:type predicate: 0-argument callable returning a Deferred.
:return: A ``Deferred`` firing with the first ``Truthy`` response from
``predicate``, or, if predicate didn't fire truthfully within the
timeout, raise TimeoutError().
"""
d = maybeDeferred(predicate)
then = time.time()
def loop(result):
if timeout and time.time() - then > timeout:
raise TimeoutError()
if not result:
print "Retrying %s given result %r..." % (message, result)
d = deferLater(reactor, 1.0, predicate)
d.addCallback(loop)
return d
return result
d.addCallback(loop)
return d
def test__handles_timeout(self):
node, power_info = yield deferToDatabase(
self.make_node_with_power_info)
def defer_way_later(*args, **kwargs):
# Create a defer that will finish in 1 minute.
return deferLater(reactor, 60 * 60, lambda: None)
rack_id = factory.make_name("system_id")
client = Mock()
client.ident = rack_id
client.side_effect = defer_way_later
self.patch(power_module, "getAllClients").return_value = [client]
power_state, success_racks, failed_racks = yield power_query_all(
node.system_id, node.hostname, power_info, timeout=0.5)
self.assertEqual(POWER_STATE.UNKNOWN, power_state)
self.assertItemsEqual([], success_racks)
self.assertItemsEqual([rack_id], failed_racks)
def test__handles_timeout(self):
def defer_way_later(*args, **kwargs):
# Create a defer that will finish in 1 minute.
return deferLater(reactor, 60 * 60, lambda: None)
rack_id = factory.make_name("system_id")
client = Mock()
client.ident = rack_id
client.side_effect = defer_way_later
self.patch(pods_module, "getAllClients").return_value = [client]
discovered = yield discover_pod(
factory.make_name("pod"), {}, timeout=0.5)
self.assertThat(discovered[0], Equals({}))
self.assertThat(discovered[1], MatchesDict({
rack_id: IsInstance(CancelledError),
}))
def check_aggregator(self):
'''
If the aggregator is live, but isn't getting events, log a diagnostic
warning.
This function is sometimes called using deferLater, so any exceptions
will be handled by errorCallback.
'''
if (self.aggregator is not None and not self.is_aggregator_pending and
self.expected_aggregator_start_time is not None and
self.expected_aggregator_start_time < time()):
aggregator_live_time = time() - self.expected_aggregator_start_time
flag_message = "Is your relay in the Tor consensus?"
flag_list = self.get_flag_list()
if len(flag_list) > 0:
flag_message = "Consensus flags: {}".format(" ".join(flag_list))
if self.are_dc_events_expected():
log_fn = logging.warning
else:
log_fn = logging.info
if ((self.aggregator.protocol is None or
self.aggregator.protocol.state != "processing") and
aggregator_live_time > EXPECTED_CONTROL_ESTABLISH_MAX):
logging.warning("Aggregator has been running {}, but is not connected to the control port. Is your control port working?"
.format(format_elapsed_time_since(
self.expected_aggregator_start_time,
'since')))
elif (self.aggregator.last_event_time is None and
aggregator_live_time > EXPECTED_EVENT_INTERVAL_MAX):
log_fn("Aggregator has been running {}, but has not seen a tor event. {}"
.format(format_elapsed_time_since(
self.expected_aggregator_start_time,
'since'),
flag_message))
elif (self.aggregator.last_event_time is not None and
self.aggregator.last_event_time < time() - EXPECTED_EVENT_INTERVAL_MAX):
log_fn("Aggregator has not received any events recently, {}. {}"
.format(format_last_event_time_since(
self.aggregator.last_event_time),
flag_message))
def _start_aggregator_deferred(self):
'''
This function is called using deferLater, so any exceptions will be
handled by errorCallback.
'''
if self.is_aggregator_pending:
self.is_aggregator_pending = False
self.aggregator.start()
# schedule a once-off check that the aggregator has connected
check_aggregator_deferred = task.deferLater(
reactor,
EXPECTED_CONTROL_ESTABLISH_MAX + 1.0,
self.check_aggregator)
check_aggregator_deferred.addErrback(errorCallback)
def _flush_later(self, msg):
'''
This function is called using deferLater, so any exceptions will be
handled by errorCallback.
'''
self._flush_now(msg)
self._inject_events()
def close(self):
"""
close all http connections.
returns a deferred that fires once they're all closed.
"""
def validate_client(client):
"""
Validate that the connection is for the current client
:param client:
:return:
"""
host, port = client.addr
parsed_url = urlparse(self._hostname)
return host == parsed_url.hostname and port == parsed_url.port
# read https://github.com/twisted/treq/issues/86
# to understand the following...
def _check_fds(_):
fds = set(reactor.getReaders() + reactor.getReaders())
if not [fd for fd in fds if isinstance(fd, Client) and validate_client(fd)]:
return
return deferLater(reactor, 0, _check_fds, None)
pool = self._async_http_client_params["pool"]
return pool.closeCachedConnections().addBoth(_check_fds)
def tick_fine(self):
self.time_wheel_fine.moveNext()
self.tick_fine_d = task.deferLater(
self.reactor, Config.TIMER_FINE_GRANULARITY, self.tick_fine)
self.tick_fine_d.addBoth(Utils.nop)
def tick_coarse(self):
self.time_wheel_coarse.moveNext()
self.tick_coarse_d = task.deferLater(
self.reactor, Config.TIMER_COARSE_GRANULARITY, self.tick_coarse)
self.tick_coarse_d.addBoth(Utils.nop)
def send_key(self):
"""
Send user's key to provider.
Public key bound to user's is sent to provider, which will
replace any prior keys for the same address in its database.
:return: A Deferred which fires when the key is sent, or which fails
with KeyNotFound if the key was not found in local database.
:rtype: Deferred
:raise UnsupportedKeyTypeError: if invalid key type
"""
if not self.token:
self.log.debug(
'Token not available, scheduling '
'a new key sending attempt...')
yield task.deferLater(reactor, 5, self.send_key)
self.log.info('Sending public key to server')
key = yield self.get_key(self._address, fetch_remote=False)
yield self._nicknym.put_key(self.uid, key.key_data,
self._api_uri, self._api_version)
emit_async(catalog.KEYMANAGER_DONE_UPLOADING_KEYS,
self._address)
self.log.info('Key sent to server')
defer.returnValue(key)
def retry_connect(self):
with self._lock:
if 'connection' not in self._in_retry or not self._in_retry['connection']:
self.conn_retry_interval += 2
log.err("Connection Closed! retry connecting in %s seconds..." % self.conn_retry_interval)
self._in_retry['connection'] = True
d = task.deferLater(reactor, self.conn_retry_interval, self.do_connect)
d.addErrback(self.failed)
def startService(self):
service.Service.startService(self)
self._connector = self._connectMethod(*self._args, factory=self._factory, **self._kwargs)
def waitForConnect():
if self._connector.state == 'connected':
log.msg('Starting child services now.', level=logging.DEBUG)
# noinspection PyTypeChecker
for svc in self:
svc.startService()
else:
from twisted.internet import reactor
task.deferLater(reactor, 1, waitForConnect)
waitForConnect()
def render_GET(self, request):
session = request.getSession()
session.touch()
log.debug("HTTP Request from %s:%s (%s) to %s", request.client.host, request.client.port, session.uid, request.uri)
if not self.sessions.has_key(session.uid):
log.info("New Client Session: %s" % session.uid)
session._expireCall.cancel()
session.sessionTimeout = HTTP_SESSION_TIMEOUT
session.startCheckingExpiration()
session.notifyOnExpire(self._expireSession)
session.updates = []
session.isAuthenticated = not self.monast.authRequired
session.username = None
self.sessions[session.uid] = session
if not session.isAuthenticated and request.path != "/doAuthentication":
return "ERROR :: Authentication Required"
handler = self.handlers.get(request.path)
if handler:
d = task.deferLater(reactor, 0.1, lambda: request)
d.addCallback(handler)
d.addErrback(self._onRequestFailure, request)
return TWebServer.NOT_DONE_YET
return "ERROR :: Request Not Found"
def test_usr1_rotates_logs(self):
"""
SIGUSR1 should cause logs to be reopened.
"""
logging.getLogger().addHandler(logging.FileHandler(self.makeFile()))
# Store the initial set of handlers
original_streams = [handler.stream for handler in
logging.getLogger().handlers if
isinstance(handler, logging.FileHandler)]
# Instantiating LandscapeService should register the handler
TestService(self.config)
# We'll call it directly
handler = signal.getsignal(signal.SIGUSR1)
self.assertTrue(handler)
handler(None, None)
def check(ign):
new_streams = [handler.stream for handler in
logging.getLogger().handlers if
isinstance(handler, logging.FileHandler)]
for stream in new_streams:
self.assertTrue(stream not in original_streams)
# We need to give some room for the callFromThread to run
d = deferLater(reactor, 0, lambda: None)
return d.addCallback(check)
test_client_protocol.py 文件源码
项目:joinmarket-clientserver
作者: JoinMarket-Org
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_waiter(self):
print("test_main()")
return task.deferLater(reactor, 3, self._called_by_deffered)