def docopt_full_help(docstring, *args, **kwargs):
try:
return docopt(docstring, *args, **kwargs)
except DocoptExit:
raise SystemExit(docstring)
python类DocoptExit()的实例源码
def main(args=None):
try:
arguments = docopt(__doc__, argv=args, version=APPVSN)
except DocoptExit as usage:
print(usage)
sys.exit(1)
path = os.getcwd()
logger.configure(arguments['--log-level'])
result = False
if arguments['create']:
result = create(path, arguments)
if arguments['build']:
result = build(path, arguments)
if arguments['version']:
result = version(path)
if arguments['deps']:
result = deps(path)
if arguments['release']:
result = release(path, arguments)
if arguments['package']:
result = package(path, arguments)
if arguments['upgrade']:
result = upgrade(path, arguments)
if arguments['add_package']:
result = add_package(path, arguments)
if arguments['eunit']:
result = eunit(path, arguments)
if arguments['ct']:
result = ct(path, arguments)
if arguments['fetch']:
result = fetch(arguments)
if arguments['install']:
result = install(arguments)
if arguments['uninstall']:
result = uninstall(arguments)
if arguments['installed']:
result = installed()
if result:
sys.exit(0)
else:
sys.exit(1)
def test_args_none(self):
self.assertRaises(DocoptExit, docopt, to_string(main_doc), None, version=__version__)
def test_args_two_commands(self):
self.assertRaises(DocoptExit, docopt, to_string(main_doc), 'lsa luhn'.split(), version=__version__)
def test_args_url_and_file(self):
self.assertRaises(DocoptExit, docopt, to_string(main_doc), 'lsa --url=URL --file=FILE'.split(), version=__version__)
def main():
try:
cli.banner()
arguments = docopt(__doc__, version="TWA Corp. SubDomain Finder - 2016")
target = arguments['--target']
wordlist = arguments['--wordlist']
threads = arguments['--threads']
opt_scan = arguments['--scan']
opt_whois = arguments['--whois']
opt_scan_ports = arguments['--scan-ports']
opt_scan_options = arguments['--scan-options']
opt_uniq_ips = arguments['--uniq-ip']
except DocoptExit as e:
cli.banner()
os.system('python3 subdomain_finder.py --help')
sys.exit(1)
if not wordlist:
wordlist = os.path.join(os.getcwd(), os.path.dirname(__file__), 'data', 'wordlist.txt')
try:
domains_ips = subdomain_finder.finder(threads, wordlist, target)
except:
print("Wordlist {0} ERROR: {1}".format(wordlist, sys.exc_info()[1]))
exit(0)
if opt_uniq_ips:
print("\n IPs:")
ips = subdomain_finder.ip_uniq(domains_ips)
print(" Uniq: "+str(len(ips)))
for ip in subdomain_finder.ip_uniq(domains_ips):
print(" "+ip)
if opt_scan:
subdomain_finder.ip_scan(domains_ips, opt_scan_ports, opt_scan_options)
if opt_whois:
subdomain_finder.domain_whois(target)
cli.banner()
def docopt_cmd(func):
"""
This decorator is used to simplify the try/except block and pass the result
of the docopt parsing to the called action.
"""
def fn(self, arg):
try:
opt = docopt(fn.__doc__, arg)
except DocoptExit as e:
# The DocoptExit is thrown when the args do not match.
# We print a message to the user and the usage block.
print(colored("Invalid Command!", "red"))
print(e)
return
except SystemExit:
# The SystemExit exception prints the usage for --help
# We do not need to do the print here.
return
return func(self, opt)
fn.__name__ = func.__name__
fn.__doc__ = func.__doc__
fn.__dict__.update(func.__dict__)
return fn
def values_from_file(docopt_dict, config_option, settable, booleans, repeatable):
"""Parse config file and read settable values.
Can be overridden by both command line arguments and environment variables.
:raise DocoptcfgError: If `config_option` isn't found in docstring.
:raise DocoptcfgFileError: On any error while trying to read and parse config file.
:param dict docopt_dict: Dictionary from docopt with environment variable defaults merged in by docoptcfg().
:param str config_option: Config option long name with file path as its value.
:param iter settable: Option long names available to set by config file.
:param iter booleans: Option long names of boolean/flag types.
:param iter repeatable: Option long names of repeatable options.
:return: Settable values.
:rtype: dict
"""
section = docopt.DocoptExit.usage.split()[1]
settable = set(o for o in settable if o != config_option)
config = ConfigParser()
defaults = dict()
# Sanity checks.
if config_option not in docopt_dict:
raise DocoptcfgError
if docopt_dict[config_option] is None or not settable:
return defaults
# Read config file.
path = DocoptcfgFileError.FILE_PATH = docopt_dict[config_option]
try:
with open(path) as handle:
if hasattr(config, 'read_file'):
config.read_file(handle)
else:
getattr(config, 'readfp')(handle)
except Error as exc:
raise DocoptcfgFileError('Unable to parse config file.', str(exc))
except IOError as exc:
raise DocoptcfgFileError('Unable to read config file.', str(exc))
# Make sure section is in config file.
if not config.has_section(section):
raise DocoptcfgFileError('Section [{0}] not in config file.'.format(section))
# Parse config file.
for key in settable:
if config.has_option(section, key[2:]):
defaults[key] = get_opt(key, config, section, booleans, repeatable)
return defaults
def main():
global in_pipe, out_pipe, public_key, private_key
try:
docopt_config = "Usage: my_program.py [--port=PORT] [--connect=PORT]"
arguments = docopt.docopt(docopt_config)
port = arguments["--port"]
if port == None:
port = 5555
connect_dest = arguments["--connect"]
except docopt.DocoptExit as e:
print(e.message)
return
context = zmq.Context()
in_pipe = zpipe(context)
out_pipe = zpipe(context)
loop = asyncio.get_event_loop()
net_config = {"port": port}
# Generate Node Keys & Id.
private_key = enc.generate_RSA(4096)
public_key = private_key.publickey();
# debug("Private Key=[%s], Public Key=[%s]." % (str(private_key.exportKey("PEM")), str(public_key.exportKey("PEM"))))
node_id = enc.generate_ID(public_key.exportKey("DER"))
debug("node_id=[%s]." % node_id.hexdigest())
# Start Net Engine.
zmq_future = loop.run_in_executor(None, engageNet, loop, context, out_pipe[0], in_pipe[1], net_config)
# thread = threading.Thread(target=engageNet, args=(loop, context, out_pipe[0], in_pipe[1], net_config))
# thread.daemon = True
# thread.start()
# Connect for testing.
if connect_dest != None:
out_pipe[1].send_multipart([b"conn", "tcp://{}".format(connect_dest).encode()])
# out_pipe[0].send_multipart([b"conn", "tcp://localhost:{}".format(port).encode()])
try:
loop.run_until_complete(zmq_future)
except BaseException as e:
handleException("loop.run_until_complete()", e)
out_pipe[1].send_multipart([b"shutdown"])
loop.stop()
loop.close()
zmq_future.cancel()
sys.exit(1)
def docopt_wrapper(usage, real_usage, **keywords):
""" A wrapper around the real docopt parser.
Redirects a failed command to /dev/null and prints the proper
real_usage message, instead of just the usage string from usage.
:param usage: The simplified usage string to parse
:type usage: str
:param real_usage: The original usage string to parse
:type real_usage: str
:param keywords: The keyword arguments to pass to docopt
:type keywords: dict
:returns: The parsed arguments
:rtype: dict
"""
base_subcommand = keywords.pop('base_subcommand')
subcommand = keywords.pop('subcommand')
try:
stdout = sys.stdout
# We run docopt twice (once with the real usage string and
# once with the modified usage string) in order to populate
# the 'real' arguments properly.
with open(os.devnull, 'w') as nullfile:
sys.stdout = nullfile
real_arguments = docopt.docopt(
real_usage,
argv=[base_subcommand])
arguments = docopt.docopt(
usage,
**keywords)
sys.stdout = stdout
real_arguments.update(arguments)
real_arguments[subcommand] = True
return real_arguments
except docopt.DocoptExit:
sys.stdout = stdout
print(real_usage.strip(), file=sys.stderr)
sys.exit(1)
except SystemExit:
sys.stdout = stdout
if "argv" in keywords and any(h in ("-h", "--help")
for h in keywords["argv"]):
print(real_usage.strip())
elif "version" in keywords and any(v in ("--version")
for v in keywords["argv"]):
print(keywords["version"].strip())
sys.exit()
def beradio_cmd():
"""
Usage:
beradio forward --source=serial:///dev/ttyUSB0 --target=mqtt://localhost [--protocol=<version>] [--debug]
beradio decode <payload> [--protocol=<version>] [--debug]
beradio info
beradio --version
beradio (-h | --help)
Options:
--source=<source> Data source, e.g. serial:///dev/ttyUSB0
--target=<target> Data sink, e.g. mqtt://localhost
--protocol=<version> Protocol version: 1 or 2 [default: 2]
--version Show version information
--debug Enable debug messages
-h --help Show this screen
"""
options = docopt(beradio_cmd.__doc__, version=APP_NAME)
#print 'options: {}'.format(options)
source = options.get('--source')
target = options.get('--target')
protocol = options.get('--protocol')
boot_logging(options)
if options.get('forward'):
if source.startswith('serial://') and target.startswith('mqtt://'):
source = source.replace('serial://', '')
SerialToMQTT(serial_device=source, mqtt_broker=target, protocol=protocol).setup().forward()
elif source.startswith('data://') and target.startswith('mqtt://'):
data = source.replace('data://', '')
if data == 'stdin':
data = sys.stdin.read().strip()
DataToMQTT(mqtt_broker=target, protocol=protocol).setup().publish(data)
else:
raise DocoptExit('Unable to handle combination of {} and {} in forwarding mode'.format(options.get('--source'), options.get('--target')))
elif options.get('decode'):
p = protocol_factory(protocol)
payload = options.get('<payload>')
return json.dumps(p.decode_safe(payload), indent=4)
elif options.get('info'):
network_id = str(NetworkIdentifier())
gateway_id = str(GatewayIdentifier())
print >>sys.stderr, '-' * 50
print >>sys.stderr, APP_NAME.center(50)
print >>sys.stderr, '-' * 50
print >>sys.stderr, 'config file: {}'.format(BERadioConfig().config_file)
print >>sys.stderr, 'network_id: {}'.format(network_id)
print >>sys.stderr, 'gateway_id: {}'.format(gateway_id)