def worker(work_queue, done_queue):
spinner = spinning_cursor()
p = current_process()
for nif_path in iter(work_queue.get, 'STOP'):
sys.stdout.write("\r\b\033[K{0} [{1}][{2}][{3}]".format(
next(spinner), work_queue.qsize(), p.name, nif_path))
sys.stdout.flush()
assets = []
try:
# assets.append('DEADBEEF')
assets = retrieve_assets_from_nif(nif_path)
except Exception:
pass
done_queue.put((nif_path, assets))
done_queue.put('STOP')
return True
python类current_process()的实例源码
def bflipper(tokens):
mutated_tokens = []
procnum = int(multiprocessing.current_process().name)
threadnum = int(threading.current_thread().name)
mystart = procnum*max((config_hodor.iterations/config_hodor.procs), 8)
# Figure out how to spread threads in a sensible manner
for item in tokens:
buf = bytearray(item) if isinstance(item, str) else item
if len(buf) == 0:
mutated_tokens.append(buf) # Nothing to do
continue
# This is supposed to deal with iterations > buflen in a sane way
# Should just loop through and flip more bits at once
myflip = config_hodor.mutator["bflipper"]["flipmode"] + (mystart+threadnum)/(len(buf)*8)
flipme = (threadnum/8)+(mystart/8)
if flipme >= len(buf):
flipme = flipme % len(buf)
for j in range(myflip):
buf[flipme] ^= (1 << ((threadnum+j)%8)) # Minor bug here, will do one extra xor on myflip>1
mutated_tokens.append(buf)
return mutated_tokens
# Quid pro quo, swap out old tokens for user specified tokens
def serve_forever(self):
'''
Run the server forever
'''
current_process()._manager_server = self
try:
try:
while 1:
try:
c = self.listener.accept()
except (OSError, IOError):
continue
t = threading.Thread(target=self.handle_request, args=(c,))
t.daemon = True
t.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
self.stop = 999
self.listener.close()
def RebuildProxy(func, token, serializer, kwds):
'''
Function used for unpickling proxy objects.
If possible the shared object is returned, or otherwise a proxy for it.
'''
server = getattr(current_process(), '_manager_server', None)
if server and server.address == token.address:
return server.id_to_obj[token.id][0]
else:
incref = (
kwds.pop('incref', True) and
not getattr(current_process(), '_inheriting', False)
)
return func(token, serializer, incref=incref, **kwds)
#
# Functions to create proxies and proxy types
#
def _get_listener():
global _listener
if _listener is None:
_lock.acquire()
try:
if _listener is None:
debug('starting listener and thread for sending handles')
_listener = Listener(authkey=current_process().authkey)
t = threading.Thread(target=_serve)
t.daemon = True
t.start()
finally:
_lock.release()
return _listener
def craw(self,lock,count):
while 1:
next_task=self.task.get()
if next_task is None:
self.task.task_done()
continue
# print(self.urls.new_urls)
# new_url = self.urls.get_new_url()
# print("%s craw %d : %s" % (multiprocessing.current_process().name,count, new_url))
# new_html = self.downloader.download(new_url)
# new_urls, new_data = self.parser.parse(new_url, new_html)
# self.urls.add_new_urls(new_urls)
# self.outputer.collect_data(new_data)
# self.outputer.output_html()
# count += 1
new_url = next_task.a
print("%s craw %d : %s" % (multiprocessing.current_process().name, count, new_url))
new_html = self.downloader.download(new_url)
new_urls, new_data = self.parser.parse(new_url, new_html)
for i in range(len(new_urls)):
self.task.put(Task(new_urls[i]))
self.outputer.collect_data(new_data)
self.outputer.output_html()
self.task.task_done()
count += 1
def serve_forever(self):
'''
Run the server forever
'''
current_process()._manager_server = self
try:
try:
while 1:
try:
c = self.listener.accept()
except (OSError, IOError):
continue
t = threading.Thread(target=self.handle_request, args=(c,))
t.daemon = True
t.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
self.stop = 999
self.listener.close()
def RebuildProxy(func, token, serializer, kwds):
'''
Function used for unpickling proxy objects.
If possible the shared object is returned, or otherwise a proxy for it.
'''
server = getattr(current_process(), '_manager_server', None)
if server and server.address == token.address:
return server.id_to_obj[token.id][0]
else:
incref = (
kwds.pop('incref', True) and
not getattr(current_process(), '_inheriting', False)
)
return func(token, serializer, incref=incref, **kwds)
#
# Functions to create proxies and proxy types
#
def _get_listener():
global _listener
if _listener is None:
_lock.acquire()
try:
if _listener is None:
debug('starting listener and thread for sending handles')
_listener = Listener(authkey=current_process().authkey)
t = threading.Thread(target=_serve)
t.daemon = True
t.start()
finally:
_lock.release()
return _listener
def submit(config, user, run_id, pids):
"""
Submits pipeline defined by 'config' as user 'user'.
Dumps the config in a temp. file that is removed after succesful completion.
Returns exit code, stdout, and stderr.
"""
pids[run_id] = mp.current_process().pid
(fd, tmp_cfg) = tempfile.mkstemp(prefix='pypers_', suffix='.cfg', text=True)
os.fchmod(fd, 0644)
with os.fdopen(fd, 'w') as fh:
json.dump(config, fh)
cmd = [which('np_submit.py'), '-i', tmp_cfg]
(ec, err, out) = run_as(cmd=cmd, user=user)
if ec == 0:
os.unlink(tmp_cfg)
return (err, out)
else:
raise Exception('Unable to execute cmd %s:\n%s\n%s' % (cmd, err, out))
def print_statistic(self):
now = time.time()
if now - self.checkpoint > self.statistic_interval:
count = self.count.value
self.count.value = 0
delta = now - self.checkpoint
self.checkpoint = now
if now - self.checkpoint > 3 * self.statistic_interval:
# ????, ???????, ??????????, ?????
log.info("inserted {} rows in the past {}s".format(count, round(delta, 3)))
else:
log.info(
"delta:{}s count:{} speed:{}/s qsize:{} qfull:{} P:{} Th:{}".format(
round(delta, 3), count, round(count / delta, 2),
self.queue.qsize(), self.queue.full(),
multiprocessing.current_process().name,
threading.current_thread().name,
))
def run(self):
while True:
try:
next_task = self.task_queue.get()
if not next_task:
# print("%s Poisoned" % multiprocessing.current_process().name, file=sys.stderr)
self.task_queue.task_done()
break
try:
result = next_task()
self.result_queue.put(result)
except Exception as e:
if self.exception_handling == ExceptionHandling.IGNORE:
# print("%s Exception: %s" % (multiprocessing.current_process().name, e), file=sys.stderr)
# print("%s IGNORE error" % multiprocessing.current_process().name, file=sys.stderr)
pass
elif self.exception_handling == ExceptionHandling.THROW: # Caution
self.task_queue.task_done()
raise e
else: # Special Token
self.result_queue.put(self.exception_handling)
self.task_queue.task_done()
except Exception as e:
raise e
pass
def __init__(self, address=None, authkey=None, serializer='pickle'):
if authkey is None:
authkey = current_process().authkey
self._address = address # XXX not final address if eg ('', 0)
self._authkey = AuthenticationString(authkey)
self._state = State()
self._state.value = State.INITIAL
self._serializer = serializer
self._Listener, self._Client = listener_client[serializer]
def __init__(self, token, serializer, manager=None,
authkey=None, exposed=None, incref=True):
BaseProxy._mutex.acquire()
try:
tls_idset = BaseProxy._address_to_local.get(token.address, None)
if tls_idset is None:
tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
BaseProxy._address_to_local[token.address] = tls_idset
finally:
BaseProxy._mutex.release()
# self._tls is used to record the connection used by this
# thread to communicate with the manager at token.address
self._tls = tls_idset[0]
# self._idset is used to record the identities of all shared
# objects for which the current process owns references and
# which are in the manager at token.address
self._idset = tls_idset[1]
self._token = token
self._id = self._token.id
self._manager = manager
self._serializer = serializer
self._Client = listener_client[serializer][1]
if authkey is not None:
self._authkey = AuthenticationString(authkey)
elif self._manager is not None:
self._authkey = self._manager._authkey
else:
self._authkey = current_process().authkey
if incref:
self._incref()
util.register_after_fork(self, BaseProxy._after_fork)
def _connect(self):
util.debug('making connection to manager')
name = current_process().name
if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name
conn = self._Client(self._token.address, authkey=self._authkey)
dispatch(conn, None, 'accept_connection', (name,))
self._tls.connection = conn
def rebuild_handle(pickled_data):
address, handle, inherited = pickled_data
if inherited:
return handle
sub_debug('rebuilding handle %d', handle)
conn = Client(address, authkey=current_process().authkey)
conn.send((handle, os.getpid()))
new_handle = recv_handle(conn)
conn.close()
return new_handle
#
# Register `_multiprocessing.Connection` with `ForkingPickler`
#
def cleanup_and_exit(self, code, frame):
if not current_process().name == "MainProcess":
return
logging.info("Starting cleanup procedure! Stopping running threads")
# TODO Move submodules into self that populates as used?
submodules = ['replset', 'sharding', 'backup', 'oplogtailer', 'archive', 'upload']
for submodule_name in submodules:
try:
submodule = getattr(self, submodule_name)
if submodule:
submodule.close()
except Exception:
continue
if self.manager:
self.manager.shutdown()
if self.db:
self.db.close()
if self.notify:
try:
self.notify.notify("%s: backup '%s/%s' failed! Error: '%s'" % (
self.program_name,
self.config.backup.name,
self.backup_time,
self.last_error_msg
))
self.notify.run()
self.notify.close()
except Exception, e:
logging.error("Error from notifier: %s" % e)
logging.info("Cleanup complete, exiting")
if self.logger:
self.logger.rotate()
self.logger.close()
self.release_lock()
sys.exit(1)
def write(self, data):
# note that these pids are in the form of current_process()._identity
# rather than OS pids
from multiprocessing import current_process
pid = current_process()._identity
self.__queue.put((pid, data))
def __init__(self, address=None, authkey=None, serializer='pickle'):
if authkey is None:
authkey = current_process().authkey
self._address = address # XXX not final address if eg ('', 0)
self._authkey = AuthenticationString(authkey)
self._state = State()
self._state.value = State.INITIAL
self._serializer = serializer
self._Listener, self._Client = listener_client[serializer]
def __init__(self, token, serializer, manager=None,
authkey=None, exposed=None, incref=True):
BaseProxy._mutex.acquire()
try:
tls_idset = BaseProxy._address_to_local.get(token.address, None)
if tls_idset is None:
tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
BaseProxy._address_to_local[token.address] = tls_idset
finally:
BaseProxy._mutex.release()
# self._tls is used to record the connection used by this
# thread to communicate with the manager at token.address
self._tls = tls_idset[0]
# self._idset is used to record the identities of all shared
# objects for which the current process owns references and
# which are in the manager at token.address
self._idset = tls_idset[1]
self._token = token
self._id = self._token.id
self._manager = manager
self._serializer = serializer
self._Client = listener_client[serializer][1]
if authkey is not None:
self._authkey = AuthenticationString(authkey)
elif self._manager is not None:
self._authkey = self._manager._authkey
else:
self._authkey = current_process().authkey
if incref:
self._incref()
util.register_after_fork(self, BaseProxy._after_fork)