def scrapeCitiesFromList(cityList, dirPrefix, threadCount, scrapers):
"""scrapes locations in cities in cityList to file in dirPrefix, spawns threadCount threads"""
threads = []
i = 0
while(True):
if(threading.active_count() <= threadCount):
if(i == len(cityList)):
break
for s in scrapers:
if(not s.inUse):
threads.append(ScrapeThread(target=scrapeCityToFile, args=(dirPrefix, cityList[i], s)))
threads[-1].start()
i += 1
break
time.sleep(1)
for t in threads:
t.join()
python类active_count()的实例源码
def scrapeLocationsFromList(locList, dirPrefixes, date, timeWindow, threadCount, maxPosts, scrapers):
"""scrapes postscounts at locations in locList in timeWindow before date, spawning threadCount threads, scrolling to maximum of maxPosts"""
threads = []
i = 0
while(True):
#start a new thread if fewer than threadCount active
if(threading.active_count() <= threadCount):
if(i == len(locList)): #don't start more threads than locations
break
for s in scrapers: #find a free scraper
if(not s.inUse):
threads.append(ScrapeThread(target=scrapeLocationToFile, args=(dirPrefixes[i], locList[i], date, timeWindow, maxPosts, s)))
threads[-1].start()
i += 1
break
time.sleep(1)
#once all threads started, wait for them to finish
for t in threads:
t.join()
def run_thread(req_list, name=None, is_lock=True, limit_num=8):
'''
?????
- req_list ????, list, ?????????, ???
- [
- (func_0, (para_0_1, para_0_2, *,)),
- (func_1, (para_1_1, para_1_2, *,)),
- *
- ]
- name ???, str, ???None
- is_lock ??????, bool, ???True, ????, False????
- limit_num ?????, int, ???8
'''
queue = deque(req_list)
while len(queue):
if threading.active_count() <= limit_num:
para = queue.popleft()
now_thread = threading.Thread(
target=para[0], args=para[1], name=name, daemon=True)
now_thread.start()
if is_lock:
for now_thread in threading.enumerate():
if now_thread is not threading.currentThread():
now_thread.join()
def _threaded_perform_search(self):
self._perform_search_complete = False
# generate a name and ensure we never have two threads
# with the same name
names = [thread.name for thread in threading.enumerate()]
for i in range(threading.active_count() + 1, 0, -1):
thread_name = 'ThreadedQuery-%s' % i
if not thread_name in names:
break
# create and start it
t = threading.Thread(
target=self._blocking_perform_search, name=thread_name)
t.start()
# don't block the UI while the thread is running
context = GObject.main_context_default()
while not self._perform_search_complete:
time.sleep(0.02) # 50 fps
while context.pending():
context.iteration()
t.join()
# call the query-complete callback
self.emit("query-complete")
def _threaded_perform_search(self):
self._perform_search_complete = False
# generate a name and ensure we never have two threads
# with the same name
names = [thread.name for thread in threading.enumerate()]
for i in range(threading.active_count() + 1, 0, -1):
thread_name = 'ThreadedQuery-%s' % i
if not thread_name in names:
break
# create and start it
t = threading.Thread(
target=self._blocking_perform_search, name=thread_name)
t.start()
# don't block the UI while the thread is running
context = GObject.main_context_default()
while not self._perform_search_complete:
time.sleep(0.02) # 50 fps
while context.pending():
context.iteration()
t.join()
# call the query-complete callback
self.emit("query-complete")
def _buildJobs (self):
"""
Build the jobs.
"""
self.props['channel'] = Channel.create([None] * self.size)
rptjob = 0 if self.size == 1 else randint(0, self.size-1)
def bjSingle(i):
job = Job(i, self)
job.init()
self.jobs[i] = job
row = tuple(job.data['out'].values())
self.props['channel'][i] = row
utils.parallel(bjSingle, [(i, ) for i in range(self.size)], self.nthread)
self.log('After job building, active threads: %s' % threading.active_count(), 'debug')
if self.jobs[0].data['out']:
self.channel.attach(*self.jobs[0].data['out'].keys())
self.jobs[rptjob].report()
def _get_lrc(self):
while True:
print(self.q.qsize())
if self.q.empty():
break
if threading.active_count() > 5:
time.sleep(3)
continue
m_info = self.q.get()
try:
t = threading.Thread(target=self._save_lrc, args=(m_info,))
t.start()
t.join()
except:
self.q.put(m_info)
# ??????,??
def onquit(self, ev):
"Check if program is busy before quitting"
if active_count() > 1:
TkDialog(None, "Please wait until conversions are complete!", "Info").run()
elif self.rec is None: self.quit = True
def run(self):
log.debug('[ Start PrintThread ]')
# GetInfoThread?GetCancelThread, GetNewsThread?????????
while active_count() >= 3:
time.sleep(1)
else:
while True:
try:
t = self.queue.get(block=False, timeout=None)
except Exception:
# ????????????
log.debug('[ End PrintThread ]\n')
break
print t
def do_media_info(self):
""" Shows basic media info. """
if self.is_client_mod:
self.send_private_msg('*Playlist Length:* ' + str(len(self.media.track_list)), self.active_user.nick)
self.send_private_msg('*Track List Index:* ' + str(self.media.track_list_index), self.active_user.nick)
self.send_private_msg('*Elapsed Track Time:* ' +
self.format_time(self.media.elapsed_track_time()), self.active_user.nick)
self.send_private_msg('*Active Track:* ' + str(self.media.has_active_track()), self.active_user.nick)
self.send_private_msg('*Active Threads:* ' + str(threading.active_count()), self.active_user.nick)
def update(haproxy_config):
logger.debug("Started job...")
try:
start_time = time.time()
resolvers = Service.query_resolvers()
services = Service.query_services()
Haproxy().update(resolvers=resolvers, services=services, **haproxy_config)
metrics.info('background-refresh.duration {}'.format(time.time() - start_time))
metrics.info('active-thread-count {}'.format(threading.active_count()))
finally:
logger.debug("Finished job.")
totalThreads.py 文件源码
项目:Learning-Concurrency-in-Python
作者: PacktPublishing
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def main():
for i in range(random.randint(2,50)):
thread = threading.Thread(target=myThread, args=(i,))
thread.start()
time.sleep(4)
print("Total Number of Active Threads: {}".format(threading.active_count()))
def _teardown(self):
""" restore radio and wait on tuning thread"""
clean = True
self._err = ""
# restore the radio - this will have the side effect of
# causing the threads to error out and quit
try:
if self._card:
phy = self._card.phy
pyw.devdel(self._card)
card = pyw.phyadd(phy,self._dev,self._dinfo['mode'])
pyw.up(card)
except pyric.error as e:
clean = False
self._err = "ERRNO {0} {1}".format(e.errno, e.strerror)
# join threads, waiting a short time before continuing
try:
self._tuner.join(5.0)
except (AttributeError,RuntimeError):
# either tuner is None, or it never started
pass
try:
self._sniffer.join(5.0)
except (AttributeError, RuntimeError):
# either sniffer is None, or it never started
pass
if threading.active_count() > 0:
clean = False
self._err += "One or more workers failed to stop"
return clean
def handle(self):
while True:
self.logger.info("Handler thread name = {}/active count = {}"
.format(threading.current_thread().name,
threading.active_count()))
header = MessageHeader(
self.rfile.read(MessageHeader.HEADER_LENGTH))
self.logger.info("Message length = {}".format(header.length))
self.data = ""
while len(self.data) < header.length:
buf = self.rfile.read(
min(self.MAX_BUF_LENGTH, header.length - len(self.data)))
self.data += str(buf, "UTF-8")
self.logger.info("current length = {}, total length = {}"
.format(len(self.data),
header.length))
self.logger.info("{0} request = {1}"
.format(self.client_address[0],
self.data))
response = JSONRPCResponseManager.handle(self.data, dispatcher)
self.logger.info("response for {0} = {1}"
.format(self.client_address[0],
response.json))
self.wfile.write(MessageHeader.create(
len(response.json)) + bytes(response.json, "UTF-8"))
def main():
parser = argparse.ArgumentParser()
parser.add_argument("config")
args = parser.parse_args()
config = parse_config(args.config)
for name, repo in config.items():
github_auth = (repo["github_username"], repo["github_password"])
snooze_label = repo["snooze_label"]
ignore_members_of = repo["ignore_members_of"]
callback = lambda event, message: github_callback(event, message, github_auth,
snooze_label, ignore_members_of)
listener = RepositoryListener(
callbacks=[callback],
events=LISTEN_EVENTS,
**repo)
t = threading.Thread(target=poll_forever, args=(listener, repo["poll_interval"]))
t.daemon = True
t.start()
while True:
# wait forever for a signal or an unusual termination
if threading.active_count() < len(config) + 1:
logging.error("Child polling thread quit!")
return False
time.sleep(1)
return True
def do_media_info(self):
""" Shows basic media info. """
if self.is_client_mod:
self.send_owner_run_msg('*Playlist Length:* ' + str(len(self.media.track_list)))
self.send_owner_run_msg('*Track List Index:* ' + str(self.media.track_list_index))
self.send_owner_run_msg('*Elapsed Track Time:* ' +
self.format_time(self.media.elapsed_track_time()))
self.send_owner_run_msg('*Active Track:* ' + str(self.media.has_active_track()))
self.send_owner_run_msg('*Active Threads:* ' + str(threading.active_count()))
def test_starts_multiple_threads(self):
multi_thread_handler = MultiThreadHandler(Queue())
multi_thread_handler.start()
thread_count = get_config()["handlers"]["multi_thread"]["thread_count"]
try:
wait_until_success(lambda: self.assertTrue(threading.active_count() >= thread_count + 1))
finally:
multi_thread_handler.stop()
def test_paralleldiagnostics_start_workers_doesnt_start_above_concurrency_limit(self):
begin_threads = threading.active_count()
ec2rlcore.paralleldiagnostics._start_workers(self.workers, 2, self.options, self.work_queue, self.logdir)
self.assertEqual(len(self.workers), 2)
ec2rlcore.paralleldiagnostics._start_workers(self.workers, 2, self.options, self.work_queue, self.logdir)
self.assertEqual(len(self.workers), 2)
ec2rlcore.paralleldiagnostics._start_workers(self.workers, 1, self.options, self.work_queue, self.logdir)
self.assertEqual(len(self.workers), 2)
# Clean up workers
# This is tested in test_workers_lifecycle_sentinels()
for _ in self.workers:
self.work_queue.put(None)
def test_paralleldiagnostics_start_workers_does_start_additional_workers(self):
begin_threads = threading.active_count()
ec2rlcore.paralleldiagnostics._start_workers(self.workers, 2, self.options, self.work_queue, self.logdir)
# Must have more threads than we started with, and 2 workers
self.assertGreater(threading.active_count(), begin_threads)
self.assertEqual(len(self.workers), 2)
ec2rlcore.paralleldiagnostics._start_workers(self.workers, 4, self.options, self.work_queue, self.logdir)
# Must have 4 workers
self.assertEqual(len(self.workers), 4)
# Clean up workers
# This is tested in test_workers_lifecycle_sentinels()
for _ in self.workers:
self.work_queue.put(None)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config", type=str, help="Configuration file")
args = parser.parse_args()
starttime = time.time()
# Store opened shell sessions
shells = {}
# FIXME redis connection settings should be in config
redis_conn = redis.StrictRedis()
p = redis_conn.connection_pool
publish = gevent.spawn(publisher, redis_conn)
# FIXME: use config
workers = 2
log.info(_("Spawning %s greenlets connecting to Redis..."), workers)
redis_greenlets = [gevent.spawn(execute_workflow, redis_conn, _id, shells)
for _id in xrange(workers)]
# Wait until all greenlets have started and connected.
gevent.sleep(1)
log.info(_("# active `threading` threads: %s") % threading.active_count())
log.info(_("# Redis connections created: %s") % p._created_connections)
log.info(_("# Redis connections in use: %s") % len(p._in_use_connections))
log.info(_("# Redis connections available: %s") % len(p._available_connections))
log.info(_("Waiting for Redis connection greenlets to terminate..."))
gevent.joinall(redis_greenlets)
d = time.time() - starttime
log.info(_("All Redis connection greenlets terminated. Duration: %.2f s.") % d)
publish.kill()