def _request(request, request_fallback=None):
''' Extract request fields wherever they may come from: GET, POST, forms, fallback '''
# Use lambdas to avoid evaluating bottle.request.* which may throw an Error
all_dicts = [
lambda: request.json,
lambda: request.forms,
lambda: request.query,
lambda: request.files,
#lambda: request.POST,
lambda: request_fallback
]
request_dict = dict()
for req_dict_ in all_dicts:
try:
req_dict = req_dict_()
except KeyError:
continue
if req_dict is not None and hasattr(req_dict, 'items'):
for req_key, req_val in req_dict.items():
request_dict[req_key] = req_val
return request_dict
python类query()的实例源码
def dtos_get_films_with_actors():
try:
queryObject = QueryObject(
filter = "ReleaseYear='{0}'".format(request.query['releaseYear']),
expand = ['FilmActors.Actor', 'FilmCategories']
)
resultSerialData = dataService.dataViewDto.getItems("Film", queryObject)
return json.dumps(resultSerialData, cls=CustomEncoder, indent=2)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# POST: POST: api/datasource/crud/operations/dtos/TestAction
# with: Content-Type: application/json and body - {"param1":1}
def entities_get_films_with_actors():
try:
queryObject = QueryObject(
filter = "ReleaseYear='{0}'".format(request.query['releaseYear']),
expand = ['FilmActors.Actor', 'FilmCategories']
)
resultSerialData = dataService.from_.remote.dtoView.Films.getItems(queryObject)
return json.dumps(resultSerialData, cls=CustomEncoder, indent=2)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# POST: api/datasource/crud/operations/entities/TestAction
# with: Content-Type: application/json and body - {"param1":1}
def _handle_cert(self, domain):
rawcert = request.environ['ssl_certificate']
certconfig = self._get_cert_config_if_allowed(domain, rawcert)
logger.debug('Fetching certificate for domain %s', domain)
(key, crt, chain) = self.acmeproxy.get_cert(
domain=domain,
altname=certconfig.altname,
rekey=certconfig.rekey,
renew_margin=certconfig.renew_margin,
force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'), # pylint: disable=unsupported-membership-test,unsubscriptable-object
auto_renew=certconfig.renew_on_fetch
)
return {
'crt': crt.decode(),
'key': key.decode(),
'chain': chain.decode()
}
def assertRedirect(self, target, result, query=None, status=303, **args):
env = {'SERVER_PROTOCOL':'HTTP/1.1'}
for key in list(args):
if key.startswith('wsgi'):
args[key.replace('_', '.', 1)] = args[key]
del args[key]
env.update(args)
request.bind(env)
bottle.response.bind()
try:
bottle.redirect(target, **(query or {}))
except bottle.HTTPResponse:
r = _e()
self.assertEqual(status, r.status_code)
self.assertTrue(r.headers)
self.assertEqual(result, r.headers['Location'])
def _filter_row(row, filters):
def filterfunc(row, filt):
val = filters[filt]['val']
if filt == "tracks":
tracks_set = set(val.split('|'))
return row['trackrel'] in tracks_set
elif filt.endswith(" (min)"):
filt = filt.replace(" (min)", "")
val = float(val)
return row[filt] >= val
elif filt.endswith(" (max)"):
filt = filt.replace(" (max)", "")
val = float(val)
return row[filt] <= val
elif isinstance(row[filt], bool):
# need to convert our string query values to bool for the comparison
return row[filt] == bool(val)
else:
return row[filt] == val
return all(filterfunc(row, filt) for filt in filters)
def download():
trackrels = request.query.tracks.split('|')
# write the archive into a temporary in-memory file-like object
temp = tempfile.SpooledTemporaryFile()
with zipfile.ZipFile(temp, 'w', zipfile.ZIP_DEFLATED) as archive:
for trackrel in trackrels:
base_wildcard = trackrel.replace("-track.csv", "*")
paths = config.TRACKDIR.glob(base_wildcard)
for path in paths:
archive.write(str(path),
str(path.relative_to(config.TRACKDIR))
)
temp.seek(0)
# force a download; give it a filename and mime type
response.set_header('Content-Disposition', 'attachment; filename="data.zip"')
response.set_header('Content-Type', 'application/zip')
# relying on garbage collector to delete tempfile object
# (and hence the file itself) when done sending
return temp
def create(controller="", method=""):
return exec_command(controller=controller, method=method, input=request.query)
def _extract_params(request_dict, param_list, param_fallback=False):
''' Extract pddb parameters from request '''
if not param_list or not request_dict:
return dict()
query = dict()
for param in param_list:
# Retrieve all items in the form of {param: value} and
# convert {param__key: value} into {param: {key: value}}
for query_key, query_value in request_dict.items():
if param == query_key:
query[param] = query_value
else:
query_key_parts = query_key.split('__', 1)
if param == query_key_parts[0]:
query[param] = {query_key_parts[1]: query_value}
# Convert special string "__null__" into Python None
nullifier = lambda d: {k:(nullifier(v) if isinstance(v, dict) else # pylint: disable=used-before-assignment
(None if v == '__null__' else v)) for k, v in d.items()}
# When fallback is enabled and no parameter matched, assume query refers to first parameter
if param_fallback and all([param_key not in query.keys() for param_key in param_list]):
query = {param_list[0]: dict(request_dict)}
# Return a dictionary with only the requested parameters
return {k:v for k, v in nullifier(query).items() if k in param_list}
def get_search():
try:
q = request.query["q"]
except KeyError:
return []
else:
results = graph.run(
"MATCH (movie:Movie) "
"WHERE movie.title =~ {title} "
"RETURN movie", {"title": "(?i).*" + q + ".*"})
response.content_type = "application/json"
return json.dumps([{"movie": dict(row["movie"])} for row in results])
def get_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleGet(entitySetName, request.query, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder, indent=2)
except:
abort(400, 'Bad Request')
# GET: api/datasource/crud/single/:entitySetName?keys=key1:{key1}
def get_single_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleGetSingle(entitySetName, request.query, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder, indent=2)
except:
abort(400, 'Bad Request')
# GET: api/datasource/crud/many/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def get_many_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleGetMany(entitySetName, request.query, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder, indent=2)
except:
abort(400, 'Bad Request')
# PUT: api/datasource/crud/:entitySetName?keys=key1:{key1}
def put_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# PATCH: api/datasource/crud/:entitySetName?keys=key1:{key1}
#@patch('/api/datasource/crud/<entitySetName>')
def delete_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleDeleteEntity(entitySetName, request.query, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# PUT: api/datasource/crud/batch/:entitySetName
def delete_batch_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.query, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
def _handle_renew_all(self):
rawcert = request.environ['ssl_certificate']
self._assert_admin(rawcert)
result = {
'ok': [],
'error': []
}
for cert in self.acmeproxy.list_certificates():
domain = cert['cn']
certconfig = self.certificates_config.match(domain)
if certconfig:
logger.debug('Getting certificate for domain %s', domain)
try:
self.acmeproxy.get_cert(
domain=domain,
altname=certconfig.altname,
rekey=certconfig.rekey,
renew_margin=certconfig.renew_margin,
force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'), # pylint: disable=unsupported-membership-test,unsubscriptable-object
)
result['ok'].append(domain)
except Exception as e:
logger.error('Encountered exception while getting certificate for domain %s (%s)', domain, e)
result['error'].append(domain)
else:
logger.error('No configuration found for domain %s', domain)
result['error'].append(domain)
return result
def info(account_id, resource_id):
request_data = request.query
if resource_id.startswith('sg-') and 'parent_id' not in request_data:
abort(400, "Missing required parameter parent_id")
result = controller.info(
account_id, resource_id, request_data.get('parent_id', resource_id))
response.content_type = "application/json"
return json.dumps(result, indent=2, cls=Encoder)
# this set to post to restrict permissions, perhaps another url space.
def test_get(self):
""" Environ: GET data """
qs = tonat(tob('a=a&a=1&b=b&c=c&cn=%e7%93%b6'), 'latin1')
request = BaseRequest({'QUERY_STRING':qs})
self.assertTrue('a' in request.query)
self.assertTrue('b' in request.query)
self.assertEqual(['a','1'], request.query.getall('a'))
self.assertEqual(['b'], request.query.getall('b'))
self.assertEqual('1', request.query['a'])
self.assertEqual('b', request.query['b'])
self.assertEqual(tonat(tob('?'), 'latin1'), request.query['cn'])
self.assertEqual(touni('?'), request.query.cn)
def tracks(db):
selected_filters = {}
for filt in request.query.keys():
val = request.query.get(filt)
if val == '':
# unspecified min/max numeric values, e.g.
continue
# figure out query string without this selection (for backing out in web interface)
querystring_without = '&'.join("{}={}".format(key, val) for key, val in request.query.items() if key != filt)
# store for showing in page
selected_filters[filt] = {
'val': val,
'querystring_without': querystring_without
}
track_data = _get_track_data(db, selected_filters)
# possible filter params/values for selected rows
possible_filters = _get_filters(track_data, selected_filters) if track_data else []
return template('tracks',
tracks=track_data,
filters=possible_filters,
selected=selected_filters,
query_string=request.query_string,
query=request.query
)
def jsonp(view):
def wrapped(*posargs, **kwargs):
args = {}
# if we access the args via get(),
# we can get encoding errors...
for k in request.forms:
args[k] = getattr(request.forms, k)
for k in request.query:
args[k] = getattr(request.query, k)
callback = args.get('callback')
status_code = 200
try:
result = view(args, *posargs, **kwargs)
except (KeyError) as e:#ValueError, AttributeError, KeyError) as e:
import traceback, sys
traceback.print_exc(file=sys.stdout)
result = {'status':'error',
'message':'invalid query',
'details': str(e)}
status_code = 403
if callback:
result = '%s(%s);' % (callback, json.dumps(result))
if status_code == 200:
return result
else:
abort(status_code, result)
return wrapped
def run_profiling(self, num_loops, num_neighbors, age_proximity):
"""Executes the k_nearest_neighbors algorithm for num_loops times and returns the average running time
Args:
num_loops: number of loops for which we query the server
num_neighbors: number of neighbors to query for
age_proximity: maximum difference between a candidate neighbor's age and the user
Returns:
"""
print('profiling over ', num_loops, ' times')
random_latitudes = random.uniform(-90, 90, num_loops)
random_longitudes = random.uniform(-180, 180, num_loops)
time_list = []
for i in tqdm(range(len(random_latitudes))):
start_time = time.clock()
kd_store.k_nearest_neighbors({'name': 'bla bla', 'age': 23, 'latitude': random_latitudes[i] / 2,
'longitude': random_longitudes[i]}, num_neighbors, age_proximity)
end_time = time.clock()
time_list.append(end_time - start_time)
# get the timing statistics
stats_desc = stats.describe(time_list)
frac_times_exceeded = len(np.where(np.array(time_list) >= 1)[0]) / len(time_list)
print('\nfraction of times with delay > 1 is: ', frac_times_exceeded, '\n')
print('\nStats:\n', stats_desc)
return stats_desc
def login_xenforo(user=None):
if user: return redirect('/')
code = request.query['code']
data = {'grant_type': 'authorization_code',
'code': code,
'client_id': config['XF_CLIENT_ID'],
'client_secret': config['XF_SECRET_KEY'],
'redirect_uri': '%s/login/xenforo' % config['SITE_URL']}
res = requests.post('%s/api/index.php?oauth/token' % config['XF_URL'], data = data)
res_data = res.json()
res.raise_for_status()
pprint(res_data)
user = {
'identity': res_data['user_id'],
'identity_type': 'xenforo',
'access_token': res_data['access_token'],
'refresh_token': res_data['refresh_token'],
'user_id': res_data['user_id'],
'authlink': str(uuid.uuid4()), # this is still used to unsubscribe from emails
'subscribed': True,
}
# get extended information
res = requests.get('%s/api/?users/%s&oauth_token=%s' % (config['XF_URL'], res_data['user_id'], user['access_token']))
xf_user = res.json()['user']
user['xf_username'] = xf_user['username']
user['email'] = xf_user['user_email']
r.table('users').get(user['identity']).replace(user).run(conn())
return users.login(user)
def test_list_environments(self):
parsed = json.loads(api.environments_list(self.session))
for env in parsed['environments']:
self.assertTrue(env['deploy_authorized'])
request.account = self.session.query(m.User).filter_by(username="username").one()
parsed = json.loads(api.environments_list(self.session))
for env in parsed['environments']:
if env['id'] == 1:
self.assertTrue(env['deploy_authorized'])
else:
self.assertFalse(env['deploy_authorized'])
def test_start_deployment_non_admin(self, mocked):
request.account = self.session.query(m.User).filter(m.User.username == 'username').one()
self._set_body_json({
'target': {
'cluster': None,
'server': None
},
'branch': 'master',
'commit': 'abcde'
})
api.environments_start_deployment(1, self.session)
def test_start_deployment_with_impersonating(self, mocked):
request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one()
request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username'
self._set_body_json({
'target': {
'cluster': None,
'server': None
},
'branch': 'master',
'commit': 'abcde'
})
api.environments_start_deployment(1, self.session)
def test_start_deployment_with_impersonating_unprivilegied_user(self, mocked):
request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one()
request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username'
self._set_body_json({
'target': {
'cluster': None,
'server': None
},
'branch': 'master',
'commit': 'abcde'
})
with self.assertRaises(HTTPError) as cm:
api.environments_start_deployment(2, self.session)
self.assertEquals(403, cm.exception.code)
def test_expunge_default_user(self):
user = self.session.query(m.User).filter_by(username="default").one()
api._expunge_user(user, self.session)
self.session.close()
# Make sure we can access attributes after the session is closed
self.assertEqual(1, len(user.roles))
self.assertEqual(0, len(user.default_roles))
def test_diff_requires_login(self):
# The default user should not be able to read diff
user = self.session.query(m.User).filter_by(username="default").one()
request.account = user
request.query['from'] = 'ab'
request.query['to'] = 'de'
with self.assertRaises(HTTPError) as e:
api.repositories_diff(1, self.session)
self.assertEqual(403, e.exception.status_code)
def test_list_deployments_requires_read(self):
user = self.session.query(m.User).filter_by(username="default").one()
request.account = user
deploys = json.loads(api.deployments_list("my repo", self.session))['deployments']
self.assertEqual(0, len(deploys))
user = self.session.query(m.User).filter_by(username="root").one()
request.account = user
deploys = json.loads(api.deployments_list("my repo", self.session))['deployments']
self.assertGreater(len(deploys), 0)