def get_java_db_connection_string(config=cfg.CONF):
ssl_params = ''
if config.database.use_ssl:
ssl_params = "&useSSL=%s&requireSSL=%s" % (
config.database.use_ssl, config.database.use_ssl
)
# FIXME I don't like this, find a better way of managing the conn
return 'jdbc:%s://%s/%s?user=%s&password=%s%s' % (
config.database.server_type,
config.database.host,
config.database.database_name,
config.database.username,
config.database.password,
ssl_params,
)
python类CONF的实例源码
def load_repositories_options():
repo_opts = [
cfg.StrOpt(
'offsets',
default='monasca_transform.offset_specs:JSONOffsetSpecs',
help='Repository for offset persistence'
),
cfg.StrOpt(
'data_driven_specs',
default='monasca_transform.data_driven_specs.'
'json_data_driven_specs_repo:JSONDataDrivenSpecsRepo',
help='Repository for metric and event data_driven_specs'
),
cfg.IntOpt('offsets_max_revisions', default=10,
help="Max revisions of offsets for each application")
]
repo_group = cfg.OptGroup(name='repositories', title='repositories')
cfg.CONF.register_group(repo_group)
cfg.CONF.register_opts(repo_opts, group=repo_group)
def load_messaging_options():
messaging_options = [
cfg.StrOpt('adapter',
default='monasca_transform.messaging.adapter:'
'KafkaMessageAdapter',
help='Message adapter implementation'),
cfg.StrOpt('topic', default='metrics',
help='Messaging topic'),
cfg.StrOpt('brokers',
default='192.168.10.4:9092',
help='Messaging brokers'),
cfg.StrOpt('publish_kafka_project_id',
default='111111',
help='publish aggregated metrics tenant'),
cfg.StrOpt('adapter_pre_hourly',
default='monasca_transform.messaging.adapter:'
'KafkaMessageAdapterPreHourly',
help='Message adapter implementation'),
cfg.StrOpt('topic_pre_hourly', default='metrics_pre_hourly',
help='Messaging topic pre hourly')
]
messaging_group = cfg.OptGroup(name='messaging', title='messaging')
cfg.CONF.register_group(messaging_group)
cfg.CONF.register_opts(messaging_options, group=messaging_group)
def _do_work(self, params, fn):
pod_name = params.args.K8S_POD_NAME
timeout = CONF.cni_daemon.vif_annotation_timeout
# In case of KeyError retry for `timeout` s, wait 1 s between tries.
@retrying.retry(stop_max_delay=(timeout * 1000), wait_fixed=1000,
retry_on_exception=lambda e: isinstance(e, KeyError))
def find():
return self.registry[pod_name]
try:
d = find()
pod = d['pod']
vif = base.VersionedObject.obj_from_primitive(d['vif'])
except KeyError:
raise exceptions.ResourceNotReady(pod_name)
fn(vif, self._get_inst(pod), params.CNI_IFNAME, params.CNI_NETNS)
return vif
def run(self):
server_pair = CONF.cni_daemon.bind_address
LOG.info('Starting server on %s.', server_pair)
try:
address, port = server_pair.split(':')
port = int(port)
except ValueError:
LOG.exception('Cannot start server on %s.', server_pair)
raise
try:
self.application.run(address, port,
processes=CONF.cni_daemon.worker_num)
except Exception:
LOG.exception('Failed to start kuryr-daemon.')
raise
def reboot(self, task):
"""Cycles the power to the task's node.
:param task: a TaskManager instance containing the node to act on.
:raises: InvalidParameterValue if iboot parameters are invalid.
:raises: MissingParameterValue if required iboot parameters are
missing.
:raises: PowerStateFailure if the final state of the node is not
POWER_ON.
"""
driver_info = _parse_driver_info(task.node)
_switch(driver_info, False)
_sleep_switch(CONF.iboot.reboot_delay)
_switch(driver_info, True)
_check_power_state(driver_info, states.POWER_ON)
def setup_log():
logging.register_options(CONF)
conf_kwargs = dict(args=[], project=DOMAIN, version=VERSION)
callback_config = parse_callback_config()
if callback_config['ironic_config']:
conf_kwargs['default_config_files'] = [
callback_config['ironic_config']]
CONF(**conf_kwargs)
if callback_config['use_journal']:
CONF.set_override('use_journal', True)
if callback_config['use_syslog']:
CONF.set_override('use_syslog', True)
if callback_config['ironic_log_file']:
CONF.set_override("log_file", callback_config['ironic_log_file'])
logging.setup(CONF, DOMAIN)
def __init__(self, name):
self._host = CONF.api_listen
self._port = CONF.api_listen_port
if platform.system() == "Windows":
self._workers = 1
else:
self._workers = (
CONF.api_workers or processutils.get_worker_count())
self._loader = wsgi.Loader(CONF)
self._app = self._loader.load_app(name)
self._server = wsgi.Server(CONF,
name,
self._app,
host=self._host,
port=self._port)
def push_metrics(token, ceilometer_data, counter_name):
json_data = json.dumps(ceilometer_data)
length = len(json_data)
headers = {
"Content-Type": 'application/json',
'X-Auth-Token': token,
'Content-Length': length
}
ceilomter_url = client.concatenate_url(CONF.gexporter.
ceilometer_endpoint,
meters+counter_name)
try:
resp = client.http_request("POST", ceilomter_url, headers, json_data,
None, None)
if resp.status_code != 200:
return False
else:
return True
except Exception as ex:
raise ex
def validate_token(token):
headers = {
"Content-Type": 'application/json',
'X-Auth-Token': token
}
keystone_token_validation_url = client.concatenate_url(
CONF.gexporter.keystone_endpoint, tenants)
try:
resp = client.http_request("GET", keystone_token_validation_url,
headers, None, None, None)
if resp.status_code != 200:
return False
else:
return True
except Exception as ex:
raise ex
def get_token():
keystone_token_request_url = client.concatenate_url(
CONF.gexporter.keystone_endpoint, tokens)
headers = {
"Content-Type": "application/json"
}
auth_data = {
"auth": {
"tenantName": CONF.gexporter.tenant_name,
"passwordCredentials": {
"username": CONF.gexporter.username,
"password": CONF.gexporter.password
}
}
}
resp = client.http_request("POST", keystone_token_request_url, headers,
json.dumps(auth_data), None, None)
json_resp = json.loads(resp.text)
auth_token = json_resp["access"]["token"]["id"]
tenant_id = json_resp["access"]["token"]["tenant"]["id"]
return auth_token, tenant_id
def before(self, state):
headers = state.request.headers
is_public_api = state.request.environ.get(
'is_public_api', False)
ctx = context.RequestContext.from_environ(
state.request.environ,
is_public_api=is_public_api,
project_id=headers.get('X-Project-Id'),
user_id=headers.get('X-User-Id'),
)
# Do not pass any token with context for noauth mode
if cfg.CONF.auth_strategy == 'noauth':
ctx.auth_token = None
creds = ctx.to_policy_values()
is_admin = policy.check('is_admin', creds, creds)
ctx.is_admin = is_admin
state.request.context = ctx
def start(self):
super(RPCService, self).start()
admin_context = context.RequestContext('admin', 'admin', is_admin=True)
target = messaging.Target(topic=self.topic, server=self.host)
endpoints = [self.manager]
serializer = objects_base.IotronicObjectSerializer()
self.rpcserver = rpc.get_server(target, endpoints, serializer)
self.rpcserver.start()
self.handle_signal()
self.manager.init_host()
self.tg.add_dynamic_timer(
self.manager.periodic_tasks,
periodic_interval_max=cfg.CONF.periodic_interval,
context=admin_context)
LOG.info(_LI('Created RPC server for service %(service)s on host '
'%(host)s.'),
{'service': self.topic, 'host': self.host})
def prepare_service(argv=[]):
log.register_options(cfg.CONF)
log.set_defaults(default_log_levels=['amqp=WARN',
'amqplib=WARN',
'qpid.messagregister_optionsing=INFO',
'oslo.messaging=INFO',
'sqlalchemy=WARN',
'keystoneclient=INFO',
'stevedore=INFO',
'eventlet.wsgi.server=WARN',
'iso8601=WARN',
'paramiko=WARN',
'requests=WARN',
'neutronclient=WARN',
'glanceclient=WARN',
'iotronic.openstack.common=WARN',
'urllib3.connectionpool=WARN',
])
config.parse_args(argv)
log.setup(cfg.CONF, 'iotronic')
def get_oslo_policy_enforcer():
# This method is for use by oslopolicy CLI scripts. Those scripts need the
# 'output-file' and 'namespace' options, but having those in sys.argv means
# loading the Ironic config options will fail as those are not expected to
# be present. So we pass in an arg list with those stripped out.
conf_args = []
# Start at 1 because cfg.CONF expects the equivalent of sys.argv[1:]
i = 1
while i < len(sys.argv):
if sys.argv[i].strip('-') in ['namespace', 'output-file']:
i += 2
continue
conf_args.append(sys.argv[i])
i += 1
cfg.CONF(conf_args, project='ironic')
return get_enforcer()
# NOTE(deva): We can't call these methods from within decorators because the
# 'target' and 'creds' parameter must be fetched from the call time
# context-local pecan.request magic variable, but decorators are compiled
# at module-load time.
def authorize(rule, target, creds, *args, **kwargs):
"""A shortcut for policy.Enforcer.authorize()
Checks authorization of a rule against the target and credentials, and
raises an exception if the rule is not defined.
Always returns true if CONF.auth_strategy == noauth.
Beginning with the Newton cycle, this should be used in place of 'enforce'.
"""
if CONF.auth_strategy == 'noauth':
return True
enforcer = get_enforcer()
try:
return enforcer.authorize(rule, target, creds, do_raise=True,
*args, **kwargs)
except policy.PolicyNotAuthorized:
raise exception.HTTPForbidden(resource=rule)
def enforce(rule, target, creds, do_raise=False, exc=None, *args, **kwargs):
"""A shortcut for policy.Enforcer.enforce()
Checks authorization of a rule against the target and credentials.
Always returns true if CONF.auth_strategy == noauth.
"""
# NOTE(deva): this method is obsoleted by authorize(), but retained for
# backwards compatibility in case it has been used downstream.
# It may be removed in the Pike cycle.
LOG.warning(_LW(
"Deprecation warning: calls to ironic.common.policy.enforce() "
"should be replaced with authorize(). This method may be removed "
"in a future release."))
if CONF.auth_strategy == 'noauth':
return True
enforcer = get_enforcer()
return enforcer.enforce(rule, target, creds, do_raise=do_raise,
exc=exc, *args, **kwargs)
def create_service():
LOG.debug(_('create xjob server'))
xmanager = XManager()
xservice = XService(
host=CONF.host,
binary="xjob",
topic=topics.TOPIC_XJOB,
manager=xmanager,
periodic_enable=True,
report_interval=_TIMER_INTERVAL,
periodic_interval_max=_TIMER_INTERVAL_MAX,
serializer=Serializer()
)
xservice.start()
return xservice
def main():
config.init(xservice.common_opts, sys.argv[1:])
host = CONF.host
workers = CONF.workers
if workers < 1:
LOG.warning(_LW("Wrong worker number, worker = %(workers)s"), workers)
workers = 1
LOG.info(_LI("XJob Server on http://%(host)s with %(workers)s"),
{'host': host, 'workers': workers})
xservice.serve(xservice.create_service(), workers)
LOG.info(_LI("Configuration:"))
CONF.log_opt_values(LOG, std_logging.INFO)
xservice.wait()
def main():
core.initialize()
logging.register_options(CONF)
logging.setup(CONF, 'trio2o-db-manage')
CONF.register_cli_opt(command_opt)
version_info = pbr.version.VersionInfo('trio2o')
try:
CONF(sys.argv[1:], project='trio2o', prog='trio2o-db-manage',
version=version_info.version_string())
except RuntimeError as e:
sys.exit("ERROR: %s" % e)
try:
CONF.command.func()
except Exception as e:
sys.exit("ERROR: %s" % e)
def main():
config.init(app.common_opts, sys.argv[1:])
application = app.setup_app()
host = CONF.bind_host
port = CONF.bind_port
workers = CONF.api_workers
if workers < 1:
LOG.warning(_LW("Wrong worker number, worker = %(workers)s"), workers)
workers = 1
LOG.info(_LI("Cinder_APIGW on http://%(host)s:%(port)s with %(workers)s"),
{'host': host, 'port': port, 'workers': workers})
service = wsgi.Server(CONF, 'Trio2o Cinder_APIGW',
application, host, port)
restapp.serve(service, CONF, workers)
LOG.info(_LI("Configuration:"))
CONF.log_opt_values(LOG, std_logging.INFO)
restapp.wait()
def main():
config.init(app.common_opts, sys.argv[1:])
application = app.setup_app()
host = CONF.bind_host
port = CONF.bind_port
workers = CONF.api_workers
if workers < 1:
LOG.warning(_LW("Wrong worker number, worker = %(workers)s"), workers)
workers = 1
LOG.info(_LI("Admin API on http://%(host)s:%(port)s with %(workers)s"),
{'host': host, 'port': port, 'workers': workers})
service = wsgi.Server(CONF, 'Trio2o Admin_API', application, host, port)
restapp.serve(service, CONF, workers)
LOG.info(_LI("Configuration:"))
CONF.log_opt_values(LOG, std_logging.INFO)
restapp.wait()
def __init__(self):
super(XJobAPI, self).__init__()
rpc.init(CONF)
target = messaging.Target(topic=topics.TOPIC_XJOB, version='1.0')
upgrade_level = CONF.upgrade_levels.xjobapi
version_cap = 1.0
if upgrade_level == 'auto':
version_cap = self._determine_version_cap(target)
else:
version_cap = self.VERSION_ALIASES.get(upgrade_level,
upgrade_level)
serializer = Serializer()
self.client = rpc.get_client(target,
version_cap=version_cap,
serializer=serializer)
# to do the version compatibility for future purpose
def errors(self):
"""Get all of the errors from the log files."""
error_found = 0
if CONF.log_dir:
logs = [x for x in os.listdir(CONF.log_dir) if x.endswith('.log')]
for file in logs:
log_file = os.path.join(CONF.log_dir, file)
lines = [line.strip() for line in open(log_file, "r")]
lines.reverse()
print_name = 0
for index, line in enumerate(lines):
if line.find(" ERROR ") > 0:
error_found += 1
if print_name == 0:
print(log_file + ":-")
print_name = 1
print("Line %d : %s" % (len(lines) - index, line))
if error_found == 0:
print("No errors in logfiles!")
def service_get_all_learning_sorted(context):
session = get_session()
with session.begin():
topic = CONF.learning_topic
label = 'learning_gigabytes'
subq = model_query(context, models.Share,
func.sum(models.Share.size).label(label),
session=session, read_deleted="no").\
join(models.ShareInstance,
models.ShareInstance.learning_id == models.Share.id).\
group_by(models.ShareInstance.host).\
subquery()
return _service_get_all_topic_subquery(context,
session,
topic,
subq,
label)
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, periodic_fuzzy_delay=None,
service_name=None, *args, **kwargs):
super(Service, self).__init__()
if not rpc.initialized():
rpc.init(CONF)
self.host = host
self.binary = binary
self.topic = topic
self.manager_class_name = manager
manager_class = importutils.import_class(self.manager_class_name)
self.manager = manager_class(host=self.host,
service_name=service_name,
*args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
def wait():
LOG.debug('Full set of CONF:')
for flag in CONF:
flag_get = CONF.get(flag, None)
# hide flag contents from log if contains a password
# should use secret flag when switch over to openstack-common
if ("_password" in flag or "_key" in flag or
(flag == "sql_connection" and "mysql:" in flag_get)):
LOG.debug('%(flag)s : FLAG SET ', {"flag": flag})
else:
LOG.debug('%(flag)s : %(flag_get)s',
{"flag": flag, "flag_get": flag_get})
try:
_launcher.wait()
except KeyboardInterrupt:
_launcher.stop()
rpc.cleanup()
def limited_by_marker(items, request, max_limit=CONF.osapi_max_limit):
"""Return a slice of items according to the requested marker and limit."""
params = get_pagination_params(request)
limit = params.get('limit', max_limit)
marker = params.get('marker')
limit = min(max_limit, limit)
start_index = 0
if marker:
start_index = -1
for i, item in enumerate(items):
if 'flavorid' in item:
if item['flavorid'] == marker:
start_index = i + 1
break
elif item['id'] == marker or item.get('uuid') == marker:
start_index = i + 1
break
if start_index < 0:
msg = _('marker [%s] not found') % marker
raise webob.exc.HTTPBadRequest(explanation=msg)
range_end = start_index + limit
return items[start_index:range_end]
def saharaclient(context):
deprecated_opts_for_v2 = {
'auth_url': CONF.sahara.auth_url,
'token': context.auth_token,
'tenant_id': context.tenant,
}
opts_for_v3 = {
'auth_url': CONF.sahara.auth_url,
'token': context.auth_token,
'project_id': context.tenant,
}
AUTH_OBJ = client_auth.AuthClientLoader(
client_class=sahara_client.Client,
exception_module=sahara_exception,
cfg_group=SAHARA_GROUP,
deprecated_opts_for_v2=deprecated_opts_for_v2,
opts_for_v3=opts_for_v3,
url=CONF.sahara.auth_url,
token=context.auth_token)
return AUTH_OBJ.get_client(context)
def __init__(self, db_session, db_migrate, sql_connection, sqlite_db,
sqlite_clean_db):
self.sql_connection = sql_connection
self.sqlite_db = sqlite_db
self.sqlite_clean_db = sqlite_clean_db
self.engine = db_session.get_engine()
self.engine.dispose()
conn = self.engine.connect()
if sql_connection == "sqlite://":
self.setup_sqlite(db_migrate)
else:
testdb = os.path.join(CONF.state_path, sqlite_db)
db_migrate.upgrade('head')
if os.path.exists(testdb):
return
if sql_connection == "sqlite://":
conn = self.engine.connect()
self._DB = "".join(line for line in conn.connection.iterdump())
self.engine.dispose()
else:
cleandb = os.path.join(CONF.state_path, sqlite_clean_db)
shutil.copyfile(testdb, cleandb)