def __init__(self, key, secret, options={}):
self.options = options
self.auth = (key, secret)
if 'hostname' in options:
self.hostname = options['hostname']
else:
self.hostname = 'api.mybitx.com'
self.port = options['port'] if 'port' in options else 443
self.pair = options['pair'] if 'pair' in options else 'XBTZAR'
self.ca = options['ca'] if 'ca' in options else None
self.timeout = options['timeout'] if 'timeout' in options else 30
self.headers = {
'Accept': 'application/json',
'Accept-Charset': 'utf-8',
'User-Agent': 'py-bitx v' + __version__
}
self._executor = ThreadPoolExecutor(max_workers=5)
python类ThreadPoolExecutor()的实例源码
def run_in_executor(f):
"""
A decorator to run the given method in the ThreadPoolExecutor.
"""
@wraps(f)
def new_f(self, *args, **kwargs):
if self.is_shutdown:
return
try:
future = self.executor.submit(f, self, *args, **kwargs)
future.add_done_callback(_future_completed)
except Exception:
log.exception("Failed to submit task to executor")
return new_f
def __init__(self, config=None):
if config is None:
config = {}
self.config = config
self.must_stop = threading.Event()
self._consumers_queues = []
if self.config.get("concurrency", 1) > 1:
self._thread_pool = ThreadPoolExecutor(
max_workers=self.config.get("concurrency")
)
else:
self._thread_pool = None
self.import_submodules(__name__ + '.plugins.ext')
self.import_submodules(__name__ + '.consumers.ext')
for extra_plugin_path in self.config.get('extra_plugins', []):
self.import_directory_modules(extra_plugin_path)
self._current_checks = []
self._current_checks_lock = threading.Lock()
def __init__(
self, name, sub_jobs, pool=None,
pool_type=ThreadPoolExecutor, join=None,
error_action="stop", error_handler=None, error_default_value=None,
goto=None):
"""
:param name ??????
:param sub_jobs ?????
:param pool ????????None??????pool_type??????pool
:param pool_type ??????pool?None???????????pool
:param join ????????????context????????????????
:param error_action ??????????stop????????????
???continue??????????????
:param error_handler ?????????continue?????????error listener
????????error_handler????
:param error_default_value ???continue?????????????????
:param goto ?????????
"""
if self._error_action != "stop" and self._error_action != "continue":
raise InvalidArgumentException(u"?????????stop??continue??")
def __init__(self, name, thread_num, pool=None,
pool_type=ThreadPoolExecutor,
start_point=None, end_point=None, context_factory=Context,
extends_listeners=False, listeners=None, goto=None):
"""
:param name Fork????
:param thread_num ??????????
:param pool ??????????
:param pool_type ???????pool?????????????????
:param start_point ????????
:param end_point ????????
:param context_factory ?????
:param extends_listeners ??????????????
:param listeners ????????????
:param goto ???join???????????????????????join??
"""
if self._listeners is None:
self._listeners = []
def __init__(self, app_id, **kwargs):
"""
Initialize broker connection.
:param app_id: string that identifies application
"""
self.app_id = app_id
# fetch configuration
if "url" in kwargs:
self.rabbitmq_url = kwargs['url']
else:
self.rabbitmq_url = os.environ.get("broker_host", RABBITMQ_URL_FALLBACK)
self.rabbitmq_exchange = os.environ.get("broker_exchange", RABBITMQ_EXCHANGE_FALLBACK)
self.rabbitmq_exchange_type = "topic"
# create additional members
self._connection = None
# trigger connection setup (without blocking)
self.setup_connection()
# Threading workers
self.thrd_pool = pool.ThreadPoolExecutor(max_workers=100)
# Track the workers
self.tasks = []
poolImprovement.py 文件源码
项目:Learning-Concurrency-in-Python
作者: PacktPublishing
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def main():
t1 = timeit.default_timer()
with ProcessPoolExecutor(max_workers=4) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
print("{} Seconds Needed for ProcessPoolExecutor".format(timeit.default_timer() - t1))
t2 = timeit.default_timer()
with ThreadPoolExecutor(max_workers=4) as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
print("{} Seconds Needed for ThreadPoolExecutor".format(timeit.default_timer() - t2))
t3 = timeit.default_timer()
for number in PRIMES:
isPrime = is_prime(number)
print("{} is prime: {}".format(number, isPrime))
print("{} Seconds needed for single threaded execution".format(timeit.default_timer()-t3))
def test_double_reader_abort(self):
lock = fasteners.ReaderWriterLock()
activated = collections.deque()
def double_bad_reader():
with lock.read_lock():
with lock.read_lock():
raise RuntimeError("Broken")
def happy_writer():
with lock.write_lock():
activated.append(lock.owner)
with futures.ThreadPoolExecutor(max_workers=20) as e:
for i in range(0, 20):
if i % 2 == 0:
e.submit(double_bad_reader)
else:
e.submit(happy_writer)
self.assertEqual(10, len([a for a in activated if a == 'w']))
def download_many(cc_list):
cc_list = cc_list[:5] # <1>
with futures.ThreadPoolExecutor(max_workers=3) as executor: # <2>
to_do = []
for cc in sorted(cc_list): # <3>
future = executor.submit(download_one, cc) # <4>
to_do.append(future) # <5>
msg = 'Scheduled for {}: {}'
print(msg.format(cc, future)) # <6>
results = []
for future in futures.as_completed(to_do): # <7>
res = future.result() # <8>
msg = '{} result: {!r}'
print(msg.format(future, res)) # <9>
results.append(res)
return len(results)
# END FLAGS_THREADPOOL_AS_COMPLETED
def save_month(year_month, verbose):
year, month = [int(s) for s in year_month.split('-')]
total_size = 0
img_count = 0
dates = potd.list_days_of_month(year, month)
with futures.ThreadPoolExecutor(max_workers=100) as executor:
downloads = dict((executor.submit(potd.save_one, date, verbose), date)
for date in dates)
for future in futures.as_completed(downloads):
date = downloads[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (date,
future.exception()))
else:
img_size = future.result()
total_size += img_size
img_count += 1
print('%r OK: %r' % (date, img_size))
return img_count, total_size
def upload_blocks(bucket, chunk_size, max_threads, lines):
session = botocore.session.get_session()
client = session.create_client('s3')
start = time.perf_counter()
futures = []
with ThreadPoolExecutor(max_workers=max_threads) as executor:
# Start the load operations and mark each future with its URL
for line in lines:
raw_block, key = load_json_block(line)
futures.append(executor.submit(client.put_object,Bucket=bucket,
Key=key,
Body=raw_block,
ContentEncoding='UTF-8',
ContentType='application/json'))
end = time.perf_counter()
done, pending = concurrent.futures.wait(futures)
complete = time.perf_counter()
rate = 1 / ((complete - start) / len(done))
return len(done), int(rate)
def __init__(self, log_client, shard_id, consumer_name, processor, cursor_position, cursor_start_time,
max_workers=2):
self.log_client = log_client
self.shard_id = shard_id
self.consumer_name = consumer_name
self.cursor_position = cursor_position
self.cursor_start_time = cursor_start_time
self.processor = processor
self.checkpoint_tracker = ConsumerCheckpointTracker(self.log_client, self.consumer_name,
self.shard_id)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.consumer_status = ConsumerStatus.INITIALIZING
self.current_task_exist = False
self.task_future = None
self.fetch_data_future = None
self.next_fetch_cursor = ''
self.shutdown = False
self.last_fetch_log_group = None
self.last_log_error_time = 0
self.last_fetch_time = 0
self.last_fetch_count = 0
self.logger = logging.getLogger(__name__)
def __init__(self, bot_type, credentials, sheet_credentials, wit_tokens, db_url='redis://localhost:6379',
num_thread=4):
self.bot_api = api.LineApi(bot_type, credentials)
self.logger = logging.getLogger('CoscupBot')
self.task_pool = ThreadPoolExecutor(num_thread)
self.db_url = db_url
self.dao = db.Dao(db_url)
self.dao.del_all_next_command()
self.dao.del_all_context()
self.dao.del_all_session()
self.nlp_message_controllers = self.gen_nlp_message_controllers(wit_tokens)
self.command_message_controllers = self.gen_command_message_controllers(
[LanguageCode.zh_tw, LanguageCode.en_us])
self.sheet_message_controller = modules.SheetMessageController(db_url, sheet_credentials['credential_path'],
sheet_credentials['name'], self)
self.__mq_conn_pool = redis.ConnectionPool.from_url(url=db_url)
self.edison_queue = utils.RedisQueue('edison', 'queue', connection_pool=self.__mq_conn_pool)
self.realtime_msg_queue = utils.RedisQueue('realmessage', 'queue', connection_pool=self.__mq_conn_pool)
self.job_scheduler = BackgroundScheduler()
self.coscup_api_helper = modules.CoscupInfoHelper(db_url)
self.start_scheduler()
self.next_step_dic = {}
self.take_photo_sec = 6
def optimiz(currencies, debug):
currencies = sorted(currencies)
if len(currencies) < 2 or len(currencies) > 10:
return {"error": "2 to 10 currencies"}
max_workers = 4 if sys.version_info[1] < 5 else None
executor = ThreadPoolExecutor(max_workers)
data = dict(future.result() for future in wait([executor.submit(get_ochl, cur) for cur in currencies]).done)
data = [data[cur] for cur in currencies]
errors = [x['error'] for x in data if 'error' in x]
if errors:
return {"error": "Currencies not found : " + str(errors)}
weights, m, s, a, b = markowitz_optimization(data, debug)
if debug:
import matplotlib as mpl
mpl.use('Agg')
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
plt.plot(s, m, 'o', markersize=1)
plt.plot(b, a, 'or')
fig.savefig("chalu.png")
result = dict()
for i, cur in enumerate(currencies):
result[cur] = weights[i]
return {"result": result}
def main():
"""Generate the jumpmap.json with ESI."""
try:
all_systems = retry_get(ESI.Universe.get_universe_systems)
except NameError:
raise SystemExit(1)
num_systems = len(all_systems)
complete = 0
systems = {}
with ThreadPoolExecutor(max_workers=100) as executor:
for future in executor.map(system_get, all_systems):
complete += 1
system, result = future
systems[system] = result
print("{}/{} systems complete".format(complete, num_systems))
with open("jumpmap.json", "w") as openjumpmap:
openjumpmap.write(json.dumps(systems))
def serve():
protoConfig = ProtoConfig.getConfig()
arduino = protoConfig.arduinos[0]
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
sensors_pb2.add_ArduinoServicer_to_server(Arduino(arduino), server)
port = protoConfig.ports.arduinoPort
server.add_insecure_port('[::]:%s' % port)
server.start()
print('Started Arduino Server on Port %s ' % port)
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
def serve():
protoConfig = ProtoConfig.getConfig()
sensor_db = Mongo()
sensor_db.GetClient() # initalize the Db
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
dao_pb2.add_DaoServicer_to_server(Dao(sensor_db), server)
port = protoConfig.ports.daoPort
server.add_insecure_port('[::]:%s' % port)
server.start()
print('Started Dao Server on Port %s ' % port)
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
def serve():
protoConfig = ProtoConfig.getConfig()
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
pushServer = Push(accessToken=protoConfig.wioLinks['havok'].accessToken)
sensors_pb2.add_PushServicer_to_server(pushServer, server)
port = protoConfig.ports.pushPort
server.add_insecure_port('[::]:%s' % port)
server.start()
print('Started Push Server on Port %s ' % port)
websocket.enableTrace(True)
ws = websocket.WebSocketApp(
"wss://us.wio.seeed.io/v1/node/event",
on_message = pushServer.on_message,
on_error = pushServer.on_error,
on_close = pushServer.on_close)
ws.on_open = pushServer.on_open
ws.run_forever()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
demo_thread_pool_executor.py 文件源码
项目:SmallReptileTraining
作者: yanbober
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def runner(self):
thread_pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='DEMO')
futures = dict()
for url in self.urls:
future = thread_pool.submit(self.get_web_content, url)
futures[future] = url
for future in concurrent.futures.as_completed(futures):
url = futures[future]
try:
data = future.result()
except Exception as e:
print('Run thread url ('+url+') error. '+str(e))
else:
print(url+'Request data ok. size='+str(len(data)))
print('Finished!')
def getloc():
allloc = []
u"""??????api???????????
http://lbs.amap.com/api/webservice/guide/api/search/#text
"""
with ThreadPoolExecutor(max_workers=5) as executor:
url = 'http://lbs.amap.com/api/webservice/guide/api/search/#text'
param = {
'key': '22d6f93f929728c10ed86258653ae14a',
'keywords': u'??',
'city': '027',
'citylimit': 'true',
'output': 'json',
'page': '',
}
future_to_url = {executor.submit(load_url, url, merge_dicts(param, {'page': i}), 60): url for i in range(1, 46)}
for future in futures.as_completed(future_to_url):
if future.exception() is not None:
print(future.exception())
elif future.done():
data = future.result()['pois']
allloc.extend([x['location'] for x in data])
with open('allloc1.pk', 'wb') as f:
pickle.dump(allloc, f, True)
def mobai(loc):
allmobai = []
with ThreadPoolExecutor(max_workers=5) as executor:
url = 'https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do'
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Mobile/14E304 MicroMessenger/6.5.7 NetType/WIFI Language/zh_CN',
'Content-Type': 'application/x-www-form-urlencoded',
'Referer': 'https://servicewechat.com/wx80f809371ae33eda/23/page-frame.html',
}
data = {
'longitude': '',
'latitude': '',
'citycode': '027',
}
future_to_url = {
executor.submit(load_url, url, merge_dicts(data, {'longitude': i.split(',')[0]}, {'latitude': i.split(',')[1]}), 60, headers): url for i in loc}
for future in futures.as_completed(future_to_url):
if future.exception() is not None:
print(future.exception())
elif future.done():
data = future.result()['object']
allmobai.extend(data)
# ??mongodb
result = collection.insert_many(data)
def test_node_specific_thread_pool_executor():
sleep_time = 0.2
n = 10
def wait(c):
sleep(sleep_time)
return c
executor_map = {'foo': ThreadPoolExecutor(n)}
comp = Computation(executor_map=executor_map)
start_dt = datetime.utcnow()
for c in range(n):
comp.add_node(c, wait, kwds={'c': C(c)}, executor='foo')
comp.compute_all()
end_dt = datetime.utcnow()
delta = (end_dt - start_dt).total_seconds()
assert delta < (n-1) * sleep_time
def run(self):
"""Concurrently invoke `get_response` for all of instance's `requests`.
"""
with futures.ThreadPoolExecutor(
max_workers=min(self.max_workers, len(self.requests))
) as executor:
to_do = []
for i, request in enumerate(self.requests):
future = executor.submit(self.get_response, request, i)
to_do.append(future)
for future in futures.as_completed(to_do):
result = future.result()
# `responses` and `pending_requests` are instance properties, which means
# client code can inspect instance to read responses as they are completed
if result.req.error is not None or result.err == 'skwarg':
continue
try:
self.pending_requests.remove(result.req)
except KeyError:
print('{} was not in pending requests, this is weird...'.format(result.req))
self.responses.append(result)
self.is_done = True
def main():
config = core.get_config()
core.init_logging("pynetsim", log_level=getattr(logging, config.get("main").get("log_level", "debug").upper()))
log.debug("Starting socket listeners")
listener_pool = ThreadPoolExecutor(max_workers=2)
futures = []
tcp_listener = TCPSocketListener(config)
udp_listener = UDPSocketListener(config)
futures.append(listener_pool.submit(tcp_listener.start))
futures.append(listener_pool.submit(udp_listener.start))
core.wait()
log.debug("Stopping socket listeners")
tcp_listener.shutdown()
udp_listener.shutdown()
for future in futures:
if future.running():
future.cancel()
tcp_listener.shutdown()
udp_listener.shutdown()
log.debug("Exiting...")
def main():
os.makedirs(localstor, exist_ok=True)
with open('tsd_dlink_filelist.csv', 'w') as fout:
cw = csv.writer(fout)
cw.writerow(['model', 'rev', 'fw_ver', 'fw_url', 'date', 'fsize', 'sha1', 'md5'])
global executor
executor = futures.ThreadPoolExecutor()
models = parse_models()
startI = next(i for i,sp in enumerate(models) if sp[0]=='DBT' and sp[1]=='120')
for model in models[startI:]:
pfx,sfx = model[0], model[1]
selectModel(pfx, sfx)
print('wait for Executor shutdown')
executor.shutdown(True)
def main():
try:
executor = ThreadPoolExecutor()
os.makedirs(dlDir, exist_ok=True)
with open('netgear_filelist.csv', 'w') as fout:
cw = csv.writer(fout)
cw.writerow(['model', 'fw_ver', 'fileName', 'fw_url', 'fw_date', 'fileSize', 'sha1', 'md5'])
catIdx = int(sys.argv[1]) if len(sys.argv)>1 else 0
famIdx = int(sys.argv[2]) if len(sys.argv)>2 else 0
prdIdx = int(sys.argv[3]) if len(sys.argv)>3 else 0
while True:
catIdx, famIdx, prdIdx = main1(catIdx, famIdx, prdIdx, executor)
if catIdx is None:
return
assert famIdx is not None
assert prdIdx is not None
print("\n[main] Continue from cat,fam,prd=(%d,%d,%d)\n" %
(catIdx, famIdx, prdIdx))
except BaseException as ex:
traceback.print_exc()
finally:
executor.shutdown(True)
def main():
global executor
try:
session = requests.Session()
executor = ThreadPoolExecutor()
os.makedirs(dlDir, exist_ok=True)
url = 'http://www.zyxel.com/us/en/support/download_landing.shtml'
with open('zyxel_us_filelist.csv', 'w') as fout:
cw = csv.writer(fout)
cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5'])
resp = session.get(url=url)
root = html.fromstring(resp.text)
models = get_all_models(root)
for modelName in sorted(models.keys()):
kbid = models[modelName]
resp2 = session.get(url='http://www.zyxel.com/us/en/support/DownloadLandingSR.shtml',
params=dict(c="us", l="en", kbid=kbid, md=modelName))
walkFiles(modelName, session, resp2)
except BaseException as ex:
traceback.print_exc()
finally:
print('Wait for exeuctor shuddown')
executor.shutdown(True)
def main():
global executor
try:
session = requests.Session()
executor = ThreadPoolExecutor()
os.makedirs(dlDir, exist_ok=True)
url='http://downloadcenter.netgear.com'
with open('netgear_filelist.csv', 'w') as fout:
cw = csv.writer(fout)
cw.writerow(['model', 'fw_ver', 'fileName', 'fw_url', 'fw_date', 'fileSize', 'sha1', 'md5'])
response = session.get(url=url)
root = html.fromstring(response.text)
href = root.xpath(".//a[@id='ctl00_ctl00_ctl00_mainContent_localizedContent_bodyCenter_BasicSearchPanel_btnAdvancedSearch']/@href")
href = strip_js(href[0])
formdata = {"__EVENTTARGET": href}
resp2 = form_submit(session, root, url,
"aspnetForm",
formdata,
{"Referer": url})
walkCategories(session, resp2)
except BaseException as ex:
traceback.print_exc()
finally:
executor.shutdown(True)
def main():
global executor
try:
session = requests.Session()
executor = ThreadPoolExecutor()
os.makedirs(dlDir, exist_ok=True)
with open('tenda_filelist.csv', 'w') as fout:
cw = csv.writer(fout)
cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5'])
walkFiles('http://www.tendacn.com/en/service/download-cata-11.html')
walkFiles('http://tendacn.com/en/service/download-cata-11-2.html')
walkFiles('http://www.tendacn.com/en/service/download-cata-11-3.html')
except BaseException as ex:
traceback.print_exc()
finally:
print('Wait for exeuctor shuddown')
executor.shutdown(True)
def main():
global executor
executor=ThreadPoolExecutor()
os.makedirs(localstor, exist_ok=True)
with open('us_dlink_filelist.csv', 'w') as fout:
cw = csv.writer(fout)
cw.writerow(['model', 'rev', 'fw_ver', 'fw_url', 'fsize', 'fdate', 'sha1', 'md5'])
start_url="http://support.dlink.com/AllPro.aspx?type=all"
d = pq(url=start_url)
# all 442 models
models = [_.text_content().strip() for _ in d('tr > td:nth-child(1) > .aRedirect')]
for model in models:
prod_url = "http://support.dlink.com/ProductInfo.aspx?m=%s"%parse.quote(model)
crawl_prod(prod_url, model)
executor.shutdown(True)