def logging_levels():
"""
Context manager to conditionally set logging levels.
Supports setting per-request debug logging using the `X-Request-Debug` header.
"""
enabled = strtobool(request.headers.get("x-request-debug", "false"))
level = None
try:
if enabled:
level = getLogger().getEffectiveLevel()
getLogger().setLevel(DEBUG)
yield
finally:
if enabled:
getLogger().setLevel(level)
python类debug()的实例源码
def capture_request(self):
if not current_app.debug:
# only capture request body on debug
return
if not self.options.include_request_body:
# only capture request body if requested
return
if (
request.content_length and
self.options.include_request_body is not True and
request.content_length >= self.options.include_request_body
):
# don't capture request body if it's too large
return
if not request.get_json(force=True, silent=True):
# only capture request body if json
return
self.request_body = request.get_json(force=True)
def redirect_to_ssl(self):
"""Redirect incoming requests to HTTPS."""
# Should we redirect?
criteria = [
request.is_secure,
current_app.debug,
request.headers.get('X-Forwarded-Proto', 'http') == 'https'
]
if not any(criteria) and not self.skip:
if request.url.startswith('http://'):
url = request.url.replace('http://', 'https://', 1)
code = 302
if self.permanent:
code = 301
r = redirect(url, code=code)
return r
def dbuser_add(arg_userid, arg_password, arg_email):
global dbconn
if app.debug:
app.logger.debug("dbuser_add: arg_userid=%s, arg_email=%s", arg_userid, arg_email)
try:
dbcursor = dbconn.cursor()
stamp = epoch2dbfmt(time.time())
dbcursor.execute("INSERT INTO {tn} ('{cn1}', '{cn2}','{cn3}','{cn4}') VALUES ('{cv1}','{cv2}','{cv3}','{cv4}')" \
.format(tn=ddl.TBL_USER, \
cn1=ddl.FLD_USER_ID, cv1=arg_userid, \
cn2=ddl.FLD_USER_PSWD, cv2=arg_password, \
cn3=ddl.FLD_USER_EMAIL, cv3=arg_email, \
cn4=ddl.FLD_USER_TSTAMP, cv4=stamp))
dbconn.commit()
except sqlite3.Error as e:
app.logger.error("dbuser_add: INSERT {%s,%s} failed, reason: {%s}", arg_userid, arg_email, repr(e))
return False
# Success
return True
#=====================
# Remove a user record
#=====================
def dbuser_remove(arg_userid):
global dbconn
if app.debug:
app.logger.debug("dbuser_remove: arg_userid=%s", arg_userid)
try:
dbcursor = dbconn.cursor()
dbcursor.execute("DELETE FROM {tn} WHERE {cn1}='{cv1}'" \
.format(tn=ddl.TBL_USER, \
cn1=ddl.FLD_USER_ID, cv1=arg_userid))
dbconn.commit()
except sqlite3.Error as e:
app.logger.error("dbuser_remove: DELETE {%s} failed, reason: {%s}", arg_userid, repr(e))
return False
# Success
return True
#========================
# Initialize the database
#========================
def delete_memoized_verhash(self, f, *args):
"""
Delete the version hash associated with the function.
..warning::
Performing this operation could leave keys behind that have
been created with this version hash. It is up to the application
to make sure that all keys that may have been created with this
version hash at least have timeouts so they will not sit orphaned
in the cache backend.
"""
if not callable(f):
raise DeprecationWarning("Deleting messages by relative name is no longer"
" reliable, please use a function reference")
try:
self._memoize_version(f, delete=True)
except Exception:
if current_app.debug:
raise
logger.exception("Exception possibly due to cache backend.")
def delete_memoized_verhash(self, f, *args):
"""
Delete the version hash associated with the function.
..warning::
Performing this operation could leave keys behind that have
been created with this version hash. It is up to the application
to make sure that all keys that may have been created with this
version hash at least have timeouts so they will not sit orphaned
in the cache backend.
"""
if not callable(f):
raise DeprecationWarning("Deleting messages by relative name is no longer"
" reliable, please use a function reference")
try:
self._memoize_version(f, delete=True)
except Exception:
if current_app.debug:
raise
logger.exception("Exception possibly due to cache backend.")
def convert_to_json(data):
'''
Encode the data as JSON -- taken from
flask_restplus.representations.output_json -- updated to clean the
dictionary of nulls.
'''
settings = current_app.config.get('RESTPLUS_JSON', {})
# If we're in debug mode, and the indent is not set, we set it to a
# reasonable value here. Note that this won't override any existing value
# that was set. We also set the "sort_keys" value.
if current_app.debug:
settings.setdefault('indent', 4)
settings.setdefault('sort_keys', True)
# always end the json dumps with a new line
# see https://github.com/mitsuhiko/flask/pull/1262
dumped = dumps(cleandict(data), **settings) + "\n"
return dumped
def redirect_to_ssl(self):
"""
Redirect incoming requests to HTTPS.
"""
criteria = [
request.is_secure,
current_app.debug,
current_app.testing,
request.headers.get('X-Forwarded-Proto', 'http') == 'https'
]
if request.headers.get('User-Agent', '').lower().startswith(self.exclude_user_agents):
return
if not any(criteria):
if request.url.startswith('http://'):
url = request.url.replace('http://', 'https://', 1)
r = redirect(url, code=301)
return r
def worker(channel, queue, token, repo_ids=None, build_ids=None):
allowed_repo_ids = frozenset(token['repo_ids'])
while (await channel.wait_message()):
msg = await channel.get_json()
data = msg.get('data')
if data['repository']['id'] not in allowed_repo_ids:
continue
if build_ids and data['id'] not in build_ids:
continue
if repo_ids and data['repository']['id'] not in repo_ids:
continue
evt = Event(
msg.get('id'),
msg.get('event'),
data,
)
await queue.put(evt)
current_app.logger.debug(
'pubsub.event.received qsize=%s', queue.qsize())
# @log_errors
def catch_parade_error(func):
def wrapper(*args, **kw):
try:
return func(*args, **kw)
except ParadeError as e:
if current_app.debug:
exc_type, exc_value, exc_traceback = sys.exc_info()
stack_info = traceback.format_exception(exc_type, exc_value, exc_traceback)
abort(e.status, code=e.code, message=e.reason, traceback=stack_info)
else:
abort(e.status, code=e.code, message=e.reason)
except Exception as e:
if current_app.debug:
exc_type, exc_value, exc_traceback = sys.exc_info()
stack_info = traceback.format_exception(exc_type, exc_value, exc_traceback)
abort(500, code=0, message=str(e), traceback=stack_info)
else:
abort(500, code=0, message=str(e))
return wrapper
def output_json(data, code, headers=None):
"""Makes a Flask response with a JSON encoded body"""
settings = current_app.config.get('RESTFUL_JSON', {})
# If we're in debug mode, and the indent is not set, we set it to a
# reasonable value here. Note that this won't override any existing value
# that was set. We also set the "sort_keys" value.
if current_app.debug:
settings.setdefault('indent', 4)
settings.setdefault('sort_keys', True)
# always end the json dumps with a new line
# see https://github.com/mitsuhiko/flask/pull/1262
dumped = dumps(data, **settings) + "\n"
resp = make_response(dumped, code)
resp.headers.extend(headers or {})
return resp
def setup_client(client):
""" Attach handlers to the clients
"""
#log.debug('setup_client {}'.format(client.clientId))
client.register(handlers.connection_handler, 'ManagedAccounts', 'NextValidId')
client.register(handlers.history_handler, 'HistoricalData')
client.register(handlers.order_handler, 'OpenOrder', 'OrderStatus', 'OpenOrderEnd')
client.register(handlers.portfolio_positions_handler, 'Position', 'PositionEnd')
client.register(handlers.account_summary_handler, 'AccountSummary', 'AccountSummaryEnd')
client.register(handlers.account_update_handler, 'UpdateAccountTime', 'UpdateAccountValue', 'UpdatePortfolio',
'AccountDownloadEnd')
client.register(handlers.contract_handler, 'ContractDetails')
client.register(handlers.executions_handler, 'ExecDetails', 'ExecDetailsEnd', 'CommissionsReport')
client.register(handlers.error_handler, 'Error')
# Add handlers for feeds
client.register(handlers.market_handler, 'TickSize', 'TickPrice')
# For easier debugging, register all messages with the generic handler
# client.registerAll(handlers.generic_handler)
# Be sure we're in a disconnected state
client.disconnect()
def output_json(data, code, headers=None):
"""Makes a Flask response with a JSON encoded body"""
settings = current_app.config.get('RESTFUL_JSON', {})
# If we're in debug mode, and the indent is not set, we set it to a
# reasonable value here. Note that this won't override any existing value
# that was set. We also set the "sort_keys" value.
if current_app.debug:
settings.setdefault('indent', 4)
settings.setdefault('sort_keys', False)
# always end the json dumps with a new line
# see https://github.com/mitsuhiko/flask/pull/1262
dumped = dumps(data, **settings) + "\n"
resp = make_response(dumped, code)
# resp.headers.extend(headers or {'Content-Type':'application/json'})
# Always return as JSON
resp.headers['Content-Type'] = 'application/json'
return resp
def output_json(data, code, headers=None):
"""Makes a Flask response with a JSON encoded body"""
settings = current_app.config.get('RESTFUL_JSON', {})
# If we're in debug mode, and the indent is not set, we set it to a
# reasonable value here. Note that this won't override any existing value
# that was set. We also set the "sort_keys" value.
if current_app.debug:
settings.setdefault('indent', 4)
settings.setdefault('sort_keys', True)
# always end the json dumps with a new line
# see https://github.com/mitsuhiko/flask/pull/1262
dumped = dumps(data, **settings) + "\n"
resp = make_response(dumped, code)
resp.headers.extend(headers or {})
return resp
def output_json(data, code, headers=None):
"""Makes a Flask response with a JSON encoded body"""
settings = current_app.config.get('RESTFUL_JSON', {})
# If we're in debug mode, and the indent is not set, we set it to a
# reasonable value here. Note that this won't override any existing value
# that was set. We also set the "sort_keys" value.
if current_app.debug:
settings.setdefault('indent', 4)
settings.setdefault('sort_keys', True)
# always end the json dumps with a new line
# see https://github.com/mitsuhiko/flask/pull/1262
dumped = dumps(data, **settings) + "\n"
resp = make_response(dumped, code)
resp.headers.extend(headers or {})
return resp
def log(self, logger):
if self.status_code == 500:
# something actually went wrong; investigate
dct = self.to_dict()
if current_app.debug or current_app.testing:
message = dct.pop("message")
logger.warning(message, extra=dct, exc_info=True)
else:
logger.warning(dct)
else:
# usually log at INFO; a raised exception can be an error or expected behavior (e.g. 404)
logger.info(self.to_dict())
def capture_response(self, response):
self.success = True
body, self.status_code, self.response_headers = parse_response(response)
if not current_app.debug:
# only capture responsebody on debug
return
if not self.options.include_response_body:
# only capture response body if requested
return
if not body:
# only capture request body if there is one
return
if (
self.options.include_response_body is not True and
len(body) >= self.options.include_response_body
):
# don't capture response body if it's too large
return
try:
self.response_body = loads(body)
except (TypeError, ValueError):
# not json
pass
def verify_email_recipient(arg_recipient):
if app.debug:
app.logger.debug("verify_email_recipient: email recipient = %s", arg_recipient)
# Inspect email address
result = re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', arg_recipient)
if result == None:
if app.debug:
app.logger.debug("verify_email_recipient: Not an email address: %s", arg_recipient)
return False
# Extract domain name from arg_recipient
pieces = arg_recipient.split("@")
if len(pieces) != 2:
if app.debug:
app.logger.debug("verify_email_recipient: Did not split into 2 pieces: %s", arg_recipient)
return False
domain = pieces[1]
if app.debug:
app.logger.debug("verify_email_recipient: email domain = %s", domain)
# Get MX record for target domain
try:
records = dns.resolver.query(domain, 'MX')
mxRecord = str(records[0].exchange)
except:
if app.debug:
app.logger.debug("verify_email_recipient: DNS MX-query exception with %s", domain)
return False
if app.debug:
app.logger.debug("verify_email_recipient: DNS MX record = %s", mxRecord)
return True
#======================================
# Convert epoch time to database format
#======================================
def _get_wrap(self, node, classes='form-group'):
# add required class, which strictly speaking isn't bootstrap, but
# a common enough customization
if node.flags.required:
classes += ' required'
div = tags.div(_class=classes)
if current_app.debug:
div.add(tags.comment(' Field: {} ({}) '.format(
node.name, node.__class__.__name__)))
return div
def logger(node=None):
'''
'''
data = request.get_json()
log_type = data['log_type']
log_level = current_app.config['DOORMAN_MINIMUM_OSQUERY_LOG_LEVEL']
if current_app.debug:
current_app.logger.debug(json.dumps(data, indent=2))
if log_type == 'status':
log_tee.handle_status(data, host_identifier=node.host_identifier)
status_logs = []
for item in data.get('data', []):
if int(item['severity']) < log_level:
continue
status_logs.append(StatusLog(node_id=node.id, **item))
else:
db.session.add(node)
db.session.bulk_save_objects(status_logs)
db.session.commit()
elif log_type == 'result':
db.session.add(node)
db.session.bulk_save_objects(process_result(data, node))
db.session.commit()
log_tee.handle_result(data, host_identifier=node.host_identifier)
analyze_result.delay(data, node.to_dict())
else:
current_app.logger.error("%s - Unknown log_type %r",
request.remote_addr, log_type
)
current_app.logger.info(json.dumps(data))
# still need to write last_checkin, last_ip
db.session.add(node)
db.session.commit()
return jsonify(node_invalid=False)
def handle_netapp_exception(error):
'''Return the error message from the filer and 500 status code'''
return_message = {'message': error.msg, "errno": error.errno}
if current_app.debug:
return_message['failing_query'] = str(error.failing_query)
return return_message, 500
def _get_wrap(self, node, classes='form-group'):
# add required class, which strictly speaking isn't bootstrap, but
# a common enough customization
if node.flags.required:
classes += ' required'
div = tags.div(_class=classes)
if current_app.debug:
div.add(tags.comment(' Field: {} ({}) '.format(
node.name, node.__class__.__name__)))
return div
def dbgdump(obj, default=None, cls=None):
if current_app.config.get('ASK_PRETTY_DEBUG_LOGS', False):
indent = 2
else:
indent = None
msg = json.dumps(obj, indent=indent, default=default, cls=cls)
logger.debug(msg)
def init_app(self, app, path='templates.yaml'):
"""Initializes Ask app by setting configuration variables, loading templates, and maps Ask route to a flask view.
The Ask instance is given the following configuration variables by calling on Flask's configuration:
`ASK_APPLICATION_ID`:
Turn on application ID verification by setting this variable to an application ID or a
list of allowed application IDs. By default, application ID verification is disabled and a
warning is logged. This variable should be set in production to ensure
requests are being sent by the applications you specify.
Default: None
`ASK_VERIFY_REQUESTS`:
Enables or disables Alexa request verification, which ensures requests sent to your skill
are from Amazon's Alexa service. This setting should not be disabled in production.
It is useful for mocking JSON requests in automated tests.
Default: True
`ASK_VERIFY_TIMESTAMP_DEBUG`:
Turn on request timestamp verification while debugging by setting this to True.
Timestamp verification helps mitigate against replay attacks. It relies on the system clock
being synchronized with an NTP server. This setting should not be enabled in production.
Default: False
`ASK_PRETTY_DEBUG_LOGS`:
Add tabs and linebreaks to the Alexa request and response printed to the debug log.
This improves readability when printing to the console, but breaks formatting when logging to CloudWatch.
Default: False
"""
if self._route is None:
raise TypeError("route is a required argument when app is not None")
app.ask = self
app.add_url_rule(self._route, view_func=self._flask_view_func, methods=['POST'])
app.jinja_loader = ChoiceLoader([app.jinja_loader, YamlLoader(app, path)])
def _alexa_request(self, verify=True):
raw_body = flask_request.data
alexa_request_payload = json.loads(raw_body)
if verify:
cert_url = flask_request.headers['Signaturecertchainurl']
signature = flask_request.headers['Signature']
# load certificate - this verifies a the certificate url and format under the hood
cert = verifier.load_certificate(cert_url)
# verify signature
verifier.verify_signature(cert, signature, raw_body)
# verify timestamp
raw_timestamp = alexa_request_payload.get('request', {}).get('timestamp')
timestamp = self._parse_timestamp(raw_timestamp)
if not current_app.debug or self.ask_verify_timestamp_debug:
verifier.verify_timestamp(timestamp)
# verify application id
try:
application_id = alexa_request_payload['session']['application']['applicationId']
except KeyError:
application_id = alexa_request_payload['context'][
'System']['application']['applicationId']
if self.ask_application_id is not None:
verifier.verify_application_id(application_id, self.ask_application_id)
return alexa_request_payload
def ping(loop, resp, client_guid):
# periodically send ping to the browser. Any message that
# starts with ":" colon ignored by a browser and could be used
# as ping message.
while True:
await asyncio.sleep(15, loop=loop)
current_app.logger.debug('pubsub.ping guid=%s', client_guid)
resp.write(b': ping\r\n\r\n')
# @log_errors
def build_server(loop, host, port):
app = Application(loop=loop, logger=current_app.logger,
debug=current_app.debug)
app.router.add_route('GET', '/', stream)
app.router.add_route('GET', '/healthz', health_check)
return await loop.create_server(app.make_handler(), host, port)
def serve_swaggerui_assets(path):
"""
Swagger-UI assets serving route.
"""
if not current_app.debug:
import warnings
warnings.warn(
"/swaggerui/ is recommended to be served by public-facing server (e.g. NGINX)"
)
from flask import send_from_directory
return send_from_directory('../static/', path)
def _get_wrap(self, node, classes='form-group'):
# add required class, which strictly speaking isn't bootstrap, but
# a common enough customization
if node.flags.required:
classes += ' required'
div = tags.div(_class=classes)
if current_app.debug:
div.add(tags.comment(' Field: {} ({}) '.format(
node.name, node.__class__.__name__)))
return div