python类url()的实例源码

apiServer.py 文件源码 项目:sdos-core 作者: sdos 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_proxy_request_url(thisAuth, thisContainer=None, thisObject=None):
    """
    create the url under which this API proxy will reach its swift back end. basically this is the request url with a different hostname
    :param thisAuth:
    :param thisContainer:
    :param thisObject:
    :return:
    """
    u = configuration.swift_store_url.format(thisAuth)
    if thisContainer:
        u += "/" + thisContainer
        if thisObject:
            u += "/" + thisObject
    return u


##############################################################################
# Frontend Pool
##############################################################################
apiServer.py 文件源码 项目:sdos-core 作者: sdos 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def handle_auth():
    """
    Forward the auth request to swift
    replace the given storage url with our own:
    'X-Storage-Url': 'http://192.168.209.204:8080/v1/AUTH_test'
    becomes
    'X-Storage-Url': 'http://localhost:4000/v1/AUTH_test'

    this is the first request any client makes; we passed on an auth-token from swift
    which is used in further requests
    :return:
    """
    clientHeaders = request.headers
    swiftStatus, swiftHeaders, swiftBody = httpBackend.doAuthGetToken(reqHead=clientHeaders, method="GET")
    log.debug("swift response: {} {} {}".format(swiftStatus, swiftHeaders, swiftBody))
    if 200 == swiftStatus:
        replaceStorageUrl(swiftResponse=swiftHeaders)
        log.debug("proxy response: {} {} {}".format(swiftStatus, swiftHeaders, swiftBody))
    return Response(status=swiftStatus, headers=swiftHeaders, response=swiftBody)
hal.py 文件源码 项目:chaos-monkey-engine 作者: BBVA 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, **kwargs):
        """Initialises a new ``Self`` link instance. Accepts the same
        Keyword Arguments as :class:`.Link`.

        Additional Keyword Args:
            external (bool): if true, force link to be fully-qualified URL, defaults to False

        See Also:
            :class:`.Link`
        """

        url = request.url
        external = kwargs.get('external', False)
        if not external and current_app.config['SERVER_NAME'] is None:
            url = request.url.replace(request.host_url, '/')

        return super(Self, self).__init__('self', url, **kwargs)
ui.py 文件源码 项目:dnflow 作者: DocNow 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def feed():
    searches = query(
        '''
        SELECT * FROM searches 
        WHERE published IS NOT NULL 
        ORDER BY id DESC
        ''', json=True)
    site_url = 'http://' + app.config['HOSTNAME']
    feed_url = site_url + '/feed/'
    def add_url(s):
        s['url'] = site_url + '/summary/' + s['date_path'] + '/'
        return s
    searches = map(_date_format, searches)
    searches = list(map(add_url, searches))
    resp = make_response(
        render_template(
            'feed.xml', 
            updated=searches[0]['created'],
            site_url=site_url,
            feed_url=feed_url,
            searches=searches
        )
    )
    resp.headers['Content-Type'] = 'application/atom+xml'
    return resp
app.py 文件源码 项目:postmarketos.org 作者: postmarketOS 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def parse_post(post, external_links=False, create_html=True):
    with open(os.path.join(BLOG_CONTENT_DIR, post)) as handle:
        raw = handle.read()
    frontmatter, content = REGEX_SPLIT_FRONTMATTER.split(raw, 2)

    data = yaml.load(frontmatter)

    y, m, d, slug = post[:-3].split('-', maxsplit=3)

    if create_html:
        data['html'] = markdown.markdown(content, extensions=[
            'markdown.extensions.extra',
            'markdown.extensions.codehilite',
            'markdown.extensions.toc'
        ])

    data['url'] = url_for('blog_post', y=y, m=m, d=d, slug=slug,
                          _external=external_links)
    data['reading_time'] = reading_time(content)

    return data
