python类auth()的实例源码

main.py 文件源码 项目:SwarmOps 作者: staugur 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def before_request():
    g.startTime = time.time()
    g.requestId = gen_requestId()
    g.sessionId = request.cookies.get("sessionId", "")
    g.username  = request.cookies.get("username", "")
    g.expires   = request.cookies.get("time", "")
    g.auth      = isLogged_in('.'.join([ g.username, g.expires, g.sessionId ]))
    g.swarm     = swarm
    g.service   = ServiceManager(ActiveSwarm=g.swarm.getActive)
    g.node      = NodeManager(ActiveSwarm=g.swarm.getActive)
    g.network   = NetworkManager(ActiveSwarm=g.swarm.getActive)
    g.registry  = RegistryManager(Registry=REGISTRY)
    g.sysInfo   = {"Version": __version__, "Author": __author__, "Email": __email__, "Doc": __doc__}
    logger.info("Start Once Access, and this requestId is %s, auth(%s)" %(g.requestId, g.auth))
    app.logger.info(app.url_map)

#?????????????????????requestId????????????API, ??????.
auth.py 文件源码 项目:SwarmOps 作者: staugur 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def sso():
    ticket = request.args.get("ticket")
    if not ticket:
        logger.info("sso ticket get failed")
        return """<html><head><title>?????……</title><meta http-equiv="Content-Language" content="zh-CN"><meta HTTP-EQUIV="Content-Type" CONTENT="text/html; charset=utf8"><meta http-equiv="refresh" content="1.0;url={}"></head><body></b>?????, ????, ?????!<b></body></html>""".format(url_for("auth.logout"))
    logger.info("ticket: %s" %ticket)
    username, expires, sessionId = ticket.split('.')
    if username and username not in SSO["SSO.AllowedUserList"]:
        logger.info("SwarmOps is not allowed to login with {}.".format(username))
        return redirect(url_for("auth.sso"))
    if expires == 'None':
        UnixExpires = None
    else:
        UnixExpires = datetime.datetime.strptime(expires,"%Y-%m-%d")
    resp = make_response(redirect(url_for("index")))
    resp.set_cookie(key='logged_in', value="yes", expires=UnixExpires)
    resp.set_cookie(key='username',  value=username, expires=UnixExpires)
    resp.set_cookie(key='sessionId', value=sessionId, expires=UnixExpires)
    resp.set_cookie(key='time', value=expires, expires=UnixExpires)
    resp.set_cookie(key='Azone', value="sso", expires=UnixExpires)
    return resp
utils.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_token_from_request():
    """
    Pulls the auth token from the request headers.

    :return:  Auth token if it exists, else None.
    """
    token = None
    try:
        header = request.headers.get('Authorization')
        token = header.split()[1] if header is not None else request.cookies.get('auth_token')
    except (AttributeError, IndexError):
        pass

    if token is None:
        raise TokenException('Unable to get token from request.')

    return token
retailers.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_retailer(retailer_id):
    """
    Retrieve a single retailer object.

    :param retailer_id:   The retailer's id

    :return: {
        "id": "123",
        "address": {Address}
    }

    """
    check_null_input((retailer_id, 'retailer to retrieve'))

    retailer = retailer_service.get_retailer(token=g.auth['loopback_token'],
                                             retailer_id=retailer_id)
    return Response(retailer,
                    status=200,
                    mimetype='application/json')
products.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_products():
    """
    Get all product objects.

    :return: [{
        "id": "I9",
        "name": "Milk",
        "supplier": "Abbott"
    }, {...}]

    """

    products = product_service.get_products(token=g.auth['loopback_token'])
    return Response(products,
                    status=200,
                    mimetype='application/json')
distribution_centers.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_distribution_centers():
    """
    Get all distribution center objects.

    :return: [{
        "id": "D2",
        "address": {Address},
        "contact": {Contact}
    }, {...}]

    """

    distribution_centers = distribution_center_service.get_distribution_centers(token=g.auth['loopback_token'])
    return Response(distribution_centers,
                    status=200,
                    mimetype='application/json')
distribution_centers.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_distribution_center(dc_id):
    """
    Retrieve a single distribution center object.

    :param dc_id:   The distribution center's id

    :return: {
        "id": "D2",
        "address": {Address},
        "contact": {Contact}
    }

    """
    check_null_input((dc_id, 'distribution center to retrieve'))

    distribution_center = distribution_center_service.get_distribution_center(token=g.auth['loopback_token'],
                                                                              dc_id=dc_id)
    return Response(distribution_center,
                    status=200,
                    mimetype='application/json')
