def __init__(
self,
worker_class,
concurrent_workers,
):
self.logger = logger.logger.Logger(
logger_name='Supervisor',
)
self.worker_class = worker_class
self.concurrent_workers = concurrent_workers
self.task = self.worker_class()
self.workers_processes = []
self.should_work_event = threading.Event()
self.should_work_event.set()
multiprocessing.set_start_method(
method='spawn',
force=True,
)
python类set_start_method()的实例源码
def test_set_get(self):
multiprocessing.set_forkserver_preload(PRELOAD)
count = 0
old_method = multiprocessing.get_start_method()
try:
for method in ('fork', 'spawn', 'forkserver'):
try:
multiprocessing.set_start_method(method, force=True)
except ValueError:
continue
self.assertEqual(multiprocessing.get_start_method(), method)
ctx = multiprocessing.get_context()
self.assertEqual(ctx.get_start_method(), method)
self.assertTrue(type(ctx).__name__.lower().startswith(method))
self.assertTrue(
ctx.Process.__name__.lower().startswith(method))
self.check_context(multiprocessing)
count += 1
finally:
multiprocessing.set_start_method(old_method, force=True)
self.assertGreaterEqual(count, 1)
def match_game(self):
multiprocessing.set_start_method('spawn')
# TODO restart runners for running games?
logger.info('Matchmaker started')
while True:
wait = gen.sleep(5)
starttime = ioloop.IOLoop.current().time()
players = yield gamequeue.find().sort([('_id', 1)]).limit(10).to_list(length=10)
while len(players) >= 2:
random.shuffle(players)
p0, p1, players = players[0], players[1], players[2:]
p0['token'], p1['token'] = create_token(), create_token()
queue_ids = [p0.pop('_id'), p1.pop('_id')]
game = {'player0': p0, 'player1': p1, 'turn': 0, 'status': 'new'}
insert_result = yield games.insert_one(game)
game_idstr = str(insert_result.inserted_id)
runner_name = 'runner-%s' % game_idstr
logger.info('Launching Process "%s"', runner_name)
p = multiprocessing.Process(target=Runner.start_game, args=(game_idstr,), name=runner_name, daemon=True)
p.start()
# TODO keep track of spawned runner processes
yield gamequeue.delete_many({'_id': {'$in': queue_ids}})
endtime = ioloop.IOLoop.current().time()
logger.debug('MatchMaker ran for %.3fms', 1000 * (endtime - starttime))
yield wait
def start(self, slave_addr, task):
self._task = task
def _start(id, slave_addr, task):
from multiprocessing import Process
import multiprocessing
#multiprocessing.set_start_method('spawn')
Process(target=_worker_main, args=(id, slave_addr, task)).start()
from concurrent.futures import ProcessPoolExecutor
print("[Worker {0}] Create".format(self.id))
_start(self.id, slave_addr, task)
#executor = ProcessPoolExecutor()
#loop = asyncio.get_event_loop()
#asyncio.ensure_future(loop.run_in_executor(ProcessPoolExecutor(), _worker_main, self.id, slave_addr, task))
#asyncio.ensure_future(_start(self.id, slave_addr, task))
#yield from asyncio.sleep(10)
print("***")
def test_set_get(self):
multiprocessing.set_forkserver_preload(PRELOAD)
count = 0
old_method = multiprocessing.get_start_method()
try:
for method in ('fork', 'spawn', 'forkserver'):
try:
multiprocessing.set_start_method(method, force=True)
except ValueError:
continue
self.assertEqual(multiprocessing.get_start_method(), method)
ctx = multiprocessing.get_context()
self.assertEqual(ctx.get_start_method(), method)
self.assertTrue(type(ctx).__name__.lower().startswith(method))
self.assertTrue(
ctx.Process.__name__.lower().startswith(method))
self.check_context(multiprocessing)
count += 1
finally:
multiprocessing.set_start_method(old_method, force=True)
self.assertGreaterEqual(count, 1)
def __init__(self):
# multiprocessing.set_start_method('spawn')
self.layers = {}
self.stop_event = multiprocessing.get_context('spawn').Event()
self.input_prompts = multiprocessing.get_context('spawn').Queue()
self.show_monitor = False
def run():
# ask user for difficulty
q_app = QtWidgets.QApplication([])
q_widget = QtWidgets.QWidget()
dialog = QtWidgets.QMessageBox(q_widget)
dialog.addButton('Easy', QtWidgets.QMessageBox.ActionRole)
dialog.addButton('Medium', QtWidgets.QMessageBox.ActionRole)
dialog.addButton('Hard', QtWidgets.QMessageBox.ActionRole)
dialog.addButton('Impossible', QtWidgets.QMessageBox.ActionRole)
dialog.setText('Choose difficulty:')
ret = dialog.exec_()
easy, medium, hard, impossible = range(4)
sim_time = None
if ret == easy:
sim_time = 1
elif ret == medium:
sim_time = 3
elif ret == hard:
sim_time = 5
elif ret == impossible:
sim_time = 8
mp.set_start_method('spawn')
gui_process = mp.Process(target=start_client.main)
gui_process.start()
run_game.main(BlackAgent='human', WhiteAgent='monte_carlo',
sim_time=sim_time, gui=True)
def main(args):
import multiprocessing
try:
multiprocessing.set_start_method("fork")
except RuntimeError as ex:
log.warn("multiprocessing.set_start_method: " + str(ex))
if not args.command:
# No command was given.
args.app.arg_parser.print_help()
return 1
# In the case fileConfig undid the command line, which has precedence.
args.applyLoggingOpts(args.log_levels, args.log_files)
if args.db_url:
args.config.set(MAIN_SECT, SA_KEY, args.db_url)
# Don't want commands and such to use this, so reset.
args.db_url = None
elif "MISHMASH_DBURL" in os.environ:
log.verbose("Using environment MISHMASH_DBURL over configuration: {}"
.format(os.environ["MISHMASH_DBURL"]))
args.config.set(MAIN_SECT, SA_KEY, os.environ["MISHMASH_DBURL"])
try:
# Run command
retval = args.command_func(args, args.config) or 0
except (KeyboardInterrupt, PromptExit) as ex:
# PromptExit raised when CTRL+D during prompt, or prompts disabled
retval = 0
except (sql_exceptions.ArgumentError,
sql_exceptions.OperationalError) as ex:
_pErr("Database error")
retval = 1
except Exception as ex:
log.exception(ex)
_pErr("General error")
retval = 2
return retval
def test_context(self):
for method in ('fork', 'spawn', 'forkserver'):
try:
ctx = multiprocessing.get_context(method)
except ValueError:
continue
self.assertEqual(ctx.get_start_method(), method)
self.assertIs(ctx.get_context(), ctx)
self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
self.assertRaises(ValueError, ctx.set_start_method, None)
self.check_context(ctx)
def test_semaphore_tracker(self):
import subprocess
cmd = '''if 1:
import multiprocessing as mp, time, os
mp.set_start_method("spawn")
lock1 = mp.Lock()
lock2 = mp.Lock()
os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
time.sleep(10)
'''
r, w = os.pipe()
p = subprocess.Popen([sys.executable,
'-c', cmd % (w, w)],
pass_fds=[w],
stderr=subprocess.PIPE)
os.close(w)
with open(r, 'rb', closefd=True) as f:
name1 = f.readline().rstrip().decode('ascii')
name2 = f.readline().rstrip().decode('ascii')
_multiprocessing.sem_unlink(name1)
p.terminate()
p.wait()
time.sleep(2.0)
with self.assertRaises(OSError) as ctx:
_multiprocessing.sem_unlink(name2)
# docs say it should be ENOENT, but OSX seems to give EINVAL
self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
err = p.stderr.read().decode('utf-8')
p.stderr.close()
expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
self.assertRegex(err, expected)
self.assertRegex(err, 'semaphore_tracker: %r: \[Errno' % name1)
#
# Mixins
#
def run(args):
# create dummy environment to be able to create model
env = gym.make(args.environment)
assert isinstance(env.observation_space, Box)
assert isinstance(env.action_space, Discrete)
print("Observation space:", env.observation_space)
print("Action space:", env.action_space)
# create main model
model = create_model(env, args)
model.summary()
env.close()
# force runner processes to use cpu
os.environ["CUDA_VISIBLE_DEVICES"] = ""
# for better compatibility with Theano and Tensorflow
multiprocessing.set_start_method('spawn')
# create shared buffer for sharing weights
blob = pickle.dumps(model.get_weights(), pickle.HIGHEST_PROTOCOL)
shared_buffer = Array('c', len(blob))
shared_buffer.raw = blob
# create fifos and threads for all runners
fifos = []
for i in range(args.num_runners):
fifo = Queue(args.queue_length)
fifos.append(fifo)
process = Process(target=runner, args=(shared_buffer, fifo, args))
process.start()
# start trainer in main thread
trainer(model, fifos, shared_buffer, args)
def test_context(self):
for method in ('fork', 'spawn', 'forkserver'):
try:
ctx = multiprocessing.get_context(method)
except ValueError:
continue
self.assertEqual(ctx.get_start_method(), method)
self.assertIs(ctx.get_context(), ctx)
self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
self.assertRaises(ValueError, ctx.set_start_method, None)
self.check_context(ctx)
def test_semaphore_tracker(self):
import subprocess
cmd = '''if 1:
import multiprocessing as mp, time, os
mp.set_start_method("spawn")
lock1 = mp.Lock()
lock2 = mp.Lock()
os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
time.sleep(10)
'''
r, w = os.pipe()
p = subprocess.Popen([sys.executable,
'-c', cmd % (w, w)],
pass_fds=[w],
stderr=subprocess.PIPE)
os.close(w)
with open(r, 'rb', closefd=True) as f:
name1 = f.readline().rstrip().decode('ascii')
name2 = f.readline().rstrip().decode('ascii')
_multiprocessing.sem_unlink(name1)
p.terminate()
p.wait()
time.sleep(2.0)
with self.assertRaises(OSError) as ctx:
_multiprocessing.sem_unlink(name2)
# docs say it should be ENOENT, but OSX seems to give EINVAL
self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
err = p.stderr.read().decode('utf-8')
p.stderr.close()
expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
self.assertRegex(err, expected)
self.assertRegex(err, 'semaphore_tracker: %r: \[Errno' % name1)
#
# Mixins
#
def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
asmbot.log("ASM version {} booting".format(asmbot.__version__), tag="ASM LDR")
asmbot.log("Loading config", tag="PAM LDR")
with open("config.json", "r") as f:
cts = f.read()
tkd = json.loads(cts)
asmbot.log("Launching ASM", tag="ASM LDR")
mp.set_start_method("spawn")
executor = ThreadPoolExecutor(max_workers=int(tkd["shard_count"]))
processes = []
for i in range(0, int(tkd["shard_count"])):
args = {
"token": tkd["token"],
"shard_id": i,
"shard_count": int(tkd["shard_count"]),
"script": tkd["script"],
"guild_blacklist": tkd["guild_blacklist"],
"guild_exempt_list": tkd["guild_exempt_list"]
}
loop.create_task(launch_process(executor, asmbotlauncher.initialize_asmbot, **args))
if i != int(tkd["shard_count"]) - 1:
loop.run_until_complete(asyncio.sleep(10))
asmbot.log("Running", tag="ASM LDR")
try:
loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks(loop)))
except KeyboardInterrupt:
pass
finally:
asmbot.log("Shutting down", tag="ASM LDR")
for process in processes:
process.join()
asmbot.log("Shutdown finalized", tag="ASM LDR")