def test_listcall(self):
a = List((1, 2, 3))
self.assertEqual(a(1), 2)
self.assertEqual(a(-1), 3)
self.assertEqual(a(-5), None)
self.assertEqual(a(-5, default='x'), 'x')
self.assertEqual(a(-3, cast=str), '1')
a.append('1234')
self.assertEqual(a(3), '1234')
self.assertEqual(a(3, cast=int), 1234)
a.append('x')
self.assertRaises(HTTP, a, 4, cast=int)
b = List()
# default is always returned when especified
self.assertEqual(b(0, cast=int, default=None), None)
self.assertEqual(b(0, cast=int, default=None, otherwise='teste'), None)
self.assertEqual(b(0, cast=int, default='a', otherwise='teste'), 'a')
# if don't have value and otherwise is especified it will called
self.assertEqual(b(0, otherwise=lambda: 'something'), 'something')
self.assertEqual(b(0, cast=int, otherwise=lambda: 'something'),
'something')
# except if default is especified
self.assertEqual(b(0, default=0, otherwise=lambda: 'something'), 0)
python类HTTP的实例源码
def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request, s, other_application)
return r
def test_listcall(self):
a = List((1, 2, 3))
self.assertEqual(a(1), 2)
self.assertEqual(a(-1), 3)
self.assertEqual(a(-5), None)
self.assertEqual(a(-5, default='x'), 'x')
self.assertEqual(a(-3, cast=str), '1')
a.append('1234')
self.assertEqual(a(3), '1234')
self.assertEqual(a(3, cast=int), 1234)
a.append('x')
self.assertRaises(HTTP, a, 4, cast=int)
b = List()
# default is always returned when especified
self.assertEqual(b(0, cast=int, default=None), None)
self.assertEqual(b(0, cast=int, default=None, otherwise='teste'), None)
self.assertEqual(b(0, cast=int, default='a', otherwise='teste'), 'a')
# if don't have value and otherwise is especified it will called
self.assertEqual(b(0, otherwise=lambda: 'something'), 'something')
self.assertEqual(b(0, cast=int, otherwise=lambda: 'something'),
'something')
# except if default is especified
self.assertEqual(b(0, default=0, otherwise=lambda: 'something'), 0)
def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request, s, other_application)
return r
def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request, s, other_application)
return r
def test_listcall(self):
a = List((1, 2, 3))
self.assertEqual(a(1), 2)
self.assertEqual(a(-1), 3)
self.assertEqual(a(-5), None)
self.assertEqual(a(-5, default='x'), 'x')
self.assertEqual(a(-3, cast=str), '1')
a.append('1234')
self.assertEqual(a(3), '1234')
self.assertEqual(a(3, cast=int), 1234)
a.append('x')
self.assertRaises(HTTP, a, 4, cast=int)
b = List()
# default is always returned when especified
self.assertEqual(b(0, cast=int, default=None), None)
self.assertEqual(b(0, cast=int, default=None, otherwise='teste'), None)
self.assertEqual(b(0, cast=int, default='a', otherwise='teste'), 'a')
# if don't have value and otherwise is especified it will called
self.assertEqual(b(0, otherwise=lambda: 'something'), 'something')
self.assertEqual(b(0, cast=int, otherwise=lambda: 'something'),
'something')
# except if default is especified
self.assertEqual(b(0, default=0, otherwise=lambda: 'something'), 0)
def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request, s, other_application)
return r
def check_credentials(request, other_application='admin',
expiration=60 * 60, gae_login=True):
"""Checks that user is authorized to access other_application"""
if request.env.web2py_runtime_gae:
from google.appengine.api import users
if users.is_current_user_admin():
return True
elif gae_login:
login_html = '<a href="%s">Sign in with your google account</a>.' \
% users.create_login_url(request.env.path_info)
raise HTTP(200, '<html><body>%s</body></html>' % login_html)
else:
return False
else:
t0 = time.time()
dt = t0 - expiration
s = get_session(request, other_application)
r = (s.authorized and s.last_time and s.last_time > dt)
if r:
s.last_time = t0
set_session(request,s,other_application)
return r
def __init__(self, trafficdb, source, destination):
http.HTTP.__init__(self, trafficdb, source, destination)
sslproto.SSLProtocol.__init__(self, trafficdb, source, destination)
self.friendly_name = "HTTPS"
self.serverPort = 443
self.name = "HTTPS"
self.log.debug("HTTPS: Initializing")
self.supports = {malloryevt.STARTS2C:True, malloryevt.STARTC2S:True,
malloryevt.CSAFTERSS:True, malloryevt.SSCREATE:True}
def callback(self):
if self.keyword in self.request.vars:
field = self.fields[0]
if is_gae:
rows = self.db(field.__ge__(self.request.vars[self.keyword]) & field.__lt__(self.request.vars[self.keyword] + u'\ufffd')).select(orderby=self.orderby, limitby=self.limitby, *(self.fields+self.help_fields))
else:
rows = self.db(field.like(self.request.vars[self.keyword] + '%')).select(orderby=self.orderby, limitby=self.limitby, distinct=self.distinct, *(self.fields+self.help_fields))
if rows:
if self.is_reference:
id_field = self.fields[1]
if self.help_fields:
options = [OPTION(
self.help_string % dict([(h.name, s[h.name]) for h in self.fields[:1] + self.help_fields]),
_value=s[id_field.name], _selected=(k == 0)) for k, s in enumerate(rows)]
else:
options = [OPTION(
s[field.name], _value=s[id_field.name],
_selected=(k == 0)) for k, s in enumerate(rows)]
raise HTTP(
200, SELECT(_id=self.keyword, _class='autocomplete',
_size=len(rows), _multiple=(len(rows) == 1),
*options).xml())
else:
raise HTTP(
200, SELECT(_id=self.keyword, _class='autocomplete',
_size=len(rows), _multiple=(len(rows) == 1),
*[OPTION(s[field.name],
_selected=(k == 0))
for k, s in enumerate(rows)]).xml())
else:
raise HTTP(200, '')
def retrieve(self, name, path=None, nameonly=False):
"""
if nameonly==True return (filename, fullfilename) instead of
(filename, stream)
"""
self_uploadfield = self.uploadfield
if self.custom_retrieve:
return self.custom_retrieve(name, path)
import http
if self.authorize or isinstance(self_uploadfield, str):
row = self.db(self == name).select().first()
if not row:
raise http.HTTP(404)
if self.authorize and not self.authorize(row):
raise http.HTTP(403)
m = REGEX_UPLOAD_PATTERN.match(name)
if not m or not self.isattachment:
raise TypeError('Can\'t retrieve %s' % name)
file_properties = self.retrieve_file_properties(name,path)
filename = file_properties['filename']
if isinstance(self_uploadfield, str): # ## if file is in DB
stream = StringIO.StringIO(row[self_uploadfield] or '')
elif isinstance(self_uploadfield,Field):
blob_uploadfield_name = self_uploadfield.uploadfield
query = self_uploadfield == name
data = self_uploadfield.table(query)[blob_uploadfield_name]
stream = StringIO.StringIO(data)
elif self.uploadfs:
# ## if file is on pyfilesystem
stream = self.uploadfs.open(name, 'rb')
else:
# ## if file is on regular filesystem
# this is intentially a sting with filename and not a stream
# this propagates and allows stream_file_or_304_or_206 to be called
fullname = pjoin(file_properties['path'],name)
if nameonly:
return (filename, fullname)
stream = open(fullname,'rb')
return (filename, stream)
def __call__(self, i, default=DEFAULT, cast=None, otherwise=None):
"""
request.args(0,default=0,cast=int,otherwise='http://error_url')
request.args(0,default=0,cast=int,otherwise=lambda:...)
"""
n = len(self)
if 0 <= i < n or -n <= i < 0:
value = self[i]
elif default is DEFAULT:
value = None
else:
value, cast = default, False
if cast:
try:
value = cast(value)
except (ValueError, TypeError):
from http import HTTP, redirect
if otherwise is None:
raise HTTP(404)
elif isinstance(otherwise, str):
redirect(otherwise)
elif callable(otherwise):
return otherwise()
else:
raise RuntimeError("invalid otherwise")
return value
def restricted(code, environment=None, layer='Unknown'):
"""
Runs code in environment and returns the output. If an exception occurs
in code it raises a RestrictedError containing the traceback. Layer is
passed to RestrictedError to identify where the error occurred.
"""
if environment is None:
environment = {}
environment['__file__'] = layer
environment['__name__'] = '__restricted__'
try:
if isinstance(code, types.CodeType):
ccode = code
else:
ccode = compile2(code, layer)
exec ccode in environment
except HTTP:
raise
except RestrictedError:
# do not encapsulate (obfuscate) the original RestrictedError
raise
except Exception, error:
# extract the exception type and value (used as output message)
etype, evalue, tb = sys.exc_info()
# XXX Show exception in Wing IDE if running in debugger
if __debug__ and 'WINGDB_ACTIVE' in os.environ:
sys.excepthook(etype, evalue, tb)
output = "%s %s" % (etype, evalue)
raise RestrictedError(layer, code, output, environment)
def retrieve(self, name, path=None, nameonly=False):
"""
If `nameonly==True` return (filename, fullfilename) instead of
(filename, stream)
"""
self_uploadfield = self.uploadfield
if self.custom_retrieve:
return self.custom_retrieve(name, path)
import http
if self.authorize or isinstance(self_uploadfield, str):
row = self.db(self == name).select().first()
if not row:
raise http.HTTP(404)
if self.authorize and not self.authorize(row):
raise http.HTTP(403)
file_properties = self.retrieve_file_properties(name, path)
filename = file_properties['filename']
if isinstance(self_uploadfield, str): # ## if file is in DB
stream = StringIO.StringIO(row[self_uploadfield] or '')
elif isinstance(self_uploadfield, Field):
blob_uploadfield_name = self_uploadfield.uploadfield
query = self_uploadfield == name
data = self_uploadfield.table(query)[blob_uploadfield_name]
stream = StringIO.StringIO(data)
elif self.uploadfs:
# ## if file is on pyfilesystem
stream = self.uploadfs.open(name, 'rb')
else:
# ## if file is on regular filesystem
# this is intentially a sting with filename and not a stream
# this propagates and allows stream_file_or_304_or_206 to be called
fullname = pjoin(file_properties['path'], name)
if nameonly:
return (filename, fullname)
stream = open(fullname, 'rb')
return (filename, stream)
def __call__(self, i, default=DEFAULT, cast=None, otherwise=None):
"""Allows to use a special syntax for fast-check of
`request.args()` validity.
:params:
i: index
default: use this value if arg not found
cast: type cast
otherwise:
will be executed when:
- casts fail
- value not found, dont have default and otherwise is
especified
can be:
- None: results in a 404
- str: redirect to this address
- callable: calls the function (nothing is passed)
Example:
You can use::
request.args(0,default=0,cast=int,otherwise='http://error_url')
request.args(0,default=0,cast=int,otherwise=lambda:...)
"""
n = len(self)
if 0 <= i < n or -n <= i < 0:
value = self[i]
elif default is DEFAULT:
value = None
else:
value, cast, otherwise = default, False, False
try:
if cast:
value = cast(value)
if not value and otherwise:
raise ValueError('Otherwise will raised.')
except (ValueError, TypeError):
from http import HTTP, redirect
if otherwise is None:
raise HTTP(404)
elif isinstance(otherwise, str):
redirect(otherwise)
elif callable(otherwise):
return otherwise()
else:
raise RuntimeError("invalid otherwise")
return value
def __call__(self, i, default=DEFAULT, cast=None, otherwise=None):
"""Allows to use a special syntax for fast-check of
`request.args()` validity.
:params:
i: index
default: use this value if arg not found
cast: type cast
otherwise:
will be executed when:
- casts fail
- value not found, dont have default and otherwise is
especified
can be:
- None: results in a 404
- str: redirect to this address
- callable: calls the function (nothing is passed)
Example:
You can use::
request.args(0,default=0,cast=int,otherwise='http://error_url')
request.args(0,default=0,cast=int,otherwise=lambda:...)
"""
n = len(self)
if 0 <= i < n or -n <= i < 0:
value = self[i]
elif default is DEFAULT:
value = None
else:
value, cast, otherwise = default, False, False
try:
if cast:
value = cast(value)
if not value and otherwise:
raise ValueError('Otherwise will raised.')
except (ValueError, TypeError):
from http import HTTP, redirect
if otherwise is None:
raise HTTP(404)
elif isinstance(otherwise, str):
redirect(otherwise)
elif callable(otherwise):
return otherwise()
else:
raise RuntimeError("invalid otherwise")
return value
def __call__(self, i, default=DEFAULT, cast=None, otherwise=None):
"""Allows to use a special syntax for fast-check of
`request.args()` validity.
:params:
i: index
default: use this value if arg not found
cast: type cast
otherwise:
will be executed when:
- casts fail
- value not found, dont have default and otherwise is
especified
can be:
- None: results in a 404
- str: redirect to this address
- callable: calls the function (nothing is passed)
Example:
You can use::
request.args(0,default=0,cast=int,otherwise='http://error_url')
request.args(0,default=0,cast=int,otherwise=lambda:...)
"""
n = len(self)
if 0 <= i < n or -n <= i < 0:
value = self[i]
elif default is DEFAULT:
value = None
else:
value, cast, otherwise = default, False, False
try:
if cast:
value = cast(value)
if not value and otherwise:
raise ValueError('Otherwise will raised.')
except (ValueError, TypeError):
from http import HTTP, redirect
if otherwise is None:
raise HTTP(404)
elif isinstance(otherwise, str):
redirect(otherwise)
elif callable(otherwise):
return otherwise()
else:
raise RuntimeError("invalid otherwise")
return value
def __call__(self, i, default=DEFAULT, cast=None, otherwise=None):
"""Allows to use a special syntax for fast-check of
`request.args()` validity.
:params:
i: index
default: use this value if arg not found
cast: type cast
otherwise:
will be executed when:
- casts fail
- value not found, dont have default and otherwise is
especified
can be:
- None: results in a 404
- str: redirect to this address
- callable: calls the function (nothing is passed)
Example:
You can use::
request.args(0,default=0,cast=int,otherwise='http://error_url')
request.args(0,default=0,cast=int,otherwise=lambda:...)
"""
n = len(self)
if 0 <= i < n or -n <= i < 0:
value = self[i]
elif default is DEFAULT:
value = None
else:
value, cast, otherwise = default, False, False
try:
if cast:
value = cast(value)
if not value and otherwise:
raise ValueError('Otherwise will raised.')
except (ValueError, TypeError):
from http import HTTP, redirect
if otherwise is None:
raise HTTP(404)
elif isinstance(otherwise, str):
redirect(otherwise)
elif callable(otherwise):
return otherwise()
else:
raise RuntimeError("invalid otherwise")
return value
def __call__(self, i, default=DEFAULT, cast=None, otherwise=None):
"""Allows to use a special syntax for fast-check of `request.args()`
validity
Args:
i: index
default: use this value if arg not found
cast: type cast
otherwise: can be:
- None: results in a 404
- str: redirect to this address
- callable: calls the function (nothing is passed)
Example:
You can use::
request.args(0,default=0,cast=int,otherwise='http://error_url')
request.args(0,default=0,cast=int,otherwise=lambda:...)
"""
n = len(self)
if 0 <= i < n or -n <= i < 0:
value = self[i]
elif default is DEFAULT:
value = None
else:
value, cast = default, False
if cast:
try:
value = cast(value)
except (ValueError, TypeError):
from http import HTTP, redirect
if otherwise is None:
raise HTTP(404)
elif isinstance(otherwise, str):
redirect(otherwise)
elif callable(otherwise):
return otherwise()
else:
raise RuntimeError("invalid otherwise")
return value