def UseGnuGetOpt(self, use_gnu_getopt=True):
"""Use GNU-style scanning. Allows mixing of flag and non-flag arguments.
See http://docs.python.org/library/getopt.html#getopt.gnu_getopt
Args:
use_gnu_getopt: wether or not to use GNU style scanning.
"""
self.__dict__['__use_gnu_getopt'] = use_gnu_getopt
python类gnu_getopt()的实例源码
def parse_args(args, options):
"""Parse arguments from command-line to set options."""
long_opts = ['help', 'oauth', 'save-dir=', 'api-rate', 'timeline=', 'mentions=', 'favorites', 'follow-redirects',"redirect-sites=", 'dms=', 'isoformat']
short_opts = "hos:at:m:vfr:d:i"
opts, extra_args = getopt(args, short_opts, long_opts)
for opt, arg in opts:
if opt in ('-h', '--help'):
print(__doc__)
raise SystemExit(0)
elif opt in ('-o', '--oauth'):
options['oauth'] = True
elif opt in ('-s', '--save-dir'):
options['save-dir'] = arg
elif opt in ('-a', '--api-rate'):
options['api-rate' ] = True
elif opt in ('-t', '--timeline'):
options['timeline'] = arg
elif opt in ('-m', '--mentions'):
options['mentions'] = arg
elif opt in ('-v', '--favorites'):
options['favorites'] = True
elif opt in ('-f', '--follow-redirects'):
options['follow-redirects'] = True
elif opt in ('-r', '--redirect-sites'):
options['redirect-sites'] = arg
elif opt in ('-d', '--dms'):
options['dms'] = arg
elif opt in ('-i', '--isoformat'):
options['isoformat'] = True
options['extra_args'] = extra_args
def parse_args(args, options):
long_opts = ['help', 'format=', 'refresh', 'oauth=',
'refresh-rate=', 'config=', 'length=', 'timestamp',
'datestamp', 'no-ssl', 'force-ansi']
short_opts = "e:p:f:h?rR:c:l:td"
opts, extra_args = getopt(args, short_opts, long_opts)
if extra_args and hasattr(extra_args[0], 'decode'):
extra_args = [arg.decode(locale.getpreferredencoding())
for arg in extra_args]
for opt, arg in opts:
if opt in ('-f', '--format'):
options['format'] = arg
elif opt in ('-r', '--refresh'):
options['refresh'] = True
elif opt in ('-R', '--refresh-rate'):
options['refresh_rate'] = int(arg)
elif opt in ('-l', '--length'):
options["length"] = int(arg)
elif opt in ('-t', '--timestamp'):
options["timestamp"] = True
elif opt in ('-d', '--datestamp'):
options["datestamp"] = True
elif opt in ('-?', '-h', '--help'):
options['action'] = 'help'
elif opt in ('-c', '--config'):
options['config_filename'] = arg
elif opt == '--no-ssl':
options['secure'] = False
elif opt == '--oauth':
options['oauth_filename'] = arg
elif opt == '--force-ansi':
options['force-ansi'] = True
if extra_args and not ('action' in options and options['action'] == 'help'):
options['action'] = extra_args[0]
options['extra_args'] = extra_args[1:]
def parse_args(args, options):
"""Parse arguments from command-line to set options."""
long_opts = ['help', 'oauth', 'save-dir=', 'api-rate', 'timeline=', 'mentions=', 'favorites', 'follow-redirects',"redirect-sites=", 'dms=', 'isoformat']
short_opts = "hos:at:m:vfr:d:i"
opts, extra_args = getopt(args, short_opts, long_opts)
for opt, arg in opts:
if opt in ('-h', '--help'):
print(__doc__)
raise SystemExit(0)
elif opt in ('-o', '--oauth'):
options['oauth'] = True
elif opt in ('-s', '--save-dir'):
options['save-dir'] = arg
elif opt in ('-a', '--api-rate'):
options['api-rate' ] = True
elif opt in ('-t', '--timeline'):
options['timeline'] = arg
elif opt in ('-m', '--mentions'):
options['mentions'] = arg
elif opt in ('-v', '--favorites'):
options['favorites'] = True
elif opt in ('-f', '--follow-redirects'):
options['follow-redirects'] = True
elif opt in ('-r', '--redirect-sites'):
options['redirect-sites'] = arg
elif opt in ('-d', '--dms'):
options['dms'] = arg
elif opt in ('-i', '--isoformat'):
options['isoformat'] = True
options['extra_args'] = extra_args
def UseGnuGetOpt(self, use_gnu_getopt=True):
"""Use GNU-style scanning. Allows mixing of flag and non-flag arguments.
See http://docs.python.org/library/getopt.html#getopt.gnu_getopt
Args:
use_gnu_getopt: wether or not to use GNU style scanning.
"""
self.__dict__['__use_gnu_getopt'] = use_gnu_getopt
def main(argv):
try:
opts, args = getopt.gnu_getopt(argv[1:], "hp:o:d:z:", ["help", "publish_on_pubrel=", "overlapping_single=",
"dropQoS0=", "port=", "zero_length_clientids="])
except getopt.GetoptError as err:
print(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
publish_on_pubrel = overlapping_single = dropQoS0 = zero_length_clientids = True
port = 1883
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-p", "--publish_on_pubrel"):
publish_on_pubrel = False if a in ["off", "false", "0"] else True
elif o in ("-o", "--overlapping_single"):
overlapping_single = False if a in ["off", "false", "0"] else True
elif o in ("-d", "--dropQoS0"):
dropQoS0 = False if a in ["off", "false", "0"] else True
elif o in ("-z", "--zero_length_clientids"):
zero_length_clientids = False if a in ["off", "false", "0"] else True
elif o in ("--port"):
port = int(a)
else:
assert False, "unhandled option"
run(publish_on_pubrel=publish_on_pubrel, overlapping_single=overlapping_single, dropQoS0=dropQoS0, port=port,
zero_length_clientids=zero_length_clientids)
def main(argv):
try:
opts, args = getopt.gnu_getopt(argv[1:], "hp:o:d:z:", ["help", "publish_on_pubrel=", "overlapping_single=",
"dropQoS0=", "port=", "zero_length_clientids="])
except getopt.GetoptError as err:
print(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
publish_on_pubrel = overlapping_single = dropQoS0 = zero_length_clientids = True
port = 1883
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-p", "--publish_on_pubrel"):
publish_on_pubrel = False if a in ["off", "false", "0"] else True
elif o in ("-o", "--overlapping_single"):
overlapping_single = False if a in ["off", "false", "0"] else True
elif o in ("-d", "--dropQoS0"):
dropQoS0 = False if a in ["off", "false", "0"] else True
elif o in ("-z", "--zero_length_clientids"):
zero_length_clientids = False if a in ["off", "false", "0"] else True
elif o in ("--port"):
port = int(a)
else:
assert False, "unhandled option"
run(publish_on_pubrel=publish_on_pubrel, overlapping_single=overlapping_single, dropQoS0=dropQoS0, port=port,
zero_length_clientids=zero_length_clientids)
def parse_argv_module_id(argv):
#baseline_args = ['module_name','lsid','cache_path','execution_id','cwd']
arg_names = [arg_name[2:]+'=' for arg_name in argv if arg_name[:2]=='--']
(param_value_pairs,remnants)=getopt.gnu_getopt(argv,"",arg_names)
#Set defaults
module_name = ''
lsid = ''
cache_path = common.cache_path
execution_id = '0'
module_libdir = None
cwd = os.getcwd()
sg_prepare_template = ''
args_used = set()
for arg_pair in param_value_pairs:
param_name = arg_pair[0][2:] #strip off '--' at start
param_value = arg_pair[1]
if param_name in args_used:
raise Exception ('duplicated parameter: %s'%param_name)
else:
args_used.add(param_name)
if param_name == 'module_name':
module_name = param_value
elif param_name == 'lsid':
lsid = param_value
elif param_name == 'cache_path':
cache_path = param_value
elif param_name == 'execution_id':
execution_id = param_value
elif param_name == 'sg_prepare_template':
sg_prepare_template = param_value
elif param_name == 'module_libdir':
module_libdir = param_value
module_name = os.path.basename(module_libdir)
elif param_name == 'cwd':
cwd = param_value
# test harness
#module_name = "CancerBirdseedSNPsToGeli"
#lsid = "urn_lsid_broadinstitute_org_cancer_genome_analysis_00038_12"
#cache_path = common.cache_path
#execution_id = "0"
#cwd = "/xchip/tcga_scratch/gsaksena/test_run_gp_module"
return (module_name,lsid,cache_path,execution_id,module_libdir,cwd,sg_prepare_template)
def main(args):
try:
(opts, args) = getopt(args, 'o:TPX')
except GetoptError:
usage()
if len(args) != 1:
usage()
from tokenizer import Tokenizer
from parser import Parser
from error import JtError
import context
from os.path import abspath
filename = abspath(args[0])
stdin = file(filename, 'r')
target = 'P'
stdout = sys.stdout
for (ok, ov) in opts:
if ok in ('-T', '-P', '-X'):
target = ok[1]
elif ok == '-o':
stdout = file(ov, 'w')
contents = stdin.read()
tokenizer = Tokenizer()
tokenizer.build()
tokenizer.input(contents)
parser = Parser(tokenizer)
result_tree = None
try:
result_tree = parser.parse()
except JtError, error:
failure(error)
context.add_pdf(result_tree)
ok = context.inspect(result_tree)
ok &= context.validate(result_tree)
if target == 'T':
print >>stdout, result_tree
if not ok:
failure()
result_tree.filename = filename
if target != 'T':
if stdout.isatty():
failure('Prevented from printing binary garbage to the terminal.')
if target == 'P':
result_tree.compile_pyc(stdout)
elif target == 'X':
result_tree.compile_x86(stdout)
else:
raise NotImplementedError()
# vim:ts=4 sts=4 sw=4 et
def main():
"""Command line interface for the ``qpass`` program."""
# Initialize logging to the terminal.
coloredlogs.install()
# Prepare for command line argument parsing.
action = show_matching_entry
program_opts = dict()
show_opts = dict(use_clipboard=is_clipboard_supported())
# Parse the command line arguments.
try:
options, arguments = getopt.gnu_getopt(sys.argv[1:], 'elnp:vqh', [
'edit', 'list', 'no-clipboard', 'password-store=',
'verbose', 'quiet', 'help',
])
for option, value in options:
if option in ('-e', '--edit'):
action = edit_matching_entry
elif option in ('-l', '--list'):
action = list_matching_entries
elif option in ('-n', '--no-clipboard'):
show_opts['use_clipboard'] = False
elif option in ('-p', '--password-store'):
stores = program_opts.setdefault('stores', [])
stores.append(PasswordStore(directory=value))
elif option in ('-v', '--verbose'):
coloredlogs.increase_verbosity()
elif option in ('-q', '--quiet'):
coloredlogs.decrease_verbosity()
elif option in ('-h', '--help'):
usage(__doc__)
return
else:
raise Exception("Unhandled option! (programming error)")
if not (arguments or action == list_matching_entries):
usage(__doc__)
return
except Exception as e:
warning("Error: %s", e)
sys.exit(1)
# Execute the requested action.
try:
action(QuickPass(**program_opts), arguments,
**(show_opts if action == show_matching_entry else {}))
except PasswordStoreError as e:
# Known issues don't get a traceback.
logger.error("%s", e)
sys.exit(1)
except KeyboardInterrupt:
# If the user interrupted an interactive prompt they most likely did so
# intentionally, so there's no point in generating more output here.
sys.exit(1)
def cmd_logs(data, buffer, args):
"""List files in Weechat's log dir."""
cmd_init()
global home_dir
sort_by_size = False
filter = []
try:
opts, args = getopt.gnu_getopt(args.split(), 's', ['size'])
if args:
filter = args
for opt, var in opts:
opt = opt.strip('-')
if opt in ('size', 's'):
sort_by_size = True
except Exception, e:
error('Argument error, %s' %e)
return WEECHAT_RC_OK
# is there's a filter, filter_excludes should be False
file_list = dir_list(home_dir, filter, filter_excludes=not filter)
if sort_by_size:
file_list.sort(key=get_size)
else:
file_list.sort()
file_sizes = map(lambda x: human_readable_size(get_size(x)), file_list)
# calculate column lenght
if file_list:
L = file_list[:]
L.sort(key=len)
bigest = L[-1]
column_len = len(bigest) + 3
else:
column_len = ''
buffer = buffer_create()
if get_config_boolean('clear_buffer'):
weechat.buffer_clear(buffer)
file_list = zip(file_list, file_sizes)
msg = 'Found %s logs.' %len(file_list)
print_line(msg, buffer, display=True)
for file, size in file_list:
separator = column_len and '.'*(column_len - len(file))
prnt(buffer, '%s %s %s' %(strip_home(file), separator, size))
if file_list:
print_line(msg, buffer)
return WEECHAT_RC_OK
### Completion ###
def cmd_logs(data, buffer, args):
"""List files in Weechat's log dir."""
cmd_init()
global home_dir
sort_by_size = False
filter = []
try:
opts, args = getopt.gnu_getopt(args.split(), 's', ['size'])
if args:
filter = args
for opt, var in opts:
opt = opt.strip('-')
if opt in ('size', 's'):
sort_by_size = True
except Exception, e:
error('Argument error, %s' %e)
return WEECHAT_RC_OK
# is there's a filter, filter_excludes should be False
file_list = dir_list(home_dir, filter, filter_excludes=not filter)
if sort_by_size:
file_list.sort(key=get_size)
else:
file_list.sort()
file_sizes = map(lambda x: human_readable_size(get_size(x)), file_list)
# calculate column lenght
if file_list:
L = file_list[:]
L.sort(key=len)
bigest = L[-1]
column_len = len(bigest) + 3
else:
column_len = ''
buffer = buffer_create()
if get_config_boolean('clear_buffer'):
weechat.buffer_clear(buffer)
file_list = zip(file_list, file_sizes)
msg = 'Found %s logs.' %len(file_list)
print_line(msg, buffer, display=True)
for file, size in file_list:
separator = column_len and '.'*(column_len - len(file))
prnt(buffer, '%s %s %s' %(strip_home(file), separator, size))
if file_list:
print_line(msg, buffer)
return WEECHAT_RC_OK
### Completion ###
def cmd_logs(data, buffer, args):
"""List files in Weechat's log dir."""
cmd_init()
global home_dir
sort_by_size = False
filter = []
try:
opts, args = getopt.gnu_getopt(args.split(), 's', ['size'])
if args:
filter = args
for opt, var in opts:
opt = opt.strip('-')
if opt in ('size', 's'):
sort_by_size = True
except Exception, e:
error('Argument error, %s' %e)
return WEECHAT_RC_OK
# is there's a filter, filter_excludes should be False
file_list = dir_list(home_dir, filter, filter_excludes=not filter)
if sort_by_size:
file_list.sort(key=get_size)
else:
file_list.sort()
file_sizes = map(lambda x: human_readable_size(get_size(x)), file_list)
# calculate column lenght
if file_list:
L = file_list[:]
L.sort(key=len)
bigest = L[-1]
column_len = len(bigest) + 3
else:
column_len = ''
buffer = buffer_create()
if get_config_boolean('clear_buffer'):
weechat.buffer_clear(buffer)
file_list = zip(file_list, file_sizes)
msg = 'Found %s logs.' %len(file_list)
print_line(msg, buffer, display=True)
for file, size in file_list:
separator = column_len and '.'*(column_len - len(file))
prnt(buffer, '%s %s %s' %(strip_home(file), separator, size))
if file_list:
print_line(msg, buffer)
return WEECHAT_RC_OK
### Completion ###
def cmd_logs(data, buffer, args):
"""List files in Weechat's log dir."""
cmd_init()
global home_dir
sort_by_size = False
filter = []
try:
opts, args = getopt.gnu_getopt(args.split(), 's', ['size'])
if args:
filter = args
for opt, var in opts:
opt = opt.strip('-')
if opt in ('size', 's'):
sort_by_size = True
except Exception, e:
error('Argument error, %s' %e)
return WEECHAT_RC_OK
# is there's a filter, filter_excludes should be False
file_list = dir_list(home_dir, filter, filter_excludes=not filter)
if sort_by_size:
file_list.sort(key=get_size)
else:
file_list.sort()
file_sizes = map(lambda x: human_readable_size(get_size(x)), file_list)
# calculate column lenght
if file_list:
L = file_list[:]
L.sort(key=len)
bigest = L[-1]
column_len = len(bigest) + 3
else:
column_len = ''
buffer = buffer_create()
if get_config_boolean('clear_buffer'):
weechat.buffer_clear(buffer)
file_list = zip(file_list, file_sizes)
msg = 'Found %s logs.' %len(file_list)
print_line(msg, buffer, display=True)
for file, size in file_list:
separator = column_len and '.'*(column_len - len(file))
prnt(buffer, '%s %s %s' %(strip_home(file), separator, size))
if file_list:
print_line(msg, buffer)
return WEECHAT_RC_OK
### Completion ###
def twitter(bot, message):
"""#twitter [-p ??]
-p : ????
"""
try:
cmd, *args = shlex.split(message.text)
except ValueError:
return False
if not cmd[0] in config['trigger']:
return False
if not cmd[1:] == 'twitter':
return False
try:
options, args = getopt.gnu_getopt(args, 'hp:')
except getopt.GetoptError:
# ????
reply(bot, message, twitter.__doc__)
return True
days = 0
for o, a in options:
if o == '-p':
# ????
try:
days = int(a)
if days < 0:
raise ValueError
except ValueError:
reply(bot, message, twitter.__doc__)
return True
elif o == '-h':
# ??
reply(bot, message, twitter.__doc__)
return True
tweets = Twitter.objects(Q(date__gte=datetime.now().date()+timedelta(days=-days)) & Q(date__lte=datetime.now().date()+timedelta(days=-days+1)))
if tweets:
reply(bot, message, '\n---------\n'.join([str(tweet) for tweet in tweets]))
return True
else:
reply(bot, message, '??????...')
return True
def poster(bot, message):
"""#poster [-h] [-f]
-h : ?????
-f : ????
"""
try:
cmd, *args = shlex.split(message.text)
except ValueError:
return False
if not cmd[0] in config['trigger']:
return False
if not cmd[1:] == 'poster':
return False
try:
options, args = getopt.gnu_getopt(args, 'hf')
except getopt.GetoptError:
# ????
reply(bot, message, poster.__doc__)
return True
refresh = False
# ???
for o, a in options:
if o == '-h':
# ??
reply(bot, message, poster.__doc__)
return True
elif o == '-f':
refresh = True
weekday = datetime.now().weekday()
if weekday >= 3:
delta = timedelta(days=weekday-3)
else:
delta = timedelta(days=7+weekday-3)
thu_date = datetime.now().date() - delta
url = '{}event{}.jpg'.format(BASE_URL, thu_date.strftime('%Y%m%d'))
filename = os.path.basename(url)
dir = os.path.join(config['cq_root_dir'], config['cq_image_dir'], 'poster')
path = os.path.join(dir, filename)
if not os.path.exists(dir):
os.mkdir(dir)
if not os.path.exists(path) or refresh:
resp = requests.get(url, timeout=60, proxies=config.get('proxies'))
if not resp.status_code == 200:
reply(bot, message, '?????...????????????')
return True
with open(path, 'wb') as f:
f.write(resp.content)
reply(bot, message, CQImage(os.path.join('poster', filename)))
return True
def unalias_command(bot, message):
"""#unalias [-h] ??
-h : ?????
?? : ??????
"""
try:
cmd, *args = shlex.split(message.text)
except ValueError:
return False
if not cmd[0] in config['trigger']:
return False
if not cmd[1:] == 'unalias':
return False
try:
options, args = getopt.gnu_getopt(args, 'hd:')
except getopt.GetoptError:
# ????
reply(bot, message, unalias_command.__doc__)
return True
origin = None
for o, a in options:
if o == '-h':
# ??
reply(bot, message, unalias_command.__doc__)
return True
# ??????
if not args:
reply(bot, message, unalias_command.__doc__)
return True
command = args[0]
alias = Alias.objects(alias=command)
if not alias:
reply(bot, message, '?????{}'.format(command))
reply(bot, message, '????? {} => {}'.format(alias.alias, alias.origin))
alias.delete()
return True
def main():
try:
shortflags = 'h'
longflags = ['help', 'consumer-key=', 'consumer-secret=',
'access-key=', 'access-secret=', 'encoding=']
opts, args = getopt.gnu_getopt(sys.argv[1:], shortflags, longflags)
except getopt.GetoptError:
PrintUsageAndExit()
consumer_keyflag = None
consumer_secretflag = None
access_keyflag = None
access_secretflag = None
encoding = None
for o, a in opts:
if o in ("-h", "--help"):
PrintUsageAndExit()
if o in ("--consumer-key"):
consumer_keyflag = a
if o in ("--consumer-secret"):
consumer_secretflag = a
if o in ("--access-key"):
access_keyflag = a
if o in ("--access-secret"):
access_secretflag = a
if o in ("--encoding"):
encoding = a
message = ' '.join(args)
if not message:
PrintUsageAndExit()
rc = TweetRc()
consumer_key = consumer_keyflag or GetConsumerKeyEnv() or rc.GetConsumerKey()
consumer_secret = consumer_secretflag or GetConsumerSecretEnv() or rc.GetConsumerSecret()
access_key = access_keyflag or GetAccessKeyEnv() or rc.GetAccessKey()
access_secret = access_secretflag or GetAccessSecretEnv() or rc.GetAccessSecret()
if not consumer_key or not consumer_secret or not access_key or not access_secret:
PrintUsageAndExit()
api = twitter.Api(consumer_key=consumer_key, consumer_secret=consumer_secret,
access_token_key=access_key, access_token_secret=access_secret,
input_encoding=encoding)
try:
status = api.PostUpdate(message)
except UnicodeDecodeError:
print "Your message could not be encoded. Perhaps it contains non-ASCII characters? "
print "Try explicitly specifying the encoding with the --encoding flag"
sys.exit(2)
print "%s just posted: %s" % (status.user.name, status.text)
def get_options():
'''
Extracts command line arguments
'''
input_fname = None
output_fname = None
test_fname = None
max_candidates = MAX_CANDIDATES
check_only = False
silent = False
try:
opts, args = getopt.gnu_getopt(sys.argv[1:], 'hi:o:t:',
['help', 'input-file=', 'output-file=', 'test-file=',
'check-only', 'silent'])
except getopt.GetoptError, err:
sys.stderr.write('Error: %s\n' % err)
usage()
sys.exit(1)
for o, a in opts:
if o in ('-i', '--input-file'):
input_fname = a
elif o in ('-o', '--output-file'):
output_fname = a
elif o in ('-t', '--test-file'):
test_fname = a
elif o in ('-h', '--help'):
usage()
sys.exit()
elif o in ('--check-only',):
check_only = True
elif o in ('--silent',):
silent = True
elif o in ('--max-candidates', ):
try:
max_candidates = int(a)
except ValueError, e:
sys.stderr.write('Error: --max-candidates takes integer argument (you provided %s).\n' % a)
sys.exit(1)
if max_candidates < 1:
sys.stderr.write('Error: --max-candidates must be above 0.\n')
sys.exit(1)
else:
sys.stderr.write('Error: unknown option %s. Type --help to see the options.\n' % o)
sys.exit(1)
if check_only:
if test_fname or output_fname:
sys.stderr.write('No test file or output file is required to check the input format.\n')
sys.exit(1)
else:
if not test_fname:
sys.stderr.write('Error: no test file provided.\n')
sys.exit(1)
return input_fname, output_fname, test_fname, max_candidates, check_only, silent
def main(argv):
try:
opts, args = getopt.gnu_getopt(argv[1:], "hp:o:d:z:t:m:r:s:",
["help", "port=",
"publish_on_pubrel=",
"overlapping_single=",
"dropQoS0=",
"zero_length_clientids="
"topicAliasMaximum=",
"maximumPacketSize=",
"receiveMaximum=",
"serverKeepAlive="])
except getopt.GetoptError as err:
print(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
publish_on_pubrel = overlapping_single = dropQoS0 = zero_length_clientids = True
topicAliasMaximum = 2
maximumPacketSize = 1000
receiveMaximum = 20
serverKeepAlive = 60
port = 1883
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-p", "--publish_on_pubrel"):
publish_on_pubrel = False if a in ["off", "false", "0"] else True
elif o in ("-o", "--overlapping_single"):
overlapping_single = False if a in ["off", "false", "0"] else True
elif o in ("-d", "--dropQoS0"):
dropQoS0 = False if a in ["off", "false", "0"] else True
elif o in ("-z", "--zero_length_clientids"):
zero_length_clientids = False if a in ["off", "false", "0"] else True
elif o in ("-t", "--topicAliasMaximum"):
if a.isnumeric():
topicAliasMaximum = a
elif o in ("-m", "--maximumPacketSize"):
if a.isnumeric():
maximumPacketSize = a
elif o in ("-r", "--receiveMaximum"):
if a.isnumeric():
receiveMaximum = a
elif o in ("-s", "--serverKeepAlive"):
if a.isnumeric():
serverKeepAlive = a
elif o in ("--port"):
port = int(a)
else:
assert False, "unhandled option"
run(publish_on_pubrel=publish_on_pubrel,
overlapping_single=overlapping_single,
dropQoS0=dropQoS0,
port=port,
zero_length_clientids=zero_length_clientids,
topicAliasMaximum=topicAliasMaximum,
maximumPacketSize=maximumPacketSize,
receiveMaximum=receiveMaximum,
serverKeepAlive=serverKeepAlive)