def add_result():
session = make_session()
parser = argparse.ArgumentParser()
parser.add_argument("league", type=int, help="league id")
parser.add_argument("match", type=int, help="match")
parser.add_argument("player", type=str, help="player name")
parser.add_argument("position", type=int, help="team position")
parser.add_argument("kills", type=int, help="player kills")
args = parser.parse_args()
with transaction.manager:
result_string = "%s,%s" % (args.position, args.kills)
hero_id = session.query(Hero).filter(Hero.league == args.league).filter(Hero.name == args.player).first()
if not hero_id:
print "Name wrong"
return
session.add(Result(args.league, hero_id.id, args.match, result_string,
time.time(), 1, 1))
transaction.commit()
return
python类ArgumentParser()的实例源码
def main():
session = make_session()
parser = argparse.ArgumentParser()
parser.add_argument("league", type=int, help="league id")
args = parser.parse_args()
league = session.query(League).filter(League.id == args.league).first()
with transaction.manager:
print "Updating hero points"
update_hero_points(session, league)
transaction.commit()
with transaction.manager:
print "Updating league points"
update_league_points(session, league)
transaction.commit()
with transaction.manager:
print "Updating user rankings"
update_user_rankings(session, league)
transaction.commit()
def get_arguments():
parser = argparse.ArgumentParser(description='FAST5 to FASTQ',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('dir', type=str,
help='directory of FAST5 reads to extract (will be searched recursively)')
parser.add_argument('--min_length', type=int, default=0,
help='Exclude reads shorter than this length (in bp)')
parser.add_argument('--min_mean_qual', type=float, default=0.0,
help='Exclude reads with a mean qscore less than this value')
parser.add_argument('--min_qual_window', type=float, default=0.0,
help='Exclude reads where their mean qscore in a sliding window drops '
'below this value')
parser.add_argument('--window_size', type=int, default=50,
help='The size of the sliding window used for --min_qual_window')
parser.add_argument('--target_bases', type=int, default=None,
help='If set, exclude the worst reads (as judged by their minimum qscore '
'in a sliding window) such that only this many bases remain')
args = parser.parse_args()
args.dir = os.path.abspath(args.dir)
return args
def main():
"""
Parse arguments and trigger main application
"""
parser = argparse.ArgumentParser()
parser.add_argument("--region", help="The region to operate in.", default="us-east-1")
parser.add_argument("jsondoc", help="The json document describing the snapshot.")
args = parser.parse_args()
config = {}
with open(args.jsondoc, 'r') as jsonf:
config = json.load(jsonf)
snapshot_ids = [vol["snapshot-id"] for vol in config.get("backup-volumes")]
region = config["region"]
try:
if are_snapshots_tagged(region, snapshot_ids):
return
validate_all_collections()
update_tags(region, snapshot_ids, True)
except Exception as exc:
update_tags(region, snapshot_ids, False)
#TODO: Add alerting, monitoring, chaos, destruction, etc.
raise exc
def main():
parser = argparse.ArgumentParser(description=_description(),
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('-l', '--list', action='store_true')
parser.add_argument('-p', '--password')
parser.add_argument('-t', '--target-dir')
parser.add_argument('IN', type=AndroidBackup)
args = parser.parse_args()
with args.IN as infile:
if args.list:
infile.list(
password=args.password
)
else:
infile.unpack(
target_dir=args.target_dir,
password=args.password
)
def main():
parser = argparse.ArgumentParser(description=_description(),
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('OUT')
parser.add_argument('-p', '--password')
parser.add_argument('-s', '--source-dir')
parser.add_argument('-e', '--encrypt', action='store_true')
args = parser.parse_args()
ab = android_backup.AndroidBackup()
ab.version = 3
ab.compression = android_backup.CompressionType.ZLIB
ab.encryption = android_backup.EncryptionType.NONE
if args.encrypt:
ab.encryption = android_backup.EncryptionType.AES256
ab.pack(
fname=args.OUT,
source_dir=args.source_dir,
password=args.password
)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser()
parser.add_argument('action',
nargs='?',
choices=[CHECK, ADD_USER, BUILD, GEN_PSWD],
default=CHECK)
parser.add_argument('--key', type=str)
args = parser.parse_args()
key = None
if args.key:
key = wrapper.convert_key(args.key)
if args.action == CHECK:
check()
elif args.action == BUILD:
try:
build()
except Exception as e:
_smirc("build error")
print(str(e))
elif args.action == ADD_USER:
add_user(key)
elif args.action == GEN_PSWD:
gen_pass(True, key)
def main():
parser = argparse.ArgumentParser(description='Open a diff tool with three files')
parser.add_argument('filename', nargs='*', help='input filename')
# The info and debug options were inspired by rsync.
parser.add_argument('--tool', help='path of diff tool')
parser.add_argument('--results', action='store_true', help='show results diffs')
args = parser.parse_args()
tool = args.tool
if tool is None:
tool = os.environ.get(DIFF3_VARNAME)
if tool is None:
parser.error('Please specify the 3-way file comparison tool with --tool or set %s' %
DIFF3_VARNAME)
if args.results:
return modified_results(tool)
filenames = args.filename
if not filenames:
parser.error('Please specify at least one source file')
filenames = [os.path.abspath(f) for f in filenames]
diff_for_files(tool, filenames)
return 0
def run_with_args(args, parser):
# type: (argparse.Namespace, argparse.ArgumentParser) -> int
set_logging_parameters(args, parser)
start_time = time.time()
ret = OK
try:
if args.profile:
outline("Profiling...")
profile("ret = whatstyle(args, parser)", locals(), globals())
else:
ret = whatstyle(args, parser)
except IOError as exc:
# If the output is piped into a pager like 'less' we get a broken pipe when
# the pager is quit early and that is ok.
if exc.errno == errno.EPIPE:
pass
elif str(exc) == 'Stream closed':
pass
else:
raise
if not PY2:
sys.stderr.close()
iprint(INFO_TIME, 'Run time: %s seconds' % (time.time() - start_time))
return ret
def arguments():
parser = argparse.ArgumentParser(description="Run AWS Lambda function locally")
parser.add_argument("filename", type=str,
help="name of file containing Lambda function")
parser.add_argument("event", type=str,
help="filename of file containing JSON event data")
parser.add_argument("-f", "--function", metavar="HANDLER_FUNCTION",
dest="function_name", type=str, default="handler",
help="Name of handler function. Defaults to \"handler\"")
parser.add_argument("-t", "--timeout", metavar="TIMEOUT",
dest="timeout", type=int, default=None,
help="Timeout (in seconds) for function call. If not provided, "
"no timeout will be used.")
parser.add_argument("-c", "--context", metavar="CONTEXT_FILENAME", type=str, default=None,
dest="context_file",
help="Filename of file containing JSON context data")
return parser.parse_args()
def main():
parser = argparse.ArgumentParser(description="")
parser.add_argument("--mode", default="stdio",
help="communication (stdio|tcp)")
parser.add_argument("--addr", default=2087,
help="server listen (tcp)", type=int)
args = parser.parse_args()
if args.mode == "stdio":
log("Reading on stdin, writing on stdout")
s = LangServer(conn=ReadWriter(sys.stdin, sys.stdout))
s.listen()
elif args.mode == "tcp":
host, addr = "0.0.0.0", args.addr
log("Accepting TCP connections on {}:{}".format(host, addr))
ThreadingTCPServer.allow_reuse_address = True
s = ThreadingTCPServer((host, addr), LangserverTCPTransport)
try:
s.serve_forever()
finally:
s.shutdown()
def get_parser():
def dict_type(ss):
return dict([map(str.strip, s.split(':'))
for s in ss.split(',')])
parser = argparse.ArgumentParser()
parser.add_argument('--proto', type=str)
parser.add_argument('--ref-encs', type=dict_type,
help="Models to initialize encoders, \
eg. --ref-encs=fi:file1,de:file2")
parser.add_argument('--ref-decs', type=dict_type,
help="Models to initialize decoders, \
eg. --ref-decs=en:file1,de:file2")
parser.add_argument('--ref-att', type=str,
help="Model to initialize shared components")
parser.add_argument('--ref-dec-embs', type=dict_type,
help="Models to initialize decoder embeddings, \
eg. --ref-dec-embs=en:file1,de:file2")
parser.add_argument('--ref-enc-embs', type=dict_type,
help="Models to initialize encoder embeddings, \
eg. --ref-enc-embs=en:file1,de:file2")
return parser
def main():
"""Run newer stuffs."""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
add_debug(parser)
add_app(parser)
add_env(parser)
add_region(parser)
add_properties(parser)
parser.add_argument("--elb-subnet", help="Subnetnet type, e.g. external, internal", required=True)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
log.debug('Parsed arguments: %s', args)
spinnakerapps = SpinnakerDns(
app=args.app, env=args.env, region=args.region, prop_path=args.properties, elb_subnet=args.elb_subnet)
spinnakerapps.create_elb_dns()
def main():
"""Destroy any DNS related resources of an application
Records in any Hosted Zone for an Environment will be deleted.
"""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
add_app(parser)
add_env(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
assert destroy_dns(**vars(args))
def main():
"""Command to create IAM Instance Profiles, Roles, Users, and Groups.
IAM Roles will retain any attached Managed Policies. Inline Policies that do
not match the name *iam-project_repo_policy* will also be left untouched.
**WARNING**: Inline Policies named *iam-project_repo_policy* will be
rewritten.
"""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
add_app(parser)
add_env(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
assert create_iam_resources(**args.__dict__)
def main():
"""Append Application Configurations to a given file in multiple formats."""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description=main.__doc__)
add_debug(parser)
parser.add_argument('-o', '--output', required=True, help='Name of environment file to append to')
parser.add_argument(
'-g', '--git-short', metavar='GROUP/PROJECT', required=True, help='Short name for Git, e.g. forrest/core')
parser.add_argument('-r', '--runway-dir', help='Runway directory with app.json files, requires --git-short')
args = parser.parse_args()
LOG.setLevel(args.debug)
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
generated = gogoutils.Generator(*gogoutils.Parser(args.git_short).parse_url(), formats=APP_FORMATS)
git_short = generated.gitlab()['main']
if args.runway_dir:
configs = process_runway_configs(runway_dir=args.runway_dir)
else:
configs = process_git_configs(git_short=git_short)
write_variables(app_configs=configs, out_file=args.output, git_short=git_short)
def main():
"""CLI entrypoint for scaling policy creation"""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
add_debug(parser)
add_app(parser)
add_properties(parser)
add_env(parser)
add_region(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
log.debug('Parsed arguments: %s', args)
asgpolicy = AutoScalingPolicy(app=args.app, prop_path=args.properties, env=args.env, region=args.region)
asgpolicy.create_policy()
def main():
"""Entry point for ELB creation"""
logging.basicConfig(format=LOGGING_FORMAT)
parser = argparse.ArgumentParser(description='Example with non-optional arguments')
add_debug(parser)
add_app(parser)
add_env(parser)
add_region(parser)
add_properties(parser)
args = parser.parse_args()
logging.getLogger(__package__.split('.')[0]).setLevel(args.debug)
elb = SpinnakerELB(app=args.app, env=args.env, region=args.region, prop_path=args.properties)
elb.create_elb()
def main():
"""Send Slack notification to a configured channel."""
logging.basicConfig(format=LOGGING_FORMAT)
log = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
add_debug(parser)
add_app(parser)
add_env(parser)
add_properties(parser)
args = parser.parse_args()
logging.getLogger(__package__.split(".")[0]).setLevel(args.debug)
log.debug('Parsed arguements: %s', args)
if "prod" not in args.env:
log.info('No slack message sent, not a production environment')
else:
log.info("Sending slack message, production environment")
slacknotify = SlackNotification(app=args.app, env=args.env, prop_path=args.properties)
slacknotify.post_message()