def _update_json_file(data_dir, data, model_str):
if len(model_str) > 100:
model_str = model_str[:100]
model_dir = os.path.abspath(os.path.join(data_dir, data['model']))
_ensure_dirs(model_dir)
record_path = os.path.abspath(os.path.join(
model_dir, '%s_%s.json' % (data['id'], model_str)))
updated_paths = [record_path]
# Name changed
if not os.path.exists(record_path):
for root, dirs, files in os.walk(model_dir):
for file in files:
if file.startswith(data['id']):
print(file)
old = os.path.abspath(os.path.join(root, file))
os.rename(old, record_path)
updated_paths.append(old)
with open(record_path, 'w') as f:
json.dump(data, f, indent=4, sort_keys=True)
return updated_paths
python类dump()的实例源码
def wait_for_processing(self, task):
"""Wait for any background processing threads to finish"""
if self.video_processing is not None:
logging.debug('Waiting for video processing to finish')
self.video_processing.communicate()
self.video_processing = None
if not self.job['keepvideo']:
try:
os.remove(task['video_file'])
except Exception:
pass
opt = None
if self.optimization is not None:
opt = self.optimization.join()
if self.wpt_result is not None:
self.process_optimization_results(self.wpt_result['pageData'],
self.wpt_result['requests'], opt)
if self.path_base is not None:
devtools_file = self.path_base + '_devtools_requests.json.gz'
with gzip.open(devtools_file, 'wb', 7) as f_out:
json.dump(self.wpt_result, f_out)
def convert_image(options):
print 'loading cache'
options['cache'] = json.load(open('color_cache.json'))
print 'beginning conversion'
im = Image.open( options['filename'] )
o = convert_frame(im, options)
o += ANSI_RESET + '\n\n'
# Save to ANSI file and add SAUCE record
if options['output_file'] is not sys.stdout:
save_frame(o,options)
add_sauce(o,options)
# Output to console (unicode)
else:
print_frame(o,options)
# json.dump(options['cache'], open('color_cache.json','w'))
# MAIN FUNCTION
def refresh(self):
self.status_file = self.path / "status.json"
self.status = (
{} if not self.status_file.exists() else
json.loads(self.status_file.open().read())
)
for files in self.iter_file_lists():
for file_obj in files:
self.archive.downloader.add_file(file_obj)
output_dir = self.path / ts2ymd(file_obj["created"])
if not output_dir.exists():
output_dir.mkdir()
output_file = output_dir / (file_obj["id"] + ".json")
with open_atomic(str(output_file)) as f:
json.dump(file_obj, f)
def query(self, *args, onlyCached=False, **kwargs):
queryString, hashString = self._queryString(*args, **kwargs)
filename = self.__cacheDir + '/' + self._prefix + '-' + self.__hash(hashString)
if not os.path.exists(self.__cacheDir):
os.makedirs(self.__cacheDir)
if os.path.exists(filename):
with open(filename, 'r') as file:
data = ujson.load(file)
elif onlyCached:
print('[' + self._prefix + '] data not cached: ' + queryString)
return None
else:
print('[' + self._prefix + '] downloading data: ' + queryString)
if self._waitForReady() == None:
if self.__lastQuery and self.__waitBetweenQueries and time.time() - self.__lastQuery < self.__waitBetweenQueries:
time.sleep(self.__waitBetweenQueries - time.time() + self.__lastQuery)
self.__lastQuery = time.time()
data = self.__query(queryString)
with open(filename, 'w') as file:
ujson.dump(data, file)
result = self._rawToResult(data, queryString)
if not self._isValid(result):
raise(Exception('[' + self._prefix + '] error in result (' + filename + '): ' + queryString))
return result
def write(self):
"""Write out the resulting json data"""
if self.out_file is not None and len(self.result['pageData']) and \
len(self.result['requests']):
try:
_, ext = os.path.splitext(self.out_file)
if ext.lower() == '.gz':
with gzip.open(self.out_file, 'wb') as f_out:
json.dump(self.result, f_out)
else:
with open(self.out_file, 'w') as f_out:
json.dump(self.result, f_out)
except Exception:
logging.critical("Error writing to " + self.out_file)
def save(self):
"""Save a snapshot of the dataset file in JSON-Line format."""
with storage.open(self.path, 'w', self._compression_level) as fp:
for dp in self._datapoints_query:
json.dump((dp.pid, dp.json), fp)
fp.write('\n')
def write_json_file(filename, path, result):
"""Writes the result to json with the given filename.
Args:
filename (str): Filename to write to.
path (str): Directory path to use.
"""
with open(path + filename + ".json", "w+") as json_file:
ujson.dump(result, json_file)
json_file.close()
def write_json_rows(rows, docfilepath, encode_html=True):
""" Note that we are appending"""
with open(docfilepath, 'a') as fp:
for row in rows:
ujson.dump(row, fp, encode_html_chars=encode_html)
fp.write('\n')
#-----------------------------------------------------------
#-----------------------------------------------------------
def export_data(data, output_file, format='json', compress=False):
"""Writes and optionally compresses Mixpanel data to disk in json or csv format
:param data: A list of Mixpanel events or People profiles, if format='json', arbitrary json can be exported
:param output_file: Name of file to write to
:param format: Output format can be 'json' or 'csv' (Default value = 'json')
:param compress: Option to gzip output (Default value = False)
:type data: list
:type output_file: str
:type format: str
:type compress: bool
"""
with open(output_file, 'w+') as output:
if format == 'json':
json.dump(data, output)
elif format == 'csv':
Mixpanel._write_items_to_csv(data, output_file)
else:
msg = "Invalid format - must be 'json' or 'csv': format = " + str(format) + '\n' \
+ "Dumping json to " + output_file
Mixpanel.LOGGER.warning(msg)
json.dump(data, output)
if compress:
Mixpanel._gzip_file(output_file)
os.remove(output_file)
def _prep_event_for_import(event, token, timezone_offset):
"""Takes an event dict and modifies it to meet the Mixpanel /import HTTP spec or dumps it to disk if it is invalid
:param event: A Mixpanel event dict
:param token: A Mixpanel project token
:param timezone_offset: UTC offset (number of hours) of the timezone setting for the project that exported the
data. Needed to convert the timestamp back to UTC prior to import.
:type event: dict
:type token: str
:type timezone_offset: int | float
:return: Mixpanel event dict with token added and timestamp adjusted to UTC
:rtype: dict
"""
# The /import API requires a 'time' and 'distinct_id' property, if either of those are missing we dump that
# event to a log of invalid events and return
if ('time' not in event['properties']) or ('distinct_id' not in event['properties']):
Mixpanel.LOGGER.warning('Event missing time or distinct_id property, dumping to invalid_events.txt')
with open('invalid_events.txt', 'a') as invalid:
json.dump(event, invalid)
invalid.write('\n')
return
event_copy = deepcopy(event)
# transforms timestamp to UTC
event_copy['properties']['time'] = int(int(event['properties']['time']) - (timezone_offset * 3600))
event_copy['properties']['token'] = token
return event_copy
def _create_json_file(data_dir, data, model_str):
if len(model_str) > 100:
model_str = model_str[:100]
model_dir = os.path.abspath(os.path.join(data_dir, data['model']))
_ensure_dirs(model_dir)
record_path = os.path.abspath(os.path.join(
model_dir, '%s_%s.json' % (data['id'], model_str)))
with open(record_path, 'w') as f:
json.dump(data, f, indent=4, sort_keys=True)
return [record_path]
def _update_fs(items, data_dir, create=False, update=False, delete=False):
"""
CREATE OPERATION HANDLING IS CONCERNED ONLY ABOUT ROOT models
NON ROOT MODEL ITEMS ARE IGNORED
We don't dump non-root models, since they belong to one root model
any way and there data will be saved as part of a root model.
example, if user created a task (non root model) and assigned it to contact
cached data will look like
{'changes': {'updated': IdentitySet([<crm.contact.models.Contact object at 0x7f5748a1f5c0>]),
'deleted': IdentitySet([]),
'created': IdentitySet([<crm.task.models.Task object at 0x7f574929dba8>])}}
so Contact object has been updated any way and we'll handle that update add the task in contact data
in File system.
:param items: list of model dictionaries that was created
:param data_dir: where to save files
:return: newly created file paths
:rtype: list
"""
all_paths = []
for item in items:
data = item['data']
if not _is_root_model(data['model']):
continue
if create:
paths = _create_json_file(data_dir, data, item['obj_as_str'])
elif update:
paths = _update_json_file(data_dir, data, item['obj_as_str'])
elif delete:
paths = _delete_json_file(data_dir, data, item['obj_as_str'])
all_paths.extend(paths)
return all_paths
def dumpdata():
"""
Dump data table models into filesystem.
Only Root models are dumped
'Company', 'Contact', 'Deal',
'Sprint', 'Project', 'Organization','User'
"""
data_dir = app.config["DATA_DIR"]
if not os.path.exists(data_dir):
os.makedirs(data_dir)
for model in RootModel.__subclasses__():
model_dir = os.path.abspath(os.path.join(data_dir, model.__name__))
if not os.path.exists(model_dir):
os.mkdir(model_dir)
for obj in model.query.all():
obj_as_str = str(obj).replace('/', '_')
if len(obj_as_str) > 100:
obj_as_str = obj_as_str[:100]
record_path = os.path.abspath(os.path.join(
model_dir, '%s_%s.json' % (obj.id, obj_as_str)))
data = obj.as_dict()
with open(record_path, 'w') as f:
json.dump(data, f, indent=4, sort_keys=True)
def write(obj, fn):
with open(fn, "w") as f:
json.dump(obj, f, indent=4)
def process_requests(self, request_timings, task):
"""Convert all of the request and page events into the format needed for WPT"""
result = {}
result['requests'] = self.merge_requests(request_timings)
result['pageData'] = self.calculate_page_stats(result['requests'])
devtools_file = os.path.join(task['dir'], task['prefix'] + '_devtools_requests.json.gz')
with gzip.open(devtools_file, 'wb', 7) as f_out:
json.dump(result, f_out)
def main():
""" Main entry-point when running on the command-line"""
import argparse
parser = argparse.ArgumentParser(description='Chrome trace parser.',
prog='trace-parser')
parser.add_argument('-v', '--verbose', action='count',
help="Increase verbosity (specify multiple times for more)" \
". -vvvv for full debug output.")
parser.add_argument('-l', '--logfile', help="File name for the mozilla log.")
parser.add_argument('-s', '--start',
help="Start Time in UTC with microseconds YYYY-MM-DD HH:MM:SS.xxxxxx.")
parser.add_argument('-o', '--out', help="Output requests json file.")
options, _ = parser.parse_known_args()
# Set up logging
log_level = logging.CRITICAL
if options.verbose == 1:
log_level = logging.ERROR
elif options.verbose == 2:
log_level = logging.WARNING
elif options.verbose == 3:
log_level = logging.INFO
elif options.verbose >= 4:
log_level = logging.DEBUG
logging.basicConfig(
level=log_level, format="%(asctime)s.%(msecs)03d - %(message)s", datefmt="%H:%M:%S")
if not options.logfile or not options.start:
parser.error("Input devtools file or start time is not specified.")
parser = FirefoxLogParser()
requests = parser.process_logs(options.logfile, options.start)
if options.out:
with open(options.out, 'w') as f_out:
json.dump(requests, f_out, indent=4)
def write_json(self, out_file, json_data):
"""Write out one of the internal structures as a json blob"""
try:
_, ext = os.path.splitext(out_file)
if ext.lower() == '.gz':
with gzip.open(out_file, 'wb') as f:
json.dump(json_data, f)
else:
with open(out_file, 'w') as f:
json.dump(json_data, f)
except BaseException:
logging.critical("Error writing to " + out_file)
def write(self):
"""Write out the resulting json data"""
if self.out_file is not None and len(self.result['pageData']) and \
len(self.result['requests']):
try:
_, ext = os.path.splitext(self.out_file)
if ext.lower() == '.gz':
with gzip.open(self.out_file, 'wb') as f_out:
json.dump(self.result, f_out)
else:
with open(self.out_file, 'w') as f_out:
json.dump(self.result, f_out)
except Exception:
logging.critical("Error writing to " + self.out_file)
def update_browser_viewport(self, task):
"""Update the browser border size based on the measured viewport"""
if 'actual_viewport' in task and 'width' in task and 'height' in task and \
self.job is not None and 'browser' in self.job:
browser = self.job['browser']
width = max(task['width'] - task['actual_viewport']['width'], 0)
height = max(task['height'] - task['actual_viewport']['height'], 0)
if browser not in self.margins or self.margins[browser]['width'] != width or \
self.margins[browser]['height'] != height:
self.margins[browser] = {"width": width, "height": height}
if not os.path.isdir(self.persistent_dir):
os.makedirs(self.persistent_dir)
margins_file = os.path.join(self.persistent_dir, 'margins.json')
with open(margins_file, 'wb') as f_out:
json.dump(self.margins, f_out)
def process_message(self, msg):
"""Process a message from the browser
https://trac.webkit.org/browser/webkit/trunk/Source/JavaScriptCore/inspector/protocol"""
try:
if 'method' in msg and self.recording:
parts = msg['method'].split('.')
if len(parts) >= 2:
category = parts[0]
event = parts[1]
if category == 'Page':
self.process_page_event(event, msg)
elif category == 'Network':
self.process_network_event(event, msg)
elif category == 'Inspector':
self.process_inspector_event(event)
elif category == 'Timeline':
self.process_timeline_event(event, msg)
elif category == 'Console':
self.process_console_event(event, msg)
except Exception:
pass
if self.timeline and 'method' in msg and self.recording:
json.dump(msg, self.timeline)
self.timeline.write(",\n")
if 'id' in msg:
response_id = int(re.search(r'\d+', str(msg['id'])).group())
if response_id in self.pending_commands:
self.pending_commands.remove(response_id)
self.command_responses[response_id] = msg
def cache_all_colors():
options = {}
options['cache'] = json.load(open('color_cache.json'))
for r in range(255):
for g in range(255):
for b in range(255):
print 'r: ' + str(r) + ' | g: ' + str(g) + ' | b: ' + str(b)
desired_color = {
'r': r,
'g': g,
'b': b,
}
color_id = str(r).zfill(3) + str(g).zfill(3) + str(b).zfill(3)
closest_dist = INFINITY
closest_color_index = 0
for i, block_char in enumerate(ANSI_SHADED_BLOCKS_TO_RGB):
block_char_color = {
'r': int( block_char['r'] ),
'g': int( block_char['g'] ),
'b': int( block_char['b'] ),
}
d = color_distance(block_char_color, desired_color)
if d < closest_dist:
closest_dist = d
closest_color_index = i
# Add this index to our color cache so we don't have to look it up again
options['cache'][color_id] = closest_color_index
json.dump(options['cache'], open('color_cache.json','w'))
#@timing
def set_user_value(self, username, key, value):
data = {key: value}
try:
self[username].update(data)
except KeyError:
self[username] = data
with open(self.file, 'w') as fd:
json.dump(self, fd)
def read_instance_types():
redirects = read_redirects()
subject_object = defaultdict(list)
read_types(redirects, subject_object, 'instance_types_en.ttl')
#read_types(redirects, subject_object, 'instance_types_transitive_en.ttl')
#read_types(redirects, subject_object, 'instance_types_lhd_dbo_en.ttl')
read_types(redirects, subject_object, 'instance_types_sdtyped_dbo_en.ttl')
#read_types(redirects, subject_object, 'instance_types_dbtax_dbo_en.ttl')
print('write json')
with open('instance_types.json', 'w') as outfile:
ujson.dump(subject_object, outfile)
print('finish')
def _write_pending(self):
try:
self.pool.join()
finally:
to_write = list(self.pool.iter_incomplete())
if not to_write:
try:
self.pending_file.unlink()
except OSError:
pass
return
with open_atomic(str(self.pending_file)) as f:
json.dump(to_write, f)
def _refresh_messages(self):
latest_archive = next(self.iter_archives(reverse=True), None)
latest_ts = 0
if latest_archive:
msgs = self.load_messages(latest_archive)
latest_ts = msgs[-1]["ts"] if msgs else 0
slack = getattr(self.slack, self.attr)
while True:
resp = self._get_list(slack, latest_ts)
assert_successful(resp)
msgs = resp.body["messages"]
msgs.sort(key=lambda m: m["ts"])
if msgs and not self.path.exists():
self.path.mkdir()
for day, day_msgs in groupby(msgs, key=lambda m: ts2ymd(m["ts"])):
day_msgs = list(day_msgs)
day_archive = self.path / (day + ".json")
cur = (
self.load_messages(day_archive)
if day_archive.exists() else []
)
cur.extend(day_msgs)
print "%s: %s new messages in %s (saving to %s)" %(
self.pretty_name, len(day_msgs), self.pretty_name, day_archive,
)
for msg in day_msgs:
if "file" in msg or "attachments" in msg:
self.downloader.add_message(msg)
with open_atomic(str(day_archive)) as f:
json.dump(cur, f)
if float(day_msgs[-1]["ts"]) > float(latest_ts):
latest_ts = day_msgs[-1]["ts"]
if not resp.body["has_more"]:
break
def update_status(self, x):
self.status.update(x)
with open_atomic(str(self.status_file)) as f:
json.dump(self.status, f)
def save_name(self, user_data):
data = {
"fbid": user_data.data['fbprofile']['id'],
"showid": user_data.meta['showid'],
"showdate": user_data.meta['showdate'],
}
os.makedirs("./data/" + self.name, exist_ok=True)
filename = "./data/{}/{}.json".format(self.name, user_data.userid)
with open(filename, "w+") as fd:
json.dump(data, fd)
def json_export(self, sourcenode, fname, **kargs):
""" Exports current data for later import.
The exported data is a snapshot of current state.
Args:
fname: File name for the exported data.
sourcenode: Base of sub-tree for export.
None for complete JSON document.
**kargs:
ffs.
Returns:
When successful returns 'True', else returns either 'False',
or raises an exception.
Raises:
JSONDataTargetFile:
"""
if not sourcenode:
sourcenode = self.data
try:
with open(fname, 'w') as fp:
#ret =
myjson.dump(sourcenode, fp)
except Exception as e:
raise JSONDataTargetFile("open-"+str(e),"data.dump",str(fname))
return True
def __repr__(self):
"""Dump data.
"""
# io = StringIO()
# myjson.dump(self.data, io)
# return io.getvalue()
return repr(self.data)