def post_batch_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleInsertEntityBatch(entitySetName, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
## DELETE: api/datasource/crud/batch/:entitySetName
#@delete('/api/datasource/crud/batch/<entitySetName>')
#def delete_batch_entityset(entitySetName):
# try:
# result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.json, dataService)
# response.content_type = "application/json; charset=utf-8"
# return json.dumps(result, cls=CustomEncoder)
# except dalUtils.StatusCodeError as err:
# response.status = err.value
# except:
# abort(400, 'Bad Request')
# DELETE: api/datasource/crud/batch/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
python类content_type()的实例源码
def apply(self, callback, route):
def wrapper(*a, **ka):
try:
rv = callback(*a, **ka)
except HTTPResponse as resp:
rv = resp
if isinstance(rv, dict):
json_response = dumps(rv)
response.content_type = 'application/json'
return json_response
elif isinstance(rv, HTTPResponse) and isinstance(rv.body, dict):
rv.body = dumps(rv.body)
rv.content_type = 'application/json'
return rv
return wrapper
def task_screenshots(task=0, screenshot=None):
folder_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", str(task), "shots")
if os.path.exists(folder_path):
if screenshot:
screenshot_name = "{0}.jpg".format(screenshot)
screenshot_path = os.path.join(folder_path, screenshot_name)
if os.path.exists(screenshot_path):
# TODO: Add content disposition.
response.content_type = "image/jpeg"
return open(screenshot_path, "rb").read()
else:
return HTTPError(404, screenshot_path)
else:
zip_data = StringIO()
with ZipFile(zip_data, "w", ZIP_STORED) as zip_file:
for shot_name in os.listdir(folder_path):
zip_file.write(os.path.join(folder_path, shot_name), shot_name)
# TODO: Add content disposition.
response.content_type = "application/zip"
return zip_data.getvalue()
else:
return HTTPError(404, folder_path)
def get_files(task_id):
if not task_id.isdigit():
return HTTPError(code=404, output="The specified ID is invalid")
files_path = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files")
zip_file = os.path.join(CUCKOO_ROOT, "storage", "analyses", task_id, "files.zip")
with zipfile.ZipFile(zip_file, "w", compression=zipfile.ZIP_DEFLATED) as archive:
root_len = len(os.path.abspath(files_path))
for root, dirs, files in os.walk(files_path):
archive_root = os.path.abspath(root)[root_len:]
for f in files:
fullpath = os.path.join(root, f)
archive_name = os.path.join(archive_root, f)
archive.write(fullpath, archive_name, zipfile.ZIP_DEFLATED)
if not os.path.exists(files_path):
return HTTPError(code=404, output="Files not found")
response.content_type = "application/zip"
response.set_header("Content-Disposition", "attachment; filename=cuckoo_task_%s(not_encrypted).zip" % (task_id))
return open(zip_file, "rb").read()
def translate(self):
"""
Processes a translation request.
"""
translation_request = request_provider(self._style, request)
logging.debug("REQUEST - " + repr(translation_request))
translations = self._translator.translate(
translation_request.segments,
translation_request.settings
)
response_data = {
'status': TranslationResponse.STATUS_OK,
'segments': [translation.target_words for translation in translations],
'word_alignments': [translation.get_alignment_json(as_string=False) for translation in translations] if translation_request.settings.get_alignment else None,
'word_probabilities': [translation.target_probs for translation in translations] if translation_request.settings.get_word_probs else None,
}
translation_response = response_provider(self._style, **response_data)
logging.debug("RESPONSE - " + repr(translation_response))
response.content_type = translation_response.get_content_type()
return repr(translation_response)
def post_index():
event_type = request.get_header('X-GitHub-Event')
if not is_request_from_github():
abort(403, "Forbidden for IP %s, it's not GitHub's address" % remote_ip())
if request.content_type.split(';')[0] != 'application/json':
abort(415, "Expected application/json, but got %s" % request.content_type)
if event_type == 'ping':
return handle_ping()
elif event_type == 'push':
return handle_push()
else:
abort(400, "Unsupported event type: %s" % event_type)
def prometheus_metrics():
if not test_restricted(request['REMOTE_ADDR']):
return ''
dhcpstat = {'shared-networks': []}
dhcp6stat = {'shared-networks': []}
try:
dhcpstat = json.loads(exec_command([args.binary, '-c', args.dhcp4_config, '-l', args.dhcp4_leases, '-f', 'j']))
except:
pass
try:
dhcp6stat = json.loads(exec_command([args.binary, '-c', args.dhcp6_config, '-l', args.dhcp6_leases, '-f', 'j']))
except:
pass
data = []
for shared_network in dhcpstat['shared-networks']:
data.append('dhcp_pool_used{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],shared_network['used']))
data.append('dhcp_pool_free{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],shared_network['free']))
defined_leases = float(shared_network['defined'])
leases_used_percentage = 0
if defined_leases > 0:
leases_used_percentage = float(shared_network['used'])/defined_leases
data.append('dhcp_pool_usage{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],leases_used_percentage))
for shared_network in dhcp6stat['shared-networks']:
data.append('dhcp_pool_used{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],shared_network['used']))
data.append('dhcp_pool_free{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],shared_network['free']))
defined_leases = float(shared_network['defined'])
leases_used_percentage = 0
if defined_leases > 0:
leases_used_percentage = float(shared_network['used'])/defined_leases
data.append('dhcp_pool_usage{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],leases_used_percentage))
response.content_type = 'text/plain'
return '%s\n' % ('\n'.join(data))
def badge(number):
response.set_header('Cache-Control', 'public, max-age={}'.format(SECONDS_TO_RESCRAPE))
response.set_header('Access-Control-Allow-Origin','*')
response.content_type = "image/svg+xml;charset=utf-8"
if number < 10:
return template("First Timers-1-blue.svg", number=number)
if number < 100:
return template("First Timers-10-blue.svg", number=number)
return template("First Timers-100-blue.svg", number=number)
def query_response(result, max_results=100000):
response.content_type = 'application/json; charset=utf8'
result_len = len(result)
if result_len > max_results:
abort(403, 'Too many rows (%s)' % result_len)
else:
return [row.to_dict() for row in result]
def get_json_groups():
"""Get all driver families (JSON)"""
response.content_type = 'application/json'
families = collection.get_families()
return json.dumps(sorted(families.keys()))
def get_json_drivers():
"""Get all drivers (JSON)"""
response.content_type = 'application/json'
return json.dumps([ob.__dict__ for ob in collection.drivers])
def get_search():
try:
q = request.query["q"]
except KeyError:
return []
else:
results = graph.run(
"MATCH (movie:Movie) "
"WHERE movie.title =~ {title} "
"RETURN movie", {"title": "(?i).*" + q + ".*"})
response.content_type = "application/json"
return json.dumps([{"movie": dict(row["movie"])} for row in results])
def get_metadata():
try:
response.content_type = "application/json; charset=utf-8"
metadataClient = databaseInfo.getMetadataClient()
return json.dumps(metadataClient, cls=MetadataEncoder, indent=2)
except:
abort(500, 'Internal server error')
# GET: api/datasource/crud/:entitySetName?skip=20&top=10
def get_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleGet(entitySetName, request.query, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder, indent=2)
except:
abort(400, 'Bad Request')
# GET: api/datasource/crud/single/:entitySetName?keys=key1:{key1}
def get_single_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleGetSingle(entitySetName, request.query, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder, indent=2)
except:
abort(400, 'Bad Request')
# GET: api/datasource/crud/many/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
def get_many_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleGetMany(entitySetName, request.query, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder, indent=2)
except:
abort(400, 'Bad Request')
# PUT: api/datasource/crud/:entitySetName?keys=key1:{key1}
def put_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# PATCH: api/datasource/crud/:entitySetName?keys=key1:{key1}
#@patch('/api/datasource/crud/<entitySetName>')
def post_entityset(entitySetName):
# test1 = json.loads(request.body.read())
try:
result = dataProviderDto.apiProvider.handleInsertEntity(entitySetName, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# DELETE: api/datasource/crud/:entitySetName?keys=key1:{key1}
def delete_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleDeleteEntity(entitySetName, request.query, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# PUT: api/datasource/crud/batch/:entitySetName
def put_batch_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# PATCH: api/datasource/crud/batch/:entitySetName
#@patch('/api/datasource/crud/batch/<entitySetName>')
def patch_batch_entityset(entitySetName):
try:
result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService)
response.content_type = "application/json; charset=utf-8"
return json.dumps(result, cls=CustomEncoder)
except dalUtils.StatusCodeError as err:
response.status = err.value
except:
abort(400, 'Bad Request')
# POST: api/datasource/crud/batch/:entitySetName
def setup(self, app):
def default_error_handler(res):
if res.content_type == "application/json":
return res.body
res.content_type = "application/json"
return dumps({'message': str(res.exception if res.exception else res.body)})
app.default_error_handler = default_error_handler
def test_jsonapi_header():
"""Make sure that the content type is set accordingly.
http://jsonapi.org/format/#content-negotiation-clients
"""
content_type = request.content_type
content_type_expected = "application/vnd.api+json"
if content_type != content_type_expected and content_type.startswith(content_type_expected):
abort(415, "The Content-Type header must be \"{}\", not \"{}\".".format(
content_type_expected, content_type))
accepts = request.headers.get("Accept", "*/*").split(",")
expected_accept = ["*/*", "application/*", "application/vnd.api+json"]
if not any([accept in expected_accept for accept in accepts]):
abort(406, "The Accept header must one of \"{}\", not \"{}\".".format(
expected_accept, ",".join(accepts)))
def get_resource_ids():
"""Return the list of current ids."""
test_jsonapi_header()
resources = get_resources()
response.content_type = 'application/vnd.api+json'
return response_object({"data": [{"type": "id", "id": _id} for _id in resources],
"links": {"self": get_location_url("ids")}})
def account_status(account_id):
result = controller.db.iter_resources(account_id)
response.content_type = "application/json"
return json.dumps(result, indent=2, cls=Encoder)
def info(account_id, resource_id):
request_data = request.query
if resource_id.startswith('sg-') and 'parent_id' not in request_data:
abort(400, "Missing required parameter parent_id")
result = controller.info(
account_id, resource_id, request_data.get('parent_id', resource_id))
response.content_type = "application/json"
return json.dumps(result, indent=2, cls=Encoder)
# this set to post to restrict permissions, perhaps another url space.
def delta(account_id):
request_data = request.json
for rp in ('region',):
if not request_data or rp not in request_data:
abort(400, "Missing required parameter %s" % rp)
result = controller.get_account_delta(
account_id, request_data['region'], api_url())
response.content_type = "application/json"
return json.dumps(result, indent=2, cls=Encoder)
def error(e):
response.content_type = "application/json"
return json.dumps({
"status": e.status,
"url": repr(request.url),
"exception": repr(e.exception),
# "traceback": e.traceback and e.traceback.split('\n') or '',
"body": repr(e.body)
}, indent=2)
def post_update(context, update):
"""
Updates a Cisco Spark room
:param context: button state and configuration
:type context: ``dict``
:param update: content of the update to be posted there
:type update: ``str`` or ``dict``
If the update is a simple string, it is sent as such to Cisco Spark.
Else if it a dictionary, then it is encoded as MIME Multipart.
"""
logging.info("Posting update to Cisco Spark room")
url = 'https://api.ciscospark.com/v1/messages'
headers = {'Authorization': 'Bearer '+context['spark']['CISCO_SPARK_BTTN_BOT']}
if isinstance(update, dict):
update['roomId'] = context['spark']['id']
payload = MultipartEncoder(fields=update)
headers['Content-Type'] = payload.content_type
else:
payload = {'roomId': context['spark']['id'], 'text': update }
response = requests.post(url=url, headers=headers, data=payload)
if response.status_code != 200:
logging.info(response.json())
raise Exception("Received error code {}".format(response.status_code))
logging.info('- done, check the room with Cisco Spark client software')
#
# handle Twilio API
#
def header_json():
response.content_type = 'application/json'