python类debug()的实例源码

__init__.py 文件源码 项目:netzob-webapi 作者: netzob 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def serve_swaggerui_assets(path):
    """
    Swagger-UI assets serving route.
    """
    if not current_app.debug:
        import warnings
        warnings.warn(
            "/swaggerui/ is recommended to be served by public-facing server (e.g. NGINX)"
        )
    from flask import send_from_directory
    return send_from_directory('../static/', path)
connection.py 文件源码 项目:ibrest 作者: hamx0r 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_client():
    """ Creates a client connection to be used with orders
    """
    # Get client ID from our non-order pool list in memory
    timeout = g.timeout
    while g.clientId_in_use:
        log.debug('Waiting for clientId to become available...({})'.format(timeout))
        time.sleep(0.5)
        timeout -= 1

    client = g.client_connection

    # Enable logging if we're in debug mode
    if current_app.debug is True:
        client.enableLogging()

    # Reconnect if needed
    if not client.isConnected():
        log.debug('Client {} not connected.  Trying to reconnect...'.format(g.client_id))
        client.disconnect()
        time.sleep(1)
        client.connect()
        # If we failed to reconnect, be sure to put our client ID back in the pool
        if client.isConnected() is False:
            raise Exception('Client cannot connect')
    return client
forms.py 文件源码 项目:Lixiang_zhaoxin 作者: hejaxian 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _get_wrap(self, node, classes='form-group'):
        # add required class, which strictly speaking isn't bootstrap, but
        # a common enough customization
        if node.flags.required:
            classes += ' required'

        div = tags.div(_class=classes)
        if current_app.debug:
            div.add(tags.comment(' Field: {} ({}) '.format(
                node.name, node.__class__.__name__)))

        return div
sqlalchemy.py 文件源码 项目:Hawkeye 作者: tozhengxq 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def recording_enabled():
    return (current_app.debug
            or current_app.config.get('SQLALCHEMY_RECORD_QUERIES'))
forms.py 文件源码 项目:ngx_status 作者: YoYoAdorkable 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_wrap(self, node, classes='form-group'):
        # add required class, which strictly speaking isn't bootstrap, but
        # a common enough customization
        if node.flags.required:
            classes += ' required'

        div = tags.div(_class=classes)
        if current_app.debug:
            div.add(tags.comment(' Field: {} ({}) '.format(
                node.name, node.__class__.__name__)))

        return div
apierrors.py 文件源码 项目:drift 作者: dgnorth 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle_all_exceptions(e):
    is_server_error = not isinstance(e, HTTPException)

    ret = {}
    error = {}
    ret['error'] = error

    if is_server_error or e.code >= 500:            
        # Use context_id from the client if it's available, or make one if not.
        log_context = request.headers.get("Drift-Log-Context")
        log_context = json.loads(log_context) if log_context else {}
        context_id = log_context.get("request_id", str(uuid.uuid4()).replace("-", ""))
        error['context_id'] = context_id
        title = str(e) + " - [{}]".format(context_id)
        splunk_link = 'http://splunk.devnorth.dg-api.com:8000/en-US/app/search/search'
        splunk_link += '?q=search%20sourcetype%3D%22*%22%20%7C%20search%20{}'.format(context_id)
        error['link'] = splunk_link

    if is_server_error:
        # Do a traceback if caller has dev role, or we are running in debug mode.
        current_user = query_current_user()
        if (current_user and "dev" in current_user['roles']) or current_app.debug:
            sio = cStringIO.StringIO()
            ei = sys.exc_info()
            sio.write("%s: %s\n" % (type(e).__name__, e))
            traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
            error["traceback"] = sio.getvalue()
            sio.close()
            error['description'] = str(e)
        else:
            error['description'] = "Internal Server Error"

        # The exception is logged out and picked up by Splunk or comparable tool.
        # The 'context_id' in the title enables quick cross referencing with the 
        # response body below.
        log.exception(title)

        ret['status_code'] = 500
        ret['message'] = "Internal Server Error"
        error['code'] = 'internal_server_error'
    else:
        ret['status_code'] = e.code
        ret['message'] = e.name
        error['code'] = 'user_error' if e.code < 500 else 'server_error'
        error['description'] = e.description

        # Support for Flask Restful 'data' property in exceptions.
        if hasattr(e, 'data') and e.data:
            error.update(e.data)        

            # Legacy field 'message'. If it's in the 'data' payload, rename the field
            # to 'description'.
            if 'message' in e.data:
                error['description'] = error.pop('message')

        if e.code >= 500:
            # It's a "soft" server error. Let's log it out.
            log.warning(title + " " + error['description'])

    return make_response(jsonify(ret), ret['status_code'])
