python类debug()的实例源码

audit.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def logging_levels():
    """
    Context manager to conditionally set logging levels.

    Supports setting per-request debug logging using the `X-Request-Debug` header.

    """
    enabled = strtobool(request.headers.get("x-request-debug", "false"))
    level = None
    try:
        if enabled:
            level = getLogger().getEffectiveLevel()
            getLogger().setLevel(DEBUG)
        yield
    finally:
        if enabled:
            getLogger().setLevel(level)
audit.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def capture_request(self):
        if not current_app.debug:
            # only capture request body on debug
            return

        if not self.options.include_request_body:
            # only capture request body if requested
            return

        if (
                request.content_length and
                self.options.include_request_body is not True and
                request.content_length >= self.options.include_request_body
        ):
            # don't capture request body if it's too large
            return

        if not request.get_json(force=True, silent=True):
            # only capture request body if json
            return

        self.request_body = request.get_json(force=True)
flask_sslify.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def redirect_to_ssl(self):
        """Redirect incoming requests to HTTPS."""
        # Should we redirect?
        criteria = [
            request.is_secure,
            current_app.debug,
            request.headers.get('X-Forwarded-Proto', 'http') == 'https'
        ]

        if not any(criteria) and not self.skip:
            if request.url.startswith('http://'):
                url = request.url.replace('http://', 'https://', 1)
                code = 302
                if self.permanent:
                    code = 301
                r = redirect(url, code=code)
                return r
diyca_web_utilities.py 文件源码 项目:diyca 作者: texadactyl 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def dbuser_add(arg_userid, arg_password, arg_email):
    global dbconn
    if app.debug:
        app.logger.debug("dbuser_add: arg_userid=%s, arg_email=%s", arg_userid, arg_email)
    try:
        dbcursor = dbconn.cursor() 
        stamp = epoch2dbfmt(time.time())
        dbcursor.execute("INSERT INTO {tn} ('{cn1}', '{cn2}','{cn3}','{cn4}') VALUES ('{cv1}','{cv2}','{cv3}','{cv4}')" \
                         .format(tn=ddl.TBL_USER, \
                                 cn1=ddl.FLD_USER_ID, cv1=arg_userid, \
                                 cn2=ddl.FLD_USER_PSWD, cv2=arg_password, \
                                 cn3=ddl.FLD_USER_EMAIL, cv3=arg_email, \
                                 cn4=ddl.FLD_USER_TSTAMP, cv4=stamp))
        dbconn.commit()
    except sqlite3.Error as e:
        app.logger.error("dbuser_add: INSERT {%s,%s} failed, reason: {%s}", arg_userid, arg_email, repr(e))
        return False
    # Success
    return True

#=====================
# Remove a user record
#=====================
diyca_web_utilities.py 文件源码 项目:diyca 作者: texadactyl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def dbuser_remove(arg_userid):
    global dbconn
    if app.debug:
        app.logger.debug("dbuser_remove: arg_userid=%s", arg_userid)
    try:
        dbcursor = dbconn.cursor() 
        dbcursor.execute("DELETE FROM {tn} WHERE {cn1}='{cv1}'" \
                         .format(tn=ddl.TBL_USER, \
                                 cn1=ddl.FLD_USER_ID, cv1=arg_userid))
        dbconn.commit()
    except sqlite3.Error as e:
        app.logger.error("dbuser_remove: DELETE {%s} failed, reason: {%s}", arg_userid, repr(e))
        return False
    # Success
    return True

#========================
# Initialize the database
#========================
__init__.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def delete_memoized_verhash(self, f, *args):
        """
        Delete the version hash associated with the function.

        ..warning::

            Performing this operation could leave keys behind that have
            been created with this version hash. It is up to the application
            to make sure that all keys that may have been created with this
            version hash at least have timeouts so they will not sit orphaned
            in the cache backend.
        """
        if not callable(f):
            raise DeprecationWarning("Deleting messages by relative name is no longer"
                          " reliable, please use a function reference")

        try:
            self._memoize_version(f, delete=True)
        except Exception:
            if current_app.debug:
                raise
            logger.exception("Exception possibly due to cache backend.")
