def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify)
# enumerate all watchlists
#
watchlists = cb.watchlist()
print "%-4s | %-32s | %s" % ('id', 'name', 'query')
print "%-4s + %-32s + %s" % ('-' * 4, '-' * 32, '-' * 60)
# for each result
for watchlist in watchlists:
print "%-4s | %-32s | %s" % (watchlist['id'], watchlist['name'], truncate(opts.fulloutput, watchlist['search_query'], 57))
python类argv()的实例源码
def LoadPlugins(plugins, verbose):
if plugins == '':
return
scriptPath = os.path.dirname(sys.argv[0])
for plugin in sum(map(ProcessAt, plugins.split(',')), []):
try:
if not plugin.lower().endswith('.py'):
plugin += '.py'
if os.path.dirname(plugin) == '':
if not os.path.exists(plugin):
scriptPlugin = os.path.join(scriptPath, plugin)
if os.path.exists(scriptPlugin):
plugin = scriptPlugin
exec(open(plugin, 'r').read())
except Exception as e:
print('Error loading plugin: %s' % plugin)
if verbose:
raise e
def build_wheel(wheel_directory, config_settings=None,
metadata_directory=None):
config_settings = _fix_config(config_settings)
wheel_directory = os.path.abspath(wheel_directory)
sys.argv = sys.argv[:1] + ['bdist_wheel'] + \
config_settings["--global-option"]
_run_setup()
if wheel_directory != 'dist':
shutil.rmtree(wheel_directory)
shutil.copytree('dist', wheel_directory)
wheels = [f for f in os.listdir(wheel_directory)
if f.endswith('.whl')]
assert len(wheels) == 1
return wheels[0]
def hook_scope(self, name=""):
"""Scope all future interactions to the current hook execution
revision."""
assert not self.revision
self.cursor.execute(
'insert into hooks (hook, date) values (?, ?)',
(name or sys.argv[0],
datetime.datetime.utcnow().isoformat()))
self.revision = self.cursor.lastrowid
try:
yield self.revision
self.revision = None
except:
self.flush(False)
self.revision = None
raise
else:
self.flush()
def kas(argv):
"""
The main entry point of kas.
"""
create_logger()
parser = kas_get_argparser()
args = parser.parse_args(argv)
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
logging.info('%s %s started', os.path.basename(sys.argv[0]), __version__)
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, interruption)
atexit.register(_atexit_handler)
for plugin in getattr(kasplugin, 'plugins', []):
if plugin().run(args):
return
parser.print_help()
def _main():
"""
For testing. The command-line argument should be the name of the
function that should be called.
"""
map = {
'readInt': readInt, 'readAllInts': readAllInts,
'readFloat': readFloat, 'readAllFloats': readAllFloats,
'readBool': readBool, 'readAllBools': readAllBools,
'readString': readString, 'readAllStrings': readAllStrings,
'readLine': readLine, 'readAllLines' : readAllLines,
'readAll': readAll }
testId = sys.argv[1]
if testId == 'write':
_testWrite()
else:
writeln(map[testId]())
def parse_argv(tokens, options, options_first=False):
"""Parse command-line argument vector.
If options_first:
argv ::= [ long | shorts ]* [ argument ]* [ '--' [ argument ]* ] ;
else:
argv ::= [ long | shorts | argument ]* [ '--' [ argument ]* ] ;
"""
parsed = []
while tokens.current() is not None:
if tokens.current() == '--':
return parsed + [Argument(None, v) for v in tokens]
elif tokens.current().startswith('--'):
parsed += parse_long(tokens, options)
elif tokens.current().startswith('-') and tokens.current() != '-':
parsed += parse_shorts(tokens, options)
elif options_first:
return parsed + [Argument(None, v) for v in tokens]
else:
parsed.append(Argument(None, tokens.move()))
return parsed
def run_setup(setup_script, args):
"""Run a distutils setup script, sandboxed in its directory"""
setup_dir = os.path.abspath(os.path.dirname(setup_script))
with setup_context(setup_dir):
try:
sys.argv[:] = [setup_script] + list(args)
sys.path.insert(0, setup_dir)
# reset to include setup dir, w/clean callback list
working_set.__init__()
working_set.callbacks.append(lambda dist: dist.activate())
# __file__ should be a byte string on Python 2 (#712)
dunder_file = (
setup_script
if isinstance(setup_script, str) else
setup_script.encode(sys.getfilesystemencoding())
)
with DirectorySandbox(setup_dir):
ns = dict(__file__=dunder_file, __name__='__main__')
_execfile(setup_script, ns)
except SystemExit as v:
if v.args and v.args[0]:
raise
# Normal exit, just return
def run():
"""
Run the script in sys.argv[1] as if it had
been invoked naturally.
"""
__builtins__
script_name = sys.argv[1]
namespace = dict(
__file__=script_name,
__name__='__main__',
__doc__=None,
)
sys.argv[:] = sys.argv[1:]
open_ = getattr(tokenize, 'open', open)
script = open_(script_name).read()
norm_script = script.replace('\\r\\n', '\\n')
code = compile(norm_script, script_name, 'exec')
exec(code, namespace)
def main(argv=None, **kw):
from setuptools import setup
from setuptools.dist import Distribution
class DistributionWithoutHelpCommands(Distribution):
common_usage = ""
def _show_help(self, *args, **kw):
with _patch_usage():
Distribution._show_help(self, *args, **kw)
if argv is None:
argv = sys.argv[1:]
with _patch_usage():
setup(
script_args=['-q', 'easy_install', '-v'] + argv,
script_name=sys.argv[0] or 'easy_install',
distclass=DistributionWithoutHelpCommands,
**kw
)
def main():
"""Small main program"""
import sys, getopt
try:
opts, args = getopt.getopt(sys.argv[1:], 'deut')
except getopt.error as msg:
sys.stdout = sys.stderr
print(msg)
print("""usage: %s [-d|-e|-u|-t] [file|-]
-d, -u: decode
-e: encode (default)
-t: encode and decode string 'Aladdin:open sesame'"""%sys.argv[0])
sys.exit(2)
func = encode
for o, a in opts:
if o == '-e': func = encode
if o == '-d': func = decode
if o == '-u': func = decode
if o == '-t': test(); return
if args and args[0] != '-':
with open(args[0], 'rb') as f:
func(f, sys.stdout.buffer)
else:
func(sys.stdin.buffer, sys.stdout.buffer)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.group_id:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
#check if the given group_id truly corresponds to one of the existing sensor groups
does_exist = False
for group in cb.group_enum():
if int(opts.group_id) == int(group['id']):
does_exist = True
if does_exist:
config = cb.group_datasharing_del_all(opts.group_id)
for key in config.keys():
print "%-20s : %s" % (key, config[key])
else:
sys.exit(-1)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
builds = cb.get_builds()
for build in builds:
print ""
for key in build.keys():
print "%-20s : %s" % (key, build[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.fname or not opts.type:
print "Missing required param."
sys.exit(-1)
if not opts.type in ["md5", "domain", "ipaddr"]:
print "Unknown type: ", opts.type
sys.exit(-1)
# setup the CbApi object
cb = CBQuery(opts.url, opts.token, ssl_verify=opts.ssl_verify)
# get the IOCs to check; this is a list of strings, one indicator
# per line. strip off the newlines as they come in
vals = [val.strip() for val in open(opts.fname, "r")]
# check each!
cb.check(vals, opts.type, opts.detail)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
# enumerate configured feeds
#
feeds = cb.feed_enum()
# output a banner
#
print "%-3s %-25s %-8s %s" % ("Id", "Name", "Enabled", "Url")
print "%s+%s+%s+%s" % ("-"*3, "-"*27, "-"*10, "-"*31)
# output a row about each feed
#
for feed in feeds:
print "%-3s| %-25s | %-8s | %s" % (feed['id'], feed['name'], feed['enabled'], feed['feed_url'])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
builds = cb.get_builds()
for build in builds:
print ""
for key in build.keys():
print "%-20s : %s" % (key, build[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.id or not opts.query:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify)
# edit the search query of the just-added watchlist
#
watchlist = { 'search_query': opts.query }
print "-> Modifying the watchlist query..."
cb.watchlist_modify(opts.id, watchlist)
print "-> Watchlist modified"
# get record describing this watchlist
#
print "-> Querying for watchlist information..."
watchlist = cb.watchlist(opts.id)
print "-> Watchlist queried; details:"
watchlist_output(watchlist)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.sensorid:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify)
# enumerate sensors
#
sensor = cb.sensor(opts.sensorid)
# output
#
for key in sensor.keys():
print "%-35s : %s" % (key, sensor[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify)
# if a period is specified, handle that specially
#
if 0 != opts.interval:
return query_forever(cb, opts.interval, opts.udp)
# grab the global statistics
#
backlog = cb.sensor_backlog()
# output
#
for key in backlog.keys():
print "%-35s : %s" % (key, backlog[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.id:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
actions = cb.feed_action_enum(opts.id)
count = 1
for action in actions:
print ""
print "Action number: %s" % count
print "-"*50
count += 1
for key in action.keys():
print "%-20s : %s" % (key, action[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or opts.md5 is None:
print "Missing required param; run with --help for usage"
sys.exit(-1)
if opts.filename is None:
opts.filename = "%s.zip" % (opts.md5,)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify)
# perform a single binary search
#
binary = cb.binary(opts.md5)
# open the file and write out the contents
#
open(opts.filename, "w").write(binary)
print "-> Downloaded binary %s [%u bytes]" % (opts.md5, len(binary))
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.feedname:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
id = cb.feed_get_id_by_name(opts.feedname)
if id is None:
print "-> No configured feed with name '%s' found!" % (opts.feedname)
sys.exit(-1)
sync_result = cb.feed_synchronize(opts.feedname, True)
print sync_result
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.id:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify)
# delete the watchlist
# for the purposes of this test script, hardcode the watchlist type, name, and query string
#
print "-> Deleting watchlist [id=%s]..." % (opts.id,)
watchlist = cb.watchlist_del(opts.id)
print "-> Watchlist deleted"
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.fname or not opts.id:
print "Missing required param."
sys.exit(-1)
# setup the CbApi object
#
cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify)
print "-> querying for report for id '%s'..." % (opts.id)
report = cb.process_report(opts.id, opts.segment)
print "-> writing report to file '%s'..." % (opts.fname)
open(opts.fname, "w").write(report)
print "-> Complete"
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.id:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
events = cb.event_info(opts.id)
print events
print ""
count = 1
for event in events:
print ""
print "Event Number: %s" % count
count = count + 1
for field in event:
print "%-20s : %s" % (field, event[field])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or (not opts.feedname and not opts.feedid):
print "Missing required param; run with --help for usage"
print "One of -f or -i must be specified"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
if not opts.feedid:
id = cb.feed_get_id_by_name(opts.feedname)
if id is None:
print "-> No configured feed with name '%s' found!" % (opts.feedname)
return
else:
id = opts.feedid
old_feed = cb.feed_info(id)
#create a new updated feed based on user input
# create
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
builds = cb.get_builds()
for build in builds:
print ""
for key in build.keys():
print "%-20s : %s" % (key, build[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.group_id or not opts.config_id:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
#check if the given group_id truly corresponds to one of the existing sensor groups
does_exist = False
for group in cb.group_enum():
if int(opts.group_id) == int(group['id']):
does_exist = True
if does_exist:
datasharing_config = cb.group_datasharing_info(opts.group_id, opts.config_id)
for key in datasharing_config.keys():
print "%-20s : %s" % (key, datasharing_config[key])
else:
sys.exit(-1)
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.id or not opts.description:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
event = cb.event_update(opts.id, opts.description)
print ""
for key in event.keys():
print "%-20s : %s" % (key, event[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
start = 0
pagesize=100
while True:
results = cb.alert_search(opts.query, rows=int(pagesize), start=start)
if len(results['results']) == 0: break
for result in results['results']:
pprint.pprint(result)
start = start + int(pagesize)