def _song_from_info(self, info):
if "id" in info:
song_id = info['id']
else:
song_id = info['storeId']
songs = self._songs
if song_id in songs:
return songs[song_id]
artist = info['artist']
title = info['title']
duration_millis = int(info['durationMillis'])
duration = datetime.fromtimestamp(duration_millis / 1000).strftime("%M:%S")
url = None
if "albumArtRef" in info:
ref = info["albumArtRef"]
if ref:
ref = ref[0]
if "url" in ref:
url = ref["url"]
song = Song(song_id, self, title, artist, url, " - ".join([artist, title]), duration)
songs[song_id] = song
return song
python类fromtimestamp()的实例源码
def _song_from_info(self, info) -> Song:
"""
Create a song object from the json dict returned by the HTTP API.
:param info: the json dict
:return: a Song object, or None
"""
song_id = str(info.id)
songs = self._songs
if song_id in songs:
return songs[song_id]
if not info.streamable and not info.downloadable:
return None
artist = info.user['username']
title = info.title
url = info.artwork_url
duration_millis = info.duration
duration = datetime.fromtimestamp(duration_millis / 1000).strftime("%M:%S")
song = Song(song_id, self, title, artist, url, " - ".join([artist, title]), duration=duration)
songs[song_id] = song
return song
def __init__(self, id=None, created=None, shards=None, locked=None, user=None, size=None, storageSize=None):
self.id = id
self.locked = locked
self.user = user
self.size = size
self.storageSize = storageSize
if created is not None:
self.created = datetime.fromtimestamp(
strict_rfc3339.rfc3339_to_timestamp(created))
else:
self.created = None
if shards is None:
self.shards = []
else:
self.shards = shards
def __init__(
self, token=None, bucket=None, operation=None, expires=None,
encryptionKey=None, id=None,
):
self.token = token
self.bucket = Bucket(id=bucket)
self.operation = operation
self.id = id
if expires is not None:
self.expires = datetime.fromtimestamp(
strict_rfc3339.rfc3339_to_timestamp(expires))
else:
self.expires = None
self.encryptionKey = encryptionKey
def parse_job_list_page(self, response):
self.get_connector().log(self.name, self.ACTION_CRAWL_LIST, response.url)
feed_parser = feedparser.parse(response.body)
for job_entry in feed_parser.entries:
job_url = job_entry.link
job_publication_date = datetime.fromtimestamp(mktime(job_entry.published_parsed))
job_publication_time = mktime(job_publication_date.timetuple())
last_job_publication_time = mktime(self._last_job_date.timetuple())
if job_publication_time <= last_job_publication_time:
self.get_connector().log(self.name,
self.ACTION_MARKER_FOUND,
"%s <= %s" % (job_publication_time, last_job_publication_time))
return
prepared_job = JobItem()
request = Request(job_url, self.parse_job_page)
request.meta['item'] = prepared_job
prepared_job['title'] = job_entry.title
prepared_job['description'] = job_entry.description
prepared_job['publication_datetime'] = job_publication_date
yield request
def refresh_user_token(user_social):
"""
Utility function to refresh the access token if is (almost) expired
Args:
user_social (UserSocialAuth): a user social auth instance
"""
try:
last_update = datetime.fromtimestamp(user_social.extra_data.get('updated_at'), tz=pytz.UTC)
expires_in = timedelta(seconds=user_social.extra_data.get('expires_in'))
except TypeError:
_send_refresh_request(user_social)
return
# small error margin of 5 minutes to be safe
error_margin = timedelta(minutes=5)
if now_in_utc() - last_update >= expires_in - error_margin:
_send_refresh_request(user_social)
def datetime_filter(t):
date_time = datetime.fromtimestamp(t)
str_date = date_time.strftime("%Y-%m-%d %X")
delta = int(time.time() - t)
if delta < 60:
return u'<span title="{}">1???</span>'.format(str_date)
if delta < 3600:
return u'<span title="{}">{}???</span>'.format(str_date, delta // 60)
if delta < 86400:
return u'<span title="{}">{}???</span>'.format(str_date, delta // 3600)
if delta < 604800:
return u'<span title="{}">{}??</span>'.format(str_date, delta // 86400)
#dt = datetime.fromtimestamp(t)
return u'<span title="{}">{}</span>'.format(str_date, date_time.strftime("%Y?%m?%d?"))
#def index(request):
#return web.Response(body=b'<h1>Awesome Python3 Web</h1>', content_type='text/html')
def walkSubNodes(self, vcn):
logging.debug("Inside walkSubNodes: vcn %s" % vcn)
entries = self.parseIndexBlocks(vcn)
files = []
for entry in entries:
if entry.isSubNode():
files += self.walkSubNodes(entry.getVCN())
else:
if len(entry.getKey()) > 0 and entry.getINodeNumber() > 16:
fn = NTFS_FILE_NAME_ATTR(entry.getKey())
if fn['FileNameType'] != FILE_NAME_DOS:
#inode = INODE(self.NTFSVolume)
#inode.FileAttributes = fn['FileAttributes']
#inode.FileSize = fn['DataSize']
#inode.LastDataChangeTime = datetime.fromtimestamp(getUnixTime(fn['LastDataChangeTime']))
#inode.INodeNumber = entry.getINodeNumber()
#inode.FileName = fn['FileName'].decode('utf-16le')
#inode.displayName()
files.append(fn)
# if inode.FileAttributes & FILE_ATTR_I30_INDEX_PRESENT and entry.getINodeNumber() > 16:
# inode2 = self.NTFSVolume.getINode(entry.getINodeNumber())
# inode2.walk()
return files
def datetime_from_utc(cls, metadata, element=None):
"""
Generates a homan readable time from an items UTC timestamp
:param metadata: Item metadata
:type metadata: dict
:param details: Item details
:type details: dict
:returns: tuple -- Match date & match time
"""
date_container = None
if metadata.get('scheduled_start'):
date_container = metadata.get('scheduled_start', {})
elif element is not None:
date_container = element.get('scheduled_start', {})
if date_container is None:
return (None, None)
timestamp = float(date_container.get('date'))
match_datetime = datetime.fromtimestamp(timestamp)
match_date = match_datetime.strftime('%d.%m.%Y')
match_time = match_datetime.strftime('%H:%M')
return (match_date, match_time)
def get_task_from_cache(domain_to_request, provider):
"""
Check if there is already a pending/resolved similar request
"""
defendant = service = expiration = ticket = None
for entry in utils.redis.lrange(common.CDN_REQUEST_REDIS_QUEUE % provider, 0, -1):
entry = json.loads(entry, object_pairs_hook=OrderedDict)
if entry['domain'] == domain_to_request:
defendant = Defendant.objects.filter(
id=entry['defendant_id']
).last()
service = Service.objects.filter(
id=entry['service_id']
).last()
ticket = Ticket.objects.get(
id=entry['request_ticket_id']
)
expiration = datetime.fromtimestamp(entry['expiration'])
break
return defendant, service, ticket, expiration
def logout(request):
""" Logout a user
"""
try:
token = request.environ['HTTP_X_API_TOKEN']
except (KeyError, IndexError, TypeError):
raise BadRequest('Missing HTTP X-Api-Token header')
try:
data = jwt.decode(token, settings.SECRET_KEY)
data = json.loads(CRYPTO.decrypt(str(data['data'])))
user = User.objects.get(id=data['id'])
user.last_login = datetime.fromtimestamp(0)
user.save()
return {'message': 'Logged out'}
except (utils.CryptoException, KeyError, jwt.DecodeError,
jwt.ExpiredSignature, User.DoesNotExist):
raise BadRequest('Invalid token')
def load_jobs(bot, job_queue):
"""Load all existing jobs (pending reminders) into the given
'job_queue', and apologise if we missed any.
"""
if not os.path.isdir(bot.username):
return
now = datetime.now()
for chat_id in os.listdir(bot.username):
apologise = False
for reminder in os.listdir(get_user_dir(bot, chat_id)):
reminder_file = os.path.join(bot.username, chat_id, reminder)
reminder_date = datetime.fromtimestamp(int(reminder))
if reminder_date > now:
queue_message(job_queue, reminder_date,
int(chat_id), reminder_file)
else:
apologise = True
os.remove(reminder_file)
if apologise:
bot.send_message(chat_id,
text='Oops… looks like I missed some reminders. Sorry :(')
def status(bot, update):
directory = get_user_dir(bot, update.message.chat_id)
reminders = list(sorted(os.listdir(directory)))
if not reminders:
update.message.reply_text('You have no pending reminders. Hooray ^_^')
return
reminder = reminders[0]
diff = format_time_diff(datetime.fromtimestamp(int(reminder)))
with open(os.path.join(directory, reminder)) as f:
text = f.read()
text = ':\n' + text if text else '.'
amount = ('{} reminders' if len(reminders) > 1 else '{} reminder')\
.format(len(reminders))
update.message.reply_text('{}. Next reminder in {}{}'
.format(amount, diff, text))
def save(self):
"""Saves the logger's buffered points to a CSV file. If the file exists,
then the data points are appended.
"""
#We need to see if enough time has passed since the last
from datetime import datetime
from time import time
if self.lastsave is not None:
elapsed = (datetime.fromtimestamp(time()) -
datetime.fromtimestamp(self.lastsave)).total_seconds()
else:
elapsed = self.logfreq + 1
if elapsed > self.logfreq:
self._csv_append()
self.lastsave = time()
def createTimestamp():
return datetime.fromtimestamp(time()).strftime('%Y-%m-%d %H:%M:%S')
def writeStdout(s):
ts = datetime.fromtimestamp(time()).strftime('%Y-%m-%d %H:%M:%S')
sys.stdout.write('\t'.join([ts, s]) + '\n')
sys.stdout.flush()
def writeStderr(s):
ts = datetime.fromtimestamp(time()).strftime('%Y-%m-%d %H:%M:%S')
sys.stderr.write('\t'.join([ts, s]) + '\n')
sys.stderr.flush()
def estimate_time(builds):
"""Update the working build with an estimated completion time.
Takes a simple average over the previous builds, using those
whose outcome is ``'passed'``.
Arguments:
builds (:py:class:`list`): All builds.
"""
try:
index, current = next(
(index, build) for index, build in enumerate(builds[:4])
if build['outcome'] == 'working'
)
except StopIteration:
return # no in-progress builds
if current.get('started_at') is None:
current['elapsed'] = 'estimate not available'
return
usable = [
current for current in builds[index + 1:]
if current['outcome'] == 'passed' and current['duration'] is not None
]
if not usable:
current['elapsed'] = 'estimate not available'
return
average_duration = int(sum(build['duration'] for build in usable) /
float(len(usable)))
finish = current['started_at'] + average_duration
remaining = (datetime.fromtimestamp(finish) -
datetime.now()).total_seconds()
if remaining >= 0:
current['elapsed'] = '{} left'.format(naturaldelta(remaining))
else:
current['elapsed'] = 'nearly done'
def current_time():
ts = time.time()
return datetime.fromtimestamp(ts).strftime(TIME_FORMAT)
# converts a string to a timestamp