def init_audit(log_group):
def audit(f):
@functools.wraps(f)
def handle(account_id, *args, **kw):
envelope = {
'timestamp': int(time.time() * 1000),
'message': json.dumps({
'user': request.environ.get('REMOTE_USER', ''),
'url': request.url,
'path': request.path,
'method': request.method,
'pid': os.getpid(),
'account_id': account_id,
'ip': request.remote_addr})
}
transport.send_group("%s=%s" % (log_group, account_id), [envelope])
return f(account_id, *args, **kw)
return handle
return audit
python类url()的实例源码
def test_url(self):
""" Environ: URL building """
request = BaseRequest({'HTTP_HOST':'example.com'})
self.assertEqual('http://example.com/', request.url)
request = BaseRequest({'SERVER_NAME':'example.com'})
self.assertEqual('http://example.com/', request.url)
request = BaseRequest({'SERVER_NAME':'example.com', 'SERVER_PORT':'81'})
self.assertEqual('http://example.com:81/', request.url)
request = BaseRequest({'wsgi.url_scheme':'https', 'SERVER_NAME':'example.com'})
self.assertEqual('https://example.com/', request.url)
request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/path',
'QUERY_STRING':'1=b&c=d', 'SCRIPT_NAME':'/sp'})
self.assertEqual('http://example.com/sp/path?1=b&c=d', request.url)
request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/pa th',
'SCRIPT_NAME':'/s p'})
self.assertEqual('http://example.com/s%20p/pa%20th', request.url)
def callback():
""" Step 3: Retrieving an access token.
The user has been redirected back from the provider to your registered
callback URL. With this redirection comes an authorization code included
in the redirect URL. We will use that to obtain an access token.
NOTE: your server name must be correctly configured in order for this to
work, do this by adding the headers at your http layer, in particular:
X_FORWARDED_HOST, X_FORWARDED_PROTO so that bottle can render the correct
url and links for you.
"""
oauth2session = OAuth2Session(settings.SLACK_OAUTH['client_id'],
state=request.GET['state'])
#PII - update privacy policy if oauth2 token is stored.
token = oauth2session.fetch_token(
settings.SLACK_OAUTH['token_url'],
client_secret=settings.SLACK_OAUTH['client_secret'],
authorization_response=request.url
)
# we don't need the token, we just need the user to have installed the app
# in the future, if we need tokens, we'll get them.
redirect('/?added_to_slack=true')
def mmrz():
username = request.get_cookie('username')
password = request.get_cookie('password')
password = urllib.unquote(password) if password else None
if not verify_login(username, password):
redirect('/')
# need_https = "localhost" not in request.url
need_https = False
return_dict = dict(universal_ROUTE_dict)
return_dict.update(dict(need_https=need_https))
return return_dict
def dict_short():
query = request.urlparts.query
url = '/dictionary?{0}'.format(query) if query else '/dictionary'
redirect(url)
def pre_processor():
s = request.environ.get('beaker.session')
user = parse_userdata(s)
perms = parse_permissions(s)
status = {}
captcha = False
update = False
plugins = False
if user["is_authenticated"]:
status = PYLOAD.statusServer()
info = PYLOAD.getInfoByPlugin("UpdateManager")
captcha = PYLOAD.isCaptchaWaiting()
# check if update check is available
if info:
if info["pyload"] == "True": update = True
if info["plugins"] == "True": plugins = True
return {"user": user,
'status': status,
'captcha': captcha,
'perms': perms,
'url': request.url,
'update': update,
'plugins': plugins}
def info(account_id, resource_id):
request_data = request.query
if resource_id.startswith('sg-') and 'parent_id' not in request_data:
abort(400, "Missing required parameter parent_id")
result = controller.info(
account_id, resource_id, request_data.get('parent_id', resource_id))
response.content_type = "application/json"
return json.dumps(result, indent=2, cls=Encoder)
# this set to post to restrict permissions, perhaps another url space.
def api_url():
parsed = request.urlparts
url = "%s://%s%s" % (parsed.scheme, parsed.netloc, request.script_name)
return url
def error(e):
response.content_type = "application/json"
return json.dumps({
"status": e.status,
"url": repr(request.url),
"exception": repr(e.exception),
# "traceback": e.traceback and e.traceback.split('\n') or '',
"body": repr(e.body)
}, indent=2)
def dispatch(url):
"""
This class is the beginning of all entrypoints in the Ray API. Here, each url
will be redirect to the right handler
"""
url = bottle_req.path
log.info('request: %s', bottle_req.url)
if url[-1] == '/':
url = url[:-1]
response_code = 200
try:
processed = process(url, bottle_req, bottle_resp)
try:
from_func, http_status = processed[0], processed[1]
bottle_resp.status = http_status
return from_func
except:
return processed
except exceptions.RayException as e:
log.exception('ray exception: ')
response_code = e.http_code
except:
log.exception('exception:')
raise
bottle_resp.status = response_code
def __handle_action(url):
# url e.g: /api/user/123/action
arg = None
if len(url.split('/')) >= 5: # indicate that has an id between endpoint and action_name
arg = http.param_at(url, -2)
return Action(url, arg, bottle_req).process_action()
def query_hujiang(key_word):
if not key_word:
return []
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1',
'Accept-Encoding': 'gzip, deflate, sdch',
}
url = "https://m.hujiang.com/d/dict_jp_api.ashx?type=jc&w={0}".format(urllib.quote(key_word))
proxies = {}
response = requests.get(url, headers=headers, verify=False, proxies=proxies)
try:
defines = response.json()
except:
return []
for i in range(len(defines)):
Comment = defines[i]["Comment"]
comments = re.findall("<br/>([^a-zA-Z]+)<br/>", Comment)
tmp = ", ".join(comments)
# ??tmp??, ??????????
# tmp = Comment if not tmp else tmp
# ???????????
# mch = re.search(u"(?.+??)", tmp)
# tmp = mch.group(1) if mch else tmp
# ??????
tmp = re.sub(u"\?.+?\?", "", tmp)
tmp = re.sub(u"\(.+?\)", "", tmp)
tmp = re.sub(u"?+?", "", tmp)
defines[i]["Comment"] = tmp
defines[i]["PronounceJp"] = re.sub("\[|\]", "", defines[i]["PronounceJp"])
return defines
### static files
def get_hujiang_tts():
key_word = request.params.get('key_word', None)
job_id = request.params.get('job_id', None)
if not key_word:
return "key_word is null"
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.89 Safari/537.36',
'Accept-Encoding': 'gzip, deflate, sdch',
}
url = "http://dict.hjenglish.com/jp/jc/" + urllib.quote(key_word)
req = urllib2.Request(url, None, headers)
response = urllib2.urlopen(req)
compressedData = response.read()
compressedStream = StringIO.StringIO(compressedData)
gzipper = gzip.GzipFile(fileobj=compressedStream)
html = gzipper.read()
soup = BeautifulSoup(html, "html.parser")
ret_info = {
"found": False,
"message_str": "",
"tts_url": "",
"job_id": job_id,
}
jpSound_list = soup.select('span[class=jpSound]')
if len(jpSound_list) < 1:
ret_info["found"] = False
ret_info["message_str"] = "jpSound not found"
return json.dumps(ret_info)
jpSound = str(jpSound_list[0])
mc = re.search("GetTTSVoice\(\"(.*?)\"\)", jpSound)
if not mc:
ret_info["found"] = False
ret_info["message_str"] = "tts_url not found"
return json.dumps(ret_info)
tts_url = mc.group(1)
ret_info["found"] = True
ret_info["message_str"] = "tts_url is found"
ret_info["tts_url"] = tts_url
return json.dumps(ret_info)