def _get_labels_for_reactome_ids(self, reactome_ids):
labels = defaultdict(str)
if reactome_ids:
res = self._cached_search(index=self._index_reactome,
doc_type=self._docname_reactome,
body={"query": {
"ids": {
"values": reactome_ids
}
},
'_source': {"includes": ['label']},
'size': 10000,
'from': 0,
}
)
if res['hits']['total']:
for hit in res['hits']['hits']:
labels[hit['_id']] = hit['_source']['label']
return labels
python类values()的实例源码
def handle_states():
'''Returns all the states from the database as JSON objects with a GET
request, or adds a new state to the database with a POST request. Refer to
exception rules of peewee `get()` method for additional explanation of
how the POST request is handled:
http://docs.peewee-orm.com/en/latest/peewee/api.html#SelectQuery.get
'''
if request.method == 'GET':
list = ListStyle().list(State.select(), request)
return jsonify(list), 200
elif request.method == 'POST':
params = request.values
'''Check that all the required parameters are made in request.'''
required = set(["name"]) <= set(request.values.keys())
if required is False:
return jsonify(msg="Missing parameter."), 400
try:
State.select().where(State.name == request.form['name']).get()
return jsonify(code=10001, msg="State already exists"), 409
except State.DoesNotExist:
state = State.create(name=request.form['name'])
return jsonify(state.to_dict()), 201
def handle_state_id(state_id):
'''Select the state with the id from the database and store as the variable
`state` with a GET request method. Update the data of the particular state
with a PUT request method. This will take the parameters passed and update
only those values. Remove the state with this id from the database with
a DELETE request method.
Keyword arguments:
state_id: The id of the state from the database.
'''
try:
state = State.select().where(State.id == state_id).get()
except State.DoesNotExist:
return jsonify(msg="There is no state with this id."), 404
if request.method == 'GET':
return jsonify(state.to_dict()), 200
elif request.method == 'DELETE':
try:
state = State.delete().where(State.id == state_id)
except State.DoesNotExist:
raise Exception("There is no state with this id.")
state.execute()
return jsonify(msg="State deleted successfully."), 200
def api_send_message():
print ('send message', request.values.get('type'))
game.doStage(session['name'], request.values)
return jsonify({'messages': 'success'})
def verify_slack_token(http_method, allowed_token_list):
token = ''
if http_method == 'GET':
token = request.args.get('token', '')
elif http_method == 'POST':
token = request.values['token']
else:
raise Exception("Error: unsupported http_method(%s)" % (http_method))
if token not in allowed_token_list:
content = "Error: invalid token(%s)" % (token)
raise Exception(content)
return None
def get_response_url(http_method):
response_url = ''
if http_method == 'GET':
response_url = request.args.get('response_url', '')
elif http_method == 'POST':
response_url = request.values['response_url']
else:
raise Exception("Error: unsupported http_method(%s)" % (http_method))
if response_url == '':
raise Exception("Error: fail to get response_url from the http request")
return response_url
def update_job(job_id, job=None):
result = Job.update(job_id, request.values)
if result.get("updated", False):
return json_resp(result)
else:
return json_resp(result), 450
def add_user():
result = User.create(request.values)
if result.get("created", False):
result["user"] = result["user"].as_object()
return json_resp(result)
else:
return json_resp(result), 450
def update_pheno(pheno_id, pheno=None):
result = Phenotype.update(pheno_id, request.values)
if result.get("updated", False):
return json_resp(result)
else:
return json_resp(result), 450
def webhook():
"""
Twilio will make a post request to `/webhook` everytime if receives a new
text message. This endpoint should both record the texter in the database,
and use Twilio to send a response. The status code will be 201 if
successful, and have an appropriate error code if not.
Returns:
object: The rendered JSON response.
"""
# pylint: disable=unused-variable
if not request.values:
return Response(status=400)
phone_number = request.values.get("From")
message_body = request.values.get("Body")
if None in {phone_number, message_body}:
return Response(status=400)
Texter.record(phone_number)
response = get_response(message_body)
get_text_class().send_message(phone_number, response)
return Response(status=201)
def set_param(param_name):
"""
POST /api/<version>/param/<param_name> {"value": "x"}
POST to the ROS Parameter Server. Value should be in the value field
of the request body.
"""
if not "value" in request.values:
return error("No value supplied")
rospy.set_param(param_name, request.values["value"])
return "", 204
def post_topic_message(topic_name):
"""
POST /api/<version>/topic/<topic_name>
POST a message to a ROS topic by name.
Requires a JSON payload of shape: ``[x, y, z]``. The values of the JSON
array must match the argument types of the topic's type constructor.
"""
topic_name = "/" + topic_name
# Get the list of ROS topics in the system.
# Returns a list of [topic, type_string] pairs.
topic_list = rostopic_master.getTopicTypes()
# Find topic in list (this is Python's approach to find).
try:
topic_match = next(x for x in topic_list if x[0] == topic_name)
except StopIteration:
# If we can't find the topic, return early (400 bad request).
return error("Topic does not exist")
json = request.get_json(silent=True)
args = json if json else []
try:
# Get the message type constructor for the topic's type string.
MsgType = get_message_class(topic_match[1])
pub = rospy.Publisher(topic_name, MsgType, queue_size=10)
# Unpack JSON list and pass to publish.
# pub.publish() will pass the list of arguments to the message
# type constructor.
pub.publish(*args)
except Exception, e:
return error("Wrong arguments for topic type constructor")
return success("Posted message to topic")
def get_topic_data(topic_name):
"""
GET /api/<version>/topic_data/<topic_name>
Get a single data point from a topic over HTTP.
"""
topic_name = "/" + topic_name
try:
msg_class, real_topic, _ = rostopic.get_topic_class(topic_name)
except rostopic.ROSTopicIOException as e:
raise e
if not real_topic:
return error("Topic does not exist", 404)
msg = rospy.wait_for_message(real_topic, msg_class)
data = getattr(msg, "data", None)
if data is None:
json = '{"status": ['
for x in msg.status:
if x == msg.status[-1]:
json += '{"name": \"%s\", "level": %d, "message": \"%s\", "hardware_id": \"%s\", "values": %s}]}' % \
(x.name if x.name else "null", x.level, \
x.message if x.message else "null", x.hardware_id if x.hardware_id else "null", \
x.values)
else:
json += '{"name": \"%s\", "level": %d, "message": \"%s\", "hardware_id": \"%s\", "values": %s},' % \
(x.name if x.name else "null", x.level, \
x.message if x.message else "null", x.hardware_id if x.hardware_id else "null", \
x.values)
return Response(json, mimetype = 'application/json')
else:
return jsonify({"result": data})
def filter_participants(event):
include_participants = request.values.get('include_participants', '').lower()
if include_participants == 'true':
return event
else:
return {k: v for k, v in event.items() if k != 'participants'}
def events():
if 'created_at_or_after' in request.values:
date = parse_date(request.values['created_at_or_after'])
events = [event for event in api_data if created_on_or_after(event, date)]
return render_events(sort_by_start_time(events))
elif 'ids' in request.values:
ids = json.loads(request.values['ids'])
return render_events(e for e in map(find_event, ids) if e)
else:
return render_events([])
def leave(id):
event = find_event(id)
if event == None:
return not_found()
user = find_user(request.values.get('user_param'), request.values.get('user_param_value'))
if user == None:
return user_not_specified()
leave_event(user, event)
return jsonify({})
def get_request_log():
# parse args and forms
values = ''
if len(request.values) > 0:
for key in request.values:
values += key + ': ' + request.values[key] + ', '
route = '/' + request.base_url.replace(request.host_url, '')
request_log = {
'route': route,
'method': request.method,
}
# ??xml ??
category = request_log['route'].split('/')[1]
if category != "virtual_card":
return request_log
if len(request.get_data()) != 0:
body = json.loads(request.get_data())
if "password" in body:
body.pop("password")
request_log['body'] = body
if len(values) > 0:
request_log['values'] = values
return request_log
def sonarr():
from slack_posting import postMsg
postMsg(request.values, 'bots')
return "ok"
# API Stuff
def handle_books_id(place_id, book_id):
'''Returns a JSON object of the book_id with a GET request method. Updates
a booking's attributes with a POST request method. Removes a booking with a
DELETE request method.
Keyword arguments:
place_id: The id of the place with the booking.
book_id: The id of the booking.
'''
try:
book = PlaceBook.select().where(PlaceBook.id == book_id).get()
except PlaceBook.DoesNotExist:
raise Exception("There is no placebook with this id.")
if request.method == 'GET':
return jsonify(book.to_dict()), 200
elif request.method == 'PUT':
params = request.values
for key in params:
if key == 'user':
return jsonify(msg="You may not change the user."), 409
if key == 'updated_at' or key == 'created_at':
continue
setattr(book, key, params.get(key))
book.save()
return jsonify(msg="Place book information updated successfully."), 200
elif request.method == 'DELETE':
try:
book = PlaceBook.delete().where(PlaceBook.id == book_id)
except PlaceBook.DoesNotExist:
raise Exception("There is no place with this id.")
book.execute()
return jsonify(msg="Place book deleted successfully."), 200
def handle_places():
'''Returns all the places with a GET request, or adds a new city to the
database with a POST request. The parameters passed to the POST request
iterate through the data and set the corresponding attributes of the
instance to be inserted into the database. Will not set attribute passed
as `updated_at` or `created_at`.
'''
if request.method == 'GET':
list = ListStyle().list(Place.select(), request)
return jsonify(list), 200
elif request.method == 'POST':
params = request.values
place = Place()
'''Check that all the required parameters are made in request.'''
required = set(["owner", "city", "name"]) <= set(request.values.keys())
if required is False:
return jsonify(msg="Missing parameter."), 400
for key in params:
if key == 'updated_at' or key == 'created_at':
continue
setattr(place, key, params.get(key))
place.save()
return jsonify(place.to_dict()), 200