def render_json(data=None):
if not data:
if isinstance(data, list):
return Response('{"data": []}', mimetype='application/json')
return Response('{}', mimetype='application/json')
json_data = {}
if isinstance(data, db.Model):
json_data = data.serialize()
elif isinstance(data, list) or isinstance(data, BaseQuery):
json_data['data'] = []
for v in data:
if isinstance(v, dict):
json_data['data'].append(v)
elif isinstance(v, db.Model):
json_data['data'].append(v.serialize())
else:
json_data = data
json_data = traverse(json_data)
return Response(json.dumps(json_data), mimetype='application/json')
python类BaseQuery()的实例源码
def test_get_html_report(client, monkeypatch):
fgz = BytesIO()
with gzip.GzipFile('report.gz', mode='wb', fileobj=fgz) as gzipobj:
gzipobj.write(b'<html><head></head></html>')
monkeypatch.setattr(VxAPIClient, 'get', lambda *args, **kw: bytes(fgz))
monkeypatch.setattr(BaseQuery, 'first_or_404', lambda x: True)
rv = client.get(
url_for('api.get_vxstream_report',
sha256='sss', envid=1, type_='html')
)
assert rv.status_code == 200
def test_get_vxstream_json_summary(client, monkeypatch, malware_sample):
vxstream = MagicMock()
state_resp = MagicMock(
return_value={
'response': {'state': _state_to_name[SUCCESS]},
'response_code': 0
}
)
summary_resp = MagicMock(
return_value={
'response': {
"analysis_start_time": "2016-05-11 21:50:33",
"domains": [],
"environmentDescription": "Windows 7 x64 (Stealth)",
"environmentId": "6",
"hosts": [],
"isinteresting": False,
"isurlanalysis": False,
"md5": malware_sample.md5,
"sha1": malware_sample.sha1,
"sha256": malware_sample.sha256,
"size": 26616,
"targeturl": "",
"threatlevel": 0,
"threatscore": 0,
"type": "PE32 executable (native) Intel 80386"
},
'response_code': 0
}
)
vxstream.side_effect = [
state_resp(),
summary_resp()
]
monkeypatch.setattr(BaseQuery, 'first_or_404',
lambda x: MagicMock(filename='saved_sameple'))
monkeypatch.setattr(VxAPIClient, 'get', lambda *args, **kw: vxstream())
rv = client.get(
url_for('api.get_vxstream_analysis',
sha256=1, envid=1))
assert rv.status_code == 200
def test_download_cp_html_report(client, monkeypatch):
fgz = BytesIO()
with gzip.GzipFile('report.gz', mode='wb', fileobj=fgz) as gzipobj:
gzipobj.write(b'<html><head></head></html>')
monkeypatch.setattr(VxAPIClient, 'get', lambda *args, **kw: bytes(fgz))
monkeypatch.setattr(BaseQuery, 'first_or_404', lambda x: True)
rv = client.get(
url_for('api.get_vxstream_download',
sha256='sss', eid=1, ftype='html')
)
assert rv.status_code == 200
def test_create_av_scan(client, monkeypatch, malware_sample):
monkeypatch.setattr(BaseQuery, 'first_or_404', lambda x: True)
rv = client.post(url_for('api.add_av_scan'),
json={'files': [malware_sample._asdict()]})
assert rv.status_code == 202
def test_add_cp_av_scan(client, monkeypatch):
monkeypatch.setattr(BaseQuery, 'first_or_404', lambda x: True)
rv = client.post(
url_for('cp.add_cp_av_scan'),
json=dict(files=[{'sha256': '1eedab2b09a4bf6c'}])
)
assert_msg(rv, key='message',
value='Your files have been submitted for AV scanning',
response_code=202)
def test_download_cp_html_report(client, monkeypatch):
fgz = BytesIO()
with gzip.GzipFile('report.gz', mode='wb', fileobj=fgz) as gzipobj:
gzipobj.write(b'<html><head></head></html>')
monkeypatch.setattr(VxAPIClient, 'get', lambda *args, **kw: bytes(fgz))
monkeypatch.setattr(BaseQuery, 'first_or_404', lambda x: True)
rv = client.get(
url_for('cp.get_cp_vxstream_download',
sha256='sss', eid=1, ftype='html')
)
assert rv.status_code == 200
def test_get_cp_html_report(client, monkeypatch):
fgz = BytesIO()
with gzip.GzipFile('report.gz', mode='wb', fileobj=fgz) as gzipobj:
gzipobj.write(b'<html><head></head></html>')
monkeypatch.setattr(VxAPIClient, 'get', lambda *args, **kw: bytes(fgz))
monkeypatch.setattr(BaseQuery, 'first_or_404', lambda x: True)
rv = client.get(
url_for('cp.get_cp_vxstream_report',
sha256='sss', envid=1, type_='html')
)
assert rv.status_code == 200
def get_query_class(cls):
"""
Returns extended BaseQuery class for flask_sqlalchemy model to provide get_or_403 method
Example:
>>> DataTransformation(db.Model):
... query_class = OwnerRolePermission.get_query_class()
"""
return lambda *args, **kwargs: PermissionExtendedQuery(cls, *args, **kwargs)
def get_query(self, product_id: int, **kwargs) -> BaseQuery:
return Indicator.query.filter_by(product_id=product_id)
def get_query(self, id: int, **kwargs) -> BaseQuery:
return IndicatorValue.query.filter_by(indicator_id=id).order_by(IndicatorValue.timestamp)
def get_filtered_query(self, query: BaseQuery, **kwargs) -> BaseQuery:
"""Filter query using query parameters"""
end = kwargs.get('end')
start = kwargs.get('start')
return query.filter(IndicatorValue.timestamp >= start, IndicatorValue.timestamp < end)
def get_limited_query(self, query: BaseQuery, **kwargs) -> Union[Pagination, BaseQuery]:
"""Apply pagination limits on query"""
if 'from' in kwargs:
return query
return super().get_limited_query(query, **kwargs)
def get_objects(self, query: Union[Pagination, BaseQuery], **kwargs) -> List[IndicatorValue]:
if isinstance(query, Pagination):
return [obj for obj in query.items]
return [obj for obj in query.all()]
def get_query(self, **kwargs) -> BaseQuery:
q = Product.query
if 'product_group' in kwargs:
return q.filter(ProductGroup.name == kwargs['product_group'])
return q
def get_query(self, product_id: int, **kwargs) -> BaseQuery:
return Objective.query.filter_by(product_id=product_id)
def get_query(self, **kwargs) -> BaseQuery:
return ProductGroup.query
def get_query(self, slo_id: int, **kwargs) -> BaseQuery:
return Target.query.filter_by(objective_id=slo_id)
def get_limited_query(self, query: BaseQuery, **kwargs) -> Union[Pagination, BaseQuery]:
"""Apply pagination limits on query"""
per_page = int(kwargs.get('page_size', API_DEFAULT_PAGE_SIZE))
page = int(kwargs.get('page', 1))
if page < 0 or per_page < 1:
raise ProblemException(
title='Invalid paging parameters', detail='page and page_size should be greater than 0')
return query.paginate(page=page or 1, per_page=per_page, error_out=False)
def get_query(self, **kwargs) -> BaseQuery:
raise NotImplemented