def serve_websocket(request, port):
"""Start UWSGI websocket loop and proxy."""
env = request.environ
# Send HTTP response 101 Switch Protocol downstream
uwsgi.websocket_handshake(env['HTTP_SEC_WEBSOCKET_KEY'], env.get('HTTP_ORIGIN', ''))
# Map the websocket URL to the upstream localhost:4000x Notebook instance
parts = urlparse(request.url)
parts = parts._replace(scheme="ws", netloc="localhost:{}".format(port))
url = urlunparse(parts)
# Proxy initial connection headers
headers = [(header, value) for header, value in request.headers.items() if header.lower() in CAPTURE_CONNECT_HEADERS]
logger.info("Connecting to upstream websockets: %s, headers: %s", url, headers)
ws = ProxyClient(url, headers=headers)
ws.connect()
# TODO: Will complain loudly about already send headers - how to abort?
return httpexceptions.HTTPOk()
python类HTTPOk()的实例源码
def update_user(request):
# ID
user_id = request.matchdict.get('user_id')
current_user_from_db = DBSession().query(User).filter_by(id=user_id).first()
# First Name
if request.matchdict.get('first_name') :
new_first_name = request.matchdict.get('first_name')
else :
new_first_name = current_user_from_db.first_name
# Last Name
if request.matchdict.get('last_name') :
new_last_name = request.matchdict.get('last_name')
else :
new_last_name = current_user_from_db.last_name
# Created At
createdAt = current_user_from_db.created_at
with transaction.manager:
DBSession.query(User).filter_by(id=user_id).update({"first_name": new_first_name, "last_name": new_last_name, "created_at": createdAt})
return HTTPOk()
# DELETE user
def event_reorder_up(request):
"""Switch this event with the previous event
and adjust the next event's previous_event_id"""
ctx = request.context
instance = ctx._instance
db = instance.db
next_event = db.query(TimelineEvent).filter_by(
previous_event_id=instance.id).first()
previous_event = instance.previous_event
pre_previous_id = previous_event.previous_event_id
# clear first to avoid index uniqueness checks
previous_event.previous_event_id = None
instance.previous_event_id = None
if next_event:
next_event.previous_event_id = None
db.flush()
instance.previous_event_id = pre_previous_id
previous_event.previous_event_id = instance.id
if next_event:
next_event.previous_event_id = previous_event.id
return HTTPOk()
def update_sequence(request):
"""runs when adding a new sequence
"""
logged_in_user = get_logged_in_user(request)
sequence_id = request.params.get('sequence_id')
sequence = Sequence.query.filter_by(id=sequence_id).first()
name = request.params.get('name')
code = request.params.get('code')
status_id = request.params.get('status_id')
status = Status.query.filter_by(id=status_id).first()
if sequence and code and name and status:
# get descriptions
description = request.params.get('description')
#update the sequence
sequence.name = name
sequence.code = code
sequence.description = description
sequence.status = status
sequence.updated_by = logged_in_user
sequence.date_updated = datetime.datetime.now()
DBSession.add(sequence)
else:
logger.debug('there are missing parameters')
logger.debug('name : %s' % name)
logger.debug('status : %s' % status)
HTTPServerError()
return HTTPOk()
def sms_view_test(request):
"""Dummy view to simulate outgoing SMS."""
send_sms(request, "+15551234", "Test message")
# Check DummyService recorded this
service = request.registry.queryAdapter(request, ISMSService)
assert service.last_outgoing == "Test message"
return HTTPOk()
def create_user(request):
newFirstName = request.matchdict.get('first_name')
newLastName = request.matchdict.get('last_name')
createdAt = datetime.now()
user = User(first_name=newFirstName, last_name=newLastName, created_at=createdAt)
with transaction.manager:
DBSession.add(user)
# Flush to get the post.id from the database
DBSession.flush()
return HTTPOk()
# Update User
def delete_user(request):
user_id = request.matchdict.get('user_id')
user = DBSession.query(User).filter_by(id=user_id).first()
if not user:
msg = "The User with id '{}' was not found.".format(user_id)
return Response(status=404, json_body={'error': msg})
else:
with transaction.manager:
DBSession.delete(user)
return HTTPOk()
def redis_test(request):
redis = get_redis(request)
redis.set("foo", b"bar")
assert redis.get("foo") == b"bar"
redis.set("foo", "ÅÄÖ")
assert redis.get("foo").decode("utf-8") == "ÅÄÖ"
return HTTPOk()
def throttle_sample(request):
return HTTPOk()
def traversing_test(self):
return HTTPOk("Editing: {}".format(self.context.id))
def traversing_test(self):
return HTTPOk("Showing: {}".format(self.context.id))
def update_asset(request):
"""updates an Asset
"""
logger.debug('***update_asset method starts ***')
logged_in_user = get_logged_in_user(request)
# get params
asset_id = request.matchdict.get('id', -1)
asset = Asset.query.filter_by(id=asset_id).first()
name = request.params.get('name')
code = request.params.get('code')
description = request.params.get('description')
type_name = request.params.get('type_name')
status_id = request.params.get('status_id')
status = Status.query.filter_by(id=status_id).first()
if asset and name and code and type_name and status:
# get the type
type_ = Type.query\
.filter_by(target_entity_type='Asset')\
.filter_by(name=type_name)\
.first()
if type_ is None:
# create a new Type
type_ = Type(
name=type_name,
code=type_name,
target_entity_type='Asset'
)
# update the asset
logger.debug('code : %s' % code)
asset.name = name
asset.code = code
asset.description = description
asset.type = type_
asset.status = status
asset.updated_by = logged_in_user
asset.date_updated = datetime.datetime.now()
DBSession.add(asset)
return HTTPOk()
def create_shot(request):
"""runs when adding a new shot
"""
logged_in_user = get_logged_in_user(request)
name = request.params.get('name')
code = request.params.get('code')
status_id = request.params.get('status_id')
status = Status.query.filter_by(id=status_id).first()
project_id = request.params.get('project_id')
project = Project.query.filter_by(id=project_id).first()
logger.debug('project_id : %s' % project_id)
if name and code and status and project:
# get descriptions
description = request.params.get('description')
sequence_id = request.params['sequence_id']
sequence = Sequence.query.filter_by(id=sequence_id).first()
# get the status_list
status_list = StatusList.query.filter_by(
target_entity_type='Shot'
).first()
# there should be a status_list
# TODO: you should think about how much possible this is
if status_list is None:
return HTTPServerError(detail='No StatusList found')
new_shot = Shot(
name=name,
code=code,
description=description,
sequence=sequence,
status_list=status_list,
status=status,
created_by=logged_in_user,
project=project
)
DBSession.add(new_shot)
else:
logger.debug('there are missing parameters')
logger.debug('name : %s' % name)
logger.debug('code : %s' % code)
logger.debug('status : %s' % status)
logger.debug('project : %s' % project)
HTTPServerError()
return HTTPOk()
def update_shot(request):
"""runs when adding a new shot
"""
logged_in_user = get_logged_in_user(request)
shot_id = request.params.get('shot_id')
shot = Shot.query.filter_by(id=shot_id).first()
name = request.params.get('name')
code = request.params.get('code')
cut_in = int(request.params.get('cut_in', 1))
cut_out = int(request.params.get('cut_out', 1))
status_id = request.params.get('status_id')
status = Status.query.filter_by(id=status_id).first()
if shot and code and name and status:
# get descriptions
description = request.params.get('description')
sequence_id = request.params['sequence_id']
sequence = Sequence.query.filter_by(id=sequence_id).first()
#update the shot
shot.name = name
shot.code = code
shot.description = description
shot.sequences = [sequence]
shot.status = status
shot.updated_by = logged_in_user
shot.date_updated = datetime.datetime.now()
shot.cut_in = cut_in
shot.cut_out = cut_out
DBSession.add(shot)
else:
logger.debug('there are missing parameters')
logger.debug('name : %s' % name)
logger.debug('status : %s' % status)
HTTPServerError()
return HTTPOk()
def append_entities_to_entity(request):
"""Appends entities to entity for example appends Projects to user.projects
etc.
"""
logger.debug('append_class_to_entity is running')
entity_id = request.matchdict.get('id', -1)
from stalker import Entity
entity = Entity.query.filter_by(id=entity_id).first()
# selected_list = get_multi_integer(request, 'selected_items[]')
from stalker_pyramid.views import EntityViewBase
selected_list = EntityViewBase.get_multi_integer(request, 'selected_ids')
logger.debug('selected_list: %s' % selected_list)
if entity and selected_list:
appended_entities = Entity.query\
.filter(Entity.id.in_(selected_list)).all()
if appended_entities:
attr_name = appended_entities[0].plural_class_name.lower()
eval(
'entity.%(attr_name)s.extend(appended_entities)' %
{'attr_name': attr_name}
)
from stalker.db.session import DBSession
DBSession.add(entity)
logger.debug('entity is updated successfully')
request.session.flash(
'success:User <strong>%s</strong> is updated successfully' %
entity.name
)
logger.debug('***append_entities_to_entity method ends ***')
else:
logger.debug('not all parameters are in request.params')
from pyramid.httpexceptions import HTTPServerError
HTTPServerError()
from pyramid.httpexceptions import HTTPOk
return HTTPOk()
def create_studio(request):
"""creates the studio
"""
name = request.params.get('name', None)
dwh = request.params.get('dwh', None)
wh_mon_start = get_time(request, 'mon_start')
wh_mon_end = get_time(request, 'mon_end')
wh_tue_start = get_time(request, 'tue_start')
wh_tue_end = get_time(request, 'tue_end')
wh_wed_start = get_time(request, 'wed_start')
wh_wed_end = get_time(request, 'wed_end')
wh_thu_start = get_time(request, 'thu_start')
wh_thu_end = get_time(request, 'thu_end')
wh_fri_start = get_time(request, 'fri_start')
wh_fri_end = get_time(request, 'fri_end')
wh_sat_start = get_time(request, 'sat_start')
wh_sat_end = get_time(request, 'sat_end')
wh_sun_start = get_time(request, 'sun_start')
wh_sun_end = get_time(request, 'sun_end')
if name and dwh:
# create new studio
studio = Studio(
name=name,
daily_working_hours=int(dwh)
)
wh = WorkingHours()
def set_wh_for_day(day, start, end):
if start != end:
wh[day] = [[start.seconds/60, end.seconds/60]]
else:
wh[day] = []
set_wh_for_day('mon', wh_mon_start, wh_mon_end)
set_wh_for_day('tue', wh_tue_start, wh_tue_end)
set_wh_for_day('wed', wh_wed_start, wh_wed_end)
set_wh_for_day('thu', wh_thu_start, wh_thu_end)
set_wh_for_day('fri', wh_fri_start, wh_fri_end)
set_wh_for_day('sat', wh_sat_start, wh_sat_end)
set_wh_for_day('sun', wh_sun_start, wh_sun_end)
studio.working_hours = wh
DBSession.add(studio)
# Commit will be handled by the zope transaction extension
return HTTPOk()
def create_sequence(request):
"""runs when adding a new sequence
"""
logged_in_user = get_logged_in_user(request)
name = request.params.get('name')
code = request.params.get('code')
status_id = request.params.get('status_id')
status = Status.query.filter_by(id=status_id).first()
project_id = request.params.get('project_id')
project = Project.query.filter_by(id=project_id).first()
logger.debug('project_id : %s' % project_id)
if name and code and status and project:
# get descriptions
description = request.params.get('description')
# get the status_list
status_list = StatusList.query.filter_by(
target_entity_type='Sequence'
).first()
# there should be a status_list
# TODO: you should think about how much possible this is
if status_list is None:
return HTTPServerError(detail='No StatusList found')
new_sequence = Sequence(
name=name,
code=code,
description=description,
status_list=status_list,
status=status,
created_by=logged_in_user,
project=project
)
DBSession.add(new_sequence)
else:
logger.debug('there are missing parameters')
logger.debug('name : %s' % name)
logger.debug('code : %s' % code)
logger.debug('status : %s' % status)
logger.debug('project : %s' % project)
HTTPServerError()
return HTTPOk()