python类utcfromtimestamp()的实例源码

test_validation.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_conversion_specific_date(self):
        dt = datetime(1981, 7, 11, microsecond=555000)

        uuid = util.uuid_from_time(dt)

        from uuid import UUID
        assert isinstance(uuid, UUID)

        ts = (uuid.time - 0x01b21dd213814000) / 1e7 # back to a timestamp
        new_dt = datetime.utcfromtimestamp(ts)

        # checks that we created a UUID1 with the proper timestamp
        assert new_dt == dt
columns.py 文件源码 项目:deb-python-cassandra-driver 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def to_python(self, value):
        if value is None:
            return
        if isinstance(value, datetime):
            if DateTime.truncate_microseconds:
                us = value.microsecond
                truncated_us = us // 1000 * 1000
                return value - timedelta(microseconds=us - truncated_us)
            else:
                return value
        elif isinstance(value, date):
            return datetime(*(value.timetuple()[:6]))

        return datetime.utcfromtimestamp(value)
test_date.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_utc_timestamping():
    assert timestamp(
        datetime(2017, 7, 14, 2, 40).replace(tzinfo=utc)
    ) == 1500000000

    for d in (
        datetime.now(),
        datetime.utcnow(),
        datetime(1999, 12, 31, 23, 59, 59),
        datetime(2000, 1, 1, 0, 0, 0)
    ):
        assert datetime.utcfromtimestamp(
            timestamp(d)) - d < timedelta(microseconds=10)
time.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _x_format(self):
        """Return the value formatter for this graph"""
        def datetime_to_str(x):
            dt = datetime.utcfromtimestamp(x)
            return self.x_value_formatter(dt)
        return datetime_to_str
wsgi.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _opener(self, filename):
        return lambda: (
            open(filename, 'rb'),
            datetime.utcfromtimestamp(os.path.getmtime(filename)),
            int(os.path.getsize(filename))
        )
itsdangerous.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def timestamp_to_datetime(self, ts):
        """Used to convert the timestamp from `get_timestamp` into a
        datetime object.
        """
        return datetime.utcfromtimestamp(ts + EPOCH)
itsdangerous.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_issue_date(self, header):
        rv = header.get('iat')
        if isinstance(rv, number_types):
            return datetime.utcfromtimestamp(int(rv))
wsgi.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _opener(self, filename):
        return lambda: (
            open(filename, 'rb'),
            datetime.utcfromtimestamp(os.path.getmtime(filename)),
            int(os.path.getsize(filename))
        )
itsdangerous.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def timestamp_to_datetime(self, ts):
        """Used to convert the timestamp from `get_timestamp` into a
        datetime object.
        """
        return datetime.utcfromtimestamp(ts + EPOCH)
itsdangerous.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_issue_date(self, header):
        rv = header.get('iat')
        if isinstance(rv, number_types):
            return datetime.utcfromtimestamp(int(rv))
dataaccess.py 文件源码 项目:n1mm_view 作者: n1kdo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_last_qso(cursor) :
    cursor.execute('SELECT timestamp, callsign, exchange, section, operator.name, band_id \n'
                   'FROM qso_log JOIN operator WHERE operator.id = operator_id \n'
                   'ORDER BY timestamp DESC LIMIT 1')
    last_qso_time = int(time.time()) - 60
    message = ''
    for row in cursor:
        last_qso_time = row[0]
    message = 'Last QSO: %s %s %s on %s by %s at %s' % (
        row[1], row[2], row[3], constants.Bands.BANDS_TITLE[row[5]], row[4],
        datetime.utcfromtimestamp(row[0]).strftime('%H:%M:%S'))
    logging.debug(message)
    return last_qso_time, message
dataaccess.py 文件源码 项目:n1mm_view 作者: n1kdo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_qsos_per_hour_per_band(cursor):
    qsos_per_hour = []
    qsos_by_band = [0] * constants.Bands.count()
    slice_minutes = 15
    slices_per_hour = 60 / slice_minutes
    window_seconds = slice_minutes * 60

    logging.debug('Load QSOs per Hour by Band')
    cursor.execute('SELECT timestamp / %d * %d AS ts, band_id, COUNT(*) AS qso_count \n'
                   'FROM qso_log GROUP BY ts, band_id;' % (window_seconds, window_seconds))
    for row in cursor:
        if len(qsos_per_hour) == 0:
            qsos_per_hour.append([0] * constants.Bands.count())
            qsos_per_hour[-1][0] = row[0]
        while qsos_per_hour[-1][0] != row[0]:
            ts = qsos_per_hour[-1][0] + window_seconds
            qsos_per_hour.append([0] * constants.Bands.count())
            qsos_per_hour[-1][0] = ts
        qsos_per_hour[-1][row[1]] = row[2] * slices_per_hour
        qsos_by_band[row[1]] += row[2]

    for rec in qsos_per_hour:  # FIXME
        rec[0] = datetime.utcfromtimestamp(rec[0])
        t = rec[0].strftime('%H:%M:%S')

    return qsos_per_hour, qsos_by_band
tzinfo.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def memorized_datetime(seconds):
    '''Create only one instance of each distinct datetime'''
    try:
        return _datetime_cache[seconds]
    except KeyError:
        # NB. We can't just do datetime.utcfromtimestamp(seconds) as this
        # fails with negative values under Windows (Bug #90096)
        dt = _epoch + timedelta(seconds=seconds)
        _datetime_cache[seconds] = dt
        return dt