__init__.py 文件源码 项目:FileStoreGAE 作者: liantian-cn 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def delete_memoized_verhash(self, f, *args):
        """
        Delete the version hash associated with the function.

        ..warning::

            Performing this operation could leave keys behind that have
            been created with this version hash. It is up to the application
            to make sure that all keys that may have been created with this
            version hash at least have timeouts so they will not sit orphaned
            in the cache backend.
        """
        if not callable(f):
            raise DeprecationWarning("Deleting messages by relative name is no longer"
                          " reliable, please use a function reference")

        try:
            self._memoize_version(f, delete=True)
        except Exception:
            if current_app.debug:
                raise
            logger.exception("Exception possibly due to cache backend.")
format_utils.py 文件源码 项目:mlab-vis-api 作者: m-lab 项目源码 文件源码 阅读 56 收藏 0 点赞 0 评论 0
def convert_to_json(data):
    '''
    Encode the data as JSON -- taken from
    flask_restplus.representations.output_json -- updated to clean the
    dictionary of nulls.
    '''
    settings = current_app.config.get('RESTPLUS_JSON', {})

    # If we're in debug mode, and the indent is not set, we set it to a
    # reasonable value here.  Note that this won't override any existing value
    # that was set.  We also set the "sort_keys" value.
    if current_app.debug:
        settings.setdefault('indent', 4)
        settings.setdefault('sort_keys', True)

    # always end the json dumps with a new line
    # see https://github.com/mitsuhiko/flask/pull/1262
    dumped = dumps(cleandict(data), **settings) + "\n"

    return dumped
ssl.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def redirect_to_ssl(self):
        """
        Redirect incoming requests to HTTPS.
        """
        criteria = [
            request.is_secure,
            current_app.debug,
            current_app.testing,
            request.headers.get('X-Forwarded-Proto', 'http') == 'https'
        ]

        if request.headers.get('User-Agent', '').lower().startswith(self.exclude_user_agents):
            return

        if not any(criteria):
            if request.url.startswith('http://'):
                url = request.url.replace('http://', 'https://', 1)
                r = redirect(url, code=301)
                return r
server.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def worker(channel, queue, token, repo_ids=None, build_ids=None):
    allowed_repo_ids = frozenset(token['repo_ids'])

    while (await channel.wait_message()):
        msg = await channel.get_json()
        data = msg.get('data')
        if data['repository']['id'] not in allowed_repo_ids:
            continue
        if build_ids and data['id'] not in build_ids:
            continue
        if repo_ids and data['repository']['id'] not in repo_ids:
            continue
        evt = Event(
            msg.get('id'),
            msg.get('event'),
            data,
        )
        await queue.put(evt)
        current_app.logger.debug(
            'pubsub.event.received qsize=%s', queue.qsize())


# @log_errors
__init__.py 文件源码 项目:parade 作者: bailaohe 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def catch_parade_error(func):
    def wrapper(*args, **kw):
        try:
            return func(*args, **kw)
        except ParadeError as e:
            if current_app.debug:
                exc_type, exc_value, exc_traceback = sys.exc_info()
                stack_info = traceback.format_exception(exc_type, exc_value, exc_traceback)
                abort(e.status, code=e.code, message=e.reason, traceback=stack_info)
            else:
                abort(e.status, code=e.code, message=e.reason)
        except Exception as e:
            if current_app.debug:
                exc_type, exc_value, exc_traceback = sys.exc_info()
                stack_info = traceback.format_exception(exc_type, exc_value, exc_traceback)
                abort(500, code=0, message=str(e), traceback=stack_info)
            else:
                abort(500, code=0, message=str(e))
    return wrapper
json.py 文件源码 项目:locust-demo 作者: bmd 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def output_json(data, code, headers=None):
    """Makes a Flask response with a JSON encoded body"""

    settings = current_app.config.get('RESTFUL_JSON', {})

    # If we're in debug mode, and the indent is not set, we set it to a
    # reasonable value here.  Note that this won't override any existing value
    # that was set.  We also set the "sort_keys" value.
    if current_app.debug:
        settings.setdefault('indent', 4)
        settings.setdefault('sort_keys', True)

    # always end the json dumps with a new line
    # see https://github.com/mitsuhiko/flask/pull/1262
    dumped = dumps(data, **settings) + "\n"

    resp = make_response(dumped, code)
    resp.headers.extend(headers or {})
    return resp
