def _parse_args(args):
parser = argparse.ArgumentParser()
if any([arg == '--version' for arg in args]):
return argparse.Namespace(version=True)
parser.add_argument('script', help='Script to run')
parser.add_argument('target', nargs='?', default='build', help='Target object to build; defaults to \'build\'')
parser.add_argument('--version', action='store_true', help='Print version info and exit')
parser.add_argument('--clear', action='store_true', help='Clear output directory')
parser.add_argument('--clear-cache', action='store_true', help='Clear cache before compiling')
parser.add_argument('--threads', '-t', type=int, help='Set thread count; defaults to cores*2')
parser.add_argument('--no-threading', '-nt', action='store_true', help='Disable multithreaded compiling')
# TODO: Make target '*' instead of '?' so multiple targets could be ran from the same command
return parser.parse_args(args)
python类Namespace()的实例源码
def parse(self, argv):
"""
Parse arguments.
:param argv: arguments.
"""
values = dict()
self.data = None
self._errors = list()
for idx, param in enumerate(self.params):
try:
values[param['dest'] or param['name']] = self.parse_parameter(param, argv, idx)
except ParamException as e:
self._errors.append(str(e))
self.data = Namespace(**values)
def list_action_remove(self, player, values, map_dictionary, view, **kwargs):
# Check permission.
if not await self.instance.permission_manager.has_permission(player, 'admin:remove_map'):
await self.instance.chat(
'$f00You don\'t have the permission to perform this action!',
player
)
return
# Ask for confirmation.
cancel = bool(await ask_confirmation(player, 'Are you sure you want to remove the map \'{}\'$z$s from the server?'.format(
map_dictionary['name']
), size='sm'))
if cancel is True:
return
# Simulate command.
await self.remove_map(player, Namespace(nr=map_dictionary['id']))
# Reload parent view.
await view.refresh(player)
def run_with_args(args, parser):
# type: (argparse.Namespace, argparse.ArgumentParser) -> int
set_logging_parameters(args, parser)
start_time = time.time()
ret = OK
try:
if args.profile:
outline("Profiling...")
profile("ret = whatstyle(args, parser)", locals(), globals())
else:
ret = whatstyle(args, parser)
except IOError as exc:
# If the output is piped into a pager like 'less' we get a broken pipe when
# the pager is quit early and that is ok.
if exc.errno == errno.EPIPE:
pass
elif str(exc) == 'Stream closed':
pass
else:
raise
if not PY2:
sys.stderr.close()
iprint(INFO_TIME, 'Run time: %s seconds' % (time.time() - start_time))
return ret
def test_start_object(self):
server = PJFServer(configuration=PJFConfiguration(Namespace(ports={"servers": {"HTTP_PORT": 8080, "HTTPS_PORT": 8443}},
html=False, level=6, command=["radamsa"], stdin=True,
json={"a": "test"}, indent=True, strong_fuzz=False, url_encode=False,
parameters=[], notify=False, debug=False, content_type="text/plain",
utf8=False, nologo=True)))
server.run()
json_http = urllib2.urlopen("http://127.0.0.1:8080").read()
try:
import requests
requests.packages.urllib3.disable_warnings()
json_https = requests.get('https://127.0.0.1:8443', verify=False).content
self.assertTrue(json_https)
except ImportError:
pass
self.assertTrue(json_http)
server.stop()
def cli_args():
"""Parse the command line arguments.
:return: The parsed arguments.
:rtype: argparse.Namespace
"""
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--check', action='store_true',
help="check if the system is vulnerable to WCry")
parser.add_argument('-m', '--mitigate', action='store_true',
help="mitigate the system's vulnerability by disabling the"
" SMBv1 protocol, if necessary; implies --check")
parser.add_argument('-f', '--fix', action='store_true')
parser.add_argument('--download-directory',
help="Optionally specify a directory where the Microsoft"
" KB update is saved when using --fix")
if len(sys.argv) == 1:
parser.print_help()
sys.exit(1)
# else:
return parser.parse_args()
def register_command(handler: Callable[[argparse.Namespace], None],
main_parser: Optional[ArgParserType]=None,
) -> Callable[[argparse.Namespace], None]:
if main_parser is None:
main_parser = global_argparser
if id(main_parser) not in _subparsers:
subparsers = main_parser.add_subparsers(title='commands',
dest='command')
_subparsers[id(main_parser)] = subparsers
else:
subparsers = _subparsers[id(main_parser)]
@functools.wraps(handler)
def wrapped(args):
handler(args)
doc_summary = handler.__doc__.split('\n\n')[0]
inner_parser = subparsers.add_parser(handler.__name__.replace('_', '-'),
description=handler.__doc__,
help=doc_summary)
inner_parser.set_defaults(function=wrapped)
wrapped.register_command = functools.partial(register_command,
main_parser=inner_parser)
wrapped.add_argument = inner_parser.add_argument
return wrapped
def __call__(
self,
parser, # type: argparse.ArgumentParser
namespace, # type: argparse.Namespace
values, # type: Union[ARGPARSE_TEXT, Sequence[Any], None]
option_string=None # type: Optional[ARGPARSE_TEXT]
):
# type: (...) -> None
"""Checks to make sure that the destination is empty before writing.
:raises parser.error: if destination is already set
"""
if getattr(namespace, self.dest) is not None: # type: ignore # typeshed doesn't know about Action.dest yet?
parser.error('{} argument may not be specified more than once'.format(option_string))
return
setattr(namespace, self.dest, values) # type: ignore # typeshed doesn't know about Action.dest yet?
def start(self):
self.setup_sockets()
import StaticUPnP_Settings
permissions = Namespace(**StaticUPnP_Settings.permissions)
print(permissions)
if permissions.drop_permissions:
self.drop_privileges(permissions.user, permissions.group)
self.running = Value(ctypes.c_int, 1)
self.queue = Queue()
self.reciever_thread = Process(target=self.socket_handler, args=(self.queue, self.running))
self.reciever_thread.start()
self.schedule_thread = Process(target=self.schedule_handler, args=(self.running,))
self.schedule_thread.start()
self.response_thread = Process(target=self.response_handler, args=(self.queue, self.running))
self.response_thread.start()
def get_interface_addresses(logger):
import StaticUPnP_Settings
interface_config = Namespace(**StaticUPnP_Settings.interfaces)
ip_addresses = StaticUPnP_Settings.ip_addresses
if len(ip_addresses) == 0:
import netifaces
ifs = netifaces.interfaces()
if len(interface_config.include) > 0:
ifs = interface_config.include
if len(interface_config.exclude) > 0:
for iface in interface_config.exclude:
ifs.remove(iface)
for i in ifs:
addrs = netifaces.ifaddresses(i)
if netifaces.AF_INET in addrs:
for addr in addrs[netifaces.AF_INET]:
ip_addresses.append(addr['addr'])
logger.info("Regestering multicast on %s: %s"%(i, addr['addr']))
return ip_addresses
def parse_args() -> argparse.Namespace: # pragma: no cover
"""
Parses the Command Line Arguments using argparse
:return: The parsed arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument("connection", help="The Type of Connection to use")
parser.add_argument("-v", "--verbose", action="store_true",
help="Activates verbose output")
parser.add_argument("-d", "--debug", action="store_true",
help="Activates debug-level logging output")
parser.add_argument("-q", "--quiet", action="store_true",
help="Disables all text output")
parser.add_argument("-c", "--config",
default=os.path.join(os.path.expanduser("~"),
".kudubot"),
help="Overrides the configuration directory location")
return parser.parse_args()
def test_init_2(tmpdir):
"it should open sam file if provided"
make_bam(tmpdir.strpath, """
123456789_123456789_
r1 + ...........
r1 - ......*....
r2 + .........*.
r2 - .....*.......
""")
o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None)
init(o)
assert isinstance(o.cfdna, AlignmentFile)
assert o.gdna == None
def test_init_3(tmpdir):
"it should generate a proper output file name if not provided"
make_bam(tmpdir.strpath, """
123456789_123456789_
r1 + ...........
r1 - ......*....
r2 + .........*.
r2 - .....*.......
""")
o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None)
init(o)
assert o.output != None
def test_aggregate_reads_3():
"it should ignore when 3+ reads share the same name"
o = Namespace(verbos=False, qual=20, mismatch_limit=-1)
reads = (
("r1", 'A', 60, 2, 11, -1, 2, 9, False, True),
("r1", 'C', 60, 2, 11, -1, 2, -9, True, True),
("r1", 'C', 60, 2, 11, -1, 2, 9, False, True),
("r2", 'C', 60, 2, 11, -1, 0, 0, True, False)
)
unique_pairs, unique_single, _, nerror, *_ = aggregate_reads(o, reads)
assert len(unique_pairs) == 0
assert len(unique_single) == 1
assert nerror == 3
def test_aggregate_reads_4():
"it should ignore when base in overlap area inconsistent between two reads"
o = Namespace(verbos=False, qual=20, mismatch_limit=-1)
reads = (
("r1", 'A', 60, 2, 11, -1, 4, 11, False, True),
("r1", 'C', 60, 4, 13, -1, 2, -11, True, True),
("r2", 'C', 60, 3, 12, -1, 5, 11, False, True),
("r2", 'C', 60, 5, 14, -1, 3, -11, True, True)
)
unique_pairs, unique_single, *_, ninconsis = aggregate_reads(o, reads)
assert len(unique_pairs) == 1
assert ninconsis == 2
def test_aggregate_reads_5():
"it should drop reads that has too much mismatch"
o = Namespace(verbos=False, qual=20, mismatch_limit=2)
reads = (
("r1", 'C', 60, 2, 11, 1, 4, 11, False, True),
("r1", 'C', 60, 4, 13, 1, 2, -11, True, True),
("r2", 'C', 60, 3, 12, 3, 5, 11, False, True),
("r2", 'C', 60, 5, 14, 1, 3, -11, True, True),
("r3", 'C', 60, 6, 14, 3, 0, 0, True, False),
("r4", 'C', 60, 7, 14, 1, 0, 0, True, False)
)
unique_pairs, unique_single, *_, nlowq, ninconsis = aggregate_reads(o, reads)
assert len(unique_pairs) == 1
assert len(unique_single) == 1
assert nlowq == 3
def test_count_different_type_1():
"it should NOT count dna that more than 10% reads have different bases"
o = Namespace(verbos=False, allow_inconsist=False)
pair = {
('ref', 2, True): [
('A', 60),
('T', 60),
('T', 60)
],
('ref', 3, False): [
('A', 60),
('T', 60),
('T', 60)
]
}
mor, mnr, msr, oor, onr, osr, moa, mna, msa, ooa, ona, osa, inconsis = count_different_type(o, pair, {}, 'T', 'A')
assert inconsis == 6
assert sum((mor, mnr, msr, oor, onr, osr, moa, mna, msa, ooa, ona, osa)) == 0
def test_get_reads_1(tmpdir):
"it should get all but only the reads that covers the given position"
make_bam(tmpdir.strpath, """
123456789_123456789_12
r1 + ...........
r1 - ......*....
r2 + .........*.
r2 - .....*.......
r3 + ...........
r3 - ....*......
r4 + ...........
r4 - ...........
123456789_123456789_12
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
assert sum( 1 for _ in get_reads(o, sam, 'ref', '4') ) == 2
assert sum( 1 for _ in get_reads(o, sam, 'ref', '12') ) == 7
assert sum( 1 for _ in get_reads(o, sam, 'ref', '20') ) == 2
def test_get_reads_2(tmpdir):
"it should read properties correctly"
make_bam(tmpdir.strpath, """
123456789_123
r1 + ...*.......
r1 - .*.........
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
r = next(get_reads(o, sam, 'ref', '4'))
assert r[0] == "r1" # name
assert r[3] == 0 # 0-based pos
assert r[4] == 11 # length
assert r[5] == -1 # mismatch, not caculated
assert r[6] == 2 # mate pos
assert r[7] == 13 # template length
assert r[8] == False # is_reverse
assert r[9] == True # paired and mapped
def test_pad_softclip_2(tmpdir):
"it should ignore more than two reads which share the same name"
make_bam(tmpdir.strpath, """
r1 + __.*.......
r1 - .*.......__
r1 - .*.......__
r2 + .*.......__
r2 - .*.......__
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
adjusted_pos = pad_softclip(sam)
assert sum(1 for startpos, length in adjusted_pos.values() if startpos != -1) == 1
def test_pad_softclip_3(tmpdir):
"it should pad softclipped bases"
make_bam(tmpdir.strpath, """
123456789_123
r1 + __.*.......
r1 - .*.........
r2 - ...*.......
r2 + .*.......__
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
adjusted_pos = pad_softclip(sam)
assert adjusted_pos["r1"] == (0, 13) # 0-based position
assert adjusted_pos["r2"] == (0, 13)
def get_api_id(config, args):
"""
Get the API ID from Terraform, or from AWS if that fails.
:param config: configuration
:type config: :py:class:`~.Config`
:param args: command line arguments
:type args: :py:class:`argparse.Namespace`
:return: API Gateway ID
:rtype: str
"""
try:
logger.debug('Trying to get Terraform rest_api_id output')
runner = TerraformRunner(config, args.tf_path)
outputs = runner._get_outputs()
depl_id = outputs['rest_api_id']
logger.debug("Terraform rest_api_id output: '%s'", depl_id)
except Exception:
logger.info('Unable to find API rest_api_id from Terraform state;'
' querying AWS.', exc_info=1)
aws = AWSInfo(config)
depl_id = aws.get_api_id()
logger.debug("AWS API ID: '%s'", depl_id)
return depl_id
def _predict(predictors: Dict[str, str]):
def predict_inner(args: argparse.Namespace) -> None:
predictor = _get_predictor(args, predictors)
output_file = None
if args.silent and not args.output_file:
print("--silent specified without --output-file.")
print("Exiting early because no output will be created.")
sys.exit(0)
# ExitStack allows us to conditionally context-manage `output_file`, which may or may not exist
with ExitStack() as stack:
input_file = stack.enter_context(args.input_file) # type: ignore
if args.output_file:
output_file = stack.enter_context(args.output_file) # type: ignore
_run(predictor, input_file, output_file, args.batch_size, not args.silent, args.cuda_device)
return predict_inner
def get_parsed_args(self, comp_words):
""" gets the parsed args from a patched parser """
active_parsers = self._patch_argument_parser()
parsed_args = argparse.Namespace()
self.completing = True
if USING_PYTHON2:
# Python 2 argparse only properly works with byte strings.
comp_words = [ensure_bytes(word) for word in comp_words]
try:
stderr = sys.stderr
sys.stderr = io.open(os.devnull, "w")
active_parsers[0].parse_known_args(comp_words, namespace=parsed_args)
sys.stderr.close()
sys.stderr = stderr
except BaseException:
pass
self.completing = False
return parsed_args
def test_with_config(self, is_file, open_):
# Create our stand-in config file.
config_file = textwrap.dedent(u"""\
---
local_paths:
reporoot: ~/Code
publish: local
""")
is_file.return_value = True
open_.return_value = io.StringIO(config_file)
# Get the config and test the result.
flags = Namespace(user_config='/bogus/file.yaml', command='not_init')
user_config = main.read_user_config(flags)
assert user_config == {
'local_paths': {'reporoot': '~/Code'},
'publish': 'local',
}
def test_with_ssh_repo(self, login):
# Set up test data to return when we attempt to make the
# pull request.
gh = mock.MagicMock(spec=github3.github.GitHub)
login.return_value = gh
url = 'https://github.com/me/repo/pulls/1/'
gh.repository().create_pull.return_value = Namespace(html_url=url)
# Run the task.
task = github.CreateGitHubPullRequest()
pr = task.execute(**self.task_kwargs)
# Assert we got the correct result.
assert pr.html_url == url
# Assert that the correct methods were called.
login.assert_called_once_with('lukesneeringer', '1335020400')
gh.repository.assert_called_with('me', 'repo')
gh.repository().create_pull.assert_called_once_with(
base='master',
body='This pull request was generated by artman. '
'Please review it thoroughly before merging.',
head='pubsub-python-v1',
title='Python GAPIC: Pubsub v1',
)
def test_with_http_url(self, login):
# Set up test data to return when we attempt to make the
# pull request.
gh = mock.MagicMock(spec=github3.github.GitHub)
login.return_value = gh
url = 'https://github.com/me/repo/pulls/1/'
gh.repository().create_pull.return_value = Namespace(html_url=url)
# Run the task.
task = github.CreateGitHubPullRequest()
pr = task.execute(**dict(self.task_kwargs, git_repo={
'location': 'https://github/me/repo/',
}))
# Assert we got the correct result.
assert pr.html_url == url
# Assert that the correct repository method was still called.
gh.repository.assert_called_with('me', 'repo')
def print_args(self):
"""Print out all the arguments in this parser."""
if not self.opt:
self.parse_args(print_args=False)
values = {}
for key, value in self.opt.items():
values[str(key)] = str(value)
for group in self._action_groups:
group_dict = {
a.dest: getattr(self.args, a.dest, None)
for a in group._group_actions
}
namespace = argparse.Namespace(**group_dict)
count = 0
for key in namespace.__dict__:
if key in values:
if count == 0:
print('[ ' + group.title + ': ] ')
count += 1
print('[ ' + key + ': ' + values[key] + ' ]')
def _test_standalone_sequana(qtbot, tmpdir):
wkdir = TemporaryDirectory()
inputdir = os.path.realpath(
sequana_data("Hm2_GTGAAA_L005_R1_001.fastq.gz")).rsplit(os.sep,1)[0]
# Standalone for sequana given a wkdir and pipeline and input_dir
args = Namespace(pipeline="quality_control", wkdir=wkdir.name,
input_directory=inputdir)
widget = sequana_gui.SequanaGUI(ipython=False, user_options=args)
qtbot.addWidget(widget)
assert widget.mode == "sequana"
widget.force = True
widget.save_project()
widget.click_run()
count = 0
while widget.process.state() and count < 5:
time.sleep(0.5)
count+=0.5
widget.click_stop()
time.sleep(1)
def _setup_arg_parser(argv):
""" Parse arguments from command line
Args:
argv list: by default it's command line arguments
Returns:
argparse.Namespace: parsed argument
"""
parser = argparse.ArgumentParser(
description='MySQL binlog to Google Cloud Pub/Sub')
parser.add_argument('conf',
help='configuration file for publishing')
parser.add_argument('--loglevel', '-l', default=None,
help='log level for root')
if os.path.isfile('logging.ini'):
_log_file = 'logging.ini'
else:
_log_file = None
parser.add_argument('--logconf', default=_log_file,
help='INI file log configuration')
args = parser.parse_args(argv)
return args