def get_proxy_request_url(thisAuth, thisContainer=None, thisObject=None):
"""
create the url under which this API proxy will reach its swift back end. basically this is the request url with a different hostname
:param thisAuth:
:param thisContainer:
:param thisObject:
:return:
"""
u = configuration.swift_store_url.format(thisAuth)
if thisContainer:
u += "/" + thisContainer
if thisObject:
u += "/" + thisObject
return u
##############################################################################
# Frontend Pool
##############################################################################
python类url()的实例源码
def handle_auth():
"""
Forward the auth request to swift
replace the given storage url with our own:
'X-Storage-Url': 'http://192.168.209.204:8080/v1/AUTH_test'
becomes
'X-Storage-Url': 'http://localhost:4000/v1/AUTH_test'
this is the first request any client makes; we passed on an auth-token from swift
which is used in further requests
:return:
"""
clientHeaders = request.headers
swiftStatus, swiftHeaders, swiftBody = httpBackend.doAuthGetToken(reqHead=clientHeaders, method="GET")
log.debug("swift response: {} {} {}".format(swiftStatus, swiftHeaders, swiftBody))
if 200 == swiftStatus:
replaceStorageUrl(swiftResponse=swiftHeaders)
log.debug("proxy response: {} {} {}".format(swiftStatus, swiftHeaders, swiftBody))
return Response(status=swiftStatus, headers=swiftHeaders, response=swiftBody)
def __init__(self, **kwargs):
"""Initialises a new ``Self`` link instance. Accepts the same
Keyword Arguments as :class:`.Link`.
Additional Keyword Args:
external (bool): if true, force link to be fully-qualified URL, defaults to False
See Also:
:class:`.Link`
"""
url = request.url
external = kwargs.get('external', False)
if not external and current_app.config['SERVER_NAME'] is None:
url = request.url.replace(request.host_url, '/')
return super(Self, self).__init__('self', url, **kwargs)
def feed():
searches = query(
'''
SELECT * FROM searches
WHERE published IS NOT NULL
ORDER BY id DESC
''', json=True)
site_url = 'http://' + app.config['HOSTNAME']
feed_url = site_url + '/feed/'
def add_url(s):
s['url'] = site_url + '/summary/' + s['date_path'] + '/'
return s
searches = map(_date_format, searches)
searches = list(map(add_url, searches))
resp = make_response(
render_template(
'feed.xml',
updated=searches[0]['created'],
site_url=site_url,
feed_url=feed_url,
searches=searches
)
)
resp.headers['Content-Type'] = 'application/atom+xml'
return resp
def parse_post(post, external_links=False, create_html=True):
with open(os.path.join(BLOG_CONTENT_DIR, post)) as handle:
raw = handle.read()
frontmatter, content = REGEX_SPLIT_FRONTMATTER.split(raw, 2)
data = yaml.load(frontmatter)
y, m, d, slug = post[:-3].split('-', maxsplit=3)
if create_html:
data['html'] = markdown.markdown(content, extensions=[
'markdown.extensions.extra',
'markdown.extensions.codehilite',
'markdown.extensions.toc'
])
data['url'] = url_for('blog_post', y=y, m=m, d=d, slug=slug,
_external=external_links)
data['reading_time'] = reading_time(content)
return data
def get_feed():
from mhn.common.clio import Clio
from mhn.auth import current_user
authfeed = mhn.config['FEED_AUTH_REQUIRED']
if authfeed and not current_user.is_authenticated():
abort(404)
feed = AtomFeed('MHN HpFeeds Report', feed_url=request.url,
url=request.url_root)
sessions = Clio().session.get(options={'limit': 1000})
for s in sessions:
feedtext = u'Sensor "{identifier}" '
feedtext += '{source_ip}:{source_port} on sensorip:{destination_port}.'
feedtext = feedtext.format(**s.to_dict())
feed.add('Feed', feedtext, content_type='text',
published=s.timestamp, updated=s.timestamp,
url=makeurl(url_for('api.get_session', session_id=str(s._id))))
return feed
def after_request_log(response):
name = dns_resolve(request.remote_addr)
current_app.logger.warn(u"""[client {ip} {host}] {http} "{method} {path}" {status}
Request: {method} {path}
Version: {http}
Status: {status}
Url: {url}
IP: {ip}
Hostname: {host}
Agent: {agent_platform} | {agent_browser} | {agent_browser_version}
Raw Agent: {agent}
""".format(method=request.method,
path=request.path,
url=request.url,
ip=request.remote_addr,
host=name if name is not None else '?',
agent_platform=request.user_agent.platform,
agent_browser=request.user_agent.browser,
agent_browser_version=request.user_agent.version,
agent=request.user_agent.string,
http=request.environ.get('SERVER_PROTOCOL'),
status=response.status))
return response
def get(self):
url = request.args.get('url')
tags = request.args.getlist('tag')
filters = [db.Bookmark.user == current_user.id]
if url is not None:
filters.append(db.Bookmark.url == urldefrag(url).url)
filters.append(db.Bookmark.tags.contains(tags))
result = db.Bookmark.query.filter(*filters) \
.order_by(
db.Bookmark.read.desc().nullsfirst(),
db.Bookmark.timestamp.desc()) \
.paginate()
headers = {}
links = []
if result.has_next:
last_url = update_query(request.url, {'page': result.pages})
links.append(lh.Link(last_url, rel='last'))
if links:
headers['Link'] = lh.format_links(links)
return list(map(lambda x: x.to_dict(), result.items)), 200, headers
def add_entry():
"""Add a new entry to the bibliography."""
form = BiblioForm()
if form.validate_on_submit():
bib_entry = BiblioEntry(ID=form.ID.data,
ENTRYTYPE=form.typ.data,
authors=form.author.data,
title=form.title.data,
year=form.year.data,
school="",
publisher="",
keywords=form.keywords.data,
url=form.url.data,
journal=form.journal.data)
db.session.add(bib_entry)
user = current_user.name
event = Event(author=user, article=form.ID.data,
event="ADD", time=time.time())
db.session.add(event)
db.session.commit()
return redirect("/biblio")
return redirect("/biblio")
def apidocs():
url = urlparse(request.url)
if ":" in url.netloc:
host, port = url.netloc.split(":")
else:
host = url.netloc
port = "80"
base_path = url.path.replace('/apidocs','') if url.path != "/apidocs" else "/"
schemes = [url.scheme]
other_scheme = "https" if url.scheme is "http" else "http"
try:
if request.get(other_scheme+"://"+url.netloc+url.path.replace('/apidocs','')+"/scheme").status_code is 200:
schemes += [other_scheme]
except:
pass
r = make_response(swagger.json(schemes, host, port, base_path))
r.mimetype = 'application/json'
return r
# return send_from_directory("www","swagger.json")
def tensorboard(logdir):
port = _tensorboard_dirs.get(logdir)
if not port:
sock = socket.socket(socket.AF_INET)
sock.bind(('', 0))
port = sock.getsockname()[1]
sock.close()
subprocess.Popen([
'tensorboard',
'--logdir=' + logdir,
'--port=' + str(port)])
time.sleep(5) # wait for tensorboard to spin up
_tensorboard_dirs[logdir] = port
redirect_url = 'http://{}:{}'.format(
six.moves.urllib.parse.urlparse(request.url).hostname,
port)
logger.debug('Redirecting to ' + redirect_url)
return redirect(redirect_url)
def make_error_page(app, name, code, sentry=None, data=None, exception=None):
''' creates the error page dictionary for web errors '''
shortname = name.lower().replace(' ', '_')
error = {}
error['title'] = 'Marvin | {0}'.format(name)
error['page'] = request.url
error['event_id'] = g.get('sentry_event_id', None)
error['data'] = data
error['name'] = name
error['code'] = code
error['message'] = exception.description if exception and hasattr(exception, 'description') else None
if app.config['USE_SENTRY'] and sentry:
error['public_dsn'] = sentry.client.get_public_dsn('https')
app.logger.error('{0} Exception {1}'.format(name, error))
return render_template('errors/{0}.html'.format(shortname), **error), code
# ----------------
# Error Handling
# ----------------
def weibo_nearbytimeline_wrapper():
try:
data = r.get(request.url)
if data is None:
data = weibo_service.get_all_weibo_nearby_async(
request.args["lat"],
request.args["lng"],
int(request.args["starttime"]),
int(request.args["endtime"]),
int(request.args["range"])
)
data = json.dumps(weibo_service.nearby_weibo_statis_wrapper(data))
r.set(request.url, data)
return data
except:
traceback.print_exc()
abort(404)
def get_user_flow_to_html():
try:
# data = None
data = r.get(request.url)
if data is None:
if "ring_str" in request.args:
ring_str = request.args["ring_str"]
else:
ring_str = None
data = hotel_data_service.get_user_flow_to_html(
request.args["hotel_name"],
request.args["baseinfo_id"].encode("utf-8"),
int(request.args["page"]),
ring_str=ring_str
)
data = json.dumps(data)
r.set(request.url, data)
return data
except:
traceback.print_exc()
abort(404)
def query_floorstate():
try:
result = r.get(request.url)
#result = None
if result is None:
result = pms_service.query_floorstate(
request.args['floornum'].encode('utf-8'),
request.args['time'].encode('utf-8')
)
result = json.dumps(result,ensure_ascii=False)
#print result
r.set(request.url,result)
return result
except Exception,e:
print e
traceback.print_exc()
abort(404)
def callback():
""" Step 3: Retrieving an access token.
The user has been redirected back from the provider to your registered
callback URL. With this redirection comes an authorization code included
in the redirect URL. We will use that to obtain an access token.
"""
mautic = OAuth2Session(client_id, redirect_uri=redirect_uri,
state=session['oauth_state'])
token = mautic.fetch_token(token_url, client_secret=client_secret, authorization_response=request.url)
# We use the session as a simple DB for this example.
session['oauth_token'] = token
update_token_tempfile(token) # store token in /tmp/mautic_creds.json
return redirect(url_for('.menu'))
def paths(self):
"""
:class:`dict`: The top level :swagger:`pathsObject`.
"""
paths = {}
for rule in self.app.url_map.iter_rules():
if rule.endpoint == 'static':
continue
log.info('Processing %r', rule)
url, parameters = parse_werkzeug_url(rule.rule)
paths.setdefault(url, {})
if parameters:
paths[url]['parameters'] = parameters
for method in rule.methods:
if method in ('HEAD', 'OPTIONS'):
# XXX Do we want to process these?
continue
paths[url][method.lower()] = self._process_rule(rule)
return paths
def deprecated(self, fn):
"""
Mark an operation as deprecated.
This will be exposed through the OpenAPI operation object.
Additionally a warning will be emitted when the API is used.
This can be configured using the ``OPENAPI_WARN_DEPRECATED``
configuration option. This must be one of ``warn`` or ``log``.
See :swagger:`operationDeprecated`.
"""
fn.deprecated = True
@functools.wraps(fn)
def call_deprecated(*args, **kwargs):
method = self._config('warn_deprecated')
log_args = request.method, request.url
if method == 'warn':
warnings.warn(_DEPRECATION_MESSAGE % log_args,
DeprecationWarning)
else:
log.warning(_DEPRECATION_MESSAGE, *log_args)
return fn(*args, **kwargs)
return call_deprecated
def getCurrentSpotifyPlaylistList():
ret_list = []
access_token = getSpotifyToken()
if access_token:
sp = spy.Spotify(access_token)
results = sp.current_user()
uri = 'spotify:user:12120746446:playlist:6ZK4Tz0ZsZuJBYyDZqlbGt'
username = uri.split(":")[2]
playlist_id = uri.split(":")[4]
results = sp.user_playlist(username, playlist_id)
for i, t in enumerate(results['tracks']['items']):
temp = {}
temp['img_url'] = t['track']['album']['images'][2]['url']
temp['name'] = t['track']['name']
temp['artist'] = t['track']['artists'][0]['name']
temp['album'] = t['track']['album']['name']
duration = int(t['track']['duration_ms']) / (1000 * 60)
temp['duration'] = "{:2.2f}".format(duration)
ret_list.append(temp)
return ret_list
def getCurrentYoutubePlaylistList():
#keep dicts of videos in ret_list
ret_list = []
youtube = initYoutube()
playlistitems_list_request = youtube.playlistItems().list(
playlistId=constants.YOUTUBE_PLAYLIST_ID,
part="snippet,contentDetails",
maxResults=30
)
playlistitems_list_response = playlistitems_list_request.execute()
#print(playlistitems_list_response)
for playlist_item in playlistitems_list_response['items']:
video_dict = {}
#print(playlist_item)
video_dict['name'] = playlist_item['snippet']['title']
video_dict['img_url'] = playlist_item['snippet']['thumbnails']['default']['url']
video_dict['artist'] = "--"
video_dict['album'] = "--"
video_dict['duration'] = "--:--"
ret_list.append(video_dict)
return ret_list