distribution_centers.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_distribution_center_inventory(dc_id):
    """
    Retrieve all inventory at the specified distribution center.

    :param dc_id:   The distribution center's id

    :return: [{
        "id": "123",
        "quantity": 10,
        "productId": "123",
        "locationId": "123",
        "locationType": "DistributionCenter"
    }, {...}]
    """
    check_null_input((dc_id, 'distribution center whose inventory you want to retrieve'))

    inventory = distribution_center_service.get_distribution_center_inventory(token=g.auth['loopback_token'],
                                                                              dc_id=dc_id)
    return Response(inventory,
                    status=200,
                    mimetype='application/json')
views.py 文件源码 项目:quilt 作者: quiltdata 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_instance(auth, owner, package_name, package_hash):
    instance = (
        Instance.query
        .filter_by(hash=package_hash)
        .options(undefer('contents'))  # Contents is deferred by default.
        .join(Instance.package)
        .filter_by(owner=owner, name=package_name)
        .join(Package.access)
        .filter(Access.user.in_([auth.user, PUBLIC]))
        .one_or_none()
    )
    if instance is None:
        raise ApiException(
            requests.codes.not_found,
            "Package hash does not exist"
        )
    return instance
views.py 文件源码 项目:quilt 作者: quiltdata 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def user_packages(owner):
    packages = (
        db.session.query(Package, sa.func.max(Access.user == PUBLIC))
        .filter_by(owner=owner)
        .join(Package.access)
        .filter(Access.user.in_([g.auth.user, PUBLIC]))
        .group_by(Package.id)
        .order_by(Package.name)
        .all()
    )

    return dict(
        packages=[
            dict(
                name=package.name,
                is_public=is_public
            )
            for package, is_public in packages
        ]
    )
views.py 文件源码 项目:quilt 作者: quiltdata 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def version_list(owner, package_name):
    package = _get_package(g.auth, owner, package_name)

    versions = (
        db.session.query(Version, Instance)
        .filter_by(package=package)
        .join(Version.instance)
        .all()
    )

    sorted_versions = sorted(versions, key=lambda row: row.Version.sort_key())

    return dict(
        versions=[
            dict(
                version=version.user_version,
                hash=instance.hash
            ) for version, instance in sorted_versions
        ]
    )
views.py 文件源码 项目:quilt 作者: quiltdata 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def tag_list(owner, package_name):
    package = _get_package(g.auth, owner, package_name)

    tags = (
        db.session.query(Tag, Instance)
        .filter_by(package=package)
        .order_by(Tag.tag)
        .join(Tag.instance)
        .all()
    )

    return dict(
        tags=[
            dict(
                tag=tag.tag,
                hash=instance.hash
            ) for tag, instance in tags
        ]
    )
views.py 文件源码 项目:quilt 作者: quiltdata 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def access_get(owner, package_name, user):
    if g.auth.user != owner:
        raise ApiException(
            requests.codes.forbidden,
            "Only the package owner can view access"
        )

    access = (
        db.session.query(Access)
        .filter_by(user=user)
        .join(Access.package)
        .filter_by(owner=owner, name=package_name)
        .one_or_none()
    )
    if access is None:
        raise PackageNotFoundException(owner, package_name)

    return dict()
__init__.py 文件源码 项目:fallball-connector 作者: ingrammicro 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_reseller_info():
    reseller_id = request.headers.get('Aps-Instance-Id')
    is_new = g.endpoint == ApplicationList.__name__.lower()
    reseller_name = set_name_for_reseller(reseller_id)
    oauth = get_oauth()
    return ResellerInfo(id=reseller_id, name=reseller_name, is_new=is_new, auth=oauth)
__init__.py 文件源码 项目:fallball-connector 作者: ingrammicro 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def before_request():
    g.log = dict()
    g.log['out'] = list()
    g.log['request'] = log_request(request)

    g.endpoint = request.endpoint
    if request.blueprint:
        g.endpoint = g.endpoint[len(request.blueprint):].lstrip('.')

    reseller_info = get_reseller_info()
    g.reseller_name = reseller_info.name
    g.company_name = 'N/A'

    if not reseller_info.name:
        allow_public_endpoints_only()
        return

    if not check_oauth_signature(request):
        abort(401)

    g.auth = reseller_info.auth

    g.reseller = Reseller(reseller_info.name, reseller_info.id, None)
    g.reseller.refresh()

    if not g.reseller.token and not reseller_info.is_new:
        abort(403)
