def write_json(self, out_file, json_data):
"""Write out one of the internal structures as a json blob"""
try:
_, ext = os.path.splitext(out_file)
if ext.lower() == '.gz':
with gzip.open(out_file, 'wb') as f:
json.dump(json_data, f)
else:
with open(out_file, 'w') as f:
json.dump(json_data, f)
except BaseException:
logging.critical("Error writing to " + out_file)
python类dump()的实例源码
def _send_batch(self, base_url, endpoint, batch, dataset_id=None, dataset_version=None, retries=0):
"""POST a single batch of data to a Mixpanel API and return the response
:param base_url: The base API url
:param endpoint: Can be 'import', 'engage', 'import-events' or 'import-people'
:param batch: List of Mixpanel event data or People updates to import.
:param dataset_id: Dataset name to import into, required if dataset_version is specified, otherwise optional
:param dataset_version: Dataset version to import into, required if dataset_id is specified, otherwise
optional
:param retries: Max number of times to retry if we get a HTTP 503 response (Default value = 0)
:type base_url: str
:type endpoint: str
:type batch: list
:type dataset_id: str
:type dataset_version: str
:type retries: int
:raise: Raises for any HTTP error other than 503
:return: HTTP response from Mixpanel API
:rtype: str
"""
try:
params = {'data': base64.b64encode(json.dumps(batch))}
if dataset_id:
params['dataset_id'] = dataset_id
params['token'] = self.token
if dataset_version:
params['dataset_version'] = dataset_version
response = self.request(base_url, [endpoint], params, 'POST')
msg = "Sent " + str(len(batch)) + " items on " + time.strftime("%Y-%m-%d %H:%M:%S") + "!"
Mixpanel.LOGGER.debug(msg)
return response
except urllib2.HTTPError as err:
# In the event of a 503 we will try to send again
if err.code == 503:
if retries < self.max_retries:
Mixpanel.LOGGER.warning("HTTP Error 503: Retry #" + str(retries + 1))
self._send_batch(base_url, endpoint, batch, dataset_id=dataset_id,
dataset_version=dataset_version, retries=retries + 1)
else:
Mixpanel.LOGGER.warning("Failed to import batch, dumping to file import_backup.txt")
with open('import_backup.txt', 'a') as backup:
json.dump(batch, backup)
backup.write('\n')
else:
raise
def delete_old_files(self, date, confirm):
date_str = date.strftime("%Y-%m-%d")
dry_run = (
"" if confirm else
" (PREVIEW ONLY; use '--confirm-delete' to actaully delete these files)"
)
print "Deleting files created before %s... %s" %(date_str, dry_run)
def delete_file(x):
file_file, file_obj = x
try:
res = self.slack.files.delete(file_obj["id"])
assert_successful(res)
except Error as e:
print "Error deleting file %r: %s" %(file_obj["id"], e.message)
self._error_count += 1
return
self._deleted_count += 1
file_obj["_wayslack_deleted"] = True
with open_atomic(str(file_file)) as f:
json.dump(file_obj, f)
pool = Threadpool(delete_file, queue_size=1, thread_count=10)
self._deleted_count = 0
self._skipped_count = 0
self._error_count = 0
for dir in self.path.iterdir():
if dir.name >= date_str:
continue
for file_file, file_obj in self._iter_files_in_dir(dir):
if file_obj.get("_wayslack_deleted"):
continue
err, file_path = self.archive.downloader.is_file_missing(file_obj)
if err:
self._skipped_count += 1
if VERBOSE:
print "WARNING: %s: %s" %(
str(file_file),
err,
)
print " File:", file_path
print " URL:", file_obj["url_private"]
continue
self._deleted_count += 1
if confirm:
if (self._deleted_count + self._error_count + self._skipped_count) % 10 == 0:
print self._deleted_msg()
pool.put((file_file, file_obj))
pool.join()
print "Deleted files: %s%s" %(self._deleted_count, dry_run)
if self._skipped_count and self._deleted_count:
print "Skipped files: %s (this is 'normal'. See: https://stackoverflow.com/q/44742164/71522; use --verbose for more info)" %(self._skipped_count, )
if self._error_count:
print "Errors: %s" %(self._error_count, )