diyca_web_signer.py 文件源码 项目:diyca 作者: texadactyl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def sign_csr(arg_userid, arg_csr_path, arg_crt_path):
    # csr = User CSR file in internal crypto format
    (result, buffer) = get_file_contents(arg_csr_path)
    if not result:
        app.logger.error("sign_csr: cannot access CSR {%s} for user {%s}, reason: {%s}", arg_csr_path, arg_userid, buffer)
        return False
    try:
        csr = crypto.load_certificate_request(crypto.FILETYPE_PEM, buffer)
    except Exception as e:
        app.logger.error("sign_csr: load CSR {%s} for user {%s} failed, reason: {%s}", arg_csr_path, arg_userid, repr(e))
        return False
    # CAcertificate = CA certificate in internal crypto format
    (result, buffer) = get_file_contents(CA_CRT_FILE)
    if not result:
        app.logger.error("sign_csr: cannot access CA certificate {%s} for user {%s}, reason: {%s}", CA_CRT_FILE, arg_userid, repr(e))
        return False
    try:
        CAcertificate = crypto.load_certificate(crypto.FILETYPE_PEM, buffer)
        if app.debug:
            app.logger.debug("sign_csr: CA cert subject = {%s}", CAcertificate.get_subject())
    except Exception as e:
        app.logger.error("sign_csr: load CA certificate {%s} for user {%s} failed, reason: {%s}", CA_CRT_FILE, arg_userid, repr(e))
        return False
    # CAprivatekey = CA private key in internal crypto format
    (result, buffer) = get_file_contents(CA_KEY_FILE)
    if not result:
        app.logger.error("sign_csr: cannot access CA private key {%s} for user {%s}, reason: {%s}", CA_KEY_FILE, arg_userid, buffer)
        return False
    try:
        CAprivatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, buffer)
    except Exception as e:
        app.logger.error("sign_csr: load CA private key {%s} for user {%s} failed, reason: {%s}", CA_KEY_FILE, arg_userid, repr(e))
    # Sign CSR, giving the CRT
    try:
        cert = crypto.X509()
        cert.set_serial_number(42)
        cert.gmtime_adj_notBefore(0)
        cert.gmtime_adj_notAfter(EXPIRY_PERIOD)
        cert.set_issuer(CAcertificate.get_subject())
        subject = csr.get_subject() # will log the subject later
        cert.set_subject(subject)
        cert.set_pubkey(csr.get_pubkey())
        cert.sign(CAprivatekey, DIGEST)
    except Exception as e:
        app.logger.error("sign_csr: Cannot sign CSR {%s} for user {%s}, reason: {%s}", arg_csr_path, arg_userid, repr(e)) 
        return False
    # Store signed CRT
    try:
        file = open(arg_crt_path, "w")
        file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode('utf-8'))
        file.flush()
        os.fsync(file)
        file.close()
    except Exception as e:
        app.logger.error("sign_csr: Cannot store CRT {%s} for user {%s}, reason: {%s}", arg_crt_path, arg_userid, repr(e))
        return False
    # Success
    app.logger.info("sign_csr: Success with CRT {%s} for user {%s}, subject={%s}", arg_csr_path, arg_userid, subject)
    return True
