def get(self):
try:
Lists = dumps(connector.ShowLists())
return Lists
except Exception as ex:
return '{"type":"error", "message":"' + GetException() + '"}'
python类dumps()的实例源码
def get(self, token):
try:
dumps(connector.DeleteList(token))
return '{"type":"success", "message":"ok"}', 200
except Exception as ex:
return '{"type":"error", "message":"' + GetException() + '"}'
def get(self, list_id, scan_group_id):
try:
List = dumps(connector.ShowScannedList(list_id, scan_group_id))
return List
except Exception as ex:
return '{"type":"error", "message":"' + GetException() + '"}'
def get(self, site_id):
try:
scangroups = dumps(connector.GetScanGroupsBySite(site_id))
return scangroups
except Exception as ex:
return '{"type":"error", "message":"' + GetException() + '"}'
def get(self, list_id):
try:
scangroups = dumps(connector.GetScanGroupsByList(list_id))
return scangroups
except Exception as ex:
return '{"type":"error", "message":"' + GetException() + '"}'
def get(self, site_id):
try:
scandates = dumps(connector.GetScanDates(site_id))
return scandates
except Exception as ex:
return '{"type":"error", "message":"' + GetException() + '"}'
def get(self, list_id):
try:
token = dumps(connector.GetToken(list_id))
return token
except Exception as ex:
return '{"type":"error", "message":"' + GetException() + '"}'
def bsonjs_dumps(doc):
"""Provide same API as json_util.dumps"""
return bsonjs.dumps(to_bson(doc))
def round_trip(self, doc):
bson_bytes = to_bson(doc)
self.assertEqual(bson_bytes, bsonjs.loads(bsonjs.dumps(bson_bytes)))
# Check compatibility between bsonjs and json_util
self.assertEqual(doc, json_util.loads(
bsonjs.dumps(bson_bytes),
json_options=json_util.STRICT_JSON_OPTIONS))
self.assertEqual(bson_bytes, bsonjs.loads(json_util.dumps(
doc, json_options=json_util.STRICT_JSON_OPTIONS)))
def test_dumps_multiple_bson_documents(self):
json_str = '{ "test" : "me" }'
bson_bytes = bsonjs.loads(json_str)
self.assertEqual(json_str, bsonjs.dumps(bson_bytes + bson_bytes))
def test_response_json(self):
# Create a json response object
json_dict = {'response': 'test'}
json_template = dumps(json_dict)
response = HttpResponse(json_template, content_type='application/json')
response_data = self.middleware._extract_response_data(response)
self.assertIn('json', response_data['type'])
self.assertEqual(200, response_data['status_code'])
self.assertEqual(json_dict, response_data['content'])
def convert_to_json_list(db_cursor):
json_list = list(json.loads(json_util.dumps(db_cursor)))
return json_list
def hoplite_dumps(obj, *args, **kwargs):
"""
Serializes a dictionary into unicode(unless specified otherwise)
bson.json_util.dumps does exactly what hoplite needs, so that's what
we call
:param obj: Python dictionary to be serialized
:param args: Please refer to online documentation for bson.json_util.dumps
and json.dumps
:param kwargs: Please refer to online documentation for
bson.json_util.dumps and json.dumps
:return: serialized obj in unicode
"""
return dumps(obj, *args, **kwargs)
def list_apps():
nebula_apps_list = mongo_list_apps(mongo_collection)
return "{\"apps\": " + dumps(nebula_apps_list) + " }", 200
# get app info
def get_app(app_name):
app_exists, app_json = mongo_get_app(mongo_collection, app_name)
if app_exists is True:
return dumps(app_json), 200
elif app_exists is False:
return "{\"app_exists\": \"False\"}", 403
# set json header
def bson_renderer(data, template=None, ctx=None):
return dumps(data)
def dump_to_json(self, output_file):
filehandle = open(output_file, 'a')
for tweet in self.collection.get_iterator():
filehandle.write(json_util.dumps(tweet)+'\n')
filehandle.close()
def Addtask():
title = request.form.get('title', '')
plugin = request.form.get('plugin', '')
condition = unquote(request.form.get('condition', ''))
plan = request.form.get('plan', 0)
ids = request.form.get('ids', '')
isupdate = request.form.get('isupdate', '0')
resultcheck = request.form.get('resultcheck', '0')
result = 'fail'
if plugin:
targets = []
if resultcheck == 'true': # ?????
list = condition.strip().split(';')
query = querylogic(list)
cursor = Mongo.coll['Info'].find(query)
for i in cursor:
tar = [i['ip'], i['port']]
targets.append(tar)
else: # ???????
for i in ids.split(','):
tar = [i.split(':')[0], int(i.split(':')[1])]
targets.append(tar)
temp_result = True
for p in plugin.split(','):
query = querylogic(condition.strip().split(';'))
item = {'status': 0, 'title': title, 'plugin': p, 'condition': condition, 'time': datetime.now(),
'target': targets, 'plan': int(plan), 'isupdate': int(isupdate), 'query': dumps(query)}
insert_reuslt = Mongo.coll['Task'].insert(item)
if not insert_reuslt:
temp_result = False
if temp_result:
result = 'success'
return result
# ??????
def CheckUpdate():
json = []
notinstall = Mongo.coll['Update'].find({'isInstall': 0}).sort('unicode', -1)
for _ in notinstall:
json.append({'unicode': _['unicode'], 'name': _['name'], 'info': _['info'], 'time': _['pushtime'],
'author': _['author']})
return dumps(json)
# ?????????
def delete_mainstage(request, id):
try:
with transaction.atomic():
stage = Stage.objects.get(pk=id)
org = stage.site.project.organization if stage.site else stage.project.organization
substages = Stage.objects.filter(stage=stage)
for sub_stage in substages:
if hasattr(sub_stage, 'stage_forms'):
old_fsxf = sub_stage.stage_forms
old_fsxf.is_deleted = True
old_fsxf.stage = None
old_fsxf.save()
# desc = "deleted form of stage {} substage {} by {}".format(sub_stage.stage.name, sub_stage.name,
# request.user.username)
# noti = old_fsxf.logs.create(source=request.user, type=1, title="form Deleted",
# organization=org, description=desc)
# result = {}
# result['description'] = desc
# result['url'] = noti.get_absolute_url()
# ChannelGroup("notify-{}".format(org.id)).send({"text": json.dumps(result)})
# ChannelGroup("notify-0").send({"text": json.dumps(result)})
sub_stage.delete()
stage.delete()
return Response({}, status=status.HTTP_200_OK)
except Exception as e:
return Response({'error':e.message}, status=status.HTTP_400_BAD_REQUEST)