__init__.py 文件源码 项目:honeyd-python 作者: sookyp 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_feed():
    from mhn.common.clio import Clio
    from mhn.auth import current_user
    authfeed = mhn.config['FEED_AUTH_REQUIRED']
    if authfeed and not current_user.is_authenticated():
        abort(404)
    feed = AtomFeed('MHN HpFeeds Report', feed_url=request.url,
                    url=request.url_root)
    sessions = Clio().session.get(options={'limit': 1000})
    for s in sessions:
        feedtext = u'Sensor "{identifier}" '
        feedtext += '{source_ip}:{source_port} on sensorip:{destination_port}.'
        feedtext = feedtext.format(**s.to_dict())
        feed.add('Feed', feedtext, content_type='text',
                 published=s.timestamp, updated=s.timestamp,
                 url=makeurl(url_for('api.get_session', session_id=str(s._id))))
    return feed
flask_utils.py 文件源码 项目:GraphDash 作者: AmadeusITGroup 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def after_request_log(response):
    name = dns_resolve(request.remote_addr)
    current_app.logger.warn(u"""[client {ip} {host}] {http} "{method} {path}" {status}
    Request:   {method} {path}
    Version:   {http}
    Status:    {status}
    Url:       {url}
    IP:        {ip}
    Hostname:  {host}
    Agent:     {agent_platform} | {agent_browser} | {agent_browser_version}
    Raw Agent: {agent}
    """.format(method=request.method,
               path=request.path,
               url=request.url,
               ip=request.remote_addr,
               host=name if name is not None else '?',
               agent_platform=request.user_agent.platform,
               agent_browser=request.user_agent.browser,
               agent_browser_version=request.user_agent.version,
               agent=request.user_agent.string,
               http=request.environ.get('SERVER_PROTOCOL'),
               status=response.status))

    return response
views.py 文件源码 项目:heutagogy-backend 作者: heutagogy 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get(self):
        url = request.args.get('url')
        tags = request.args.getlist('tag')

        filters = [db.Bookmark.user == current_user.id]
        if url is not None:
            filters.append(db.Bookmark.url == urldefrag(url).url)
        filters.append(db.Bookmark.tags.contains(tags))
        result = db.Bookmark.query.filter(*filters) \
                                  .order_by(
                                      db.Bookmark.read.desc().nullsfirst(),
                                      db.Bookmark.timestamp.desc()) \
                                  .paginate()
        headers = {}
        links = []
        if result.has_next:
            last_url = update_query(request.url, {'page': result.pages})
            links.append(lh.Link(last_url, rel='last'))

        if links:
            headers['Link'] = lh.format_links(links)
        return list(map(lambda x: x.to_dict(), result.items)), 200, headers
views.py 文件源码 项目:bibtex-browser 作者: frapac 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def add_entry():
    """Add a new entry to the bibliography."""
    form = BiblioForm()
    if form.validate_on_submit():
        bib_entry = BiblioEntry(ID=form.ID.data,
                                ENTRYTYPE=form.typ.data,
                                authors=form.author.data,
                                title=form.title.data,
                                year=form.year.data,
                                school="",
                                publisher="",
                                keywords=form.keywords.data,
                                url=form.url.data,
                                journal=form.journal.data)

        db.session.add(bib_entry)
        user = current_user.name
        event = Event(author=user, article=form.ID.data,
                      event="ADD", time=time.time())
        db.session.add(event)
        db.session.commit()
        return redirect("/biblio")
    return redirect("/biblio")
run.py 文件源码 项目:perseids-manifold 作者: RDACollectionsWG 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def apidocs():
    url = urlparse(request.url)
    if ":" in url.netloc:
        host, port = url.netloc.split(":")
    else:
        host = url.netloc
        port = "80"
    base_path = url.path.replace('/apidocs','') if url.path != "/apidocs" else "/"
    schemes = [url.scheme]
    other_scheme = "https" if url.scheme is "http" else "http"
    try:
        if request.get(other_scheme+"://"+url.netloc+url.path.replace('/apidocs','')+"/scheme").status_code is 200:
            schemes += [other_scheme]
    except:
        pass
    r = make_response(swagger.json(schemes, host, port, base_path))
    r.mimetype = 'application/json'
    return r
    # return send_from_directory("www","swagger.json")