public.py 文件源码 项目:SwarmOps 作者: staugur 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if not g.auth:
            return redirect(url_for('auth.login'))
        return f(*args, **kwargs)
    return decorated_function
auth.py 文件源码 项目:SwarmOps 作者: staugur 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def login():
    if g.auth:
        return redirect(url_for("index"))
    else:
        query = {"sso": True,
           "sso_r": SpliceURL.Modify(request.url_root, "/sso/").geturl,
           "sso_p": SSO["SSO.PROJECT"],
           "sso_t": md5("%s:%s" %(SSO["SSO.PROJECT"], SpliceURL.Modify(request.url_root, "/sso/").geturl))
        }
        SSOLoginURL = SpliceURL.Modify(url=SSO["SSO.URL"], path="/login/", query=query).geturl
        logger.info("User request login to SSO: %s" %SSOLoginURL)
        return redirect(SSOLoginURL)
utils.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def logged_in(func, *args, **kwargs):
    if g.auth is None:
        raise AuthorizationException('No existing session.')
    else:
        return func(*args, **kwargs)
retailers.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_retailers():
    """
    Get all retailer objects.

    :return: [{
        "id": "123",
        "address": {Address}
    }, {...}]

    """

    retailers = retailer_service.get_retailers(token=g.auth['loopback_token'])
    return Response(retailers,
                    status=200,
                    mimetype='application/json')
retailers.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_retailer_shipments(retailer_id):
    """
    Retrieve all shipments heading to the specified retailer.

    :param retailer_id:   The retailer's id

    :return: [{
        "id": "123",
        "status": "SHIPPED",
        "createdAt": "2015-11-05T22:00:51.692765",
        "updatedAt": "2015-11-08T22:00:51.692765",
        "deliveredAt": "2015-11-08T22:00:51.692765",
        "estimatedTimeOfArrival": "2015-11-07T22:00:51.692765",
        "currentLocation": {Address},
        "fromId": "D2",
        "toId:": "123"
    }, {...}]

    """
    check_null_input((retailer_id, 'retailer whose shipments you want to retrieve'))
    status = request.args.get('status')

    shipments = shipment_service.get_shipments(token=g.auth['loopback_token'],
                                               retailer_id=retailer_id,
                                               status=status)
    return Response(shipments,
                    status=200,
                    mimetype='application/json')
weather.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_recommendations():
    """
    Return recommendations.

    :return: the list of recommendations

    """
    recommendations = weather_service.get_recommendations(g.auth['guid'])
    return Response(recommendations,
                    status=200,
                    mimetype='application/json')
weather.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def acknowledge_recommendation():
    """
    Acknowledge a recommendation.
    """
    body = get_json_data(request)

    response = weather_service.acknowledge_recommendation(g.auth['guid'], body.get('id'))
    return Response(response,
                    status=200,
                    mimetype='application/json')
weather.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def trigger_simulation():
    """
    Trigger a simulation for the given demo.
    """
    response = weather_service.trigger_simulation(g.auth['guid'])
    return Response(response,
                    status=200,
                    mimetype='application/json')
shipments.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_shipments():
    """
    Get all shipment objects.

    :return: [{
        "id": "123",
        "status": "SHIPPED",
        "createdAt": "2015-11-05T22:00:51.692765",
        "updatedAt": "2015-11-08T22:00:51.692765",
        "deliveredAt": "2015-11-08T22:00:51.692765",
        "estimatedTimeOfArrival": "2015-11-07T22:00:51.692765",
        "currentLocation": {Address},
        "fromId": "D2",
        "toId:": "123"
    }, {...}]

    """

    status = request.args.get('status')
    retailer_id = request.args.get('rid')
    dc_id = request.args.get('did')
    shipments = shipment_service.get_shipments(token=g.auth['loopback_token'],
                                               status=status,
                                               retailer_id=retailer_id,
                                               dc_id=dc_id)
    return Response(shipments,
                    status=200,
                    mimetype='application/json')
