def __init__(self, *a):
Output.__init__(self, *a)
self.timer = task.LoopingCall(self.tick)
self.inter = float(self.config.get('interval', 1.0)) # tick interval
self.pressure = int(self.config.get('pressure', -1))
self.maxsize = int(self.config.get('maxsize', 250000))
self.expire = self.config.get('expire', False)
self.allow_nan = self.config.get('allow_nan', True)
self.factory = None
self.connector = None
maxrate = int(self.config.get('maxrate', 0))
if maxrate > 0:
self.queueDepth = int(maxrate * self.inter)
else:
self.queueDepth = None
self.tls = self.config.get('tls', False)
if self.tls:
self.cert = self.config['cert']
self.key = self.config['key']
python类LoopingCall()的实例源码
def __init__(self, *a):
Output.__init__(self, *a)
self.events = []
self.timer = task.LoopingCall(self.tick)
self.inter = float(self.config.get('interval', 1.0)) # tick interval
self.maxsize = int(self.config.get('maxsize', 250000))
self.user = self.config.get('user')
self.password = self.config.get('password')
self.url = self.config.get('url', 'http://localhost:9200')
maxrate = int(self.config.get('maxrate', 100))
self.index = self.config.get('index', 'duct-%Y.%m.%d')
self.client = None
if maxrate > 0:
self.queueDepth = int(maxrate * self.inter)
else:
self.queueDepth = None
def _playlist_updated(self, pl):
if pl.has_programs():
# if we got a program playlist, save it and start a program
self._program_playlist = pl
(program_url, _) = pl.get_program_playlist(self.program, self.bitrate)
l = HLS.make_url(self.url, program_url)
return self._reload_playlist(M3U8(l))
elif pl.has_files():
# we got sequence playlist, start reloading it regularly, and get files
self._file_playlist = pl
if not self._files:
self._files = pl.iter_files()
if not pl.endlist():
if not self._pl_task:
self._pl_task = task.LoopingCall(self._reload_playlist, pl)
self._pl_task.start(10, False)
if self._file_playlisted:
self._file_playlisted.callback(pl)
self._file_playlisted = None
else:
raise
return pl
def stats_setup(self):
self.stats_snapshot_previous = None
self.interval_histogram = HdrHistogram(1, 10000000, 3)
self.ttl_histogram = HdrHistogram(1, 10000000, 3)
self.msglag_histogram = HdrHistogram(1, 10000000, 3)
namespace = "stats_{0!s}_{1!s}_depth_{2!s}".format(platform.node(),
self.SenderCompID,
self.config['market_depth'])
filename = os.path.join(self.stats_dir,
namespace+datetime.strftime(datetime.utcnow(), "_%Y%m%d%H%M%S")+'.log')
self._stats_logger = Logger(
observer=passThroughFileLogObserver(io.open(filename, "a")),
namespace='')
self.stats_loop = task.LoopingCall(self.log_stats)
self.stats_loop.start(self.stats_interval)
def do_checkin(self):
'''
Called by protocol
Refresh the config, and try to connect to the server
This function is usually called using LoopingCall, so any exceptions
will be turned into log messages.
'''
# TODO: Refactor common client code - issue #121
self.refresh_config()
self.check_aggregator()
ts_ip = self.config['tally_server_info']['ip']
ts_port = self.config['tally_server_info']['port']
# turn on reconnecting mode and reset backoff
self.resetDelay()
logging.info("checking in with TallyServer at {}:{}".format(ts_ip, ts_port))
reactor.connectSSL(ts_ip, ts_port, self, ssl.ClientContextFactory()) # pylint: disable=E1101
def start(self):
'''
start the aggregator, and connect to the control port
'''
# This call can return a list of connectors, or a single connector
self.connector_list = connect(self, self.tor_control_port)
# Twisted doesn't want a list of connectors, it only wants one
self.connector = choose_a_connection(self.connector_list)
self.rotator = task.LoopingCall(self._do_rotate)
rotator_deferred = self.rotator.start(self.rotate_period, now=False)
rotator_deferred.addErrback(errorCallback)
# if we've already built the protocol before starting
if self.protocol is not None:
self.protocol.startCollection(self.collection_counters)
def startService(self):
service.Service.startService(self)
callable, args, kwargs = self.call
# we have to make a new LoopingCall each time we're started, because
# an active LoopingCall remains active when serialized. If
# LoopingCall were a _VolatileDataService, we wouldn't need to do
# this.
self._loop = task.LoopingCall(callable, *args, **kwargs)
self._loop.start(self.step, now=True).addErrback(self._failed)
def _failed(self, why):
# make a note that the LoopingCall is no longer looping, so we don't
# try to shut it down a second time in stopService. I think this
# should be in LoopingCall. -warner
self._loop.running = False
log.err(why)
def __init__(self, conn):
self.conn = conn
self.globalTimeout = None
self.lc = task.LoopingCall(self.sendGlobal)
self.lc.start(300)
def __init__(self, site, uid):
"""Initialize a session with a unique ID for that session.
"""
components.Componentized.__init__(self)
self.site = site
self.uid = uid
self.expireCallbacks = []
self.checkExpiredLoop = task.LoopingCall(self.checkExpired)
self.touch()
self.sessionNamespaces = {}
def install(widget, ms=10, reactor=None):
"""Install a Tkinter.Tk() object into the reactor."""
installTkFunctions()
global _task
_task = task.LoopingCall(widget.update)
_task.start(ms / 1000.0, False)
def testBadDelay(self):
lc = task.LoopingCall(lambda: None)
self.assertRaises(ValueError, lc.start, -1)
# Make sure that LoopingCall.stop() prevents any subsequent calls.
def testFailure(self):
def foo(x):
raise TestException(x)
lc = task.LoopingCall(foo, "bar")
return self.assertFailure(lc.start(0.1), TestException)
def testFailAndStop(self):
def foo(x):
lc.stop()
raise TestException(x)
lc = task.LoopingCall(foo, "bar")
return self.assertFailure(lc.start(0.1), TestException)
def testEveryIteration(self):
ran = []
def foo():
ran.append(None)
if len(ran) > 5:
lc.stop()
lc = task.LoopingCall(foo)
d = lc.start(0)
def stopped(ign):
self.assertEquals(len(ran), 6)
return d.addCallback(stopped)
def testStopAtOnceLater(self):
# Ensure that even when LoopingCall.stop() is called from a
# reactor callback, it still prevents any subsequent calls.
d = defer.Deferred()
def foo():
d.errback(failure.DefaultException(
"This task also should never get called."))
self._lc = task.LoopingCall(foo)
self._lc.start(1, now=False)
reactor.callLater(0, self._callback_for_testStopAtOnceLater, d)
return d
def startService(self):
service.Service.startService(self)
self.calls = [task.LoopingCall(d.transfer) for d in self.domains]
i = 0
from twisted.internet import reactor
for c in self.calls:
# XXX Add errbacks, respect proper timeouts
reactor.callLater(i, c.start, 60 * 60)
i += 1
def __init__(self, sc, bridge_nick, nickserv_pw, slack_uid, channels,
user_bots):
self.sc = sc
self.user_bots = user_bots
self.nickserv_password = nickserv_pw
self.slack_uid = slack_uid
self.users = {bot.user_id: bot for bot in user_bots}
self.channels = {channel['id']: channel for channel in channels}
self.channel_name_uid_map = {channel['name']: channel['id']
for channel in channels}
self.nickname = bridge_nick
self.message_queue = queue.PriorityQueue()
self.rtm_connect()
log.msg('Connected successfully to Slack RTM')
# Create a looping call to poll Slack for updates
rtm_loop = LoopingCall(self.check_slack_rtm)
# Slack's rate limit is 1 request per second, so set this to something
# greater than or equal to that to avoid problems
rtm_loop.start(1)
# Create another looping call which acts on messages in the queue
message_loop = LoopingCall(self.empty_queue)
message_loop.start(0.5)
def __init__(self, port=None, interface=b'*',
update_interval=1 / 60.0):
if port is not None and interface is not None:
address = enet.Address(interface, port)
else:
address = None
self.host = enet.Host(address, self.max_connections, 1)
self.host.compress_with_range_coder()
self.update_loop = LoopingCall(self.update)
self.update_loop.start(update_interval, False)
self.connections = {}
self.clients = {}
def update(self):
try:
while 1:
if self.host is None:
return
try:
event = self.host.service(0)
except IOError:
break
if event is None:
break
event_type = event.type
if event_type == enet.EVENT_TYPE_NONE:
break
peer = event.peer
is_client = peer in self.clients
if is_client:
connection = self.clients[peer]
if event_type == enet.EVENT_TYPE_CONNECT:
connection.on_connect()
connection.timeout_call.cancel()
elif event_type == enet.EVENT_TYPE_DISCONNECT:
connection.on_disconnect()
del self.clients[peer]
self.check_client()
elif event.type == enet.EVENT_TYPE_RECEIVE:
connection.loader_received(event.packet)
else:
if event_type == enet.EVENT_TYPE_CONNECT:
self.on_connect(peer)
elif event_type == enet.EVENT_TYPE_DISCONNECT:
self.on_disconnect(peer)
elif event.type == enet.EVENT_TYPE_RECEIVE:
self.data_received(peer, event.packet)
except:
# make sure the LoopingCall doesn't catch this and stops
import traceback
traceback.print_exc()