def restartJobs(args, config): # TODO: reimplement
pipelineDbUtils = PipelineDbUtils(config)
pipelineQueueUtils = PipelineQueueUtils('WAIT_Q')
if args.jobId:
request = json.loads(pipelineDbUtils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)
msg = {
"job_id": args.jobId,
"request": request
}
pipelineQueueUtils.publish(json.dumps(msg))
if args.preempted:
preempted = pipelineDbUtils.getJobInfo(select=["job_id", "request"], where={"current_status": "PREEMPTED"})
for p in preempted:
msg = {
"job_id": p.job_id,
"request": json.loads(p.request)
}
pipelineQueueUtils.publish(json.dumps(msg))
python类loads()的实例源码
def fetch_data():
try:
r = requests.get(MTG_JSON_URL)
except requests.ConnectionError:
r = requests.get(FALLBACK_MTG_JSON_URL)
with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
unzipped_files = archive.infolist()
if len(unzipped_files) != 1:
raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
data = archive.read(archive.infolist()[0])
decoded_data = data.decode('utf-8')
sets_data = json.loads(decoded_data)
return sets_data
def main():
for url in url_list :
try:
r = requests.get(url)
except : continue
tree = html.fromstring(r.text)
script = tree.xpath('//script[@language="javascript"]/text()')[0]
json_string = regex.findall(script)[0]
json_data = json.loads(json_string)
next_page_url = tree.xpath('//footer/a/@href')
links = [domain + x['nodeRef'] for x in json_data]
for link in links:
extract(link)
def get_transform_specs_json_by_project(self):
"""get transform_specs driver table info."""
transform_specs_json = """
{"aggregation_params_map":{
"aggregation_pipeline":{"source":"streaming",
"usage":"fetch_quantity",
"setters":["rollup_quantity",
"set_aggregated_metric_name",
"set_aggregated_period"],
"insert":["prepare_data",
"insert_data"]},
"aggregated_metric_name": "vcpus_agg",
"aggregation_period": "hourly",
"aggregation_group_by_list": ["host", "metric_id", "tenant_id"],
"usage_fetch_operation": "latest",
"setter_rollup_group_by_list": ["tenant_id"],
"setter_rollup_operation": "sum",
"dimension_list":["aggregation_period",
"host",
"project_id"]
},
"metric_group":"vcpus_project",
"metric_id":"vcpus_project"}"""
return [json.loads(transform_specs_json)]
def parse_answer(self, reponse):
# ??question?answer
ans_json = json.loads(reponse.text)
is_end = ans_json["paging"]["is_end"]
next_url = ans_json["paging"]["next"]
# ??answer?????
for answer in ans_json["data"]:
answer_item = ZhihuAnswerItem()
answer_item["zhihu_id"] = answer["id"]
answer_item["url"] = answer["url"]
answer_item["question_id"] = answer["question"]["id"]
answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
answer_item["content"] = answer["content"] if "content" in answer else None
answer_item["parise_num"] = answer["voteup_count"]
answer_item["comments_num"] = answer["comment_count"]
answer_item["create_time"] = answer["created_time"]
answer_item["update_time"] = answer["updated_time"]
answer_item["crawl_time"] = datetime.datetime.now()
yield answer_item
if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
def watchJob(jobId, exchangeName):
queue = PipelineQueue('PIPELINE_JOB_{j}'.format(j=jobId))
queue.bindToExchange(exchangeName, jobId)
while True:
body, method = queue.get()
if method:
body = json.loads(body)
if body["current_status"] == "SUCCEEDED":
return jobId
else:
raise PipelineServiceError("Job {j} has current status {s}!".format(j=jobId, s=body["current_status"]))
else:
pass
def tba_get(self, path):
"""Base method for querying the TBA API. Returns the response JSON as a python dict.
:param path: (str) Request path, without the API address prefix (https://www.thebluealliance.com/api/v2/)
:return: A dict parsed from the response from the API.
"""
if self.app_id['X-TBA-App-Id'] == "":
raise Exception('An API key is required for TBA. Please use set_api_key() to set one.')
url_str = 'https://www.thebluealliance.com/api/v2/' + path
r = self.session.get(url_str, headers=self.app_id)
# print(r.url)
tba_txt = r.text
try:
return json.loads(tba_txt)
except json.JSONDecodeError:
print(url_str)
print(tba_txt)
def get_gw_interfaces():
'''
Gateway node can have multiple interfaces. This function parses json
provided in config to get all gateway interfaces for this node.
'''
node_interfaces = []
try:
all_interfaces = json.loads(config('external-interfaces'))
except ValueError:
raise ValueError("Invalid json provided for gateway interfaces")
hostname = get_unit_hostname()
if hostname in all_interfaces:
node_interfaces = all_interfaces[hostname].split(',')
elif 'DEFAULT' in all_interfaces:
node_interfaces = all_interfaces['DEFAULT'].split(',')
for interface in node_interfaces:
if not interface_exists(interface):
log('Provided gateway interface %s does not exist'
% interface)
raise ValueError('Provided gateway interface does not exist')
return node_interfaces
def getrange(self, key_prefix, strip=False):
"""
Get a range of keys starting with a common prefix as a mapping of
keys to values.
:param str key_prefix: Common prefix among all keys
:param bool strip: Optionally strip the common prefix from the key
names in the returned dict
:return dict: A (possibly empty) dict of key-value mappings
"""
self.cursor.execute("select key, data from kv where key like ?",
['%s%%' % key_prefix])
result = self.cursor.fetchall()
if not result:
return {}
if not strip:
key_prefix = ''
return dict([
(k[len(key_prefix):], json.loads(v)) for k, v in result])
def relation_get(attribute=None, unit=None, rid=None):
"""Get relation information"""
_args = ['relation-get', '--format=json']
if rid:
_args.append('-r')
_args.append(rid)
_args.append(attribute or '-')
if unit:
_args.append(unit)
try:
return json.loads(subprocess.check_output(_args).decode('UTF-8'))
except ValueError:
return None
except CalledProcessError as e:
if e.returncode == 2:
return None
raise
def status_get():
"""Retrieve the previously set juju workload state and message
If the status-get command is not found then assume this is juju < 1.23 and
return 'unknown', ""
"""
cmd = ['status-get', "--format=json", "--include-data"]
try:
raw_status = subprocess.check_output(cmd)
except OSError as e:
if e.errno == errno.ENOENT:
return ('unknown', "")
else:
raise
else:
status = json.loads(raw_status.decode("UTF-8"))
return (status["status"], status["message"])
def get_cache_mode(service, pool_name):
"""
Find the current caching mode of the pool_name given.
:param service: six.string_types. The Ceph user name to run the command under
:param pool_name: six.string_types
:return: int or None
"""
validator(value=service, valid_type=six.string_types)
validator(value=pool_name, valid_type=six.string_types)
out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json'])
try:
osd_json = json.loads(out)
for pool in osd_json['pools']:
if pool['pool_name'] == pool_name:
return pool['cache_mode']
return None
except ValueError:
raise
def get_rmq_cluster_running_nodes(self, sentry_unit):
"""Parse rabbitmqctl cluster_status output string, return list of
running rabbitmq cluster nodes.
:param unit: sentry unit
:returns: List containing node names of running nodes
"""
# NOTE(beisner): rabbitmqctl cluster_status output is not
# json-parsable, do string chop foo, then json.loads that.
str_stat = self.get_rmq_cluster_status(sentry_unit)
if 'running_nodes' in str_stat:
pos_start = str_stat.find("{running_nodes,") + 15
pos_end = str_stat.find("]},", pos_start) + 1
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
run_nodes = json.loads(str_run_nodes)
return run_nodes
else:
return []
def run_action(self, unit_sentry, action,
_check_output=subprocess.check_output,
params=None):
"""Run the named action on a given unit sentry.
params a dict of parameters to use
_check_output parameter is used for dependency injection.
@return action_id.
"""
unit_id = unit_sentry.info["unit_name"]
command = ["juju", "action", "do", "--format=json", unit_id, action]
if params is not None:
for key, value in params.iteritems():
command.append("{}={}".format(key, value))
self.log.info("Running command: %s\n" % " ".join(command))
output = _check_output(command, universal_newlines=True)
data = json.loads(output)
action_id = data[u'Action queued with id']
return action_id
def test_view_responds_stats_on(self):
self.get(NODEINFO_DOCUMENT_PATH)
self.response_200()
self.assertEqual(
json.loads(decode_if_bytes(self.last_response.content))["usage"],
{
"users": {
"total": User.objects.count(),
"activeHalfyear": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=180)).count(),
"activeMonth": User.objects.filter(last_login__gte=now() - datetime.timedelta(days=30)).count(),
},
"localPosts": Content.objects.filter(
author__user__isnull=False, content_type=ContentType.CONTENT).count(),
"localComments": Content.objects.filter(
author__user__isnull=False, content_type=ContentType.REPLY).count(),
}
)
def pull_user_data(session):
print 'pulling users'
user_data = requests.get(u"{}{}".format(config.prod_url, 'export/users'))
loaded_data = json.loads(user_data.text)
for user_dict in loaded_data:
user = User(
id=user_dict['id'],
name=user_dict['name'],
email=user_dict['email'],
admin=user_dict['admin'],
avatar=user_dict['avatar'],
active=user_dict['active'],
created_at=user_dict['created_at'],
elo=user_dict['elo'],
wins=user_dict['wins'],
losses=user_dict['losses']
)
session.add(user)
session.commit()
print 'done pulling users'
def pull_game_data(session):
print 'pulling games'
game_data = requests.get(u"{}{}".format(config.prod_url, 'export/games'))
loaded_data = json.loads(game_data.text)
for game_dict in loaded_data:
game = Game(
id=game_dict['id'],
created_at=game_dict['created_at'],
deleted_at=game_dict['deleted_at'],
winner_id=game_dict['winner_id'],
winner_elo_score=game_dict['winner_elo_score'],
loser_id=game_dict['loser_id'],
loser_elo_score=game_dict['loser_elo_score'],
submitted_by_id=game_dict['submitted_by_id']
)
session.add(game)
session.commit()
print 'done pulling games'
def generate(self, template_path, source_json_path, output_path):
print("Generating content at %s with template at %s, using key %s" % (
output_path, template_path, self.key_name))
data = []
with open(source_json_path) as f:
for line in f:
json_line = json.loads(line)
data_line = '(\'%s\',\n\'%s\')' % (
json_line[self.key_name], json.dumps(json_line))
data.append(str(data_line))
print(data)
with open(template_path) as f:
template = f.read()
with open(output_path, 'w') as write_file:
write_file.write(template)
for record in data:
write_file.write(record)
write_file.write(',\n')
write_file.seek(-2, 1)
write_file.truncate()
write_file.write(';')
def get_transform_specs_json_by_all(self):
"""get transform_specs driver table info."""
transform_specs_json = """
{"aggregation_params_map":{
"aggregation_pipeline":{"source":"streaming",
"usage":"fetch_quantity",
"setters":["rollup_quantity",
"set_aggregated_metric_name",
"set_aggregated_period"],
"insert":["prepare_data",
"insert_data"]},
"aggregated_metric_name": "vcpus_agg",
"aggregation_period": "hourly",
"aggregation_group_by_list": ["host", "metric_id"],
"usage_fetch_operation": "latest",
"setter_rollup_group_by_list": [],
"setter_rollup_operation": "sum",
"dimension_list":["aggregation_period",
"host",
"project_id"]
},
"metric_group":"vcpus_all",
"metric_id":"vcpus_all"}"""
return [json.loads(transform_specs_json)]
test_fetch_quantity_util_agg_second_stage.py 文件源码
项目:monasca-transform
作者: openstack
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def get_pre_transform_specs_json(self):
"""get pre_transform_specs driver table info."""
pre_transform_specs = ["""
{"event_processing_params":{"set_default_zone_to":"1",
"set_default_geolocation_to":"1",
"set_default_region_to":"W"},
"event_type":"cpu.total_logical_cores",
"metric_id_list":["cpu_util_all"],
"required_raw_fields_list":["creation_time"],
"service_id":"host_metrics"}""", """
{"event_processing_params":{"set_default_zone_to":"1",
"set_default_geolocation_to":"1",
"set_default_region_to":"W"},
"event_type":"cpu.idle_perc",
"metric_id_list":["cpu_util_all"],
"required_raw_fields_list":["creation_time"],
"service_id":"host_metrics"}"""]
pre_transform_specs_json_list = \
[json.loads(pre_transform_spec)
for pre_transform_spec in pre_transform_specs]
return pre_transform_specs_json_list
def get_transform_specs_json_invalid_name(self):
"""get transform_specs driver table info."""
transform_specs_json = """
{"aggregation_params_map":{
"aggregation_pipeline":{"source":"streaming",
"usage":"fetch_quantity",
"setters":["rollup_quantity",
"set_aggregated_metric_name",
"set_aggregated_period"],
"insert":["prepare_data",
"insert_data"]},
"aggregated_metric_name": "&invalidmetricname",
"aggregation_period": "hourly",
"aggregation_group_by_list": ["host", "metric_id"],
"usage_fetch_operation": "sum",
"setter_rollup_group_by_list": ["host"],
"setter_rollup_operation": "sum",
"dimension_list":["aggregation_period",
"host",
"project_id"]
},
"metric_group":"mem_total_all",
"metric_id":"mem_total_all"}"""
return [json.loads(transform_specs_json)]
test_fetch_quantity_util_agg.py 文件源码
项目:monasca-transform
作者: openstack
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def get_pre_transform_specs_json(self):
"""get pre_transform_specs driver table info."""
pre_transform_specs = ["""
{"event_processing_params":{"set_default_zone_to":"1",
"set_default_geolocation_to":"1",
"set_default_region_to":"W"},
"event_type":"cpu.total_logical_cores",
"metric_id_list":["cpu_util_all"],
"required_raw_fields_list":["creation_time"],
"service_id":"host_metrics"}""", """
{"event_processing_params":{"set_default_zone_to":"1",
"set_default_geolocation_to":"1",
"set_default_region_to":"W"},
"event_type":"cpu.idle_perc",
"metric_id_list":["cpu_util_all"],
"required_raw_fields_list":["creation_time"],
"service_id":"host_metrics"}"""]
pre_transform_specs_json_list = \
[json.loads(pre_transform_spec)
for pre_transform_spec in pre_transform_specs]
return pre_transform_specs_json_list
def test_error_response(self):
self.assertEqual(self.analyzer.get_param('config.password'), "secret")
self.assertEqual(self.analyzer.get_param('config.key'), "secret")
self.assertEqual(self.analyzer.get_param('config.apikey'), "secret")
self.assertEqual(self.analyzer.get_param('config.api_key'), "secret")
# Run the error method
with self.assertRaises(SystemExit):
self.analyzer.error('Error', True)
# Get the output
output = self.analyzer.fpoutput.getvalue().strip()
json_output = json.loads(output)
self.assertEqual(json_output['success'], False)
self.assertEqual(json_output['errorMessage'], 'Error')
self.assertEqual(json_output['input']['dataType'], 'ip')
self.assertEqual(json_output['input']['data'], '1.1.1.1')
self.assertEqual(json_output['input']['config']['password'], 'REMOVED')
self.assertEqual(json_output['input']['config']['key'], 'REMOVED')
self.assertEqual(json_output['input']['config']['apikey'], 'REMOVED')
self.assertEqual(json_output['input']['config']['api_key'], 'REMOVED')
def get_sample(self, samplehash):
"""
Downloads information about a sample using a given hash.
:param samplehash: hash to search for. Has to be either md5, sha1 or sha256
:type samplehash: str
:returns: Dictionary of results
:rtype: dict
"""
apiurl = '/rest/sample/'
if len(samplehash) == 32: # MD5
apiurl += 'md5/'
elif len(samplehash) == 40: # SHA1
apiurl += 'sha1/'
elif len(samplehash) == 64: # SHA256
apiurl += 'sha256/'
else:
raise UnknownHashTypeError('Sample hash has an unknown length.')
res = self.session.get(self.url + apiurl + samplehash)
if res.status_code == 200:
return json.loads(res.text)
else:
raise BadResponseError('Response from VMRay was not HTTP 200.'
' Responsecode: {}; Text: {}'.format(res.status_code, res.text))
def query_job_status(self, submissionid):
"""
Queries vmray to check id a job was
:param submissionid: ID of the job/submission
:type submissionid: int
:returns: True if job finished, false if not
:rtype: bool
"""
apiurl = '/rest/submission/'
result = self.session.get('{}{}{}'.format(self.url, apiurl, submissionid))
if result.status_code == 200:
submission_info = json.loads(result.text)
if submission_info.get('data', {}).get('submission_finished', False): # Or something like that
return True
else:
raise UnknownSubmissionIdError('Submission id seems invalid, response was not HTTP 200.')
return False
def __query_safebrowsing(self, search_value, search_type):
"""
The actual query to safebrowsing api
:param search_value: value to search for
:type search_value: str
:param search_type: 'url' or 'ip'
:type search_type: str
:return: Results
:rtype: str
"""
return json.loads(
self.session.post(
self.url,
json=self.__prepare_body(
search_value=search_value,
search_type=search_type
)
).text
)
def run(self):
data = self.getData()
value = {
data: {
"type": self.data_type
}
}
json_data = json.dumps(value)
post_data = json_data.encode('utf-8')
headers = {'Content-Type': 'application/json'}
try:
request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers)
response = urllib2.urlopen(request)
report = json.loads(response.read())
self.report(report)
except urllib2.HTTPError:
self.error("Hippocampe: " + str(sys.exc_info()[1]))
except urllib2.URLError:
self.error("Hippocampe: service is not available")
except Exception as e:
self.unexpectedError(e)
def logic(data):
# ????????
#print data
f1 = open('/tmp/test.txt','a')
f1.write(data)
f1.write('\n')
f1.close()
data = json.loads(data)
print type(data)
mounts = data["MOUNT"]
netifaces = data["NET"]
for netif in netifaces:
if netif["status"] == "up":
ip = netif["ip"]
#print netifaces['ip']
for mount in mounts:
Mount_point= mount['path']
use_rate=mount['used_rate']
if use_rate > '70':
send_mail(recvmail_conf['addr'],"mount point"+Mount_point ,used_rate)
return("OK")
def _reply(self, json_reply):
"""
Handle a reply that came in over the transport provided
"""
if not json_reply.startswith('{'):
self.sdata.log('Received non-JSON data: "{}"'.format(json_reply))
return
reply = json.loads(json_reply, object_pairs_hook=OrderedDict)
if reply['jsonrpc'] != '2.0' or 'id' not in reply or reply['id'] is None:
self.sdata.log('Received bad JSON-RPC reply: {}'.format(json_reply))
if len(self.pending_reply_map) == 1:
# lucky! can guess a pending reply to kill
this_id = self.pending_reply_map.keys()[0]
d = self.pending_reply_map[this_id]
del self.pending_reply_map[this_id]
e = JsonRpcException('Bad reply: {}'.format(json_reply))
d.errback(e)
return
this_id = int(reply['id'])
if 'method' in reply and this_id in self.pending_reply_map:
self.sdata.log('Got echo of request for {}, ignoring'.format(this_id))
else:
d = self.pending_reply_map[this_id]
del self.pending_reply_map[this_id]
d.callback(reply)
def create_cd(col_name, type, display_name):
url = 'https://api.kentik.com/api/v5/customdimension'
json_template = '''
{
"name": "{{ column }}",
"type": "{{ data_type }}",
"display_name": "{{ pretty_name }}"
}
'''
t = Template(json_template)
data = json.loads(t.render(column = col_name, data_type = type, pretty_name = display_name))
response = requests.post(url, headers=headers, data=data)
if response.status_code != 201:
print("Unable to create custom dimension column. Exiting.")
print("Status code: {}").format(response.status_code)
print("Error message: {}").format(response.json()['error'])
exit()
else:
print("Custom dimension \"{}\" created as id: {}").format(display_name, \
response.json()['customDimension']['id'])
return(response.json()['customDimension']['id'])