def login_required(perm=None):
def _dec(func):
def _view(*args, **kwargs):
s = request.environ.get('beaker.session')
if s.get("name", None) and s.get("authenticated", False):
if perm:
perms = parse_permissions(s)
if perm not in perms or not perms[perm]:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return HTTPError(403, "Forbidden")
else:
return redirect("/nopermission")
return func(*args, **kwargs)
else:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return HTTPError(403, "Forbidden")
else:
return redirect("/login")
return _view
return _dec
python类request()的实例源码
def translate(self):
"""
Processes a translation request.
"""
translation_request = request_provider(self._style, request)
logging.debug("REQUEST - " + repr(translation_request))
translations = self._translator.translate(
translation_request.segments,
translation_request.settings
)
response_data = {
'status': TranslationResponse.STATUS_OK,
'segments': [translation.target_words for translation in translations],
'word_alignments': [translation.get_alignment_json(as_string=False) for translation in translations] if translation_request.settings.get_alignment else None,
'word_probabilities': [translation.target_probs for translation in translations] if translation_request.settings.get_word_probs else None,
}
translation_response = response_provider(self._style, **response_data)
logging.debug("RESPONSE - " + repr(translation_response))
response.content_type = translation_response.get_content_type()
return repr(translation_response)
def code_verify(request):
image_str = request.forms.get("code")
typ = int(request.forms.get('type'))
filename = os.path.join(
curpath,
'temp',
'%s.png'%(
md5.md5(
str(time.time()) + str(random.random())
).hexdigest()
)
)
with open(filename, 'wb') as f:
f.write(image_str)
img = Image.open(filename)
exec '''from main import main_%s'''%(typ)
exec '''code = main_%s.verify(img, code_template_dic[typ])'''%(typ)
#exec '''code = main_%s.verify(img)'''%(typ)
if hasattr(img, 'close'):
img.close()
try:
print os.remove(filename)
except Exception as e:
print u"?????? %s"%e
return code
def prometheus_metrics():
if not test_restricted(request['REMOTE_ADDR']):
return ''
dhcpstat = {'shared-networks': []}
dhcp6stat = {'shared-networks': []}
try:
dhcpstat = json.loads(exec_command([args.binary, '-c', args.dhcp4_config, '-l', args.dhcp4_leases, '-f', 'j']))
except:
pass
try:
dhcp6stat = json.loads(exec_command([args.binary, '-c', args.dhcp6_config, '-l', args.dhcp6_leases, '-f', 'j']))
except:
pass
data = []
for shared_network in dhcpstat['shared-networks']:
data.append('dhcp_pool_used{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],shared_network['used']))
data.append('dhcp_pool_free{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],shared_network['free']))
defined_leases = float(shared_network['defined'])
leases_used_percentage = 0
if defined_leases > 0:
leases_used_percentage = float(shared_network['used'])/defined_leases
data.append('dhcp_pool_usage{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],leases_used_percentage))
for shared_network in dhcp6stat['shared-networks']:
data.append('dhcp_pool_used{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],shared_network['used']))
data.append('dhcp_pool_free{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],shared_network['free']))
defined_leases = float(shared_network['defined'])
leases_used_percentage = 0
if defined_leases > 0:
leases_used_percentage = float(shared_network['used'])/defined_leases
data.append('dhcp_pool_usage{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],leases_used_percentage))
response.content_type = 'text/plain'
return '%s\n' % ('\n'.join(data))
def auth_server():
username = request.forms.get('username')
if username == 'user_ok':
return {'access_token': 'my-nifty-access-token'}
elif username == 'auth_fail':
response.status = 400
return {'error': 'errored with HTTP 400 on request'}
elif username == 'create_fail':
return {'access_token': 'the-token-with-which-create-will-fail'}
elif username == 'delete_fail':
return {'access_token': 'the-token-with-which-delete-will-fail'}
elif username == 'empty_page':
return {'access_token': 'the-token-which-causes-an-empty-page'}
else:
return {}
def create():
auth_token = request.headers.get('Authorization').split()[1]
if auth_token == 'my-nifty-access-token':
return {'response': 'Successful creation of user.'}
elif auth_token == 'the-token-with-which-create-will-fail':
response.status = 403
return {'error': 'Permission denied'}
elif auth_token == 'the-token-which-causes-an-empty-page':
response.status = 403
return "empty"
def delete():
auth_token = request.headers.get('Authorization').split()[1]
if auth_token == 'my-nifty-access-token':
return {'response': 'Successful deletion of user.'}
elif auth_token == 'the-token-with-which-delete-will-fail':
response.status = 403
return {'error': 'Permission denied'}
def load_client(context):
"""Get an instance of a loaded client."""
username = context.getTransformSetting('username')
api_key = context.getTransformSetting('aKey')
test_status = context.getTransformSetting('test_local')
if test_status and test_status == 'True':
server = context.getTransformSetting('server')
version = context.getTransformSetting('version')
return AttributeRequest(username, api_key, server, version)
else:
return AttributeRequest(username, api_key, headers=gen_debug(request))
def load_client(context):
"""Get an instance of a loaded client."""
username = context.getTransformSetting('username')
api_key = context.getTransformSetting('aKey')
test_status = context.getTransformSetting('test_local')
if test_status and test_status == 'True':
server = context.getTransformSetting('server')
version = context.getTransformSetting('version')
return DnsRequest(username, api_key, server, version)
else:
return DnsRequest(username, api_key, headers=gen_debug(request))
def load_enrichment(context):
"""Get an instance of a loaded client."""
username = context.getTransformSetting('username')
api_key = context.getTransformSetting('aKey')
test_status = context.getTransformSetting('test_local')
if test_status and test_status == 'True':
server = context.getTransformSetting('server')
version = context.getTransformSetting('version')
return EnrichmentRequest(username, api_key, server, version, debug=True)
else:
return EnrichmentRequest(username, api_key, headers=gen_debug(request))
def load_client(context):
"""Get an instance of a loaded client."""
username = context.getTransformSetting('username')
api_key = context.getTransformSetting('aKey')
test_status = context.getTransformSetting('test_local')
if test_status and test_status == 'True':
server = context.getTransformSetting('server')
version = context.getTransformSetting('version')
return SslRequest(username, api_key, server, version)
else:
return SslRequest(username, api_key, headers=gen_debug(request))
def load_client(context):
"""Get an instance of a loaded client."""
username = context.getTransformSetting('username')
api_key = context.getTransformSetting('aKey')
test_status = context.getTransformSetting('test_local')
if test_status and test_status == 'True':
server = context.getTransformSetting('server')
version = context.getTransformSetting('version')
return ActionsClient(username, api_key, server, version)
else:
return ActionsClient(username, api_key, headers=gen_debug(request))
def load_client(context):
"""Get an instance of a loaded client."""
username = context.getTransformSetting('username')
api_key = context.getTransformSetting('aKey')
test_status = context.getTransformSetting('test_local')
if test_status and test_status == 'True':
server = context.getTransformSetting('server')
version = context.getTransformSetting('version')
return EnrichmentRequest(username, api_key, server, version)
else:
return EnrichmentRequest(username, api_key, headers=gen_debug(request))
def add_organization_html(ending):
"""Submit an organization for scraping.
This shows an html page with a link to the status of the organization.
"""
organization = request.forms.get('organization')
scraper.scrape_organization(organization)
if ending == "json":
return {"status": "ok", "urls": api.get_organization_urls(organization)}
return template("added-organization.html", organization=organization)
def get_all_organizations(ending):
"""List all organizations with links to their statuses."""
start = int(request.query.get("offset", 0))
count = min(int(request.query.get("limit", DEFAULT_PAGINATION_COUNT)), MAX_PAGINATION_COUNT)
if ending == "json":
return api.get_organizations(start, count)
return template("organizations.html", start=start, count=count)
def add_repository(ending):
"""Sumbit a repository for scraping
This shows an html page with a link to the status of the repository.
"""
repository = request.forms.get('repository')
organization_name, repository_name = repository.split("/")
scraper.scrape_repository(repository)
if ending == "json":
return {"status": "ok", "urls": api.get_repository_urls(organization_name, repository_name)}
return template("added-repository.html", repository=repository)
def add_authentication():
"""Add `username` and `password` to those usable to scrape github.
They will be tried and removed if invalid.
"""
# http://bottlepy.org/docs/dev/tutorial.html#http-request-methods
username = request.forms.get('username')
password = request.forms.get('password')
if check_login((username, password)):
credentials.add((username, password))
return static("add-github-authentication-success.html")
return static("add-github-authentication-failure.html")
def __enter__(self):
self.orig = bottle.request.environ
bottle.request.environ = self.environ
for k,v in self.extras.items():
if hasattr(bottle.request, k):
self.extra_orig[k] = getattr(bottle.request, k)
setattr(bottle.request, k, v)
setattr(bottle.BaseRequest, 'app', True)
def __exit__(self,a,b,c):
bottle.request.environ = self.orig
for k,v in self.extras.items():
if k in self.extra_orig:
setattr(bottle.request, k, self.extra_orig[k])
else:
try:
delattr(bottle.request, k)
except AttributeError:
pass
setattr(bottle.BaseRequest, 'app', self.orig_app_reader)
def testParams(self):
with boddle(params={'name':'derek'}):
self.assertEqual(bottle.request.params['name'], 'derek')
def test_no_params_no_throw(self):
with boddle():
self.assertEqual(list(bottle.request.params.items()), [])
def testGetParams(self):
with boddle(method='get', params={'name':'derek'}):
self.assertEqual(bottle.request.params['name'], 'derek')
def testPath(self):
with boddle(path='/derek'):
self.assertEqual(bottle.request.path, '/derek')
with boddle(path='/anderson'):
self.assertEqual(bottle.request.path, '/anderson')
self.assertEqual(bottle.request.path, '/derek')
def testMethod(self):
with boddle(method='post'):
self.assertEqual(bottle.request.method, 'POST')
def testHeaders(self):
with boddle(headers={'x_header':'value'}):
self.assertEqual(bottle.request.headers['X_HEADER'], 'value')
def testHyphenatedHeaders(self):
with boddle(headers={'x-header':'value'}):
self.assertEqual(bottle.request.headers['X-HEADER'], 'value')
def testExtraStuff(self):
with boddle(extra='woot'):
self.assertEqual(bottle.request.extra, 'woot')
with boddle(extra='woot2'):
self.assertEqual(bottle.request.extra, 'woot2')
self.assertFalse(hasattr(bottle.request,'extra'))
def testURL(self):
with boddle(url='https://github.com/keredson/boddle'):
self.assertEqual(bottle.request.url, 'https://github.com/keredson/boddle')
self.assertEqual(bottle.request.fullpath, '/keredson/boddle')
self.assertEqual(bottle.request.path, '/keredson/boddle')
def testRevert(self):
with boddle(params={'name':'derek'}):
self.assertEqual(bottle.request.params['name'], 'derek')
with boddle(params={'name':'anderson'}):
self.assertEqual(bottle.request.params['name'], 'anderson')
self.assertEqual(bottle.request.params['name'], 'derek')
def testBody(self):
with boddle(body='body'):
self.assertEqual(bottle.request.body.read(), b'body')
self.assertEqual(bottle.request.body.readline(), b'body')