shipments.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_shipment(shipment_id):
    """
    Retrieve a single shipment object.

    :param shipment_id:   The shipment's id

    :return: {
        "id": "123",
        "status": "SHIPPED",
        "createdAt": "2015-11-05T22:00:51.692765",
        "updatedAt": "2015-11-08T22:00:51.692765",
        "deliveredAt": "2015-11-08T22:00:51.692765",
        "estimatedTimeOfArrival": "2015-11-07T22:00:51.692765",
        "currentLocation": {Address},
        "fromId": "123",
        "toId:": "123",
        "items": [{LineItem}]
    }

    """
    include_items = request.args.get('include_items')
    check_null_input((shipment_id, 'shipment to retrieve'))

    shipment = shipment_service.get_shipment(token=g.auth['loopback_token'],
                                             shipment_id=shipment_id,
                                             include_items=include_items)

    return Response(shipment,
                    status=200,
                    mimetype='application/json')
shipments.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def delete_shipment(shipment_id):
    """
    Retrieve a single shipment object.

    :param shipment_id:   The shipment's id
    :return:

    """
    check_null_input((shipment_id, 'shipment to delete'))

    shipment_service.delete_shipment(token=g.auth['loopback_token'], shipment_id=shipment_id)
    return '', 204
shipments.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def update_shipment(shipment_id):
    """
    Update a single shipment object.

    :param shipment_id:   The shipment's id
    :param  {
        "id": "123",
        "status": "SHIPPED",
        "createdAt": "2015-11-05T22:00:51.692765",
        "updatedAt": "2015-11-08T22:00:51.692765",
        "deliveredAt": "2015-11-08T22:00:51.692765",
        "estimatedTimeOfArrival": "2015-11-07T22:00:51.692765",
        "currentLocation": {Address},
        "fromId": "D2",
        "toId:": "123"
    }

    :return: {
        "id": "123",
        "status": "SHIPPED",
        "createdAt": "2015-11-05T22:00:51.692765",
        "updatedAt": "2015-11-08T22:00:51.692765",
        "deliveredAt": "2015-11-08T22:00:51.692765",
        "estimatedTimeOfArrival": "2015-11-07T22:00:51.692765",
        "currentLocation": {Address},
        "fromId": "D2",
        "toId:": "123"
    }

    """
    check_null_input((shipment_id, 'shipment to update'))

    updated_shipment = get_json_data(request)
    shipment = shipment_service.update_shipment(token=g.auth['loopback_token'],
                                                shipment_id=shipment_id, shipment=updated_shipment)
    return Response(shipment,
                    status=200,
                    mimetype='application/json')
demos.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setup_auth_from_request():
    """
    Get the Auth data from the request headers and store on the g
    object for later use.
    """
    try:
        token = web_utils.get_token_from_request()
        if token is not None:
            g.auth = web_utils.detokenize(token)
    except (TokenException, ResourceDoesNotExistException):
        g.auth = None
demos.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def deauthenticate(token):
    """
    Logout the current user
    :param token  Current web token
    :return:
    """
    request_token = web_utils.get_token_from_request()
    # Only allow deletion of a web token if the token belongs to the current user
    if request_token == token:
        user_service.logout(token=g.auth['loopback_token'])
    return '', 204
demos.py 文件源码 项目:logistics-wizard-controller 作者: IBM-Cloud 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load_admin_data():
    """
    Load all data relative to the currently logged in user

    :return: {
        "shipments": [{Shipments}],
        "retailers": [{Retailer}],
        "distribution_centers": [{Distribution Center}]
    }
    """

    # Specify functions and corresponding arguments to call to retrieve ERP data
    loopback_token = g.auth['loopback_token']
    erp_calls = [(shipment_service.get_shipments, loopback_token),
                 (distribution_center_service.get_distribution_centers, loopback_token),
                 (retailer_service.get_retailers, loopback_token)]
    pool = Pool(processes=len(erp_calls))

    # Asynchronously make calls and then wait on all processes to finish
    try:
        results = pool.map(async_helper, erp_calls)
    except Exception as e:
        raise APIException('Error retrieving admin data view', internal_details=str(e))
    pool.close()
    pool.join()

    # Send back serialized results to client
    return Response(json.dumps({
                        "shipments": json.loads(results[0]),
                        "distribution-centers": json.loads(results[1]),
                        "retailers": json.loads(results[2])
                    }),
                    status=200,
                    mimetype='application/json')


问题


面经


文章

微信
公众号

扫码关注公众号