def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.investigation_id or not opts.description or not opts.start_date:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
event = cb.event_add(opts.investigation_id, opts.description, opts.start_date)
print ""
print "-->Event Added:"
for key in event.keys():
print "%-20s : %s" % (key, event[key])
python类argv()的实例源码
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.id:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
requirements = cb.feed_requirements(opts.id)
for requirement in requirements:
print requirement
sync_result = cb.feed_synchronize(opts.feedname, True)
print sync_result
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.server_url or not opts.token or not opts.id:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.server_url, token=opts.token, ssl_verify=opts.ssl_verify)
event = cb.event_del(opts.id)
print ""
for key in event.keys():
print "%-20s : %s" % (key, event[key])
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.type or not opts.group or not opts.filename:
print "Missing required param; run with --help for usage"
sys.exit(-1)
# build a cbapi object
#
cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify)
# download the installer package
#
print "-> Downloading..."
bytes = cb.sensor_installer(opts.type, opts.group)
print "-> Sensor Installer Package is %s bytes" % (len(bytes))
print "-> Download complete"
# save the instaler package to disk
#
print "-> Saving to %s..." % (opts.filename)
open(opts.filename, 'wb').write(bytes)
print "-> Complete"
def main():
if len(sys.argv) < 2:
sys.stderr.write("USAGE: %s measurement\n" % sys.argv[0])
sys.exit(1)
path = sys.argv[1]
with open(os.path.join(path, "metadata.json")) as f:
metadata = json.load(f)
start = date(metadata["start"][:-1])
end = date(metadata["start"][:-1])
print('open measurement "%s" from "%s" to "%s"', metadata["name"], start, end)
for service in metadata["services"]:
print('open service "%s"' % service["name"])
with open(os.path.join(path, service["filename"])) as csvfile:
r = csv.DictReader(csvfile, dialect=csv.excel_tab)
for row in r:
print(row["time"])
def sms():
if len(sys.argv) < 2:
print(help_text)
return
if sys.argv[1] == "send":
if len(sys.argv) < 3:
print("????? ????? ?? ?? ???.")
return
if not re.match(r"[\+98|0]9[0-9]*",sys.argv[2]):
print("????? ???? ??? ?????? ???.tetete")
return
number = sys.argv[2]
if re.match(sys.argv[2], r"^\+98"):
number = re.sub("+98", "0", number)
text = sys.argv[3]
if len(text) > 100:
print("????? ??????? ??? ??? ????? ???? ???.")
return
send_sms(number, text, str(time.time()))
return
if sys.argv[1] == "credits":
get_credits()
return
def __init__(self, config):
self.service = None
self.webServer = None
self.config = config
self.httpsPort = int(self.config.get('web', 'httpsPort'))
self.httpPort = int(self.config.get('web', 'httpPort'))
self.adminPasswordHash = self.config.get('web', 'adminPasswordHash')
self.apiSecret = self.config.get('web', 'apiSecret')
self.uploadDir = self.config.get('web', 'uploadDir')
self.dbFile = self.config.get('web', 'dbFile')
self.httpsCertFile = self.config.get('web', 'httpsCertFile')
self.httpsKeyFile = self.config.get('web', 'httpsKeyFile')
self.httpsChainFile = self.config.get('web', 'httpsChainFile')
self.localVideoPort = int(self.config.get('web', 'localVideoPort'))
dir = os.path.dirname(os.path.realpath(sys.argv[0]))
self.database = database.Database(self.dbFile)
self.deviceConfig = dict()
for deviceId, jsonConf in dict(self.config.items('devices')).iteritems():
self.deviceConfig[deviceId] = json.loads(jsonConf, object_pairs_hook=OrderedDict)
self.trends = dict()
self.lock = threading.Lock()
def main(argv=sys.argv):
if len(argv) != 2:
usage(argv)
from pyramid.paster import get_appsettings, setup_logging
from stalker import db
config_uri = argv[1]
setup_logging(config_uri)
settings = get_appsettings(config_uri)
db.setup(settings)
db.init()
# create statuses
create_statuses_and_status_lists()
create_ticket_types()
def run_setup(setup_script, args):
"""Run a distutils setup script, sandboxed in its directory"""
setup_dir = os.path.abspath(os.path.dirname(setup_script))
with setup_context(setup_dir):
try:
sys.argv[:] = [setup_script] + list(args)
sys.path.insert(0, setup_dir)
# reset to include setup dir, w/clean callback list
working_set.__init__()
working_set.callbacks.append(lambda dist: dist.activate())
# __file__ should be a byte string on Python 2 (#712)
dunder_file = (
setup_script
if isinstance(setup_script, str) else
setup_script.encode(sys.getfilesystemencoding())
)
with DirectorySandbox(setup_dir):
ns = dict(__file__=dunder_file, __name__='__main__')
_execfile(setup_script, ns)
except SystemExit as v:
if v.args and v.args[0]:
raise
# Normal exit, just return
def run():
"""
Run the script in sys.argv[1] as if it had
been invoked naturally.
"""
__builtins__
script_name = sys.argv[1]
namespace = dict(
__file__=script_name,
__name__='__main__',
__doc__=None,
)
sys.argv[:] = sys.argv[1:]
open_ = getattr(tokenize, 'open', open)
script = open_(script_name).read()
norm_script = script.replace('\\r\\n', '\\n')
code = compile(norm_script, script_name, 'exec')
exec(code, namespace)
def main(argv=None, **kw):
from setuptools import setup
from setuptools.dist import Distribution
class DistributionWithoutHelpCommands(Distribution):
common_usage = ""
def _show_help(self, *args, **kw):
with _patch_usage():
Distribution._show_help(self, *args, **kw)
if argv is None:
argv = sys.argv[1:]
with _patch_usage():
setup(
script_args=['-q', 'easy_install', '-v'] + argv,
script_name=sys.argv[0] or 'easy_install',
distclass=DistributionWithoutHelpCommands,
**kw
)
def __init__(self, methodName='runTest', orbArgs=[]):
unittest.TestCase.__init__(self, methodName)
args = sys.argv
self.debuglevel = 3
for arg in args:
if '--debuglevel' in arg:
self.debuglevel = arg.split('=')[-1]
self._orb = CORBA.ORB_init(sys.argv + orbArgs, CORBA.ORB_ID)
self._poa = self._orb.resolve_initial_references("RootPOA")
self._poa._get_the_POAManager().activate()
self._ns = self._orb.resolve_initial_references("NameService")
self._root = self._ns._narrow(CosNaming.NamingContext)
# Maintain a registry of the DomainManager (there should normally be just one)
# and all spawned DeviceManagers, for easy cleanup.
self._domainBooter = None
self._domainManager = None
self._deviceLock = threading.Lock()
self._deviceBooters = []
self._deviceManagers = []
self._execparams = ""
def main():
# Set up a console logger.
console = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s %(name)-12s:%(levelname)-8s: %(message)s")
console.setFormatter(formatter)
logging.getLogger().addHandler(console)
logging.getLogger().setLevel(logging.INFO)
kw = {}
longopts = ['domainname=', 'verbose']
opts, args = getopt.getopt(sys.argv[1:], 'v', longopts)
for opt, val in opts:
if opt == '--domainname':
kw['domainName'] = val
if opt in ['-v', '--verbose']:
kw['verbose'] = True
a = QApplication(sys.argv)
QObject.connect(a,SIGNAL("lastWindowClosed()"),a,SLOT("quit()"))
w = BrowseWindow(**kw)
w.show()
a.exec_()
def hook_scope(self, name=""):
"""Scope all future interactions to the current hook execution
revision."""
assert not self.revision
self.cursor.execute(
'insert into hooks (hook, date) values (?, ?)',
(name or sys.argv[0],
datetime.datetime.utcnow().isoformat()))
self.revision = self.cursor.lastrowid
try:
yield self.revision
self.revision = None
except Exception:
self.flush(False)
self.revision = None
raise
else:
self.flush()
def hook_scope(self, name=""):
"""Scope all future interactions to the current hook execution
revision."""
assert not self.revision
self.cursor.execute(
'insert into hooks (hook, date) values (?, ?)',
(name or sys.argv[0],
datetime.datetime.utcnow().isoformat()))
self.revision = self.cursor.lastrowid
try:
yield self.revision
self.revision = None
except Exception:
self.flush(False)
self.revision = None
raise
else:
self.flush()
def _init_global_state(self):
if self.options.database is None:
raise ValueError("--database is required to be defined")
self.cluster = self.options.cluster
self.database = self.options.database
self.namespace = DBSourcedNamespace(
cluster=self.cluster,
database=self.database
).get_name()
self.config_path = self.options.config_path
self.dry_run = self.options.dry_run
self.per_source_throughput_cap = self.options.per_source_throughput_cap
self.total_throughput_cap = self.options.total_throughput_cap
load_package_config(self.config_path)
self.refresh_runner_path = self.get_refresh_runner_path()
# Removing the cmd line arguments to prevent child process error.
sys.argv = sys.argv[:1]
def main():
"""
The user can provide the location of the config file as an argument.
If no location is specified, the default config file (experiment_parameters.cfg) is used.
"""
try:
config_filepath = sys.argv[1]
except:
print "\nUsing the default config file: experiment_parameters.cfg\n"
config_filepath = "experiment_parameters.cfg"
run_experiment(config_filepath)
def main():
try:
hooks.execute(sys.argv)
except UnregisteredHookError as e:
log('Unknown hook {} - skipping.'.format(e))
def hook_name():
"""The name of the currently executing hook"""
return os.environ.get('JUJU_HOOK_NAME', os.path.basename(sys.argv[0]))
def cli(argv=sys.argv[1:]):
parser = argparse.ArgumentParser(
description="Convert an RS3 file into an HTML file containing the RST tree.")
parser.add_argument('rs3_file')
parser.add_argument('output_file', nargs='?')
parser.add_argument(
'-f', '--output-format', nargs='?', default='html',
help="output format: html (default), png")
parser.add_argument(
'-d', '--debug', action='store_true',
help="output format: html (default), png")
args = parser.parse_args(argv)
if args.debug:
import pudb; pudb.set_trace()
if args.output_format == 'png':
if args.output_file:
rs3topng(args.rs3_file, args.output_file)
sys.exit(0)
else:
sys.stderr.write("No PNG output file given.\n")
sys.exit(1)
if args.output_file:
with codecs.open(args.output_file, 'w', 'utf8') as outfile:
outfile.write(rs3tohtml(args.rs3_file))
else:
sys.stdout.write(rs3tohtml(args.rs3_file).encode('utf8'))