def _process(self):
try:
while self.consumers != [] and self.queue != []:
d = self.consumers.pop(0)
obj = self.queue.pop(0)
dt = threads.deferToThread(self._process_in_thread, d, obj)
#reactor.callInThread(self._process_in_thread, d, obj)
except Exception, e:
print str(e)
python类callInThread()的实例源码
def visitLink(self, uid, url):
reactor.callInThread(self.spawnModerator, uid, url)
def adminRespond(self, user):
reactor.callInThread(self.spawnAdmin, user)
def gotProtocol(self, p):
log.info('gotProtocol, connecting {name}', name=self.name)
self.protocol = p
#def later():
d = p.connect(self.name, keepalive=0, cleanStart=True)
d.addCallback(self.subscribe)
#d.addCallback(self.prepareToPublish)
#reactor.callLater(random.randint(2, 7), later)
#reactor.callInThread(later)
def send_packet_stream(self, stub, interval):
queue = Queue()
@inlineCallbacks
def get_next_from_queue():
packet = yield queue.get()
returnValue(packet)
def packet_generator():
while 1:
packet = queue.get(block=True)
yield packet
def stream(stub):
"""This is executed on its own thread"""
generator = packet_generator()
result = stub.SendPackets(generator)
print 'Got this after sending packets:', result, type(result)
return result
reactor.callInThread(stream, stub)
while 1:
len = queue.qsize()
if len < 100:
packet = Packet(source=42, content='beefstew')
queue.put(packet)
yield asleep(interval)
def __init__(self):
self.connector = BroadcastConnector()
self.last_status = time.time()
self.last_nodes = 0
self.issues = 0
self.notifications = defaultdict(list)
reactor.callInThread(self.status_loop)
reactor.callInThread(self.feedback_loop)
reactor.callLater(1, self.watchdog)
def __init__(self):
self._monitor_tx = {}
self._monitor_lock = threading.Lock()
self.last_status = time.time()
self.radar_hosts = 0
self.issues = 0
reactor.callInThread(self.status_loop)
reactor.callInThread(self.feedback_loop)
reactor.callLater(1, self.watchdog)
def callMultipleInThread(tupleList):
"""
Run a list of functions in the same thread.
tupleList should be a list of (function, argsList, kwargsDict) tuples.
"""
from twisted.internet import reactor
reactor.callInThread(_runMultiple, tupleList)
def _testBlockingCallFromThread(self, reactorFunc):
"""
Utility method to test L{threads.blockingCallFromThread}.
"""
waiter = threading.Event()
results = []
errors = []
def cb1(ign):
def threadedFunc():
try:
r = threads.blockingCallFromThread(reactor, reactorFunc)
except Exception as e:
errors.append(e)
else:
results.append(r)
waiter.set()
reactor.callInThread(threadedFunc)
return threads.deferToThread(waiter.wait, self.getTimeout())
def cb2(ign):
if not waiter.isSet():
self.fail("Timed out waiting for event")
return results, errors
return self._waitForThread().addCallback(cb1).addBoth(cb2)
def puller(self):
reactor.callInThread(self._puller)
def interactive_main(args):
shell = create_remote_shell(args.conn_info)
response = yield shell.create()
intro = '\n'.join(response.stdout)
winrs_cmd = WinrsCmd(shell)
reactor.callInThread(winrs_cmd.cmdloop, intro)
def do_search_concurrently(search):
"""
Run the search task concurrently, in another thread of the threadpool.
:param search: search task to be run concurrently
"""
logging.debug("Scheduling search to run concurrently.")
reactor.callInThread(search.start)
SEARCH_TASKS.append(search)
def startService(self):
reactor.callInThread(writeForever)
Service.startService(self)
def log(sess_id, host, user, data):
reactor.callInThread(Logger.background_log, sess_id, host, user, data)
def log(sess_id, host, user, data):
reactor.callInThread(Logger.background_log, sess_id, host, user, data)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-m", "--mainnet", action="store_true", default=False,
help="Use MainNet instead of the default TestNet")
parser.add_argument("-p", "--privnet", action="store_true", default=False,
help="Use PrivNet instead of the default TestNet")
parser.add_argument("-c", "--config", action="store", help="Use a specific config file")
parser.add_argument("-t", "--set-default-theme", dest="theme",
choices=["dark", "light"], help="Set the default theme to be loaded from the config file. Default: 'dark'")
parser.add_argument('--version', action='version',
version='neo-python v{version}'.format(version=__version__))
args = parser.parse_args()
if args.config and (args.mainnet or args.privnet):
print("Cannot use both --config and --mainnet/--privnet arguments, please use only one.")
exit(1)
if args.mainnet and args.privnet:
print("Cannot use both --mainnet and --privnet arguments")
exit(1)
# Setup depending on command line arguments. By default, the testnet settings are already loaded.
if args.config:
settings.setup(args.config)
elif args.mainnet:
settings.setup_mainnet()
elif args.privnet:
settings.setup_privnet()
if args.theme:
preferences.set_theme(args.theme)
# Instantiate the blockchain and subscribe to notifications
blockchain = LevelDBBlockchain(settings.LEVELDB_PATH)
Blockchain.RegisterBlockchain(blockchain)
# Start the prompt interface
cli = PromptInterface()
# Run
reactor.suggestThreadPoolSize(15)
reactor.callInThread(cli.run)
NodeLeader.Instance().Start()
reactor.run()
def reconcile(self, device):
self.log.info('reconciling-asfvolt16-starts',device=device)
if not device.host_and_port:
device.oper_status = OperStatus.FAILED
device.reason = 'No host_and_port field provided'
self.adapter_agent.update_device(device)
return
try:
# Establishing connection towards OLT
self.bal.connect_olt(device.host_and_port, self.device_id,is_init=False)
device.connect_status = ConnectStatus.REACHABLE
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
reactor.callInThread(self.bal.get_indication_info, self.device_id)
except Exception as e:
self.log.exception('device-unreachable', error=e)
device.connect_status = ConnectStatus.UNREACHABLE
device.oper_status = OperStatus.UNKNOWN
self.adapter_agent.update_device(device)
return
if self.is_heartbeat_started == 0:
self.log.info('heart-beat-is-not-yet-started-starting-now')
self.start_heartbeat()
# Now set the initial PM configuration for this device
self.pm_metrics=Asfvolt16OltPmMetrics(device)
pm_config = self.pm_metrics.make_proto()
self.log.info("initial-pm-config", pm_config=pm_config)
self.adapter_agent.update_device_pm_config(pm_config,init=True)
# Apply the PM configuration
self.update_pm_config(device, pm_config)
# Request PM counters from OLT device.
self._handle_pm_counter_req_towards_device(device)
# Set the logical device id
device = self.adapter_agent.get_device(device.id)
if device.parent_id:
self.logical_device_id = device.parent_id
self.log.info("reconcile-logical-device")
self.adapter_agent.reconcile_logical_device(device.parent_id)
else:
self.log.info('no-logical-device-set')
# Reconcile child devices
self.log.info("reconcile-all-child-devices")
self.adapter_agent.reconcile_child_devices(device.id)
self.log.info('reconciling-asfvolt16-device-ends',device=device)