connection.py 文件源码 项目:ibrest 作者: hamx0r 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setup_client(client):
    """ Attach handlers to the clients
    """
    #log.debug('setup_client {}'.format(client.clientId))
    client.register(handlers.connection_handler, 'ManagedAccounts', 'NextValidId')
    client.register(handlers.history_handler, 'HistoricalData')
    client.register(handlers.order_handler, 'OpenOrder', 'OrderStatus', 'OpenOrderEnd')
    client.register(handlers.portfolio_positions_handler, 'Position', 'PositionEnd')
    client.register(handlers.account_summary_handler, 'AccountSummary', 'AccountSummaryEnd')
    client.register(handlers.account_update_handler, 'UpdateAccountTime', 'UpdateAccountValue', 'UpdatePortfolio',
                    'AccountDownloadEnd')
    client.register(handlers.contract_handler, 'ContractDetails')
    client.register(handlers.executions_handler, 'ExecDetails', 'ExecDetailsEnd', 'CommissionsReport')
    client.register(handlers.error_handler, 'Error')
    # Add handlers for feeds
    client.register(handlers.market_handler, 'TickSize', 'TickPrice')

    # For easier debugging, register all messages with the generic handler
    # client.registerAll(handlers.generic_handler)

    # Be sure we're in a disconnected state
    client.disconnect()
methods.py 文件源码 项目:flat-api 作者: harryho 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def output_json(data, code, headers=None):
    """Makes a Flask response with a JSON encoded body"""

    settings = current_app.config.get('RESTFUL_JSON', {})

    # If we're in debug mode, and the indent is not set, we set it to a
    # reasonable value here.  Note that this won't override any existing value
    # that was set.  We also set the "sort_keys" value.
    if current_app.debug:
        settings.setdefault('indent', 4)
        settings.setdefault('sort_keys', False)

    # always end the json dumps with a new line
    # see https://github.com/mitsuhiko/flask/pull/1262
    dumped = dumps(data, **settings) + "\n"

    resp = make_response(dumped, code)
    # resp.headers.extend(headers or {'Content-Type':'application/json'})
    # Always return as JSON 
    resp.headers['Content-Type'] = 'application/json'
    return resp
json.py 文件源码 项目:noobotkit 作者: nazroll 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def output_json(data, code, headers=None):
    """Makes a Flask response with a JSON encoded body"""

    settings = current_app.config.get('RESTFUL_JSON', {})

    # If we're in debug mode, and the indent is not set, we set it to a
    # reasonable value here.  Note that this won't override any existing value
    # that was set.  We also set the "sort_keys" value.
    if current_app.debug:
        settings.setdefault('indent', 4)
        settings.setdefault('sort_keys', True)

    # always end the json dumps with a new line
    # see https://github.com/mitsuhiko/flask/pull/1262
    dumped = dumps(data, **settings) + "\n"

    resp = make_response(dumped, code)
    resp.headers.extend(headers or {})
    return resp
json.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def output_json(data, code, headers=None):
    """Makes a Flask response with a JSON encoded body"""

    settings = current_app.config.get('RESTFUL_JSON', {})

    # If we're in debug mode, and the indent is not set, we set it to a
    # reasonable value here.  Note that this won't override any existing value
    # that was set.  We also set the "sort_keys" value.
    if current_app.debug:
        settings.setdefault('indent', 4)
        settings.setdefault('sort_keys', True)

    # always end the json dumps with a new line
    # see https://github.com/mitsuhiko/flask/pull/1262
    dumped = dumps(data, **settings) + "\n"

    resp = make_response(dumped, code)
    resp.headers.extend(headers or {})
    return resp
audit.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def log(self, logger):
        if self.status_code == 500:
            # something actually went wrong; investigate
            dct = self.to_dict()

            if current_app.debug or current_app.testing:
                message = dct.pop("message")
                logger.warning(message, extra=dct, exc_info=True)
            else:
                logger.warning(dct)
        else:
            # usually log at INFO; a raised exception can be an error or expected behavior (e.g. 404)
            logger.info(self.to_dict())
audit.py 文件源码 项目:microcosm-flask 作者: globality-corp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def capture_response(self, response):
        self.success = True

        body, self.status_code, self.response_headers = parse_response(response)

        if not current_app.debug:
            # only capture responsebody on debug
            return

        if not self.options.include_response_body:
            # only capture response body if requested
            return

        if not body:
            # only capture request body if there is one
            return

        if (
                self.options.include_response_body is not True and
                len(body) >= self.options.include_response_body
        ):
            # don't capture response body if it's too large
            return

        try:
            self.response_body = loads(body)
        except (TypeError, ValueError):
            # not json
            pass
