def action(self, user, channel, msg):
"""Called when the bot sees someone do an action."""
user = user.split('!', 1)[0]
self.logic.log(colorama.Fore.MAGENTA + "* " + user + colorama.Fore.WHITE + " " + msg)
if channel == self.nickname:
f = self.logic.check_update(self.logic.FIRST_TIME_MSG, user, "firsttime.txt")
u = self.logic.check_update(self.logic.UPDATE_MSG, user, "updates.txt")
if f:
self.msg(user, f)
if u:
self.msg(user, u)
d = threads.deferToThread(self.logic.sendpp, msg, user, "np")
d.addCallback(self.logCommand, user)
d.addErrback(log.err)
else:
pass
python类deferToThread()的实例源码
def cleanIfIdle(self, path=None):
# RecordTimer calls this when preparing a recording. That is a
# nice moment to clean up. It also mentions the path, so mark
# it as dirty.
self.markDirty(path)
if not self.dirty:
return
if self.isCleaning:
print "[Trashcan] Cleanup already running"
return
if (self.session is not None) and self.session.nav.getRecordings():
return
self.isCleaning = True
ctimeLimit = time.time() - (config.usage.movielist_trashcan_days.value * 3600 * 24)
reserveBytes = 1024*1024*1024 * int(config.usage.movielist_trashcan_reserve.value)
cleanset = self.dirty
self.dirty = set()
threads.deferToThread(purge, cleanset, ctimeLimit, reserveBytes).addCallbacks(self.cleanReady, self.cleanFail)
def list(self, raw=False, **kwargs):
log.msg("LIST called")
yield self.connect(self._router_url)
try:
output = yield self._wamp.call(u"io.timbr.kernel.list")
try:
output.remove(self._kernel_key)
except ValueError:
# kernel key doesn't exist in the list
pass
except ApplicationError:
output = []
if raw is not True:
prefix_map = yield threads.deferToThread(self._get_kernel_names, output, details=kwargs.get('details'))
if prefix_map is not None:
returnValue(prefix_map)
else:
print("Unable to access JUNO_KERNEL_URI, displaying kernel prefixes instead of kernel names")
returnValue(output)
else:
returnValue(output)
returnValue(output)
def command(self, args, callback_trigger=None):
exe = (self.executable if self.executable else which('tahoe')[0])
args = [exe] + ['-d', self.nodedir] + args
env = os.environ
env['PYTHONUNBUFFERED'] = '1'
if sys.platform == 'win32' and getattr(sys, 'frozen', False):
from twisted.internet.threads import deferToThread
output = yield deferToThread(
self._win32_popen, args, env, callback_trigger)
else:
protocol = CommandProtocol(self, callback_trigger)
reactor.spawnProcess(protocol, exe, args=args, env=env)
output = yield protocol.done
returnValue(output)
#@inlineCallbacks
#def start_monitor(self):
# furl = os.path.join(self.nodedir, 'private', 'logport.furl')
# yield self.command(['debug', 'flogtool', 'tail', furl])
def send(self, res):
self.buffer.append(res)
# buffer is full, write to disk
if len(self.buffer) >= self.chunk_size:
chunk = self.buffer[:self.chunk_size]
self.buffer = self.buffer[self.chunk_size:]
log_path = os.path.join(self.out_dir, "%s-scan.json" % (datetime.datetime.utcnow().isoformat()))
def write():
wf = open(log_path, "w")
try:
json.dump(chunk, wf, sort_keys=True)
finally:
wf.close()
r = threads.deferToThread(write).chainDeferred(self.current_task)
self.current_task = None
return r
# buffer is not full, return deferred for current batch
if not self.current_task or self.current_task.called:
self.current_task = defer.Deferred()
return self.current_task
def end_flush(self):
"""
Write buffered contents to disk.
There's no need to perform this write
in a seperate thread.
"""
def flush():
if len(self.buffer) == 0:
return defer.succeed
log_path = os.path.join(self.out_dir, "%s-scan.json" % (datetime.datetime.utcnow().isoformat()))
wf = open(log_path, "w")
try:
json.dump(self.buffer, wf, sort_keys=True)
finally:
wf.close()
return threads.deferToThread(flush)
def getSummedData(self, c, num_images = 1):
''' Get the counts with the vertical axis summed over. '''
print 'acquiring: {}'.format(self.getAcquiredData.__name__)
yield self.lock.acquire()
try:
print 'acquired: {}'.format(self.getAcquiredData.__name__)
images = yield deferToThread(self.camera.get_acquired_data, num_images)
hbin, vbin, hstart, hend, vstart, vend = self.camera.get_image()
x_pixels = int( (hend - hstart + 1.) / (hbin) )
y_pixels = int(vend - vstart + 1.) / (vbin)
images = np.reshape(images, (num_images, y_pixels, x_pixels))
images = images.sum(axis=1)
images = np.ravel(images, order='C')
images = images.tolist()
finally:
print 'releasing: {}'.format(self.getAcquiredData.__name__)
self.lock.release()
returnValue(images)
def waitForKinetic(self, c, timeout = WithUnit(1,'s')):
'''Waits until the given number of kinetic images are completed'''
requestCalls = int(timeout['s'] / 0.050 ) #number of request calls
for i in range(requestCalls):
print 'acquiring: {}'.format(self.waitForKinetic.__name__)
yield self.lock.acquire()
try:
print 'acquired : {}'.format(self.waitForKinetic.__name__)
status = yield deferToThread(self.camera.get_status)
#useful for debugging of how many iterations have been completed in case of missed trigger pulses
a,b = yield deferToThread(self.camera.get_series_progress)
print a,b
print status
finally:
print 'releasing: {}'.format(self.waitForKinetic.__name__)
self.lock.release()
if status == 'DRV_IDLE':
returnValue(True)
yield self.wait(0.050)
returnValue(False)
def mqtt_receive(self, topic=None, payload=None, **kwargs):
try:
# Synchronous message processing
#return self.process_message(topic, payload, **kwargs)
# Asynchronous message processing
#deferred = threads.deferToThread(self.process_message, topic, payload, **kwargs)
# Asynchronous message processing using different thread pool
deferred = self.thimble.process_message(topic, payload, **kwargs)
deferred.addErrback(self.mqtt_receive_error, topic, payload)
return deferred
except Exception:
log.failure(u'Processing MQTT message failed. topic={topic}, payload={payload}', topic=topic, payload=payload)
def get_summary(measurement_id):
"""
Returns a deferred that will fire with the content of the summary
or will errback with MeasurementInProgress if the measurement has not
yet finished running.
"""
measurement_path = FilePath(config.measurements_directory)
measurement = measurement_path.child(measurement_id)
if measurement.child("measurements.njson.progress").exists():
return defer.fail(MeasurementInProgress)
summary = measurement.child("summary.json")
anomaly = measurement.child("anomaly")
if not summary.exists():
return deferToThread(
generate_summary,
measurement.child("measurements.njson").path,
summary.path,
anomaly.path
)
with summary.open("r") as f:
return defer.succeed(json.load(f))
def collect(self, config):
ds0 = config.datasources[0]
credentials = (ds0.zWBEMUsername, ds0.zWBEMPassword)
url = '{0}://{1}:{2}'.format(
'https' if ds0.zWBEMUseSSL else 'http',
ds0.manageIp, ds0.zWBEMPort
)
def _inner():
return WBEMConnection(url, credentials).ExecQuery(
ds0.params['query_language'],
ds0.params['query'],
namespace=ds0.params['namespace'])
return threads.deferToThread(_inner)
def isLoopbackURL(url):
"""Checks if the specified URL refers to a loopback address.
:return: True if the URL refers to the loopback interface, otherwise False.
"""
if url is not None:
if url.hostname is not None:
is_loopback = yield deferToThread(
resolves_to_loopback_address, url.hostname)
else:
# Empty URL == localhost.
is_loopback = True
else:
# We need to pass is_loopback in, but it is only checked if url
# is not None. None is the "I don't know and you won't ask"
# state for this boolean.
is_loopback = None
return is_loopback
def refresh(self):
"""Refresh the region controller."""
# XXX ltrager 2016-05-25 - MAAS doesn't have an RPC method between
# region controllers. If this method refreshes a foreign region
# controller the foreign region controller will contain the running
# region's hardware and networking information.
if self.system_id != get_maas_id():
raise NotImplementedError(
'Can only refresh the running region controller')
try:
with NamedLock('refresh'):
token = yield deferToDatabase(self._get_token_for_controller)
yield deferToDatabase(self._signal_start_of_refresh)
sys_info = yield deferToThread(get_sys_info)
yield deferToDatabase(self._process_sys_info, sys_info)
yield deferToThread(
refresh, self.system_id, token.consumer.key, token.key,
token.secret)
except NamedLock.NotAvailable:
# Refresh already running.
pass
def loseConnection(self, reason=Failure(error.ConnectionDone())):
"""Request that the connection be dropped."""
if self.disconnecting is None:
d = self.disconnecting = Deferred()
d.addBoth(callOut, self.stopReading)
d.addBoth(callOut, self.cancelHandleNotify)
d.addBoth(callOut, deferToThread, self.stopConnection)
d.addBoth(callOut, self.connectionLost, reason)
def done():
self.disconnecting = None
d.addBoth(callOut, done)
if self.connecting is None:
# Already/never connected: begin shutdown now.
self.disconnecting.callback(None)
else:
# Still connecting: cancel before disconnect.
self.connecting.addErrback(suppress, CancelledError)
self.connecting.chainDeferred(self.disconnecting)
self.connecting.cancel()
return self.disconnecting
def evaluate_tag(
self, system_id, tag_name, tag_definition, tag_nsmap,
credentials, nodes):
"""evaluate_tag()
Implementation of
:py:class:`~provisioningserver.rpc.cluster.EvaluateTag`.
"""
# It's got to run in a thread because it does blocking IO.
d = deferToThread(
evaluate_tag, system_id, nodes, tag_name, tag_definition,
# Transform tag_nsmap into a format that LXML likes.
{entry["prefix"]: entry["uri"] for entry in tag_nsmap},
# Parse the credential string into a 3-tuple.
convert_string_to_tuple(credentials))
return d.addCallback(lambda _: {})
def refresh(self, system_id, consumer_key, token_key, token_secret):
"""RefreshRackControllerInfo()
Implementation of
:py:class:`~provisioningserver.rpc.cluster.RefreshRackControllerInfo`.
"""
def _refresh():
with ClusterConfiguration.open() as config:
return deferToThread(
refresh, system_id, consumer_key, token_key,
token_secret, config.maas_url)
lock = NamedLock('refresh')
try:
lock.acquire()
except lock.NotAvailable:
# Refresh is already running, don't do anything
raise exceptions.RefreshAlreadyInProgress()
else:
# Start gathering node results (lshw, lsblk, etc) but don't wait.
maybeDeferred(_refresh).addBoth(callOut, lock.release).addErrback(
log.err, 'Failed to refresh the rack controller.')
return deferToThread(get_sys_info)
def callOutToThread(thing, func, *args, **kwargs):
"""Call out to the given `func` in another thread, but return `thing`.
For example::
d = client.fetchSomethingReallyImportant()
d.addCallback(callOutToThread, watchTheKettleBoil))
d.addCallback(doSomethingWithReallyImportantThing)
Use this where you need a side-effect when a :py:class:`~Deferred` is
fired, but you don't want to clobber the result. Note that the result
being passed through is *not* passed to the function.
Note also that if the call-out raises an exception, this will be
propagated; nothing is done to suppress the exception or preserve the
result in this case.
:return: :class:`Deferred`.
"""
return deferToThread(func, *args, **kwargs).addCallback(lambda _: thing)
def test_probe_and_enlist(self):
num_servers = 100
self.configure_vmomi_api(servers=num_servers)
mock_create_node = self.patch(vmware, 'create_node')
system_id = factory.make_name('system_id')
mock_create_node.side_effect = asynchronous(
lambda *args, **kwargs: system_id)
mock_commission_node = self.patch(vmware, 'commission_node')
host = factory.make_hostname()
username = factory.make_username()
password = factory.make_username()
yield deferToThread(
vmware.probe_vmware_and_enlist,
factory.make_username(),
host,
username,
password,
accept_all=True)
self.assertEqual(mock_create_node.call_count, num_servers)
self.assertEqual(mock_commission_node.call_count, num_servers)
def test_probe_and_enlist_reconfigures_boot_order_if_create_node_ok(self):
num_servers = 1
self.configure_vmomi_api(servers=num_servers)
mock_create_node = self.patch(vmware, 'create_node')
system_id = factory.make_name('system_id')
mock_create_node.side_effect = asynchronous(
lambda *args, **kwargs: system_id)
mock_reconfigure_vm = self.patch(FakeVmomiVM, 'ReconfigVM_Task')
# We need to not actually try to commission any nodes...
self.patch(vmware, 'commission_node')
host = factory.make_hostname()
username = factory.make_username()
password = factory.make_username()
yield deferToThread(
vmware.probe_vmware_and_enlist,
factory.make_username(),
host,
username,
password,
accept_all=True)
self.assertEqual(mock_reconfigure_vm.call_count, num_servers)
def test_probe_and_enlist_skips_pxe_config_if_create_node_failed(self):
num_servers = 1
self.configure_vmomi_api(servers=num_servers)
mock_create_node = self.patch(vmware, 'create_node')
mock_create_node.side_effect = asynchronous(
lambda *args, **kwargs: None)
mock_reconfigure_vm = self.patch(FakeVmomiVM, 'ReconfigVM_Task')
# We need to not actually try to commission any nodes...
self.patch(vmware, 'commission_node')
host = factory.make_hostname()
username = factory.make_username()
password = factory.make_username()
yield deferToThread(
vmware.probe_vmware_and_enlist,
factory.make_username(),
host,
username,
password,
accept_all=True)
self.assertEqual(mock_reconfigure_vm.call_count, 0)
def test_probe_and_enlist_recs_probes_and_enlists(self):
user = factory.make_name('user')
ip, port, username, password, node_id, context = self.make_context()
domain = factory.make_name('domain')
macs = [factory.make_mac_address() for _ in range(3)]
mock_get_nodes = self.patch(RECSAPI, "get_nodes")
mock_get_nodes.return_value = {node_id: {
'macs': macs, 'arch': 'amd64'}}
self.patch(RECSAPI, "set_boot_source")
mock_create_node = self.patch(recs_module, "create_node")
mock_create_node.side_effect = asynchronous(lambda *args: node_id)
mock_commission_node = self.patch(recs_module, "commission_node")
yield deferToThread(
probe_and_enlist_recs, user, ip, int(port), username, password,
True, domain)
self.expectThat(
mock_create_node, MockCalledOnceWith(
macs, 'amd64', 'recs_box', context, domain))
self.expectThat(
mock_commission_node, MockCalledOnceWith(node_id, user))
def test_probe_and_enlist_recs_probes_and_enlists_no_commission(self):
user = factory.make_name('user')
ip, port, username, password, node_id, context = self.make_context()
domain = factory.make_name('domain')
macs = [factory.make_mac_address() for _ in range(3)]
mock_get_nodes = self.patch(RECSAPI, "get_nodes")
mock_get_nodes.return_value = {node_id: {
'macs': macs, 'arch': 'arm'}}
self.patch(RECSAPI, "set_boot_source")
mock_create_node = self.patch(recs_module, "create_node")
mock_create_node.side_effect = asynchronous(lambda *args: node_id)
mock_commission_node = self.patch(recs_module, "commission_node")
yield deferToThread(
probe_and_enlist_recs, user, ip, int(port), username, password,
False, domain)
self.expectThat(
mock_create_node, MockCalledOnceWith(
macs, 'armhf', 'recs_box', context, domain))
self.expectThat(
mock_commission_node, MockNotCalled())
def test_probe_and_enlist_msftocs_probes_and_enlists(self):
context = make_context()
user = factory.make_name('user')
system_id = factory.make_name('system_id')
domain = factory.make_name('domain')
macs = [factory.make_mac_address() for _ in range(3)]
mock_get_blades = self.patch(MicrosoftOCSPowerDriver, "get_blades")
mock_get_blades.return_value = {'%s' % context['blade_id']: macs}
self.patch(MicrosoftOCSPowerDriver, "set_next_boot_device")
mock_create_node = self.patch(msftocs_module, "create_node")
mock_create_node.side_effect = asynchronous(lambda *args: system_id)
mock_commission_node = self.patch(msftocs_module, "commission_node")
yield deferToThread(
probe_and_enlist_msftocs, user, context['power_address'],
int(context['power_port']), context['power_user'],
context['power_pass'], True, domain)
self.expectThat(
mock_create_node, MockCalledOnceWith(
macs, 'amd64', 'msftocs', context, domain))
self.expectThat(
mock_commission_node, MockCalledOnceWith(system_id, user))
def power_state_virsh(
self, power_address, power_id, power_pass=None, **kwargs):
"""Return the power state for the VM using virsh."""
# Force password to None if blank, as the power control
# script will send a blank password if one is not set.
if power_pass == '':
power_pass = None
conn = VirshSSH()
logged_in = yield deferToThread(conn.login, power_address, power_pass)
if not logged_in:
raise VirshError('Failed to login to virsh console.')
state = yield deferToThread(conn.get_machine_state, power_id)
if state is None:
raise VirshError('Failed to get domain: %s' % power_id)
try:
return VM_STATE_TO_POWER_STATE[state]
except KeyError:
raise VirshError('Unknown state: %s' % state)
def scan_bangumi(self):
"""
dispatch the feed crawling job, this is a synchronized method running on individual thread
:return:
"""
logger.info('scan bangumi %s', self.__class__.__name__)
bangumi_list = yield threads.deferToThread(self.query_bangumi_list)
index_list = range(len(bangumi_list))
random.shuffle(index_list)
for index in index_list:
bangumi = bangumi_list[index]
if not self.check_bangumi_status(bangumi):
episode_list = yield threads.deferToThread(self.query_episode_list, bangumi.id)
# result is an array of tuple (item, eps_no)
scan_result = yield threads.deferToThread(self.scan_feed, bangumi, episode_list)
if scan_result is None:
continue
url_eps_list = [
(download_url, self.__find_episode_by_number(episode_list, eps_no), file_path, file_name)
for (download_url, eps_no, file_path, file_name) in scan_result
]
# this method may raise exception
yield threads.deferToThread(self.download_episodes, url_eps_list, bangumi.id)
yield threads.deferToThread(self.update_bangumi_status, bangumi)
def __add_download(self, video_file_list):
logger.debug(video_file_list)
download_url_dict = {}
for video_file in video_file_list:
if video_file.download_url not in download_url_dict:
download_url_dict[video_file.download_url] = []
download_url_dict[video_file.download_url].append(video_file)
for download_url, same_torrent_video_file_list in download_url_dict.iteritems():
first_video_file = same_torrent_video_file_list[0]
bangumi_path = self.base_path + '/' + str(first_video_file.bangumi_id)
try:
torrent_id = yield download_manager.download(first_video_file.download_url, bangumi_path)
logger.info(torrent_id)
if torrent_id is None:
logger.warn('episode %s already in download queue', str(first_video_file.episode_id))
else:
yield threads.deferToThread(self.__update_video_file, same_torrent_video_file_list, torrent_id)
except Exception as error:
logger.error(error, exc_info=True)
logger.error('episode %s download failed', str(first_video_file.episode_id))
def cleanIfIdle(self, path=None):
# RecordTimer calls this when preparing a recording. That is a
# nice moment to clean up. It also mentions the path, so mark
# it as dirty.
self.markDirty(path)
if not self.dirty:
return
if self.isCleaning:
print "[Trashcan] Cleanup already running"
return
if (self.session is not None) and self.session.nav.getRecordings():
return
self.isCleaning = True
ctimeLimit = time.time() - (config.usage.movielist_trashcan_days.value * 3600 * 24)
reserveBytes = 1024*1024*1024 * int(config.usage.movielist_trashcan_reserve.value)
cleanset = self.dirty
self.dirty = set()
threads.deferToThread(purge, cleanset, ctimeLimit, reserveBytes).addCallbacks(self.cleanReady, self.cleanFail)
def _wait_request(self, request, spider):
try:
driver = self.queue.get_nowait()
except:
driver = webdriver.PhantomJS(**self.options)
driver.get(request.url)
# wait until ajax completed
dfd = threads.deferToThread(self._wait_and_switch, driver)
dfd.addCallback(self._response, driver, spider)
return dfd
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def initiateOp(self, sock, addr):
handle = sock.fileno()
if have_connectex:
max_addr, family, type, protocol = self.reactor.getsockinfo(handle)
self.reactor.issueConnectEx(handle, family, addr, self.ovDone, (handle, sock))
else:
from twisted.internet.threads import deferToThread
d = deferToThread(self.threadedThing, sock, addr)
d.addCallback(self.threadedDone)
d.addErrback(self.threadedErr)