def serve_swaggerui_assets(path):
"""
Swagger-UI assets serving route.
"""
if not current_app.debug:
import warnings
warnings.warn(
"/swaggerui/ is recommended to be served by public-facing server (e.g. NGINX)"
)
from flask import send_from_directory
return send_from_directory('../static/', path)
python类debug()的实例源码
def get_client():
""" Creates a client connection to be used with orders
"""
# Get client ID from our non-order pool list in memory
timeout = g.timeout
while g.clientId_in_use:
log.debug('Waiting for clientId to become available...({})'.format(timeout))
time.sleep(0.5)
timeout -= 1
client = g.client_connection
# Enable logging if we're in debug mode
if current_app.debug is True:
client.enableLogging()
# Reconnect if needed
if not client.isConnected():
log.debug('Client {} not connected. Trying to reconnect...'.format(g.client_id))
client.disconnect()
time.sleep(1)
client.connect()
# If we failed to reconnect, be sure to put our client ID back in the pool
if client.isConnected() is False:
raise Exception('Client cannot connect')
return client
def _get_wrap(self, node, classes='form-group'):
# add required class, which strictly speaking isn't bootstrap, but
# a common enough customization
if node.flags.required:
classes += ' required'
div = tags.div(_class=classes)
if current_app.debug:
div.add(tags.comment(' Field: {} ({}) '.format(
node.name, node.__class__.__name__)))
return div
def recording_enabled():
return (current_app.debug
or current_app.config.get('SQLALCHEMY_RECORD_QUERIES'))
def _get_wrap(self, node, classes='form-group'):
# add required class, which strictly speaking isn't bootstrap, but
# a common enough customization
if node.flags.required:
classes += ' required'
div = tags.div(_class=classes)
if current_app.debug:
div.add(tags.comment(' Field: {} ({}) '.format(
node.name, node.__class__.__name__)))
return div
def handle_all_exceptions(e):
is_server_error = not isinstance(e, HTTPException)
ret = {}
error = {}
ret['error'] = error
if is_server_error or e.code >= 500:
# Use context_id from the client if it's available, or make one if not.
log_context = request.headers.get("Drift-Log-Context")
log_context = json.loads(log_context) if log_context else {}
context_id = log_context.get("request_id", str(uuid.uuid4()).replace("-", ""))
error['context_id'] = context_id
title = str(e) + " - [{}]".format(context_id)
splunk_link = 'http://splunk.devnorth.dg-api.com:8000/en-US/app/search/search'
splunk_link += '?q=search%20sourcetype%3D%22*%22%20%7C%20search%20{}'.format(context_id)
error['link'] = splunk_link
if is_server_error:
# Do a traceback if caller has dev role, or we are running in debug mode.
current_user = query_current_user()
if (current_user and "dev" in current_user['roles']) or current_app.debug:
sio = cStringIO.StringIO()
ei = sys.exc_info()
sio.write("%s: %s\n" % (type(e).__name__, e))
traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
error["traceback"] = sio.getvalue()
sio.close()
error['description'] = str(e)
else:
error['description'] = "Internal Server Error"
# The exception is logged out and picked up by Splunk or comparable tool.
# The 'context_id' in the title enables quick cross referencing with the
# response body below.
log.exception(title)
ret['status_code'] = 500
ret['message'] = "Internal Server Error"
error['code'] = 'internal_server_error'
else:
ret['status_code'] = e.code
ret['message'] = e.name
error['code'] = 'user_error' if e.code < 500 else 'server_error'
error['description'] = e.description
# Support for Flask Restful 'data' property in exceptions.
if hasattr(e, 'data') and e.data:
error.update(e.data)
# Legacy field 'message'. If it's in the 'data' payload, rename the field
# to 'description'.
if 'message' in e.data:
error['description'] = error.pop('message')
if e.code >= 500:
# It's a "soft" server error. Let's log it out.
log.warning(title + " " + error['description'])
return make_response(jsonify(ret), ret['status_code'])
def sign_csr(arg_userid, arg_csr_path, arg_crt_path):
# csr = User CSR file in internal crypto format
(result, buffer) = get_file_contents(arg_csr_path)
if not result:
app.logger.error("sign_csr: cannot access CSR {%s} for user {%s}, reason: {%s}", arg_csr_path, arg_userid, buffer)
return False
try:
csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, buffer)
except Exception as e:
app.logger.error("sign_csr: load CSR {%s} for user {%s} failed, reason: {%s}", arg_csr_path, arg_userid, repr(e))
return False
# CAcertificate = CA certificate in internal crypto format
(result, buffer) = get_file_contents(CA_CRT_FILE)
if not result:
app.logger.error("sign_csr: cannot access CA certificate {%s} for user {%s}, reason: {%s}", CA_CRT_FILE, arg_userid, repr(e))
return False
try:
CAcertificate = crypto.load_certificate(crypto.FILETYPE_PEM, buffer)
if app.debug:
app.logger.debug("sign_csr: CA cert subject = {%s}", CAcertificate.get_subject())
except Exception as e:
app.logger.error("sign_csr: load CA certificate {%s} for user {%s} failed, reason: {%s}", CA_CRT_FILE, arg_userid, repr(e))
return False
# CAprivatekey = CA private key in internal crypto format
(result, buffer) = get_file_contents(CA_KEY_FILE)
if not result:
app.logger.error("sign_csr: cannot access CA private key {%s} for user {%s}, reason: {%s}", CA_KEY_FILE, arg_userid, buffer)
return False
try:
CAprivatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, buffer)
except Exception as e:
app.logger.error("sign_csr: load CA private key {%s} for user {%s} failed, reason: {%s}", CA_KEY_FILE, arg_userid, repr(e))
# Sign CSR, giving the CRT
try:
cert = crypto.X509()
cert.set_serial_number(42)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(EXPIRY_PERIOD)
cert.set_issuer(CAcertificate.get_subject())
subject = csr.get_subject() # will log the subject later
cert.set_subject(subject)
cert.set_pubkey(csr.get_pubkey())
cert.sign(CAprivatekey, DIGEST)
except Exception as e:
app.logger.error("sign_csr: Cannot sign CSR {%s} for user {%s}, reason: {%s}", arg_csr_path, arg_userid, repr(e))
return False
# Store signed CRT
try:
file = open(arg_crt_path, "w")
file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8'))
file.flush()
os.fsync(file)
file.close()
except Exception as e:
app.logger.error("sign_csr: Cannot store CRT {%s} for user {%s}, reason: {%s}", arg_crt_path, arg_userid, repr(e))
return False
# Success
app.logger.info("sign_csr: Success with CRT {%s} for user {%s}, subject={%s}", arg_csr_path, arg_userid, subject)
return True
def redis_cached(timeout=None, key_prefix='view/%s', unless=None):
"""
?????????
:param timeout: ??????, ????????3600?
:param key_prefix: ???
:param unless: ??????????
:return: ???????????
"""
def decorator(f):
@functools.wraps(f) # ??????
def decorated_function(*args, **kwargs):
if callable(unless) and unless() is True:
return f(*args, **kwargs)
if kwargs.get('nocache'):
return f(*args, **kwargs) # ????????? nocache ???????
try:
cache_key = decorated_function.make_cache_key(*args, **kwargs)
cache_key = urllib.quote(cache_key, safe='')
rv = redis_get(cache_key)
except Exception:
if current_app.debug:
raise
return f(*args, **kwargs)
if rv is None:
rv = f(*args, **kwargs)
try:
redis_set(cache_key, rv, timeout=decorated_function.cache_timeout)
except Exception:
if current_app.debug:
raise
return f(*args, **kwargs)
return rv
def make_cache_key(*args, **kwargs):
if callable(key_prefix):
cache_key = key_prefix()
elif '%s' in key_prefix:
cache_key = key_prefix % (request.url+'_uid_'+str(current_user.get_id()))
else:
cache_key = key_prefix
cache_key = hashlib.md5(cache_key.encode('utf-8')).hexdigest()
cache_key = '_'.join((get_version(level='day'), cache_key))
return cache_key
decorated_function.uncached = f
decorated_function.cache_timeout = timeout
decorated_function.make_cache_key = make_cache_key
return decorated_function
return decorator
def redis_memoize(timeout=100, make_name=None, unless=None):
"""
?????????
:param timeout: ??????, ??100s
:param make_name: ????,???????????,???????????,?????????,??????????
:param unless: ??????????
:return: ???????????
"""
def decorator(f):
@functools.wraps(f) # ??????
def decorated_function(*args, **kwargs):
if callable(unless) and unless() is True:
return f(*args, **kwargs)
if kwargs.get('nocache'):
return f(*args, **kwargs) # ????????? nocache ???????
try:
cache_key = decorated_function.make_cache_key(make_name, args, kwargs)
rv = redis_get(cache_key)
except Exception:
if current_app.debug:
raise
return f(*args, **kwargs)
if rv is None:
rv = f(*args, **kwargs)
try:
redis_set(cache_key, rv, timeout=decorated_function.cache_timeout)
except Exception:
if current_app.debug:
raise
return f(*args, **kwargs)
return rv
def make_cache_key(make_name, keyargs, keykwargs):
fname = f.__name__
if callable(make_name):
fname = make_name(fname)
if isinstance(make_name, str):
fname = make_name
alt_fname = '.'.join((f.__module__, fname))
try:
origin_str = "{0}{1}{2}".format(alt_fname, keyargs, keykwargs)
except AttributeError:
origin_str = "%s%s%s" % (alt_fname, keyargs, keykwargs)
cache_key = hashlib.md5(origin_str.encode('utf-8')).hexdigest()
cache_key = '_'.join((get_version(level='day'), cache_key))
return cache_key
decorated_function.uncached = f
decorated_function.cache_timeout = timeout
decorated_function.make_cache_key = make_cache_key
return decorated_function
return decorator
def distributed_write(node=None):
'''
'''
data = request.get_json()
if current_app.debug:
current_app.logger.debug(json.dumps(data, indent=2))
queries = data.get('queries', {})
statuses = data.get('statuses', {})
for guid, results in queries.items():
task = DistributedQueryTask.query.filter(
DistributedQueryTask.guid == guid,
DistributedQueryTask.status == DistributedQueryTask.PENDING,
DistributedQueryTask.node == node,
).first()
if not task:
current_app.logger.error(
"%s - Got result for distributed query not in PENDING "
"state: %s: %s",
request.remote_addr, guid, json.dumps(data)
)
continue
# non-zero status indicates sqlite errors
if not statuses.get(guid, 0):
status = DistributedQueryTask.COMPLETE
else:
current_app.logger.error(
"%s - Got non-zero status code (%d) on distributed query %s",
request.remote_addr, statuses.get(guid), guid
)
status = DistributedQueryTask.FAILED
for columns in results:
result = DistributedQueryResult(
columns,
distributed_query=task.distributed_query,
distributed_query_task=task
)
db.session.add(result)
else:
task.status = status
db.session.add(task)
else:
# need to write last_checkin, last_ip on node
db.session.add(node)
db.session.commit()
return jsonify(node_invalid=False)