diyca_web_utilities.py 文件源码 项目:diyca 作者: texadactyl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def verify_email_recipient(arg_recipient):
    if app.debug:
        app.logger.debug("verify_email_recipient: email recipient = %s", arg_recipient)
    # Inspect email address
    result = re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', arg_recipient)
    if result == None:
        if app.debug:
            app.logger.debug("verify_email_recipient: Not an email address: %s", arg_recipient)
        return False
    # Extract domain name from arg_recipient
    pieces = arg_recipient.split("@")
    if len(pieces) != 2:
        if app.debug:
            app.logger.debug("verify_email_recipient: Did not split into 2 pieces: %s", arg_recipient)
        return False
    domain = pieces[1]
    if app.debug:
        app.logger.debug("verify_email_recipient: email domain = %s", domain)
    # Get MX record for target domain
    try:
        records = dns.resolver.query(domain, 'MX')
        mxRecord = str(records[0].exchange)
    except:
        if app.debug:
            app.logger.debug("verify_email_recipient: DNS MX-query exception with %s", domain)
        return False
    if app.debug:
        app.logger.debug("verify_email_recipient: DNS MX record = %s", mxRecord)
    return True

#======================================
# Convert epoch time to database format
#======================================
forms.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_wrap(self, node, classes='form-group'):
        # add required class, which strictly speaking isn't bootstrap, but
        # a common enough customization
        if node.flags.required:
            classes += ' required'

        div = tags.div(_class=classes)
        if current_app.debug:
            div.add(tags.comment(' Field: {} ({}) '.format(
                node.name, node.__class__.__name__)))

        return div
api.py 文件源码 项目:doorman 作者: mwielgoszewski 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def logger(node=None):
    '''
    '''
    data = request.get_json()
    log_type = data['log_type']
    log_level = current_app.config['DOORMAN_MINIMUM_OSQUERY_LOG_LEVEL']

    if current_app.debug:
        current_app.logger.debug(json.dumps(data, indent=2))

    if log_type == 'status':
        log_tee.handle_status(data, host_identifier=node.host_identifier)
        status_logs = []
        for item in data.get('data', []):
            if int(item['severity']) < log_level:
                continue
            status_logs.append(StatusLog(node_id=node.id, **item))
        else:
            db.session.add(node)
            db.session.bulk_save_objects(status_logs)
            db.session.commit()

    elif log_type == 'result':
        db.session.add(node)
        db.session.bulk_save_objects(process_result(data, node))
        db.session.commit()
        log_tee.handle_result(data, host_identifier=node.host_identifier)
        analyze_result.delay(data, node.to_dict())

    else:
        current_app.logger.error("%s - Unknown log_type %r",
            request.remote_addr, log_type
        )
        current_app.logger.info(json.dumps(data))
        # still need to write last_checkin, last_ip
        db.session.add(node)
        db.session.commit()

    return jsonify(node_invalid=False)
storage.py 文件源码 项目:storage-api 作者: cerndb 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def handle_netapp_exception(error):
    '''Return the error message from the filer and 500 status code'''
    return_message = {'message': error.msg, "errno": error.errno}
    if current_app.debug:
        return_message['failing_query'] = str(error.failing_query)

    return return_message, 500
forms.py 文件源码 项目:python_ddd_flask 作者: igorvinnicius 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_wrap(self, node, classes='form-group'):
        # add required class, which strictly speaking isn't bootstrap, but
        # a common enough customization
        if node.flags.required:
            classes += ' required'

        div = tags.div(_class=classes)
        if current_app.debug:
            div.add(tags.comment(' Field: {} ({}) '.format(
                node.name, node.__class__.__name__)))

        return div
core.py 文件源码 项目:flask-ask 作者: johnwheeler 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def dbgdump(obj, default=None, cls=None):
    if current_app.config.get('ASK_PRETTY_DEBUG_LOGS', False):
        indent = 2
    else:
        indent = None
    msg = json.dumps(obj, indent=indent, default=default, cls=cls)
    logger.debug(msg)