redis_cache_decorator.py 文件源码 项目:Plog 作者: thundernet8 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def redis_cached(timeout=None, key_prefix='view/%s', unless=None):
    """
    ?????????
    :param timeout: ??????, ????????3600?
    :param key_prefix: ???
    :param unless: ??????????
    :return: ???????????
    """
    def decorator(f):
        @functools.wraps(f)  # ??????
        def decorated_function(*args, **kwargs):
            if callable(unless) and unless() is True:
                return f(*args, **kwargs)

            if kwargs.get('nocache'):
                return f(*args, **kwargs)  # ????????? nocache ???????

            try:
                cache_key = decorated_function.make_cache_key(*args, **kwargs)
                cache_key = urllib.quote(cache_key, safe='')
                rv = redis_get(cache_key)
            except Exception:
                if current_app.debug:
                    raise
                return f(*args, **kwargs)

            if rv is None:
                rv = f(*args, **kwargs)
                try:
                    redis_set(cache_key, rv, timeout=decorated_function.cache_timeout)
                except Exception:
                    if current_app.debug:
                        raise
                    return f(*args, **kwargs)
            return rv

        def make_cache_key(*args, **kwargs):
            if callable(key_prefix):
                cache_key = key_prefix()
            elif '%s' in key_prefix:
                cache_key = key_prefix % (request.url+'_uid_'+str(current_user.get_id()))
            else:
                cache_key = key_prefix
            cache_key = hashlib.md5(cache_key.encode('utf-8')).hexdigest()
            cache_key = '_'.join((get_version(level='day'), cache_key))
            return cache_key

        decorated_function.uncached = f
        decorated_function.cache_timeout = timeout
        decorated_function.make_cache_key = make_cache_key

        return decorated_function
    return decorator
redis_cache_decorator.py 文件源码 项目:Plog 作者: thundernet8 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def redis_memoize(timeout=100, make_name=None, unless=None):
    """
    ?????????
    :param timeout: ??????, ??100s
    :param make_name: ????,???????????,???????????,?????????,??????????
    :param unless: ??????????
    :return: ???????????
    """
    def decorator(f):
        @functools.wraps(f)  # ??????
        def decorated_function(*args, **kwargs):
            if callable(unless) and unless() is True:
                return f(*args, **kwargs)

            if kwargs.get('nocache'):
                return f(*args, **kwargs)  # ????????? nocache ???????

            try:
                cache_key = decorated_function.make_cache_key(make_name, args, kwargs)
                rv = redis_get(cache_key)
            except Exception:
                if current_app.debug:
                    raise
                return f(*args, **kwargs)

            if rv is None:
                rv = f(*args, **kwargs)
                try:
                    redis_set(cache_key, rv, timeout=decorated_function.cache_timeout)
                except Exception:
                    if current_app.debug:
                        raise
                    return f(*args, **kwargs)
            return rv

        def make_cache_key(make_name, keyargs, keykwargs):
            fname = f.__name__
            if callable(make_name):
                fname = make_name(fname)
            if isinstance(make_name, str):
                fname = make_name
            alt_fname = '.'.join((f.__module__, fname))
            try:
                origin_str = "{0}{1}{2}".format(alt_fname, keyargs, keykwargs)
            except AttributeError:
                origin_str = "%s%s%s" % (alt_fname, keyargs, keykwargs)
            cache_key = hashlib.md5(origin_str.encode('utf-8')).hexdigest()
            cache_key = '_'.join((get_version(level='day'), cache_key))
            return cache_key

        decorated_function.uncached = f
        decorated_function.cache_timeout = timeout
        decorated_function.make_cache_key = make_cache_key

        return decorated_function
    return decorator
api.py 文件源码 项目:doorman 作者: mwielgoszewski 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def distributed_write(node=None):
    '''
    '''
    data = request.get_json()

    if current_app.debug:
        current_app.logger.debug(json.dumps(data, indent=2))

    queries = data.get('queries', {})
    statuses = data.get('statuses', {})

    for guid, results in queries.items():
        task = DistributedQueryTask.query.filter(
            DistributedQueryTask.guid == guid,
            DistributedQueryTask.status == DistributedQueryTask.PENDING,
            DistributedQueryTask.node == node,
        ).first()

        if not task:
            current_app.logger.error(
                "%s - Got result for distributed query not in PENDING "
                "state: %s: %s",
                request.remote_addr, guid, json.dumps(data)
            )
            continue

        # non-zero status indicates sqlite errors

        if not statuses.get(guid, 0):
            status = DistributedQueryTask.COMPLETE
        else:
            current_app.logger.error(
                "%s - Got non-zero status code (%d) on distributed query %s",
                request.remote_addr, statuses.get(guid), guid
            )
            status = DistributedQueryTask.FAILED

        for columns in results:
            result = DistributedQueryResult(
                columns,
                distributed_query=task.distributed_query,
                distributed_query_task=task
            )
            db.session.add(result)
        else:
            task.status = status
            db.session.add(task)

    else:
        # need to write last_checkin, last_ip on node
        db.session.add(node)
        db.session.commit()

    return jsonify(node_invalid=False)


问题


面经


文章

微信
公众号

扫码关注公众号