def auth_token(request, extra_headers=None):
headers = []
if 'origin' in request.headers:
headers.extend([
(_ac + 'Allow-Origin', request.headers['origin']),
(_ac + 'Allow-Credentials', 'true'),
(_ac + 'Expose-Headers', 'Location, Content-Type, Content-Length'),
])
if extra_headers:
headers.extend(extra_headers)
user_id = authenticated_userid(request)
payload = {
'consumerKey': 'assembl', 'userId': (user_id or Everyone), 'ttl': 86400
}
token = encode_token(payload, request.registry.settings['session.secret'])
return Response(token, 200, headers, content_type='text/plain', charset="ascii")
python类Response()的实例源码
def test_register_view_returns_response():
"""Register view returns a Response object."""
from mood_bot.views.default import register
request = testing.DummyRequest()
response = register(request)
assert isinstance(response, dict)
# def test_register_user_for_login(dummy_request):
# """Test that checks for user login."""
# from mood_bot.views.default import register
# from pyramid.httpexceptions import HTTPFound
# data_dict = {'username': 'kurtykurt', 'password': 'kurtkurt', 'password-check': 'kurtkurt'}
# dummy_request.POST = data_dict
# response = register(dummy_request)
# assert response.status_code == 302
# assert isinstance(response, HTTPFound)
def authorize_post_(request):
form = request.web_input(credentials='', username='', password='', remember_me='', mobile='', not_me='')
try:
credentials = json.loads(form.credentials)
except ValueError:
raise HTTPBadRequest()
scopes = credentials.pop('scopes')
error = None
if form.not_me and form.username:
userid, error = login.authenticate_bcrypt(form.username, form.password, bool(form.remember_me))
if error:
error = errorcode.login_errors.get(error, 'Unknown error.')
elif not request.userid:
error = "You must specify a username and password."
else:
userid = request.userid
if error:
return Response(render_form(request, scopes, credentials, bool(form.mobile), error,
form.username, form.password, bool(form.remember_me),
bool(form.not_me)))
credentials['userid'] = userid
response_attrs = server.create_authorization_response(
*(extract_params(request) + (scopes, credentials)))
return OAuthResponse(*response_attrs)
def search_(request):
form = request.web_input(q="", min="", max="", currency="", pc="", c="", o="")
limit = 30
offset = define.get_int(form.o)
commishclass = form.pc if form.pc else form.c
commishclass = commishclass.lower()
results = commishinfo.select_commissionable(request.userid,
form.q,
commishclass,
commishinfo.parse_currency(form.min),
commishinfo.parse_currency(form.max),
form.currency,
offset,
limit * 2,)
rcount = len(results)
results = results[0:limit]
media.populate_with_user_media(results)
prev_index = None if offset == 0 else offset - limit if offset - limit > 0 else 0
next_index = offset + limit if rcount - limit > 0 else None
return Response(define.webpage(request.userid, "etc/marketplace.html",
[results, form, commishinfo.CURRENCY_CHARMAP, commishinfo.PRESET_COMMISSION_CLASSES,
prev_index, next_index]))
def hello_world(request):
return Response(
'Hello world from Pyramid!\n',
content_type='text/plain',
)
def bookmarklet_wrapper(request, endpoint):
""" Return text of the SciBot bookmarklet """
code = bookmarklet_base % (request.application_url.replace('http:', 'https:'), endpoint)
bookmarklet = code.replace('"', '"').replace('\n','')
html = html_base % (bookmarklet, request.host.split('.', 1)[-1], code)
r = Response(html)
r.content_type = 'text/html'
return r
def export(request):
print('starting csv export')
output_rows, DATE = export_impl()
data = StringIO()
writer = csv.writer(data)
writer.writerows(sorted(output_rows))
r = Response(gzip.compress(data.getvalue().encode()))
r.content_type = 'text/csv'
r.headers.update({
'Content-Disposition':'attachment;filename = RRID-data-%s.csv' % DATE,
'Content-Encoding':'gzip'
})
return r
def export_json(request):
print('starting json export')
output_json, DATE = export_json_impl()
data = json.dumps(output_json, sort_keys=True, indent=4)
r = Response(gzip.compress(data.encode()))
r.content_type = 'application/json'
r.headers.update({
'Content-Encoding':'gzip'
})
return r
def get_ticket_resolutions(self):
"""returns the available ticket resolutions
"""
from stalker import defaults
from pyramid.response import Response
return Response(
json_body=defaults['ticket_resolutions']
)
def get_ticket_workflow(self):
"""returns the available ticket workflow
"""
from stalker import defaults
from pyramid.response import Response
return Response(
json_body=defaults['ticket_workflow']
)
# ACTIONS
def get_references(self):
"""returns the references of this project
"""
sql = """select
rse.id,
rse.name,
rse.entity_type
from "Project_References" as pr
join "SimpleEntities" as rse on pr.link_id = rse.id
where pr.project_id = :id
union
select
rse.id,
rse.name,
rse.entity_type
from "Task_References" as tr
join "Tasks" as t on tr.task_id = t.id
join "SimpleEntities" as rse on tr.link_id = rse.id
where t.project_id = :id
"""
from stalker.db.session import DBSession
from sqlalchemy import text
conn = DBSession.connection()
result = conn.execute(text(sql), id=self.entity_id).fetchall()
from stalker_pyramid import entity_type_to_url
project_ref_data = [
{
'id': r[0],
'name': r[1],
'entity_type': r[2],
'$ref': '%s/%s' % (entity_type_to_url[r[2]], r[0])
} for r in result
]
from pyramid.response import Response
return Response(json_body=project_ref_data, status=200)
def studio_scheduling_mode(request):
"""Sets the system to "in schedule" mode or "normal" mode. When the system
is "in schedule" mode (Studio.is_scheduling == True) it is not allowed to
schedule the system again until the previous one is finishes.
"""
logged_in_user = get_logged_in_user(request)
# get the studio
studio = Studio.query.first()
mode = request.params.get('mode')
logger.debug('schedule mode: %s' % mode)
if not studio:
transaction.abort()
return Response("There is no Studio instance\n"
"Please create a studio first", 500)
if mode: # set the mode
mode = bool(int(mode))
studio.is_scheduling = mode
studio.is_scheduling_by = logged_in_user
studio.scheduling_started_at = local_to_utc(datetime.datetime.now())
return Response(
"Successfully, set the scheduling mode to: %s" % mode
)
def update_entity(self):
"""update user view
"""
# before updating the login and email, check availability
availability_data = self.check_availability()
login_name = self.request.params.get('login')
if login_name:
login_available = availability_data['login_available']
if not login_available:
from pyramid.response import Response
return Response(
'Login not available: %s' % login_name,
status=500
)
email = self.request.params.get('email')
if email:
email_available = availability_data['email_available']
if not email_available:
from pyramid.response import Response
return Response(
'Email not available: %s' % email,
status=500
)
# update super data
try:
return super(UserViews, self).update_entity()
except Exception as e:
import transaction
transaction.abort()
from pyramid.response import Response
return Response(
body=str(e),
status=500
)
def get_vacations(self):
"""returns user vacations
"""
sql = """
select
"Vacations".id,
"SimpleEntities".name,
"SimpleEntities".entity_type
from "Vacations"
join "SimpleEntities" on "Vacations".id = "SimpleEntities".id
where "Vacations".user_id = :id
"""
from stalker.db.session import DBSession
conn = DBSession.connection()
from sqlalchemy import text
result = conn.execute(text(sql), id=self.entity_id)
from stalker_pyramid import entity_type_to_url
data = [{
'id': r[0],
'$ref': '%s/%s' % (entity_type_to_url[r[2]], r[0]),
'name': r[1],
'entity_type': r[2]
} for r in result.fetchall()]
from pyramid.response import Response
return Response(
json_body=data,
status=200
)
# User <-> Task
def server_error(exc, request):
msg = exc.args[0] if exc.args else ''
response = Response('Server Error: %s' % msg, 500)
transaction.abort()
return response
def timeleft_view(self):
if not self.request.root.properties[PROP_KEYS.QUEUE_ENABLED]:
return 1
response = Response("text/plain")
queue = Queue(self.request)
response.body = str(queue.purchase_time_left())
return response
def check_username_view(self):
user_id = self.request.matchdict["user_id"].lower()
if user_id in self.request.root.users:
return Response("true")
else:
return Response("false")
def my_view(request):
try:
one = DBSession.query(MyModel).filter(MyModel.name == 'one').first()
except DBAPIError:
return Response(conn_err_msg, content_type='text/plain', status_int=500)
return {'one': one, 'project': 'source'}
def picture_handler(request):
"""Serve pictures from database binaries."""
if request.matchdict["db_id"] == "add":
picture_data = request.dbsession.query(AddressBook).get(request.matchdict['pic_id'])
elif request.matchdict["db_id"] == "cat":
picture_data = request.dbsession.query(Category).get(request.matchdict['pic_id'])
elif request.matchdict["db_id"] == "att":
picture_data = request.dbsession.query(Attribute).get(request.matchdict['pic_id'])
mime_type = picture_data.pic_mime
if sys.version_info[0] < 3:
mime_type = mime_type.encode('utf-8')
return Response(content_type=mime_type, body=picture_data.picture)
def head(self):
return Response()