core.py 文件源码 项目:flask-ask 作者: johnwheeler 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def init_app(self, app, path='templates.yaml'):
        """Initializes Ask app by setting configuration variables, loading templates, and maps Ask route to a flask view.

        The Ask instance is given the following configuration variables by calling on Flask's configuration:

        `ASK_APPLICATION_ID`:

            Turn on application ID verification by setting this variable to an application ID or a
            list of allowed application IDs. By default, application ID verification is disabled and a
            warning is logged. This variable should be set in production to ensure
            requests are being sent by the applications you specify.
            Default: None

        `ASK_VERIFY_REQUESTS`:

            Enables or disables Alexa request verification, which ensures requests sent to your skill
            are from Amazon's Alexa service. This setting should not be disabled in production.
            It is useful for mocking JSON requests in automated tests.
            Default: True

        `ASK_VERIFY_TIMESTAMP_DEBUG`:

            Turn on request timestamp verification while debugging by setting this to True.
            Timestamp verification helps mitigate against replay attacks. It relies on the system clock
            being synchronized with an NTP server. This setting should not be enabled in production.
            Default: False

        `ASK_PRETTY_DEBUG_LOGS`:

            Add tabs and linebreaks to the Alexa request and response printed to the debug log.
            This improves readability when printing to the console, but breaks formatting when logging to CloudWatch.
            Default: False
        """
        if self._route is None:
            raise TypeError("route is a required argument when app is not None")

        app.ask = self

        app.add_url_rule(self._route, view_func=self._flask_view_func, methods=['POST'])
        app.jinja_loader = ChoiceLoader([app.jinja_loader, YamlLoader(app, path)])
core.py 文件源码 项目:flask-ask 作者: johnwheeler 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _alexa_request(self, verify=True):
        raw_body = flask_request.data
        alexa_request_payload = json.loads(raw_body)

        if verify:
            cert_url = flask_request.headers['Signaturecertchainurl']
            signature = flask_request.headers['Signature']

            # load certificate - this verifies a the certificate url and format under the hood
            cert = verifier.load_certificate(cert_url)
            # verify signature
            verifier.verify_signature(cert, signature, raw_body)

            # verify timestamp
            raw_timestamp = alexa_request_payload.get('request', {}).get('timestamp')
            timestamp = self._parse_timestamp(raw_timestamp)

            if not current_app.debug or self.ask_verify_timestamp_debug:
                verifier.verify_timestamp(timestamp)

            # verify application id
            try:
                application_id = alexa_request_payload['session']['application']['applicationId']
            except KeyError:
                application_id = alexa_request_payload['context'][
                    'System']['application']['applicationId']
            if self.ask_application_id is not None:
                verifier.verify_application_id(application_id, self.ask_application_id)

        return alexa_request_payload
server.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def ping(loop, resp, client_guid):
    # periodically send ping to the browser. Any message that
    # starts with ":" colon ignored by a browser and could be used
    # as ping message.
    while True:
        await asyncio.sleep(15, loop=loop)
        current_app.logger.debug('pubsub.ping guid=%s', client_guid)
        resp.write(b': ping\r\n\r\n')


# @log_errors
server.py 文件源码 项目:zeus 作者: getsentry 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def build_server(loop, host, port):
    app = Application(loop=loop, logger=current_app.logger,
                      debug=current_app.debug)
    app.router.add_route('GET', '/', stream)
    app.router.add_route('GET', '/healthz', health_check)

    return await loop.create_server(app.make_handler(), host, port)
__init__.py 文件源码 项目:ddots-api-server 作者: frol 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def serve_swaggerui_assets(path):
    """
    Swagger-UI assets serving route.
    """
    if not current_app.debug:
        import warnings
        warnings.warn(
            "/swaggerui/ is recommended to be served by public-facing server (e.g. NGINX)"
        )
    from flask import send_from_directory
    return send_from_directory('../static/', path)
forms.py 文件源码 项目:flask-zhenai-mongo-echarts 作者: Fretice 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _get_wrap(self, node, classes='form-group'):
        # add required class, which strictly speaking isn't bootstrap, but
        # a common enough customization
        if node.flags.required:
            classes += ' required'

        div = tags.div(_class=classes)
        if current_app.debug:
            div.add(tags.comment(' Field: {} ({}) '.format(
                node.name, node.__class__.__name__)))

        return div


问题


面经


文章

微信
公众号

扫码关注公众号