def test_conversion_specific_date(self):
dt = datetime(1981, 7, 11, microsecond=555000)
uuid = util.uuid_from_time(dt)
from uuid import UUID
assert isinstance(uuid, UUID)
ts = (uuid.time - 0x01b21dd213814000) / 1e7 # back to a timestamp
new_dt = datetime.utcfromtimestamp(ts)
# checks that we created a UUID1 with the proper timestamp
assert new_dt == dt
python类utcfromtimestamp()的实例源码
def to_python(self, value):
if value is None:
return
if isinstance(value, datetime):
if DateTime.truncate_microseconds:
us = value.microsecond
truncated_us = us // 1000 * 1000
return value - timedelta(microseconds=us - truncated_us)
else:
return value
elif isinstance(value, date):
return datetime(*(value.timetuple()[:6]))
return datetime.utcfromtimestamp(value)
def test_utc_timestamping():
assert timestamp(
datetime(2017, 7, 14, 2, 40).replace(tzinfo=utc)
) == 1500000000
for d in (
datetime.now(),
datetime.utcnow(),
datetime(1999, 12, 31, 23, 59, 59),
datetime(2000, 1, 1, 0, 0, 0)
):
assert datetime.utcfromtimestamp(
timestamp(d)) - d < timedelta(microseconds=10)
def _x_format(self):
"""Return the value formatter for this graph"""
def datetime_to_str(x):
dt = datetime.utcfromtimestamp(x)
return self.x_value_formatter(dt)
return datetime_to_str
def _opener(self, filename):
return lambda: (
open(filename, 'rb'),
datetime.utcfromtimestamp(os.path.getmtime(filename)),
int(os.path.getsize(filename))
)
def timestamp_to_datetime(self, ts):
"""Used to convert the timestamp from `get_timestamp` into a
datetime object.
"""
return datetime.utcfromtimestamp(ts + EPOCH)
def get_issue_date(self, header):
rv = header.get('iat')
if isinstance(rv, number_types):
return datetime.utcfromtimestamp(int(rv))
def _opener(self, filename):
return lambda: (
open(filename, 'rb'),
datetime.utcfromtimestamp(os.path.getmtime(filename)),
int(os.path.getsize(filename))
)
def timestamp_to_datetime(self, ts):
"""Used to convert the timestamp from `get_timestamp` into a
datetime object.
"""
return datetime.utcfromtimestamp(ts + EPOCH)
def get_issue_date(self, header):
rv = header.get('iat')
if isinstance(rv, number_types):
return datetime.utcfromtimestamp(int(rv))
def get_last_qso(cursor) :
cursor.execute('SELECT timestamp, callsign, exchange, section, operator.name, band_id \n'
'FROM qso_log JOIN operator WHERE operator.id = operator_id \n'
'ORDER BY timestamp DESC LIMIT 1')
last_qso_time = int(time.time()) - 60
message = ''
for row in cursor:
last_qso_time = row[0]
message = 'Last QSO: %s %s %s on %s by %s at %s' % (
row[1], row[2], row[3], constants.Bands.BANDS_TITLE[row[5]], row[4],
datetime.utcfromtimestamp(row[0]).strftime('%H:%M:%S'))
logging.debug(message)
return last_qso_time, message
def get_qsos_per_hour_per_band(cursor):
qsos_per_hour = []
qsos_by_band = [0] * constants.Bands.count()
slice_minutes = 15
slices_per_hour = 60 / slice_minutes
window_seconds = slice_minutes * 60
logging.debug('Load QSOs per Hour by Band')
cursor.execute('SELECT timestamp / %d * %d AS ts, band_id, COUNT(*) AS qso_count \n'
'FROM qso_log GROUP BY ts, band_id;' % (window_seconds, window_seconds))
for row in cursor:
if len(qsos_per_hour) == 0:
qsos_per_hour.append([0] * constants.Bands.count())
qsos_per_hour[-1][0] = row[0]
while qsos_per_hour[-1][0] != row[0]:
ts = qsos_per_hour[-1][0] + window_seconds
qsos_per_hour.append([0] * constants.Bands.count())
qsos_per_hour[-1][0] = ts
qsos_per_hour[-1][row[1]] = row[2] * slices_per_hour
qsos_by_band[row[1]] += row[2]
for rec in qsos_per_hour: # FIXME
rec[0] = datetime.utcfromtimestamp(rec[0])
t = rec[0].strftime('%H:%M:%S')
return qsos_per_hour, qsos_by_band
def memorized_datetime(seconds):
'''Create only one instance of each distinct datetime'''
try:
return _datetime_cache[seconds]
except KeyError:
# NB. We can't just do datetime.utcfromtimestamp(seconds) as this
# fails with negative values under Windows (Bug #90096)
dt = _epoch + timedelta(seconds=seconds)
_datetime_cache[seconds] = dt
return dt
def wait_rate_limit_reset(now):
reset = (
datetime.utcfromtimestamp(GITHUB.x_ratelimit_reset)
.replace(tzinfo=timezone.utc)
)
delta = reset - now
wait = delta.total_seconds() + .5
if wait < 1 or 3500 < wait:
# Our data is outdated. Just go on.
return 0
logger.warning("Waiting rate limit reset in %s seconds.", wait)
time.sleep(wait)
GITHUB._instance.x_ratelimit_remaining = -1
return wait
def _yahoo(self, quote, exchange=None):
""" Collects data from Yahoo Finance API """
query = quote + "." + exchange.upper() if exchange else quote
if not hasattr(self, '_session_y'):
self._session_y = requests.Session()
r = self._session_y.get(__class__._Y_API + query)
if r.status_code == 404:
raise LookupError('Ticker symbol not found.')
else:
r.raise_for_status()
jayson = r.json()['optionChain']['result'][0]['quote']
self.ticker = jayson['symbol']
self._price = jayson['regularMarketPrice']
self.currency = jayson['currency']
self.exchange = jayson['exchange']
self.change = jayson['regularMarketChange']
self.cp = jayson['regularMarketChangePercent']
self._last_trade = datetime.utcfromtimestamp(jayson['regularMarketTime'])
self.name = jayson['longName']
self.dy = jayson.get('trailingAnnualDividendYield', 0)
def _yahoo(self, quote, d, m, y):
""" Collects data from Yahoo Finance API """
epoch = int(round(mktime(date(y, m, d).timetuple())/86400, 0)*86400)
if not hasattr(self, '_session_y'):
self._session_y = requests.Session()
r = self._session_y.get(__class__._Y_API + quote + '?date=' + str(epoch))
if r.status_code == 404:
raise LookupError('Ticker symbol not found.')
else:
r.raise_for_status()
json = r.json()
try:
self.data = json['optionChain']['result'][0]['options'][0]
except IndexError:
raise LookupError('No options listed for this stock.')
self._exp = [datetime.utcfromtimestamp(i).date() for i in json['optionChain']['result'][0]['expirationDates']]
def swift_get_container(request, container_name, with_data=True):
if with_data:
headers, data = swift_api(request).get_object(container_name, "")
else:
data = None
headers = swift_api(request).head_container(container_name)
timestamp = None
is_public = False
public_url = None
try:
is_public = GLOBAL_READ_ACL in headers.get('x-container-read', '')
if is_public:
swift_endpoint = base.url_for(request,
'object-store',
endpoint_type='publicURL')
parameters = urlparse.quote(container_name.encode('utf8'))
public_url = swift_endpoint + '/' + parameters
ts_float = float(headers.get('x-timestamp'))
timestamp = datetime.utcfromtimestamp(ts_float).isoformat()
except Exception:
pass
container_info = {
'name': container_name,
'container_object_count': headers.get('x-container-object-count'),
'container_bytes_used': headers.get('x-container-bytes-used'),
'timestamp': timestamp,
'data': data,
'is_public': is_public,
'public_url': public_url,
}
return Container(container_info)
def swift_get_object(request, container_name, object_name, with_data=True,
resp_chunk_size=CHUNK_SIZE):
if with_data:
headers, data = swift_api(request).get_object(
container_name, object_name, resp_chunk_size=resp_chunk_size)
else:
data = None
headers = swift_api(request).head_object(container_name,
object_name)
orig_name = headers.get("x-object-meta-orig-filename")
timestamp = None
try:
ts_float = float(headers.get('x-timestamp'))
timestamp = datetime.utcfromtimestamp(ts_float).isoformat()
except Exception:
pass
obj_info = {
'name': object_name,
'bytes': headers.get('content-length'),
'content_type': headers.get('content-type'),
'etag': headers.get('etag'),
'timestamp': timestamp,
}
return StorageObject(obj_info,
container_name,
orig_name=orig_name,
data=data)
def test_datetime_timestamp(self):
dt_value = 1454520554
self.DatetimeTest.objects.create(
self.conn,
test_id=5,
created_at=dt_value,
)
dt2 = self.DatetimeTest.objects(test_id=5).first(self.conn)
assert dt2.created_at == datetime.utcfromtimestamp(dt_value)
def test_conversion_specific_date(self):
dt = datetime(1981, 7, 11, microsecond=555000)
uuid = util.uuid_from_time(dt)
from uuid import UUID
assert isinstance(uuid, UUID)
ts = (uuid.time - 0x01b21dd213814000) / 1e7 # back to a timestamp
new_dt = datetime.utcfromtimestamp(ts)
# checks that we created a UUID1 with the proper timestamp
assert new_dt == dt