def _get_certivox_server_secret_share(self, expires):
method = options.certivoxServerSecret
methods = {
'dta': self._get_certivox_server_secret_share_dta,
'credentials.json': self._get_certivox_server_secret_share_credentials,
'manual': lambda x: raw_input('MIRACL server secret share:'),
'config': lambda x: options.certivoxServerSecret
}
func = methods[method if method in methods else 'config']
certivox_server_secret_hex = func(expires)
try:
return certivox_server_secret_hex.decode("hex")
except TypeError as e:
log.error(e)
raise SecretsError('Invalid CertiVox server secret share')
python类options()的实例源码
def setup():
"""
setup common commandline options and parse them all
you must add any of your own options before this
"""
tornado.options.define("host", default=None, help="run on the given address", type=str)
tornado.options.define("port", default=None, help="run on the given port", type=int)
tornado.options.define("base", default=None, type=str)
tornado.options.define("debug", default=None, type=int)
tornado.options.define("fcgi", default=None, type=str)
tornado.options.define("db_path", default=None, type=str)
tornado.options.define("config", default=None, help='Config file', type=str)
not_parsed = tornado.options.parse_command_line()
opts = tornado.options.options
setup_global_config(host=opts.host,
port=opts.port,
base=opts.base,
debug=opts.debug,
fcgi=opts.fcgi,
db_path=opts.db_path,
config=opts.config)
return not_parsed
def test_voice_base64_data(self):
if not options.cache_dir: return
response = self.fetch("/tts?text=hello")
status = escape.json_decode(response.body)
self.assertEqual(response.code, 200)
self.assertTrue(status["success"])
self.assertGreater(len(status["data"]["file"]), 0)
# ??????????????????
self.assertEqual(status["data"]["voice"], options.voice)
voicedata = base64.b64decode(status["data"]["sound_base64"])
with open(os.path.join(options.cache_dir,
status["data"]["file"]), "rb") as f:
filedata = f.read()
self.assertEqual(filedata, voicedata)
# ????cache??
def main():
''' main ??
'''
# ?? search_engin_server
ioloop = tornado.ioloop.IOLoop.instance()
server = tornado.httpserver.HTTPServer(Application(), xheaders=True)
server.listen(options.port)
def sig_handler(sig, _):
''' ??????
'''
logging.warn("Caught signal: %s", sig)
shutdown(ioloop, server)
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
ioloop.start()
def web(port = 23456,
via_cli = False,
):
"""
Bind Tornado server to specified port.
"""
print ('BINDING',port)
try:
tornado.options.parse_command_line()
http_server = HTTPServer(Application(),
xheaders=True,
)
http_server.bind(port)
http_server.start(16) # Forks multiple sub-processes
tornado.ioloop.IOLoop.instance().set_blocking_log_threshold(0.5)
IOLoop.instance().start()
except KeyboardInterrupt:
print 'Exit'
print ('WEB_STARTED')
def parse_command_line() -> Namespace:
"""Parse command line options and set them to ``config``.
This function skips unknown command line options. After parsing options,
set log level and set options in ``tornado.options``.
"""
import tornado.options
parser.parse_known_args(namespace=config)
set_loglevel() # set new log level based on commanline option
for k, v in vars(config).items():
if k.startswith('log'):
tornado.options.options.__setattr__(k, v)
return config
def main():
define(name='port', default=8000, type=int, help='run on the given port')
tornado.options.parse_command_line()
logger.info('================ spider web server has started ================ ')
logger.info(' server start at port -> {}, debug mode = {}'.format(options.port, constants.DEBUG))
app = make_web_app()
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
def override_config(config, override):
'''Overrides the given config by tornado.options'''
# Init config object
if 'tornado' not in config:
config['tornado'] = {}
if config['tornado'].get('server') is None:
config['tornado']['server'] = {}
if config['tornado'].get('app_settings') is None:
config['tornado']['app_settings'] = {}
if 'db' not in config:
config['db'] = {}
# Handle host, port, base, with the following priorities
# 1. command line arg (by tornado.options)
# 2. config file
# 3. hardcoded default
server_cfg = config['tornado']['server']
host = find_first([override.get('host'), server_cfg.get('host'), DEFAULT_HOST])
port = find_first([override.get('port'), server_cfg.get('port'), DEFAULT_PORT])
base = find_first([override.get('base'), server_cfg.get('base'), DEFAULT_BASE])
config['tornado']['server'].update(dict(host=host, port=port, base=base))
# If the debug flag is set, save it in app_settings and activate db echo
if override.get('debug') is not None:
config['tornado']['app_settings']['debug'] = override.get('debug')
config['db']['echo'] = override.get('debug') == 2
# Set up default database uri if it is not given
db_uri = find_first([override.get('db_path'), config['db'].get('uri'), DEFAULT_DEV_DB_URI])
config['db']['uri'] = db_uri
# Set up default cookie secret if it is not given
cookie_secret = config['tornado']['app_settings'].get('cookie_secret')
cookie_secret = find_first([cookie_secret, DEFAULT_COOKIE_SECRET])
config['tornado']['app_settings']['cookie_secret'] = cookie_secret
def main(app):
global http_server
if not tornado.options.options.fcgi:
http_server = tornado.httpserver.HTTPServer(app)
server_opts = le_config.tornado.server
host = server_opts.host
port = server_opts.port
base = server_opts.base
http_server.listen(port, address=host)
tornado.log.gen_log.info('HTTP Server started on http://%s:%s/%s',
host, port, base)
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
try:
tornado.ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
if hasattr(app, 'shutdown_hook'):
app.shutdown_hook()
raise
else:
from tornado.wsgi import WSGIAdapter
wsgi_app = WSGIAdapter(app)
def fcgiapp(env, start):
# set the script name to "" so it does not appear in the tornado path match pattern
env['SCRIPT_NAME'] = ''
return wsgi_app(env, start)
from flup.server.fcgi import WSGIServer
WSGIServer(fcgiapp, bindAddress=tornado.options.options.fcgi).run()
def get_app(self):
return Application([
url("/tts", server.TTSHandler),
url("/tts/voicelist", server.TTSVoiceHandler),
url("/tts/voice/([^/]+)", server.VoiceStaticHandler,
{"path":options.cache_dir}),
],)
# log_function=lambda h: h)
def test_default_voice(self):
response = self.fetch("/tts?text=hello")
status = escape.json_decode(response.body)
self.assertEqual(response.code, 200)
self.assertEqual(status["data"]["voice"], options.voice)
def test_hello_all_voice(self):
text = "hello world"
if not options.test_all_voice:
return
for voice in server.ALL_VOICE:
self._test_voice(text, voice)
# ?????base64?????????????????
def test_voice_filename(self):
response = self.fetch("/tts?text=hello&type=filename")
status = escape.json_decode(response.body)
if response.code == 400:
self.assertEqual(status["error"]["name"],
"tts-server close cache options")
elif response.code == 200:
self.assertTrue(status["success"])
self.assertGreater(len(status["data"]["file"]), 0)
# ??????????????????
self.assertEqual(status["data"]["voice"], options.voice)
else:
self.assertTrue(0)
# ????web??????????????????
def test_all_voice_fmt(self):
for fmt in options.fmtlist:
if len(fmt): continue
response = self.fetch("/tts?text=hello&type=filename&fmt=%s"%fmt)
status = escape.json_decode(response.body)
self.assertEqual(response.code, 200)
self.assertTrue(status["success"])
# ??????????????
self.assertEqual(os.path.splitext(status['data']['name'][1]), fmt)
def options(self):
# no body
self.set_status(204)
self.finish()
def __init__(self):
'''
?????
'''
settings = {
'xsrf_cookies': False,
'site_title': 'demo',
'debug': options.debug,
'static_path': os.path.join(options.project_path, 'static'),
'template_path': os.path.join(options.project_path, 'tpl'),
}
handlers = auto_route_handlers
logging.info("----> %s", handlers)
tornado.web.Application.__init__(self, handlers, **settings)
def __init__(self):
tornado.web.Application.__init__(self, handlers, **settings)
# self.session_manager = session.TornadoSessionManager(settings["session_secret"], settings["session_dir"])
# self.db = dbutils.Connection(
# host=options.DATABASE_HOST, database=options.DATABASE_NAME,
# user=options.DATABASE_USER, password=options.DATABASE_PASSWORD)
def main(port):
#train()
load_grocery()
tornado.options.parse_command_line()
print "start on port %s..." % port
http_server = tornado.httpserver.HTTPServer(Application(), xheaders=True)
http_server.listen(port)
#tornado.autoreload.start()
tornado.ioloop.IOLoop.instance().start()
def options(self):
# add CORS headers if TabPy has a cors_origin specified
self._add_CORS_header()
self.write({})
def options(self, pred_name):
# add CORS headers if TabPy has a cors_origin specified
self._add_CORS_header()
self.write({})
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Sets the default of the docker configuration options from the
# current environment. These will possibly be overridded by
# the appropriate entries in the configuration file when parse_file
# is invoked
env = os.environ
self.docker_host = env.get("DOCKER_HOST", "")
if self.docker_host == "":
self.docker_host = "unix://var/run/docker.sock"
# Note that definedness, not value, is considered, meaning
# that defining DOCKER_TLS_VERIFY=0 will still evaluate to True.
# This is consistent with both docker behavior and general shell
# practice.
self.tls_verify = (env.get("DOCKER_TLS_VERIFY", "") != "")
# We don't have an envvar way of saying TLS = True, so we rely on
# TLS_VERIFY set status
self.tls = self.tls_verify
cert_path = env.get("DOCKER_CERT_PATH",
os.path.join(os.path.expanduser("~"), ".docker"))
self.tls_cert = os.path.join(cert_path, 'cert.pem')
self.tls_key = os.path.join(cert_path, 'key.pem')
self.tls_ca = os.path.join(cert_path, 'ca.pem')
if self.tls:
self.docker_host = self.docker_host.replace('tcp://', 'https://')
# -------------------------------------------------------------------------
# Public
def parse_config(self, config_file):
"""Parses the config file, and assign their values to our local traits.
"""
# Keep the file line parser isolated, but use the global one
# so that we can get the help of the command line options.
file_line_parser = tornado.options.OptionParser()
for traitlet_name, traitlet in self.traits().items():
# tornado.OptionParser defines an option with a Python type
# and performs type validation.
default_value = getattr(self, traitlet_name)
file_line_parser.define(
traitlet_name,
default=default_value,
type=type(default_value),
help=traitlet.help)
# Let it raise the exception if the file is not there.
# We always want the config file to be present, even if empty
try:
file_line_parser.parse_config_file(config_file)
except FileNotFoundError:
raise tornado.options.Error(
'Could not find specified configuration'
' file "{}"'.format(config_file))
set_traits_from_dict(self, file_line_parser.as_dict())
if self.tls or self.tls_verify:
self.docker_host = self.docker_host.replace('tcp://', 'https://')
def get_app(self):
options.access_log_file_path = 'test.access.log'
options.application_log_file_path = 'test.application.log'
options.logging = 'debug'
log.setup_logging()
return application.TailSocketApplication()
def tearDown(self):
if os.path.exists(tornado.options.options.access_log_file_path):
os.remove(tornado.options.options.access_log_file_path)
if os.path.exists(tornado.options.options.application_log_file_path):
os.remove(tornado.options.options.application_log_file_path)
def test_home_page_returns_200_and_content_and_logs(self):
response = self.fetch('/')
assert response.code == 200
assert response.body is not None
assert os.stat(
tornado.options.options.access_log_file_path).st_size > 0
def _get_customer_server_secret_share(self, expires):
path = 'serverSecret'
url_params = url_concat(
'{0}/{1}'.format(options.DTALocalURL, path),
{
'app_id': self.app_id,
'expires': expires,
'signature': signMessage('{0}{1}{2}'.format(path, self.app_id, expires), self.app_key)
})
log.debug('customer server secret request: {0}'.format(url_params))
httpclient = tornado.httpclient.HTTPClient()
import socket
# Make at most 30 attempts to get server secret from local TA
for attempt in range(30):
try:
response = httpclient.fetch(url_params)
except (tornado.httpclient.HTTPError, socket.error) as e:
log.error(e)
log.error(
'Unable to get Server Secret from the customer TA server. '
'Retying...')
time.sleep(2)
continue
httpclient.close()
break
else:
# Max attempts reached
raise SecretsError(
'Unable to get Server Secret from the customer TA server.')
try:
data = json.loads(response.body)
except ValueError:
raise SecretsError('TA server response contains invalid JSON')
if 'serverSecret' not in data:
raise SecretsError('serverSecret not in response from TA server')
return data["serverSecret"].decode("hex")
def main():
global DRIVER, CONTROLLER, NODESET, DB
# note: this makes sure we have at least one handler
# logging.basicConfig(level=logging.WARNING)
#logging.basicConfig(level=logging.ERROR)
tornado.options.parse_command_line()
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.setLevel(logging.WARNING)
logger.setLevel(logging.ERROR)
for h in logger.handlers:
h.setFormatter(MyFormatter())
logging.info("opening shelf: %s", OPTIONS.db)
DB = Db(OPTIONS.db)
application = tornado.web.Application(
HANDLERS,
debug=True,
task_pool=multiprocessing.Pool(OPTIONS.tasks),
# map static/xxx to Static/xxx
static_path="Static/",
)
logging.info("opening serial")
MQ = zmessage.MessageQueue()
device = zdriver.MakeSerialDevice(OPTIONS.serial_port)
DRIVER = zdriver.Driver(device, MQ)
NODESET = znode.NodeSet(MQ, NodeEventCallback, OPTIONS.node_auto_refresh_secs)
CONTROLLER = zcontroller.Controller(MQ,
ControllerEventCallback,
pairing_timeout_secs=OPTIONS.pairing_timeout_secs)
CONTROLLER.Initialize()
CONTROLLER.WaitUntilInitialized()
CONTROLLER.UpdateRoutingInfo()
time.sleep(2)
print(CONTROLLER)
n = NODESET.GetNode(CONTROLLER.GetNodeId())
n.InitializeExternally(CONTROLLER.props.product, CONTROLLER.props.library_type, True)
for num in CONTROLLER.nodes:
n = NODESET.GetNode(num)
n.Ping(3, False)
logging.warning("listening on port %d", OPTIONS.port)
application.listen(OPTIONS.port)
tornado.ioloop.IOLoop.instance().start()
return 0
def late_init(self, db: Gino, *, loop=None, options=_options):
"""
Initialize this application with a database object.
This method does a few things to setup application for working with
the database:
- it enables task local storage;
- creates a connection pool and binds it to the passed database object;
- populates :py:attr:`~.db`.
:param db: the :py:class:`gino.ext.tornado.Gino()` class instance that
will be used in this application.
:param loop: io loop that will be used to run heep server, either
tornado's or asyncio's.
:param options: a tornado's ``OptionParser()`` instance or any
dictionary-like object with the database settings. Default is to
use ``tornado.options.options`` global.
"""
if loop is None:
loop = tornado.ioloop.IOLoop.current()
if isinstance(loop, tornado.platform.asyncio.BaseAsyncIOLoop):
asyncio_loop = loop.asyncio_loop
elif isinstance(loop, asyncio.BaseEventLoop):
asyncio_loop = loop
else:
raise RuntimeError('AsyncIOLoop is required to run GINO')
_enable_task_local(asyncio_loop)
self.db: Gino = db
await db.create_pool(
host=options['db_host'],
port=options['db_port'],
user=options['db_user'],
password=options['db_password'],
database=options['db_database'],
min_size=options['db_pool_min_size'],
max_size=options['db_pool_max_size'],
max_inactive_connection_lifetime=(
options['db_pool_max_inactive_conn_lifetime']
),
max_queries=options['db_pool_max_queries'],
loop=asyncio_loop
)
# noinspection PyAbstractClass