github.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def wait_rate_limit_reset(now):
    reset = (
        datetime.utcfromtimestamp(GITHUB.x_ratelimit_reset)
        .replace(tzinfo=timezone.utc)
    )
    delta = reset - now
    wait = delta.total_seconds() + .5
    if wait < 1 or 3500 < wait:
        # Our data is outdated. Just go on.
        return 0

    logger.warning("Waiting rate limit reset in %s seconds.", wait)
    time.sleep(wait)
    GITHUB._instance.x_ratelimit_remaining = -1
    return wait
wallstreet.py 文件源码 项目:wallstreet 作者: mcdallas 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _yahoo(self, quote, exchange=None):
        """ Collects data from Yahoo Finance API """

        query = quote + "." + exchange.upper() if exchange else quote

        if not hasattr(self, '_session_y'):
            self._session_y = requests.Session()
        r = self._session_y.get(__class__._Y_API + query)

        if r.status_code == 404:
            raise LookupError('Ticker symbol not found.')
        else:
            r.raise_for_status()

        jayson = r.json()['optionChain']['result'][0]['quote']

        self.ticker = jayson['symbol']
        self._price = jayson['regularMarketPrice']
        self.currency = jayson['currency']
        self.exchange = jayson['exchange']
        self.change = jayson['regularMarketChange']
        self.cp = jayson['regularMarketChangePercent']
        self._last_trade = datetime.utcfromtimestamp(jayson['regularMarketTime'])
        self.name = jayson['longName']
        self.dy = jayson.get('trailingAnnualDividendYield', 0)
wallstreet.py 文件源码 项目:wallstreet 作者: mcdallas 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _yahoo(self, quote, d, m, y):
        """ Collects data from Yahoo Finance API """

        epoch = int(round(mktime(date(y, m, d).timetuple())/86400, 0)*86400)

        if not hasattr(self, '_session_y'):
            self._session_y = requests.Session()
        r = self._session_y.get(__class__._Y_API + quote + '?date=' + str(epoch))

        if r.status_code == 404:
            raise LookupError('Ticker symbol not found.')
        else:
            r.raise_for_status()

        json = r.json()

        try:
            self.data = json['optionChain']['result'][0]['options'][0]
        except IndexError:
            raise LookupError('No options listed for this stock.')

        self._exp = [datetime.utcfromtimestamp(i).date() for i in json['optionChain']['result'][0]['expirationDates']]
swift.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def swift_get_container(request, container_name, with_data=True):
    if with_data:
        headers, data = swift_api(request).get_object(container_name, "")
    else:
        data = None
        headers = swift_api(request).head_container(container_name)
    timestamp = None
    is_public = False
    public_url = None
    try:
        is_public = GLOBAL_READ_ACL in headers.get('x-container-read', '')
        if is_public:
            swift_endpoint = base.url_for(request,
                                          'object-store',
                                          endpoint_type='publicURL')
            parameters = urlparse.quote(container_name.encode('utf8'))
            public_url = swift_endpoint + '/' + parameters
        ts_float = float(headers.get('x-timestamp'))
        timestamp = datetime.utcfromtimestamp(ts_float).isoformat()
    except Exception:
        pass
    container_info = {
        'name': container_name,
        'container_object_count': headers.get('x-container-object-count'),
        'container_bytes_used': headers.get('x-container-bytes-used'),
        'timestamp': timestamp,
        'data': data,
        'is_public': is_public,
        'public_url': public_url,
    }
    return Container(container_info)
swift.py 文件源码 项目:mos-horizon 作者: Mirantis 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def swift_get_object(request, container_name, object_name, with_data=True,
                     resp_chunk_size=CHUNK_SIZE):
    if with_data:
        headers, data = swift_api(request).get_object(
            container_name, object_name, resp_chunk_size=resp_chunk_size)
    else:
        data = None
        headers = swift_api(request).head_object(container_name,
                                                 object_name)
    orig_name = headers.get("x-object-meta-orig-filename")
    timestamp = None
    try:
        ts_float = float(headers.get('x-timestamp'))
        timestamp = datetime.utcfromtimestamp(ts_float).isoformat()
    except Exception:
        pass
    obj_info = {
        'name': object_name,
        'bytes': headers.get('content-length'),
        'content_type': headers.get('content-type'),
        'etag': headers.get('etag'),
        'timestamp': timestamp,
    }
    return StorageObject(obj_info,
                         container_name,
                         orig_name=orig_name,
                         data=data)
test_validation.py 文件源码 项目:cqlmapper 作者: reddit 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_datetime_timestamp(self):
        dt_value = 1454520554
        self.DatetimeTest.objects.create(
            self.conn,
            test_id=5,
            created_at=dt_value,
        )
        dt2 = self.DatetimeTest.objects(test_id=5).first(self.conn)
        assert dt2.created_at == datetime.utcfromtimestamp(dt_value)
test_validation.py 文件源码 项目:cqlmapper 作者: reddit 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_conversion_specific_date(self):
        dt = datetime(1981, 7, 11, microsecond=555000)

        uuid = util.uuid_from_time(dt)

        from uuid import UUID
        assert isinstance(uuid, UUID)

        ts = (uuid.time - 0x01b21dd213814000) / 1e7 # back to a timestamp
        new_dt = datetime.utcfromtimestamp(ts)

        # checks that we created a UUID1 with the proper timestamp
        assert new_dt == dt


问题


面经


文章

微信
公众号

扫码关注公众号