def create_image(self,username,clustername,containername,imagename,description,imagenum=10):
[status, info] = self.get_clusterinfo(clustername,username)
if not status:
return [False, "cluster not found"]
containers = info['containers']
for container in containers:
if container['containername'] == containername:
logger.info("container: %s found" % containername)
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
if worker is None:
return [False, "The worker can't be found or has been stopped."]
res = worker.create_image(username,imagename,containername,description,imagenum)
container['lastsave'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
container['image'] = imagename
break
else:
res = [False, "container not found"]
logger.error("container: %s not found" % containername)
clusterpath = self.fspath + "/global/users/" + username + "/clusters/" + clustername
infofile = open(clusterpath, 'w')
infofile.write(json.dumps(info))
infofile.close()
return res
python类dumps()的实例源码
def stop_cluster(self, clustername, username):
[status, info] = self.get_clusterinfo(clustername, username)
if not status:
return [False, "cluster not found"]
if info['status'] == 'stopped':
return [False, 'cluster is already stopped']
if self.distributedgw == 'True':
worker = self.nodemgr.ip_to_rpc(info['proxy_server_ip'])
worker.delete_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername)
else:
proxytool.delete_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername)
for container in info['containers']:
self.delete_all_port_mapping(username,clustername,container['containername'])
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.stop_container(container['containername'])
[status, info] = self.get_clusterinfo(clustername, username)
info['status']='stopped'
info['start_time']="------"
infofile = open(self.fspath+"/global/users/"+username+"/clusters/"+clustername, 'w')
infofile.write(json.dumps(info))
infofile.close()
return [True, "stop cluster"]
def quotaadd(*args, **kwargs):
form = kwargs.get('form')
quotaname = form.get("quotaname")
default_value = form.get("default_value")
hint = form.get("hint")
if (quotaname == None):
return { "success":'false', "reason": "Empty quota name"}
if (default_value == None):
default_value = "--"
groupfile = open(fspath+"/global/sys/quota",'r')
groups = json.loads(groupfile.read())
groupfile.close()
for group in groups:
group['quotas'][quotaname] = default_value
groupfile = open(fspath+"/global/sys/quota",'w')
groupfile.write(json.dumps(groups))
groupfile.close()
quotafile = open(fspath+"/global/sys/quotainfo",'r')
quotas = json.loads(quotafile.read())
quotafile.close()
quotas['quotainfo'].append({'name':quotaname, 'hint':hint})
quotafile = open(fspath+"/global/sys/quotainfo",'w')
quotafile.write(json.dumps(quotas))
quotafile.close()
return {"success":'true'}
def save_billing_history(vnode_name, billing_history):
clusters_dir = env.getenv("FS_PREFIX")+"/global/users/"+get_owner(vnode_name)+"/clusters/"
if not os.path.exists(clusters_dir):
return
clusters = os.listdir(clusters_dir)
vnode_cluster_id = get_cluster(vnode_name)
for cluster in clusters:
clusterpath = clusters_dir + cluster
if not os.path.isfile(clusterpath):
continue
infofile = open(clusterpath, 'r')
info = json.loads(infofile.read())
infofile.close()
if vnode_cluster_id != str(info['clusterid']):
continue
if 'billing_history' not in info:
info['billing_history'] = {}
info['billing_history'][vnode_name] = billing_history
infofile = open(clusterpath, 'w')
infofile.write(json.dumps(info))
infofile.close()
break
return
def login_required(func):
@wraps(func)
def wrapper(*args, **kwargs):
logger.info ("get request, path: %s" % request.path)
token = request.form.get("token", None)
if (token == None):
logger.info ("get request without token, path: %s" % request.path)
return json.dumps({'success':'false', 'message':'user or key is null'})
result = post_to_user("/authtoken/", {'token':token})
if result.get('success') == 'true':
username = result.get('username')
beans = result.get('beans')
else:
return result
#if (cur_user == None):
# return json.dumps({'success':'false', 'message':'token failed or expired', 'Unauthorized': 'True'})
return func(username, beans, request.form, *args, **kwargs)
return wrapper
def save_cluster(user, beans, form):
global G_vclustermgr
clustername = form.get('clustername', None)
if (clustername == None):
return json.dumps({'success':'false', 'message':'clustername is null'})
imagename = form.get("image", None)
description = form.get("description", None)
containername = form.get("containername", None)
isforce = form.get("isforce", None)
if not isforce == "true":
[status,message] = G_vclustermgr.image_check(user,imagename)
if not status:
return json.dumps({'success':'false','reason':'exists', 'message':message})
user_info = post_to_user("/user/selfQuery/", {'token':form.get("token")})
[status,message] = G_vclustermgr.create_image(user,clustername,containername,imagename,description,user_info["data"]["groupinfo"]["image"])
if status:
logger.info("image has been saved")
return json.dumps({'success':'true', 'action':'save'})
else:
logger.debug(message)
return json.dumps({'success':'false', 'reason':'exceed', 'message':message})
def process_item(self, item, spider):
#?item???dict?????json???false??????
lines = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(lines)
#process_item???return item??????pipeline????
return item
#?spider?????
def _submitSchema(self):
jobIdMap = {}
for p in self._schema["pipelines"]: # Add all jobs to the jobs table
jobIdMap[p["name"]] = self._pipelineDbUtils.insertJob(None, None, p["name"], p["tag"], None, 0,
p["request"]["pipelineArgs"]["logging"]["gcsPath"],
None, None, None, None, None,
json.dumps(p["request"]))
for p in self._schema["pipelines"]: # Add dependency info to the job dependency table
if "children" in p.keys() and len(p["children"]) > 0:
for c in p["children"]:
parentId = jobIdMap[p["name"]]
childId = jobIdMap[c]
self._pipelineDbUtils.insertJobDependency(parentId, childId)
for p in self._schema["pipelines"]: # schedule pipelines
parents = self._pipelineDbUtils.getParentJobs(jobIdMap[p["name"]])
self._pipelineDbUtils.updateJob(jobIdMap[p["name"]], setValues={"current_status": "WAITING"},
keyName="job_id")
# if the job is a root job, send the job request to the queue
msg = {
"job_id": jobIdMap[p["name"]],
"request": p["request"]
}
#pprint.pprint(msg)
if len(parents) == 0:
self._pipelineQueueUtils.publish(json.dumps(msg))
def stopPipeline(args, config):
pipelineQueueUtils = PipelineQueueUtils('CANCEL_Q')
pipelineDbUtils = PipelineDbUtils(config)
if args.jobId:
jobInfo = pipelineDbUtils.getJobInfo(select=["current_status", "operation_id", "job_id"],
where={"job_id": args.jobId})
elif args.pipeline:
jobInfo = pipelineDbUtils.getJobInfo(select=["current_status", "operation_id", "job_id"],
where={"pipeline_name": args.pipeline})
elif args.tag:
jobInfo = pipelineDbUtils.getJobInfo(select=["current_status", "operation_id", "job_id"],
where={"tag": args.tag})
for j in jobInfo:
if j.current_status == "RUNNING":
msg = {
"job_id": j.job_id,
"operation_id": j.operation_id
}
pipelineQueueUtils.publish(json.dumps(msg))
def editPipeline(args, config):
pipelineDbUtils = PipelineDbUtils(config)
request = json.loads(pipelineDbUtils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)
_, tmp = mkstemp()
with open(tmp, 'w') as f:
f.write("{data}".format(data=json.dumps(request, indent=4)))
if "EDITOR" in os.environ.keys():
editor = os.environ["EDITOR"]
else:
editor = "/usr/bin/nano"
if subprocess.call([editor, tmp]) == 0:
with open(tmp, 'r') as f:
request = json.load(f)
pipelineDbUtils.updateJob(args.jobId, keyName="job_id", setValues={"request": json.dumps(request)})
else:
print "ERROR: there was a problem editing the request"
exit(-1)
def unset(self, key):
"""
Remove a key from the database entirely.
"""
self.cursor.execute('delete from kv where key=?', [key])
if self.revision and self.cursor.rowcount:
self.cursor.execute(
'insert into kv_revisions values (?, ?, ?)',
[key, self.revision, json.dumps('DELETED')])
def json(self):
"""Serialize the object to json"""
return json.dumps(self.data)
def request(self):
return json.dumps({'api-version': self.api_version, 'ops': self.ops,
'request-id': self.request_id})
def _explore(offset, time_type):
url = '/node/ExploreAnswerListV2'
params = {
'params': json.dumps({
'offset': offset,
'type': time_type
})
}
d = req.get(url, params)
return parser.explore(d)
def send_payload(self, payload):
self.send(json.dumps(payload))
def pretty_print(response):
try:
print(json.dumps(response.json(), indent=4, sort_keys=True))
except TypeError:
print("Error: " + response.content.decode('UTF-8'))
def make_json(self):
d = {'first_name': self.first_name,
'last_name': self.last_name,
'url': self.url,
'quality': self.overall_quality,
'difficulty': self.level_of_difficulty,
'tags': self.tags}
return json.dumps(d)
def get_devices_list(self):
# (url, access_token, api_token) = self.get_api_conf("conf/stf.conf", "renguoliang")
api_url = self.url + "/api/v1/devices"
token = self.access_token + " " + self.api_token
# ??STF?API???????????json??
try:
headers = {"Authorization": token}
req = requests.get(api_url, headers=headers)
# print req.text.encode('utf-8')
req_dict = json.loads(json.dumps(req.json(), ensure_ascii=False, encoding='utf-8'))
except Exception, e:
print("Error: " + str(e))
sys.exit(-1)
device_list = req_dict["devices"]
total_devices_num = len(device_list)
device_status_list = []
# ????????????stf_status.mmap????STF??
for device in device_list:
if device['present']:
if device['status'] == 3:
if device['ready']:
device_status_list.append(
{'serial': device['serial'].encode('utf-8'),
# ws://10.60.114.29:7548
'display_url': device['display']['url'].encode('utf-8'),
'manufacturer': device['manufacturer'].encode('utf-8'),
'using': device['using'],
'owner': device['owner'],
'model': device['model'].encode('utf-8'),
'version': device['version'].encode('utf-8'),
'apilevel': device['sdk'].encode('utf-8')})
return device_status_list
def publish(self, payload):
if payload is None:
return
if self.config["publish_changes_only"] is False or payload != self.last_payload:
try:
self.gateway.publish(self.topic, json.dumps(payload))
except Exception as e:
import sys
sys.print_exception(e)
self.last_payload = payload
def do_message_callback(self, b_topic, payload):
topic = b_topic.decode() #convert to string
Util.log(self,"received: topic '{}' payload: '{}'".format(topic,payload))
if topic == self.exit_topic:
raise ExitGatewayException()
for device in self.devices:
if device.do_message(topic, payload):
# Util.log(self,"consumed: topic '{}' payload: '{}' by device {}".format(topic,payload,json.dumps(device.config)))
break