def size():
cache_dir = cfg.cache.dir
source_dir = cfg.source.dir
cache_size = 0
for f in cache_dir.walkfiles():
cache_size += f.size
source_size = 0
for f in source_dir.walkfiles():
source_size += f.size
print("{Style.BRIGHT}Cache: {Style.RESET_ALL} {}".format(
humanize.naturalsize(cache_size, binary=True),
Style=Style
))
print("{Style.BRIGHT}Source:{Style.RESET_ALL} {}".format(
humanize.naturalsize(source_size, binary=True),
Style=Style
))
python类naturalsize()的实例源码
def _prompt_artifact_selection(self, service_name, artifact_key, deployment_repo, env, artifacts):
current_image = deployment_repo['tfvars'].get(artifact_key, env)
io.info('found artifacts for "%s/%s"' % (self.config.get('dockerhub')['organization'], service_name,))
table_data = [
('id', 'tag name (* = current)', 'created at', 'size',),
]
for i, artifact in enumerate(artifacts, 1):
created_at = datetime.strptime(artifact['last_updated'], '%Y-%m-%dT%H:%M:%S.%fZ')
created_at = pretty_print_datetime(created_at)
image_size = humanize.naturalsize(artifact['full_size'])
image_name = artifact['name']
if image_name in current_image: # indicate the current artifact.
image_name += ' *'
table_data.append((str(i), image_name, created_at, image_size,))
io.print_table(table_data, 'recent artifacts')
# Handle the case where the selected artifact is the current artifact.
selected_artifact = io.collect_input('select the artifact you want to use [q]:', artifacts)
if selected_artifact and selected_artifact['name'] in current_image:
io.err('selected artifact is already the current active artifact')
return None
return selected_artifact
def __init__(self, part: Dict[str, Any], core: ApartCore, main_view: 'MainView'):
Gtk.Box.__init__(self)
self.part = part
self.core = core
self.main_view = main_view
self.add(key_and_val('Name', self.name()))
self.add(key_and_val('Type', self.part.get('fstype', 'unknown')))
self.add(key_and_val('Label', self.part.get('label', 'none')))
self.add(key_and_val('Size', humanize.naturalsize(self.part['size'], binary=True)))
self.clone_button = Gtk.Button("Clone", halign=Gtk.Align.END)
self.restore_button = Gtk.Button("Restore", halign=Gtk.Align.END)
if self.is_mounted():
self.clone_button.set_sensitive(False)
self.clone_button.set_tooltip_text('Partition is currently mounted')
self.restore_button.set_sensitive(False)
self.restore_button.set_tooltip_text('Partition is currently mounted')
else:
self.clone_button.connect('clicked', lambda b: self.main_view.show_new_clone())
self.restore_button.connect('clicked', lambda b: self.main_view.show_new_restore())
buttons = Gtk.Box(hexpand=True, halign=Gtk.Align.END)
buttons.add(self.clone_button)
buttons.add(self.restore_button)
self.add(buttons)
main_view.connect('notify::visible-child', self.on_main_view_change)
def __init__(self, resp, description=None):
"""
Wrapper for Bigquery table resources, mainly for calculating/parsing job statistics into human readable formats for logging.
:param resp: Dictionary representation of a table resource.
:type resp: dictionary
:param description: Optional string descriptor for table.
"""
assert isinstance(resp, dict)
assert resp['kind'].split('#')[-1] == 'table'
self.resp = resp
if description is not None:
self.description = description.strip().title()
try:
setattr(self, 'row_count', int(self.resp['numRows']))
except (KeyError, TypeError):
pass
try:
setattr(self, 'size', humanize.naturalsize(int(self.resp['numBytes'])))
except (KeyError, TypeError):
pass
def load_resp(self, resp, is_download):
"""
Loads json response from API.
:param resp: Response from API
:type resp: dictionary
:param is_download: Calculates time taken based on 'updated' field in response if upload, and based on stop time if download
:type is_download: boolean
"""
assert isinstance(resp, dict)
setattr(self, 'resp', resp)
setattr(self, 'size', humanize.naturalsize(int(resp['size'])))
if is_download:
updated_at = datetime.now(UTC)
else:
updated_at = UTC.localize(datetime.strptime(resp['updated'], '%Y-%m-%dT%H:%M:%S.%fZ'))
setattr(self, 'time_taken', dict(zip(
('m', 's'),
divmod((updated_at - getattr(self, 'start_time')).seconds if updated_at > getattr(self, 'start_time') else 0, 60)
)))
setattr(self, 'full_path', 'gs://%s/%s' % (resp['bucket'], resp['name']))
def build_objects(bucket):
'''Build list of object tuples to render as table.
Object structure: (name, url, size)
'''
objects = []
data_dir = 'data/{}'.format(bucket)
walk_dir = os.path.realpath('..') + '/' + data_dir
for root, dirs, files in os.walk(walk_dir):
for file_name in files:
objects.append((
file_name,
data_dir + '/' + file_name,
naturalsize(os.path.getsize(walk_dir + '/' + file_name))
))
return objects
def _handle_http_url(self, url, headers):
logging.debug("handle_http_url(url={}, headers={})".format(url, headers))
before = time.time()
start_offset = self.output.tell()
for piece in self._stream(url, headers):
self.output.write(piece)
duration = time.time() - before
# On Windows we seem to get 0 values for duration. Just round up to one second.
# Rates over intervals less than this are meaningless anyway.
duration = max(1, duration)
size = self.output.tell() - start_offset
rate = (size / (2**20)) / duration
logging.info("Downloaded {} chunk in {:.2f} seconds @ {:.2f} MiB/s".format(
humanize.naturalsize(size, binary=True), duration, rate))
def _format_stats(stats):
formatted = []
for k, v in stats.items():
if k.endswith('_size') or k.endswith('_bytes'):
v = naturalsize(v, binary=True)
elif k == 'cpu_used':
k += '_msec'
v = '{0:,}'.format(int(v))
else:
v = '{0:,}'.format(int(v))
formatted.append((k, v))
return tabulate(formatted)
def download(
url,
local_file,
file_size: int = None,
hexdigest: str = None,
title: str = '',
):
LOGGER.info('downloading {} -> {}'.format(url, local_file))
if not title:
title = f'Downloading {url.split("/")[-1]}'
Progress.start(
title=title,
label=f'Downloading to: {local_file}',
)
def _progress_hook(data):
label = 'Time left: {} ({}/{})'.format(
data['time'],
humanize.naturalsize(data['downloaded']),
humanize.naturalsize(data['total'])
)
Progress.set_label(label)
Progress.set_value(data['downloaded'] / data['total'] * 100)
# def hook(data):
# # I.progress_set_value(int(float(data['percent_complete'])))
# Progress().set_value(int(float(data['percent_complete'])))
time.sleep(1)
dl = Downloader(
url=url,
filename=local_file,
progress_hooks=[_progress_hook],
content_length=file_size,
hexdigest=hexdigest
)
return dl.download()
def __init__(
self,
version: str,
branch: str,
download_url: str,
remote_file_size: int,
remote_file_name: str,
):
self._version = version
self._branch = branch
self._download_url = download_url
self._remote_file_size = remote_file_size
self._remote_file_name = remote_file_name
self._human_file_size = humanize.naturalsize(remote_file_size)
def parse_results_rarbg(response_json):
global results_rarbg
if error_detected_rarbg == False:
for post in response_json['torrent_results']:
res = {}
res['name'] = post['title']
res['link'] = post['info_page']
temp_size = humanize.naturalsize(post['size'], binary=True, format='%.2f')
s1 = temp_size.split('.')
if(len(s1[0]) == 4):
res['size'] = humanize.naturalsize(post['size'], binary=True, format='%.0f')
elif(len(s1[1]) == 3):
res['size'] = humanize.naturalsize(post['size'], binary=True, format='%.1f')
else:
res['size'] = temp_size
#res['time'] = Implement later
res['seeders'] = post['seeders']
res['leechers'] = post['leechers']
try:
res['ratio'] = format( (float(res['seeders'])/float(res['leechers'])), '.1f' )
except ZeroDivisionError:
res['ratio'] = float('inf')
res['magnet'] = post['download']
results_rarbg.append(res)
else:
print "----------- " + colored.green('RARBG') + " -----------"
print " [No results found] "
return []
return results_rarbg
def get(self):
session = Session()
service_count = session.query(Service).count()
incidents = session.query(Incident).filter()
session.close()
memory_stats = psutil.virtual_memory()
self.render("incidents.html", version=__version__,
max_memory=humanize.naturalsize(memory_stats.total),
memory_used=humanize.naturalsize(memory_stats.used),
service_count=service_count, cpu_current=psutil.cpu_percent(),
memory_percent=(memory_stats.used / memory_stats.total) * 100, incidents=incidents)
def setProgress(self, data):
"""Set the progress of the bar."""
# TODO: What is the data structure in case of a patch?
try:
text = _(
'Downloading a new version: Total file size {}, Time remaining {}.')
text = text.format(humanize.naturalsize(
data['total']), data['time'])
self.setLabelText(text)
self.setValue(int(float(data['percent_complete']) * 10))
except Exception as e:
module_logger.exception("message")
def setProgress(self, data):
"""Set the progress of the bar."""
# TODO: What is the data structure in case of a patch?
module_logger.info("Progress {}".format(data))
try:
text = _(
'Downloading required files...: Total file size {}, Time remaining {}.')
text = text.format(humanize.naturalsize(
data['total']), data['time'])
self.setLabelText(text)
self.setValue(int(float(data['percent_complete']) * 5))
except Exception as e:
module_logger.exception("message")
def humanize_size_filter(dt: int, fmt=None):
"""??humanize??????????????"""
humanize.i18n.activate('zh_CN', path='etc/humanize')
return humanize.naturalsize(dt)
def get(self, request, imsi=None):
"""Handles GET requests."""
user_profile = UserProfile.objects.get(user=request.user)
network = user_profile.network
try:
subscriber = Subscriber.objects.get(imsi=imsi,
network=network)
except Subscriber.DoesNotExist:
return HttpResponseBadRequest()
# Set the context with various stats.
context = {
'networks': get_objects_for_user(request.user, 'view_network', klass=Network),
'currency': CURRENCIES[network.subscriber_currency],
'user_profile': user_profile,
'subscriber': subscriber,
}
try:
context['created'] = subscriber.usageevent_set.order_by(
'date')[0].date
except IndexError:
context['created'] = None
# Set usage info (SMS sent, total call duration, data usage).
sms_kinds = ['free_sms', 'outside_sms', 'incoming_sms', 'local_sms',
'local_recv_sms', 'error_sms']
context['num_sms'] = subscriber.usageevent_set.filter(
kind__in=sms_kinds).count()
call_kinds = ['free_call', 'outside_call', 'incoming_call',
'local_call', 'local_recv_call', 'error_call']
calls = subscriber.usageevent_set.filter(kind__in=call_kinds)
context['number_of_calls'] = len(calls)
context['voice_sec'] = sum([call.voice_sec() for call in calls])
gprs_events = subscriber.usageevent_set.filter(kind='gprs')
up_bytes = sum([g.uploaded_bytes for g in gprs_events])
down_bytes = sum([g.downloaded_bytes for g in gprs_events])
context['up_bytes'] = humanize.naturalsize(up_bytes)
context['down_bytes'] = humanize.naturalsize(down_bytes)
context['total_bytes'] = humanize.naturalsize(up_bytes + down_bytes)
# Render template.
template = get_template('dashboard/subscriber_detail/info.html')
html = template.render(context, request)
return HttpResponse(html)
def generate_gprs_events(start_timestamp, end_timestamp):
"""Create GPRS events from data in the GPRS DB.
Records that were generated between the specified timestamp will become
events. One event is created per IMSI (not one event per record).
Args:
start_timestamp: seconds since epoch
end_timestamp: seconds since epoch
"""
gprs_db = gprs_database.GPRSDB()
# First organize the records by IMSI.
sorted_records = {}
for record in gprs_db.get_records(start_timestamp, end_timestamp):
if record['imsi'] not in sorted_records:
sorted_records[record['imsi']] = []
sorted_records[record['imsi']].append(record)
# Now analyze all records that we have for each IMSI.
for imsi in sorted_records:
up_bytes = sum(
[r['uploaded_bytes_delta'] for r in sorted_records[imsi]])
down_bytes = sum(
[r['downloaded_bytes_delta'] for r in sorted_records[imsi]])
# Do not make an event if the byte deltas are unchanged.
if up_bytes == 0 and down_bytes == 0:
continue
# For now, GPRS is free for subscribers.
cost = 0
reason = 'gprs_usage: %s uploaded, %s downloaded' % (
humanize.naturalsize(up_bytes), humanize.naturalsize(down_bytes))
timespan = int(end_timestamp - start_timestamp)
events.create_gprs_event(
imsi, cost, reason, up_bytes, down_bytes, timespan)
def natural_content_length(self):
return humanize.naturalsize(self.content_length)
def __init__(self,
final_message: Dict,
progress_view: 'ProgressAndHistoryView',
core: ApartCore,
z_options: List[str]):
FinishedJob.__init__(self, final_message,
progress_view,
core,
icon_name='object-select-symbolic',
forget_on_rerun=False,
z_options=z_options)
self.image_size = key_and_val('Image size', humanize.naturalsize(self.msg['image_size'],
binary=True))
self.filename = key_and_val('Image file', extract_filename(self.msg['destination']))
self.source_uuid = None
if self.msg.get('source_uuid'):
self.source_uuid = key_and_val('Partition uuid', self.msg['source_uuid'])
self.stats = Gtk.VBox()
for stat in [self.filename, self.image_size, self.source_uuid, self.duration]:
if stat:
self.stats.add(stat)
self.stats.get_style_context().add_class('finished-job-stats')
self.stats.show_all()
self.extra.add(self.stats)
self.delete_image_btn = Gtk.Button.new_from_icon_name('user-trash-full-symbolic',
Gtk.IconSize.SMALL_TOOLBAR)
self.delete_image_btn.set_tooltip_text(DELETE_TIP)
self.delete_image_btn.show_all()
self.delete_image_btn.connect('clicked', self.delete_image)
self.buttons.add(self.delete_image_btn)
self.buttons.reorder_child(self.delete_image_btn, 0)
def _size(*values):
""" Print summed size humanized.
"""
value = sum(values)
return humanize.naturalsize(value, binary=True) if value else '-'
def run(self, args, config, storage, remotes):
table_lines = [('<b>NAME</b>', '<b>TYPE</b>', '<b>LAST</b>', '<b>NEXT</b>', '<b>LAST SIZE</b>')]
for remote in sorted(remotes.list(), key=lambda x: x.name):
latest_ref = '%s/latest' % remote.name
latest_backup = storage.get_backup(latest_ref)
latest_date_text = '-'
next_date_text = '-'
size = '-'
if latest_backup is None:
if remote.scheduler is not None:
next_date_text = '<color fg=yellow>now</color>'
else:
size_total = sum(latest_backup.stats.get(x, 0) for x in STATS_TOTAL)
size_new = sum(latest_backup.stats.get(x, 0) for x in STATS_NEW)
size = '%s (+%s)' % (humanize.naturalsize(size_total, binary=True),
humanize.naturalsize(size_new, binary=True))
latest_date_text = latest_backup.start_date.humanize()
if remote.scheduler is not None and remote.scheduler['enabled']:
next_date = latest_backup.start_date + datetime.timedelta(seconds=remote.scheduler['interval'] * 60)
if next_date > arrow.now():
next_date_text = '<color fg=green>%s</color>' % next_date.humanize()
else:
next_date_text = '<color fg=red>%s</color>' % next_date.humanize()
table_lines.append((remote.name, remote.type, latest_date_text, next_date_text, size))
printer.table(table_lines)
def run(self, args, config, storage, remotes):
count, size = gc(storage, delete=not args.dry_run)
if count:
printer.p('Done. Deleted {n} objects, total size: {s}', n=count, s=humanize.naturalsize(size, binary=True))
else:
printer.p('Done. Nothing to delete.')
def _get_info(self):
result = ''
if (
self.kind != HistoryItemKind.FILE and
self.kind != HistoryItemKind.IMAGE and
not self.content_type
):
if self.kind != HistoryItemKind.LINK:
result = '%i chars, %i lines' % (len(self.raw), self.n_lines)
return result
if self.n_lines > 1:
result += _('%s items') % self.n_lines
else:
try:
size = os.path.getsize(self.raw.strip())
except FileNotFoundError:
result += _('No such file or directory')
else:
result += humanize.naturalsize(size, gnu=True)
if self._content_type:
result += ', Type: %s' % self._content_type
return result
def hr_size(self):
return humanize.naturalsize(self.size)
def format_speed(value):
return "%s/s" % naturalsize(value)
def _parse_job(self):
try:
setattr(
self,
'time_taken',
dict(zip(
('m', 's'),
divmod(
(
datetime.utcfromtimestamp(float(self.resp['statistics']['endTime']) / 1000) -
datetime.utcfromtimestamp(float(self.resp['statistics']['creationTime']) / 1000)
).seconds,
60)
))
)
except KeyError:
pass
if self.job_type == 'load':
try:
setattr(self, 'size', humanize.naturalsize(int(self.resp['statistics']['load']['inputFileBytes'])))
except (KeyError, TypeError):
pass
elif self.job_type == 'query':
try:
setattr(self, 'size', humanize.naturalsize(int(self.resp['statistics']['query']['totalBytesProcessed'])))
except (KeyError, TypeError):
pass
if self.job_type == 'load':
try:
setattr(self, 'row_count', int(self.resp['statistics'][self.job_type]['outputRows']))
except (KeyError, TypeError):
pass
elif self.job_type == 'query':
try:
setattr(self, 'row_count', int(self.resp['totalRows']))
except (KeyError, TypeError):
pass
def load_resp(self, resp, is_download=False):
"""
Loads json response from API.
:param resp: Response from API
:type resp: dictionary
:param is_download: Calculates time taken based on 'modifiedTime' field in response if upload, and based on stop time if download
:type is_download: boolean
"""
assert isinstance(resp, dict)
setattr(self, 'resp', resp)
try:
setattr(self, 'size', humanize.naturalsize(int(resp['size'])))
except KeyError:
pass
if is_download:
updated_at = datetime.now(UTC)
else:
updated_at = UTC.localize(datetime.strptime(resp['modifiedTime'], '%Y-%m-%dT%H:%M:%S.%fZ'))
setattr(self, 'time_taken', dict(zip(
('m', 's'),
divmod((updated_at - getattr(self, 'start_time')).seconds if updated_at > getattr(self, 'start_time') else 0, 60)
)))
def ts_file(self, path):
full_path = os.path.normpath(os.path.join(TS_DIR, './%s' % path))
inspector = Inspector.get_inspector(full_path)
parent = inspector.get_file_paths()['web_dir']
metadata = inspector.get_metadata()
video_streams = []
audio_streams = []
other_streams = []
if metadata not in range(1, 5) and 'streams' in metadata:
for index, stream in enumerate(metadata['streams']):
if 'codec_type' in stream and stream['codec_type'] == 'video':
video_streams.append(index)
try:
if 'avg_frame_rate' in stream:
stream['avg_frame_rate_norm'] = eval(stream['avg_frame_rate'])
except ZeroDivisionError:
pass
elif 'codec_type' in stream and stream['codec_type'] == 'audio':
audio_streams.append(index)
else:
other_streams.append(index)
if 'format' in metadata and 'size' in metadata['format']:
metadata['format']['size_hr'] = humanize.naturalsize(metadata['format']['size'],
gnu=True)
return render_template('ts_file.html', inspector=inspector,
screenshot=inspector.get_screenshot(),
metadata=metadata, parent=parent,
job_status=JobStatus(),
video_streams=video_streams, audio_streams=audio_streams,
other_streams=other_streams)
def verify_size(mbtiles_file, max_size, scheme):
mbtiles = MBTiles(mbtiles_file, scheme)
for tile in mbtiles.tiles_by_size(max_size):
print('{}/{}/{}\t{}'.format(tile.z, tile.x, tile.y,
humanize.naturalsize(tile.size)))
def bytes_to_site_size(self, byte_num):
humanized = humanize.naturalsize(byte_num, format='%.2f', binary=True)
if 'MiB' in humanized or 'KiB' in humanized:
humanized = humanize.naturalsize(byte_num, format='%d', binary=True)
return humanized