apiserver.py 文件源码 项目:studio 作者: studioml 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def tensorboard(logdir):
    port = _tensorboard_dirs.get(logdir)
    if not port:

        sock = socket.socket(socket.AF_INET)
        sock.bind(('', 0))
        port = sock.getsockname()[1]
        sock.close()

        subprocess.Popen([
            'tensorboard',
            '--logdir=' + logdir,
            '--port=' + str(port)])
        time.sleep(5)  # wait for tensorboard to spin up
        _tensorboard_dirs[logdir] = port

    redirect_url = 'http://{}:{}'.format(
        six.moves.urllib.parse.urlparse(request.url).hostname,
        port)

    logger.debug('Redirecting to ' + redirect_url)
    return redirect(redirect_url)
error_handlers.py 文件源码 项目:marvin 作者: sdss 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def make_error_page(app, name, code, sentry=None, data=None, exception=None):
    ''' creates the error page dictionary for web errors '''
    shortname = name.lower().replace(' ', '_')
    error = {}
    error['title'] = 'Marvin | {0}'.format(name)
    error['page'] = request.url
    error['event_id'] = g.get('sentry_event_id', None)
    error['data'] = data
    error['name'] = name
    error['code'] = code
    error['message'] = exception.description if exception and hasattr(exception, 'description') else None
    if app.config['USE_SENTRY'] and sentry:
        error['public_dsn'] = sentry.client.get_public_dsn('https')
    app.logger.error('{0} Exception {1}'.format(name, error))
    return render_template('errors/{0}.html'.format(shortname), **error), code

# ----------------
# Error Handling
# ----------------
app.py 文件源码 项目:ugc.aggregator 作者: Dreamcatcher-GIS 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def weibo_nearbytimeline_wrapper():
    try:
        data = r.get(request.url)
        if data is None:
            data = weibo_service.get_all_weibo_nearby_async(
                request.args["lat"],
                request.args["lng"],
                int(request.args["starttime"]),
                int(request.args["endtime"]),
                int(request.args["range"])
            )
            data = json.dumps(weibo_service.nearby_weibo_statis_wrapper(data))
            r.set(request.url, data)
        return data
    except:
        traceback.print_exc()
        abort(404)
app.py 文件源码 项目:ugc.aggregator 作者: Dreamcatcher-GIS 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_user_flow_to_html():
    try:
        # data = None
        data = r.get(request.url)
        if data is None:
            if "ring_str" in request.args:
                ring_str = request.args["ring_str"]
            else:
                ring_str = None
            data = hotel_data_service.get_user_flow_to_html(
                    request.args["hotel_name"],
                    request.args["baseinfo_id"].encode("utf-8"),
                    int(request.args["page"]),
                    ring_str=ring_str
                )
            data = json.dumps(data)
            r.set(request.url, data)
        return data
    except:
        traceback.print_exc()
        abort(404)
app.py 文件源码 项目:ugc.aggregator 作者: Dreamcatcher-GIS 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def query_floorstate():
    try:
        result = r.get(request.url)
        #result = None
        if result is None:
            result = pms_service.query_floorstate(
                request.args['floornum'].encode('utf-8'),
                request.args['time'].encode('utf-8')
            )
            result = json.dumps(result,ensure_ascii=False)
            #print result
            r.set(request.url,result)
        return result
    except Exception,e:
        print e
        traceback.print_exc()
        abort(404)
oauth2_app.py 文件源码 项目:python-mautic 作者: divio 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def callback():
    """ Step 3: Retrieving an access token.

    The user has been redirected back from the provider to your registered
    callback URL. With this redirection comes an authorization code included
    in the redirect URL. We will use that to obtain an access token.
    """

    mautic = OAuth2Session(client_id, redirect_uri=redirect_uri,
                           state=session['oauth_state'])
    token = mautic.fetch_token(token_url, client_secret=client_secret, authorization_response=request.url)

    # We use the session as a simple DB for this example.
    session['oauth_token'] = token
    update_token_tempfile(token)  # store token in /tmp/mautic_creds.json

    return redirect(url_for('.menu'))
