def get_jwt(self):
exp = datetime.datetime.utcnow() + datetime.timedelta(minutes=10)
exp = calendar.timegm(exp.timetuple())
# Generate the JWT
payload = {
# issued at time
'iat': int(time.time()),
# JWT expiration time (10 minute maximum)
'exp': exp,
# Integration's GitHub identifier
'iss': options.get('github.integration-app-id'),
}
return jwt.encode(
payload, options.get('github.integration-private-key'), algorithm='RS256'
)
python类timegm()的实例源码
def delete_contact(db, cursor, timestamp, station, callsign):
"""
Delete the results of a delete in N1MM
"""
""" station_id = stations.lookup_station_id(station)
"""
logging.info('DELETEQSO: %s, timestamp = %s' % (callsign, calendar.timegm(timestamp)))
try:
cursor.execute(
"delete from qso_log where callsign = ? and timestamp = ?", (callsign, calendar.timegm(timestamp),))
db.commit()
except Exception as e:
logging.exception('Exception deleting contact from db.')
return ''
def get_timestamp(d):
"""
Returns a UTC timestamp for a C{datetime.datetime} object.
@type d: C{datetime.datetime}
@return: UTC timestamp.
@rtype: C{float}
@see: Inspiration taken from the U{Intertwingly blog
<http://intertwingly.net/blog/2007/09/02/Dealing-With-Dates>}.
"""
if isinstance(d, datetime.date) and not isinstance(d, datetime.datetime):
d = datetime.datetime.combine(d, datetime.time(0, 0, 0, 0))
msec = str(d.microsecond).rjust(6).replace(' ', '0')
return float('%s.%s' % (calendar.timegm(d.utctimetuple()), msec))
def isoparse(timestring, formats=("%Y-%m-%d %H:%M:%SZ", "%Y-%m-%d %H:%M:%S")):
"""
>>> isoparse('1970-01-01 00:00:00Z')
0
Also support backwards compatible timestamp format:
>>> isoparse('1970-01-01 00:00:00')
0
"""
for format in formats:
try:
time_tuple = time.strptime(timestring, format)
except ValueError:
continue
else:
return calendar.timegm(time_tuple)
return None
def current_timestamp(timestamp, period):
time_list = list(time.gmtime())
period = period.lower()
if period == "day":
time_list[3:] = [0] * 6
elif period == "month":
time_list[2:] = [1] + [0] * 6
elif period == "week":
time_list[2] -= time_list[6]
time_list[3:] = [0] * 6
elif period == "year":
time_list[1:] = [1, 1] + [0] * 6
else:
return None
return time.gmtime(calendar.timegm(time_list))
def db_value(self, value):
if value is None:
return
if isinstance(value, datetime.datetime):
pass
elif isinstance(value, datetime.date):
value = datetime.datetime(value.year, value.month, value.day)
else:
return int(round(value * self.resolution))
if self.utc:
timestamp = calendar.timegm(value.utctimetuple())
else:
timestamp = time.mktime(value.timetuple())
timestamp += (value.microsecond * .000001)
if self.resolution > 1:
timestamp *= self.resolution
return int(round(timestamp))
def get_signature(self, uri, method='GET', body=''):
"""Return a dictionary with the API key and API get_signature
to be sent for the given request."""
# What time is it now?
timestamp = timegm(datetime.datetime.now(tz=pytz.UTC).timetuple())
# Calculate the base string to use for the signature.
base_string = unicode(''.join((
self.auth_key.secret,
unicode(timestamp),
method.upper(),
uri,
body,
))).encode('utf-8')
# Return a dictionary with the headers to send.
return {
'HTTP_X_API_KEY': self.auth_key.auth_key,
'HTTP_X_API_SIGNATURE': sha1(base_string).hexdigest(),
'HTTP_X_API_TIMESTAMP': timestamp,
}
rhythmbox_count_rating_integrator.py 文件源码
项目:migrate-itunes-to-rhythmbox
作者: phauer
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def create_itunes_statistic_dict(itunes_songs: Dict[int, Song], itunes_library_root: str) -> Dict[str, SongStatistic]:
"""use canonical location as a common identifier. returns dict[canonical_location -> SongStatistic(play_count, rating)]"""
dict = {}
for itunes_song in itunes_songs.values():
if itunes_song.location_escaped is not None:
count = itunes_song.play_count
last_played = itunes_song.lastplayed
last_played_timestamp = calendar.timegm(last_played) if last_played is not None else None
itunes_rating = itunes_song.rating
mapped_rating = ITUNES_TO_RHYTHMBOX_RATINGS_MAP[itunes_rating]
location = itunes_song.location_escaped
canonical_location = create_canonical_location_for_itunes_location(location, itunes_library_root)
dict[canonical_location] = SongStatistic(count, mapped_rating, last_played_timestamp)
else:
print(" Can't assign the track [{} - {}] because there is no file location defined. It's probably a remote file."
.format(itunes_song.artist, itunes_song.name))
return dict
def _real_extract(self, url):
show_id = self._match_id(url)
data = self._download_json(
'http://oe1.orf.at/programm/%s/konsole' % show_id,
show_id
)
timestamp = datetime.datetime.strptime('%s %s' % (
data['item']['day_label'],
data['item']['time']
), '%d.%m.%Y %H:%M')
unix_timestamp = calendar.timegm(timestamp.utctimetuple())
return {
'id': show_id,
'title': data['item']['title'],
'url': data['item']['url_stream'],
'ext': 'mp3',
'description': data['item'].get('info'),
'timestamp': unix_timestamp
}
def _get_token_key(self):
if self.token_key is not None:
return self.token_key
timestamp = calendar.timegm(time.gmtime())
hours = int(math.floor(timestamp / 3600))
response = requests.get("https://translate.google.com/")
line = response.text.split('\n')[-1]
tkk_expr = re.search(".*?(TKK=.*?;)W.*?", line).group(1)
a = re.search("a\\\\x3d(-?\d+);", tkk_expr).group(1)
b = re.search("b\\\\x3d(-?\d+);", tkk_expr).group(1)
result = str(hours) + "." + str(int(a) + int(b))
self.token_key = result
return result
def to_epoch(now, value):
epoch = None
if value.startswith('-'):
delta_value = int(value[1:-1])
if value[-1:].lower() == 'd':
time_value = now - datetime.timedelta(days=delta_value)
epoch = calendar.timegm(time_value.utctimetuple())
elif value[-1].lower() == 'h':
time_value = now - datetime.timedelta(hours=delta_value)
epoch = calendar.timegm(time_value.utctimetuple())
elif value[-1].lower() == 'm':
time_value = now - datetime.timedelta(minutes=delta_value)
epoch = calendar.timegm(time_value.utctimetuple())
else:
epoch = value
return epoch
def fetch(self):
fetch_time = int(time.time())
feed = feedparser.parse(self.url, etag=self.status.last_result)
last_updated = self.status.updated
self.status = ChoreStatus(fetch_time, feed.get('etag'))
for e in feed.entries:
evt_time = int(calendar.timegm(e.updated_parsed))
if last_updated and evt_time > last_updated:
evturl = e.link
match = RE_BADURL.match(evturl)
if match:
evturl = urllib.parse.urljoin(self.url, match.group(1))
else:
evturl = urllib.parse.urljoin(self.url, evturl)
if not self.title_regex or self.title_regex.search(e.title):
yield Event(self.name, self.category,
evt_time, e.title, e.summary, evturl)
def fetch(self):
if self.category == 'release':
url = 'https://github.com/%s/releases.atom' % self.repo
elif self.category == 'tag':
url = 'https://github.com/%s/tags.atom' % self.repo
elif self.category == 'commit':
url = 'https://github.com/%s/commits/%s.atom' % \
(self.repo, self.branch or 'master')
else:
raise ValueError('unknown category: %s' % self.category)
fetch_time = int(time.time())
feed = feedparser.parse(url, etag=self.status.last_result)
last_updated = self.status.updated
self.status = ChoreStatus(fetch_time, feed.get('etag'))
for e in feed.entries:
evt_time = calendar.timegm(e.updated_parsed)
if last_updated and evt_time > last_updated:
yield Event(self.name, self.category,
evt_time, e.title, e.summary, e.link)
def utcnow_ts(microsecond=False):
"""Timestamp version of our utcnow function.
See :py:class:`oslo_utils.fixture.TimeFixture`.
.. versionchanged:: 1.3
Added optional *microsecond* parameter.
"""
if utcnow.override_time is None:
# NOTE(kgriffs): This is several times faster
# than going through calendar.timegm(...)
timestamp = time.time()
if not microsecond:
timestamp = int(timestamp)
return timestamp
now = utcnow()
timestamp = calendar.timegm(now.timetuple())
if microsecond:
timestamp += float(now.microsecond) / 1000000
return timestamp
def _convert_windows_times(self, t):
try:
microsec = t / 10
sec, microsec = divmod(microsec, 1000000)
days, sec = divmod(sec, 86400)
dt = (
datetime.datetime(
1601,
1,
1) +
datetime.timedelta(
days,
sec,
microsec))
return calendar.timegm(dt.timetuple()), dt.isoformat() + 'Z'
except(Exception):
return 0, ''
def time_parse(s):
try:
epoch = int(s)
return epoch
except ValueError:
pass
try:
epoch = int(calendar.timegm(time.strptime(s, '%Y-%m-%d')))
return epoch
except ValueError:
pass
try:
epoch = int(calendar.timegm(time.strptime(s, '%Y-%m-%d %H:%M:%S')))
return epoch
except ValueError:
pass
m = re.match(r'^(?=\d)(?:(\d+)w)?(?:(\d+)d)?(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s?)?$', s, re.I)
if m:
return -1*(int(m.group(1) or 0)*604800 + \
int(m.group(2) or 0)*86400+ \
int(m.group(3) or 0)*3600+ \
int(m.group(4) or 0)*60+ \
int(m.group(5) or 0))
raise ValueError('Invalid time: "%s"' % s)
def morsel_to_cookie(morsel):
"""Convert a Morsel object into a Cookie containing the one k/v pair."""
expires = None
if morsel['max-age']:
try:
expires = int(time.time() + int(morsel['max-age']))
except ValueError:
raise TypeError('max-age: %s must be integer' % morsel['max-age'])
elif morsel['expires']:
time_template = '%a, %d-%b-%Y %H:%M:%S GMT'
expires = calendar.timegm(
time.strptime(morsel['expires'], time_template)
)
return create_cookie(
comment=morsel['comment'],
comment_url=bool(morsel['comment']),
discard=False,
domain=morsel['domain'],
expires=expires,
name=morsel.key,
path=morsel['path'],
port=None,
rest={'HttpOnly': morsel['httponly']},
rfc2109=False,
secure=bool(morsel['secure']),
value=morsel.value,
version=morsel['version'] or 0,
)
def datetime_to_header(dt):
return formatdate(calendar.timegm(dt.timetuple()))
def update_headers(self, resp):
headers = resp.headers
if 'expires' in headers:
return {}
if 'cache-control' in headers and headers['cache-control'] != 'public':
return {}
if resp.status not in self.cacheable_by_default_statuses:
return {}
if 'date' not in headers or 'last-modified' not in headers:
return {}
date = calendar.timegm(parsedate_tz(headers['date']))
last_modified = parsedate(headers['last-modified'])
if date is None or last_modified is None:
return {}
now = time.time()
current_age = max(0, now - date)
delta = date - calendar.timegm(last_modified)
freshness_lifetime = max(0, min(delta / 10, 24 * 3600))
if freshness_lifetime <= current_age:
return {}
expires = date + freshness_lifetime
return {'expires': time.strftime(TIME_FMT, time.gmtime(expires))}
def utc_to_local(utc_dt):
"""converts utc time to local time
based on the answer of J.F. Sebastian on
http://stackoverflow.com/questions/4563272/how-to-convert-a-python-utc-datetime-to-a-local-datetime-using-only-python-stand/13287083#13287083
"""
# get integer timestamp to avoid precision lost
timestamp = calendar.timegm(utc_dt.timetuple())
local_dt = datetime.datetime.fromtimestamp(timestamp)
assert utc_dt.resolution >= datetime.timedelta(microseconds=1)
return local_dt.replace(microsecond=utc_dt.microsecond)
def morsel_to_cookie(morsel):
"""Convert a Morsel object into a Cookie containing the one k/v pair."""
expires = None
if morsel['max-age']:
try:
expires = int(time.time() + int(morsel['max-age']))
except ValueError:
raise TypeError('max-age: %s must be integer' % morsel['max-age'])
elif morsel['expires']:
time_template = '%a, %d-%b-%Y %H:%M:%S GMT'
expires = calendar.timegm(
time.strptime(morsel['expires'], time_template)
)
return create_cookie(
comment=morsel['comment'],
comment_url=bool(morsel['comment']),
discard=False,
domain=morsel['domain'],
expires=expires,
name=morsel.key,
path=morsel['path'],
port=None,
rest={'HttpOnly': morsel['httponly']},
rfc2109=False,
secure=bool(morsel['secure']),
value=morsel.value,
version=morsel['version'] or 0,
)
def datetime_to_header(dt):
return formatdate(calendar.timegm(dt.timetuple()))
def update_headers(self, resp):
headers = resp.headers
if 'expires' in headers:
return {}
if 'cache-control' in headers and headers['cache-control'] != 'public':
return {}
if resp.status not in self.cacheable_by_default_statuses:
return {}
if 'date' not in headers or 'last-modified' not in headers:
return {}
date = calendar.timegm(parsedate_tz(headers['date']))
last_modified = parsedate(headers['last-modified'])
if date is None or last_modified is None:
return {}
now = time.time()
current_age = max(0, now - date)
delta = date - calendar.timegm(last_modified)
freshness_lifetime = max(0, min(delta / 10, 24 * 3600))
if freshness_lifetime <= current_age:
return {}
expires = date + freshness_lifetime
return {'expires': time.strftime(TIME_FMT, time.gmtime(expires))}
def morsel_to_cookie(morsel):
"""Convert a Morsel object into a Cookie containing the one k/v pair."""
expires = None
if morsel['max-age']:
try:
expires = int(time.time() + int(morsel['max-age']))
except ValueError:
raise TypeError('max-age: %s must be integer' % morsel['max-age'])
elif morsel['expires']:
time_template = '%a, %d-%b-%Y %H:%M:%S GMT'
expires = calendar.timegm(
time.strptime(morsel['expires'], time_template)
)
return create_cookie(
comment=morsel['comment'],
comment_url=bool(morsel['comment']),
discard=False,
domain=morsel['domain'],
expires=expires,
name=morsel.key,
path=morsel['path'],
port=None,
rest={'HttpOnly': morsel['httponly']},
rfc2109=False,
secure=bool(morsel['secure']),
value=morsel.value,
version=morsel['version'] or 0,
)
def test_startofyear(self):
sdds_soy = Time.startOfYear()
# calculate start of year
soy = datetime.datetime(*(time.strptime(self.cur_year_str+"-01-01 00:00:00",
"%Y-%m-%d %H:%M:%S")[0:6]))
soy_time=calendar.timegm(soy.timetuple())
self.assertEqual( sdds_soy, soy_time )
sdds_time = Time()
sdds_time.setFromTime(soy_time)
# calculate start of year
self.assertEqual( sdds_time.gmtime(), time.gmtime(soy_time) )
def startOfYear():
soy=datetime.datetime(datetime.date.today().year,1,1,0,0,0)
return calendar.timegm(soy.timetuple())
def datetime_to_timestamp(dt): # UTC datetime
return calendar.timegm(dt.utctimetuple())
def writestr(self, zinfo, bytes):
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
import calendar
tinfo = TarInfo(zinfo.filename)
tinfo.size = len(bytes)
tinfo.mtime = calendar.timegm(zinfo.date_time)
self.tarfile.addfile(tinfo, StringIO(bytes))
def _timegm(tt):
year, month, mday, hour, min, sec = tt[:6]
if ((year >= EPOCH_YEAR) and (1 <= month <= 12) and (1 <= mday <= 31) and
(0 <= hour <= 24) and (0 <= min <= 59) and (0 <= sec <= 61)):
return timegm(tt)
else:
return None
def morsel_to_cookie(morsel):
"""Convert a Morsel object into a Cookie containing the one k/v pair."""
expires = None
if morsel['max-age']:
try:
expires = int(time.time() + int(morsel['max-age']))
except ValueError:
raise TypeError('max-age: %s must be integer' % morsel['max-age'])
elif morsel['expires']:
time_template = '%a, %d-%b-%Y %H:%M:%S GMT'
expires = calendar.timegm(
time.strptime(morsel['expires'], time_template)
)
return create_cookie(
comment=morsel['comment'],
comment_url=bool(morsel['comment']),
discard=False,
domain=morsel['domain'],
expires=expires,
name=morsel.key,
path=morsel['path'],
port=None,
rest={'HttpOnly': morsel['httponly']},
rfc2109=False,
secure=bool(morsel['secure']),
value=morsel.value,
version=morsel['version'] or 0,
)