extension.py 文件源码 项目:flask-openapi 作者: remcohaszing 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def paths(self):
        """
        :class:`dict`: The top level :swagger:`pathsObject`.

        """
        paths = {}
        for rule in self.app.url_map.iter_rules():
            if rule.endpoint == 'static':
                continue
            log.info('Processing %r', rule)
            url, parameters = parse_werkzeug_url(rule.rule)
            paths.setdefault(url, {})
            if parameters:
                paths[url]['parameters'] = parameters
            for method in rule.methods:
                if method in ('HEAD', 'OPTIONS'):
                    # XXX Do we want to process these?
                    continue
                paths[url][method.lower()] = self._process_rule(rule)
        return paths
extension.py 文件源码 项目:flask-openapi 作者: remcohaszing 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def deprecated(self, fn):
        """
        Mark an operation as deprecated.

        This will be exposed through the OpenAPI operation object.
        Additionally a warning will be emitted when the API is used.
        This can be configured using the ``OPENAPI_WARN_DEPRECATED``
        configuration option. This must be one of ``warn`` or ``log``.

        See :swagger:`operationDeprecated`.

        """
        fn.deprecated = True

        @functools.wraps(fn)
        def call_deprecated(*args, **kwargs):
            method = self._config('warn_deprecated')
            log_args = request.method, request.url
            if method == 'warn':
                warnings.warn(_DEPRECATION_MESSAGE % log_args,
                              DeprecationWarning)
            else:
                log.warning(_DEPRECATION_MESSAGE, *log_args)
            return fn(*args, **kwargs)
        return call_deprecated
views.py 文件源码 项目:YoCoolSpotify 作者: shashanksurya 项目源码 文件源码 阅读 62 收藏 0 点赞 0 评论 0
def getCurrentSpotifyPlaylistList():
    ret_list = []
    access_token = getSpotifyToken()
    if access_token:
        sp = spy.Spotify(access_token)
        results = sp.current_user()
        uri = 'spotify:user:12120746446:playlist:6ZK4Tz0ZsZuJBYyDZqlbGt'
        username = uri.split(":")[2]
        playlist_id = uri.split(":")[4]
        results = sp.user_playlist(username, playlist_id)
        for i, t in enumerate(results['tracks']['items']):
            temp = {}
            temp['img_url'] = t['track']['album']['images'][2]['url']
            temp['name'] = t['track']['name']
            temp['artist'] = t['track']['artists'][0]['name']
            temp['album'] = t['track']['album']['name']
            duration = int(t['track']['duration_ms']) / (1000 * 60)
            temp['duration'] = "{:2.2f}".format(duration)
            ret_list.append(temp)

    return ret_list
views.py 文件源码 项目:YoCoolSpotify 作者: shashanksurya 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def getCurrentYoutubePlaylistList():
    #keep dicts of videos in ret_list
    ret_list = []

    youtube = initYoutube()
    playlistitems_list_request = youtube.playlistItems().list(
                playlistId=constants.YOUTUBE_PLAYLIST_ID,
                part="snippet,contentDetails",
            maxResults=30
            )

    playlistitems_list_response = playlistitems_list_request.execute()
    #print(playlistitems_list_response)
    for playlist_item in playlistitems_list_response['items']:
        video_dict = {}
        #print(playlist_item)
        video_dict['name'] = playlist_item['snippet']['title']
        video_dict['img_url'] = playlist_item['snippet']['thumbnails']['default']['url']
        video_dict['artist'] = "--"
        video_dict['album'] = "--"
        video_dict['duration'] = "--:--"
        ret_list.append(video_dict)
    return ret_list


问题


面经


文章

微信
公众号

扫码关注公众号