python类tzutc()的实例源码

test_auth.py 文件源码 项目:backend.ai-client-py 作者: lablup 项目源码 文件源码 阅读 67 收藏 0 点赞 0 评论 0
def test_generate_signature(defconfig):
    kwargs = dict(
        method='GET',
        version=defconfig.version,
        endpoint=defconfig.endpoint,
        date=datetime.now(tzutc()),
        request_path='/path/to/api/',
        content=b'"test data"',
        content_type='application/json',
        access_key=defconfig.access_key,
        secret_key=defconfig.secret_key,
        hash_type='md5'
    )
    headers, signature = generate_signature(**kwargs)

    assert kwargs['hash_type'].upper() in headers['Authorization']
    assert kwargs['access_key'] in headers['Authorization']
    assert signature in headers['Authorization']
test_imports.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testTzAll(self):
        from dateutil.tz import tzutc
        from dateutil.tz import tzoffset
        from dateutil.tz import tzlocal
        from dateutil.tz import tzfile
        from dateutil.tz import tzrange
        from dateutil.tz import tzstr
        from dateutil.tz import tzical
        from dateutil.tz import gettz
        from dateutil.tz import tzwin
        from dateutil.tz import tzwinlocal

        tz_all = ["tzutc", "tzoffset", "tzlocal", "tzfile", "tzrange",
                  "tzstr", "tzical", "gettz"]

        tz_all += ["tzwin", "tzwinlocal"] if sys.platform.startswith("win") else []
        lvars = locals()

        for var in tz_all:
            self.assertIsNot(lvars[var], None)
test_tz.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testFoldPositiveUTCOffset(self):
        # Test that we can resolve ambiguous times
        tzname = self._get_tzname('Australia/Sydney')

        with self._gettz_context(tzname):
            SYD0 = self.gettz(tzname)
            SYD1 = self.gettz(tzname)

            t0_u = datetime(2012, 3, 31, 15, 30, tzinfo=tz.tzutc())  # AEST
            t1_u = datetime(2012, 3, 31, 16, 30, tzinfo=tz.tzutc())  # AEDT

            # Using fresh tzfiles
            t0_syd0 = t0_u.astimezone(SYD0)
            t1_syd1 = t1_u.astimezone(SYD1)

            self.assertEqual(t0_syd0.replace(tzinfo=None),
                             datetime(2012, 4, 1, 2, 30))

            self.assertEqual(t1_syd1.replace(tzinfo=None),
                             datetime(2012, 4, 1, 2, 30))

            self.assertEqual(t0_syd0.utcoffset(), timedelta(hours=11))
            self.assertEqual(t1_syd1.utcoffset(), timedelta(hours=10))
test_tz.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testGapPositiveUTCOffset(self):
        # Test that we don't have a problem around gaps.
        tzname = self._get_tzname('Australia/Sydney')

        with self._gettz_context(tzname):
            SYD0 = self.gettz(tzname)
            SYD1 = self.gettz(tzname)

            t0_u = datetime(2012, 10, 6, 15, 30, tzinfo=tz.tzutc())  # AEST
            t1_u = datetime(2012, 10, 6, 16, 30, tzinfo=tz.tzutc())  # AEDT

            # Using fresh tzfiles
            t0 = t0_u.astimezone(SYD0)
            t1 = t1_u.astimezone(SYD1)

            self.assertEqual(t0.replace(tzinfo=None),
                             datetime(2012, 10, 7, 1, 30))

            self.assertEqual(t1.replace(tzinfo=None),
                             datetime(2012, 10, 7, 3, 30))

            self.assertEqual(t0.utcoffset(), timedelta(hours=10))
            self.assertEqual(t1.utcoffset(), timedelta(hours=11))
test_tz.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testFoldNegativeUTCOffset(self):
            # Test that we can resolve ambiguous times
            tzname = self._get_tzname('America/Toronto')

            with self._gettz_context(tzname):
                # Calling fromutc() alters the tzfile object
                TOR0 = self.gettz(tzname)
                TOR1 = self.gettz(tzname)

                t0_u = datetime(2011, 11, 6, 5, 30, tzinfo=tz.tzutc())
                t1_u = datetime(2011, 11, 6, 6, 30, tzinfo=tz.tzutc())

                # Using fresh tzfiles
                t0_tor0 = t0_u.astimezone(TOR0)
                t1_tor1 = t1_u.astimezone(TOR1)

                self.assertEqual(t0_tor0.replace(tzinfo=None),
                                 datetime(2011, 11, 6, 1, 30))

                self.assertEqual(t1_tor1.replace(tzinfo=None),
                                 datetime(2011, 11, 6, 1, 30))

                self.assertEqual(t0_tor0.utcoffset(), timedelta(hours=-4.0))
                self.assertEqual(t1_tor1.utcoffset(), timedelta(hours=-5.0))
test_tz.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def testGapNegativeUTCOffset(self):
        # Test that we don't have a problem around gaps.
        tzname = self._get_tzname('America/Toronto')

        with self._gettz_context(tzname):
            # Calling fromutc() alters the tzfile object
            TOR0 = self.gettz(tzname)
            TOR1 = self.gettz(tzname)

            t0_u = datetime(2011, 3, 13, 6, 30, tzinfo=tz.tzutc())
            t1_u = datetime(2011, 3, 13, 7, 30, tzinfo=tz.tzutc())

            # Using fresh tzfiles
            t0 = t0_u.astimezone(TOR0)
            t1 = t1_u.astimezone(TOR1)

            self.assertEqual(t0.replace(tzinfo=None),
                             datetime(2011, 3, 13, 1, 30))

            self.assertEqual(t1.replace(tzinfo=None),
                             datetime(2011, 3, 13, 3, 30))

            self.assertNotEqual(t0, t1)
            self.assertEqual(t0.utcoffset(), timedelta(hours=-5.0))
            self.assertEqual(t1.utcoffset(), timedelta(hours=-4.0))
test_tz.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testEqualAmbiguousComparison(self):
        tzname = self._get_tzname('Australia/Sydney')

        with self._gettz_context(tzname):
            SYD0 = self.gettz(tzname)
            SYD1 = self.gettz(tzname)

            t0_u = datetime(2012, 3, 31, 14, 30, tzinfo=tz.tzutc())  # AEST
            t1_u = datetime(2012, 3, 31, 16, 30, tzinfo=tz.tzutc())  # AEDT

            t0_syd0 = t0_u.astimezone(SYD0)
            t0_syd1 = t0_u.astimezone(SYD1)

            # This is considered an "inter-zone comparison" because it's an
            # ambiguous datetime.
            self.assertEqual(t0_syd0, t0_syd1)
test_tz.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_utc_transitions(self, tzi, year, gap):
        dston, dstoff = tzi.transitions(year)
        if gap:
            t_n = dston - timedelta(minutes=30)

            t0_u = t_n.replace(tzinfo=tzi).astimezone(tz.tzutc())
            t1_u = t0_u + timedelta(hours=1)
        else:
            # Get 1 hour before the first ambiguous date
            t_n = dstoff - timedelta(minutes=30)

            t0_u = t_n.replace(tzinfo=tzi).astimezone(tz.tzutc())
            t_n += timedelta(hours=1)                   # Naive ambiguous date
            t0_u = t0_u + timedelta(hours=1)            # First ambiguous date
            t1_u = t0_u + timedelta(hours=1)            # Second ambiguous date

        return t_n, t0_u, t1_u
utils.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
conftest.py 文件源码 项目:agile-analytics 作者: cmheisel 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def make_analyzed_tickets(AnalyzedAgileTicket, datetime, tzutc):
    """Make ticket from a list of dicts with key data."""
    from dateutil.parser import parse
    default = datetime(1979, 8, 15, 0, 0, 0, tzinfo=tzutc)

    def _make_analyzed_tickets(ticket_datas):
        tickets = []
        for data in ticket_datas:
            t = AnalyzedAgileTicket(
                key=data['key'],
                committed=dict(state="Committed", entered_at=parse(data['committed'], default=default)),
                started=dict(state="Started", entered_at=parse(data['started'], default=default)),
                ended=dict(state="Ended", entered_at=parse(data['ended'], default=default))
            )
            tickets.append(t)
        return tickets
    return _make_analyzed_tickets
conftest.py 文件源码 项目:agile-analytics 作者: cmheisel 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def weeks_of_tickets(datetime, tzutc, AnalyzedAgileTicket):
    """A bunch of tickets."""
    from dateutil.parser import parse
    parsed = []
    default = datetime(1979, 8, 15, 0, 0, 0, tzinfo=tzutc)

    current_path = path.dirname(path.abspath(__file__))
    csv_file = path.join(current_path, 'data', 'weeks_of_tickets.csv')

    count = 1
    for row in csv.DictReader(open(csv_file, 'r')):
        t = AnalyzedAgileTicket(
            key="FOO-{}".format(count),
            committed=dict(state="committed", entered_at=parse(row['committed'], default=default)),
            started=dict(state="started", entered_at=parse(row['started'], default=default)),
            ended=dict(state="ended", entered_at=parse(row['ended'], default=default))
        )
        parsed.append(t)
        count += 1

    return parsed
order.py 文件源码 项目:trading_package 作者: abrahamchaibi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, product_id: str, sequence_id: int, order_side: OrderSide, size: str, price: str,
                 status: OrderStatus = OrderStatus.open, order_id: str = None, order_type: OrderType = OrderType.limit,
                 created_at: Optional[datetime] = None, historical: bool = False, confirmed: bool = False):
        self.product_id = product_id
        self.order_side = order_side
        self.order_type = order_type
        self.status = status
        self.sequence_id = int(sequence_id)
        self.size = size
        self.filled_size = '0'
        self.price = str(price)
        self.order_id = order_id
        if created_at is None:
            self.created_at = datetime.now(tz.tzutc())
        else:
            # orders cannot be created in the future please
            self.created_at = min(datetime.now(tz.tzutc()), created_at)
        self.historical = historical
        self.confirmed = confirmed
        if float(self.size) < 0:
            raise OrderException('Order size must be positive {}'.format(self.size))
utils.py 文件源码 项目:AshsSDK 作者: thehappydinoa 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
parser.py 文件源码 项目:Orator-Google-App-Engine 作者: MakarenaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _build_datetime(parts):

        timestamp = parts.get('timestamp')

        if timestamp:
            tz_utc = tz.tzutc()
            return datetime.fromtimestamp(timestamp, tz=tz_utc)

        am_pm = parts.get('am_pm')
        hour = parts.get('hour', 0)

        if am_pm == 'pm' and hour < 12:
            hour += 12
        elif am_pm == 'am' and hour == 12:
            hour = 0

        return datetime(year=parts.get('year', 1), month=parts.get('month', 1),
            day=parts.get('day', 1), hour=hour, minute=parts.get('minute', 0),
            second=parts.get('second', 0), microsecond=parts.get('microsecond', 0),
            tzinfo=parts.get('tzinfo'))
prop.py 文件源码 项目:iCalVerter 作者: amsysuk 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, dt):
        if not isinstance(dt, (datetime, date, timedelta, time)):
            raise ValueError('You must use datetime, date, timedelta or time')
        if isinstance(dt, datetime):
            self.params = Parameters({'value': 'DATE-TIME'})
        elif isinstance(dt, date):
            self.params = Parameters({'value': 'DATE'})
        elif isinstance(dt, time):
            self.params = Parameters({'value': 'TIME'})

        if (isinstance(dt, datetime) or isinstance(dt, time))\
                and getattr(dt, 'tzinfo', False):
            tzinfo = dt.tzinfo
            if tzinfo is not pytz.utc and\
               (tzutc is None or not isinstance(tzinfo, tzutc)):
                # set the timezone as a parameter to the property
                tzid = tzid_from_dt(dt)
                if tzid:
                    self.params.update({'TZID': tzid})
        self.dt = dt
utils.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
main.py 文件源码 项目:EasyCrawler 作者: playwolf719 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def my_local_time():
    # METHOD 1: Hardcode zones:
    # from_zone = tz.gettz('UTC')
    # to_zone = tz.gettz('America/New_York')

    # METHOD 2: Auto-detect zones:
    from_zone = tz.tzutc()
    to_zone = tz.tzlocal()

    utc = datetime.utcnow()
    # utc = datetime.strptime('2011-01-21 02:37:21', '%Y-%m-%d %H:%M:%S')

    # Tell the datetime object that it's in UTC time zone since 
    # datetime objects are 'naive' by default
    utc = utc.replace(tzinfo=from_zone)

    # Convert time zone
    return utc.astimezone(to_zone)
test_timeseries.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_index_convert_to_datetime_array_dateutil(self):
        tm._skip_if_no_dateutil()
        import dateutil

        def _check_rng(rng):
            converted = rng.to_pydatetime()
            tm.assertIsInstance(converted, np.ndarray)
            for x, stamp in zip(converted, rng):
                tm.assertIsInstance(x, datetime)
                self.assertEqual(x, stamp.to_pydatetime())
                self.assertEqual(x.tzinfo, stamp.tzinfo)

        rng = date_range('20090415', '20090519')
        rng_eastern = date_range('20090415', '20090519',
                                 tz='dateutil/US/Eastern')
        rng_utc = date_range('20090415', '20090519', tz=dateutil.tz.tzutc())

        _check_rng(rng)
        _check_rng(rng_eastern)
        _check_rng(rng_utc)
test_timeseries.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_period_resample_with_local_timezone_dateutil(self):
        # GH5430
        tm._skip_if_no_dateutil()
        import dateutil

        local_timezone = 'dateutil/America/Los_Angeles'

        start = datetime(year=2013, month=11, day=1, hour=0, minute=0,
                         tzinfo=dateutil.tz.tzutc())
        # 1 day later
        end = datetime(year=2013, month=11, day=2, hour=0, minute=0,
                       tzinfo=dateutil.tz.tzutc())

        index = pd.date_range(start, end, freq='H')

        series = pd.Series(1, index=index)
        series = series.tz_convert(local_timezone)
        result = series.resample('D', kind='period').mean()

        # Create the expected series
        # Index is moved back a day with the timezone conversion from UTC to
        # Pacific
        expected_index = (pd.period_range(start=start, end=end, freq='D') - 1)
        expected = pd.Series(1, index=expected_index)
        assert_series_equal(result, expected)
test_timeseries.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_class_ops_dateutil(self):
        tm._skip_if_no_dateutil()
        from dateutil.tz import tzutc

        def compare(x, y):
            self.assertEqual(int(np.round(Timestamp(x).value / 1e9)),
                             int(np.round(Timestamp(y).value / 1e9)))

        compare(Timestamp.now(), datetime.now())
        compare(Timestamp.now('UTC'), datetime.now(tzutc()))
        compare(Timestamp.utcnow(), datetime.utcnow())
        compare(Timestamp.today(), datetime.today())
        current_time = calendar.timegm(datetime.now().utctimetuple())
        compare(Timestamp.utcfromtimestamp(current_time),
                datetime.utcfromtimestamp(current_time))
        compare(Timestamp.fromtimestamp(current_time),
                datetime.fromtimestamp(current_time))

        date_component = datetime.utcnow()
        time_component = (date_component + timedelta(minutes=10)).time()
        compare(Timestamp.combine(date_component, time_component),
                datetime.combine(date_component, time_component))
test_timeseries.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_cant_compare_tz_naive_w_aware_dateutil(self):
        tm._skip_if_no_dateutil()
        from dateutil.tz import tzutc
        utc = tzutc()
        # #1404
        a = Timestamp('3/12/2012')
        b = Timestamp('3/12/2012', tz=utc)

        self.assertRaises(Exception, a.__eq__, b)
        self.assertRaises(Exception, a.__ne__, b)
        self.assertRaises(Exception, a.__lt__, b)
        self.assertRaises(Exception, a.__gt__, b)
        self.assertRaises(Exception, b.__eq__, a)
        self.assertRaises(Exception, b.__ne__, a)
        self.assertRaises(Exception, b.__lt__, a)
        self.assertRaises(Exception, b.__gt__, a)

        if sys.version_info < (3, 3):
            self.assertRaises(Exception, a.__eq__, b.to_pydatetime())
            self.assertRaises(Exception, a.to_pydatetime().__eq__, b)
        else:
            self.assertFalse(a == b.to_pydatetime())
            self.assertFalse(a.to_pydatetime() == b)
asg.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def process(self, asgs):
        msg_tmpl = self.data.get('message', self.default_template)
        key = self.data.get('key', self.data.get('tag', DEFAULT_TAG))
        op = self.data.get('op', 'suspend')
        date = self.data.get('days', 4)

        n = datetime.now(tz=tzutc())
        stop_date = n + timedelta(days=date)
        try:
            msg = msg_tmpl.format(
                op=op, action_date=stop_date.strftime('%Y/%m/%d'))
        except Exception:
            self.log.warning("invalid template %s" % msg_tmpl)
            msg = self.default_template.format(
                op=op, action_date=stop_date.strftime('%Y/%m/%d'))

        self.log.info("Tagging %d asgs for %s on %s" % (
            len(asgs), op, stop_date.strftime('%Y/%m/%d')))
        self.tag(asgs, key, msg)
iam.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def fetch_credential_report(self):
        client = local_session(self.manager.session_factory).client('iam')
        try:
            report = client.get_credential_report()
        except ClientError as e:
            if e.response['Error']['Code'] != 'ReportNotPresent':
                raise
            report = None
        if report:
            threshold = datetime.datetime.now(tz=tzutc()) - timedelta(
                seconds=self.get_value_or_schema_default(
                    'report_max_age'))
            if not report['GeneratedTime'].tzinfo:
                threshold = threshold.replace(tzinfo=None)
            if report['GeneratedTime'] < threshold:
                report = None
        if report is None:
            if not self.get_value_or_schema_default('report_generate'):
                raise ValueError("Credential Report Not Present")
            client.generate_credential_report()
            time.sleep(self.get_value_or_schema_default('report_delay'))
            report = client.get_credential_report()
        return report['Content']
test_filters.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_age(self):
        now = datetime.now(tz=tz.tzutc())
        three_months = now - timedelta(90)
        two_months = now - timedelta(60)
        one_month = now - timedelta(30)

        def i(d):
            return instance(LaunchTime=d)

        fdata = {
            'type': 'value',
            'key': 'LaunchTime',
            'op': 'less-than',
            'value_type': 'age',
            'value': 32}

        self.assertFilter(fdata, i(three_months), False)
        self.assertFilter(fdata, i(two_months), False)
        self.assertFilter(fdata, i(one_month), True)
        self.assertFilter(fdata, i(now), True)
        self.assertFilter(fdata, i(now.isoformat()), True)
test_filters.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_expiration(self):

        now = datetime.now(tz=tz.tzutc())
        three_months = now + timedelta(90)
        two_months = now + timedelta(60)

        def i(d):
            return instance(LaunchTime=d)

        fdata = {
            'type': 'value',
            'key': 'LaunchTime',
            'op': 'less-than',
            'value_type': 'expiration',
            'value': 61}

        self.assertFilter(fdata, i(three_months), False)
        self.assertFilter(fdata, i(two_months), True)
        self.assertFilter(fdata, i(now), True)
        self.assertFilter(fdata, i(now.isoformat()), True)
test_filters.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_filter_instance_age(self):
        now = datetime.now(tz=tz.tzutc())
        three_months = now - timedelta(90)
        two_months = now - timedelta(60)
        one_month = now - timedelta(30)

        def i(d):
            return instance(LaunchTime=d)

        for ii, v in [
                (i(now), False),
                (i(three_months), True),
                (i(two_months), True),
                (i(one_month), False)
        ]:
            self.assertFilter({'type': 'instance-uptime', 'op': 'gte', 'days': 60}, ii, v)
exporter.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def filter_extant_exports(client, bucket, prefix, days, start, end=None):
    """Filter days where the bucket already has extant export keys.
    """
    end = end or datetime.now()
    # days = [start + timedelta(i) for i in range((end-start).days)]
    try:
        tag_set = client.get_object_tagging(Bucket=bucket, Key=prefix).get('TagSet', [])
    except ClientError as e:
        if e.response['Error']['Code'] != 'NoSuchKey':
            raise
        tag_set = []
    tags = {t['Key']: t['Value'] for t in tag_set}

    if 'LastExport' not in tags:
        return sorted(days)
    last_export = parse(tags['LastExport'])
    if last_export.tzinfo is None:
        last_export = last_export.replace(tzinfo=tzutc())
    return [d for d in sorted(days) if d > last_export]
serializers.py 文件源码 项目:ros-interop 作者: mcgill-robotics 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def iso8601_to_rostime(iso):
    """Converts ISO 8601 time to ROS Time.

    Args:
        iso: ISO 8601 encoded string.

    Returns:
        std_msgs/Time.
    """
    # Convert to datetime in UTC.
    t = dateutil.parser.parse(iso)
    if not t.utcoffset():
        t = t.replace(tzinfo=tzutc())

    # Convert to time from epoch in UTC.
    epoch = datetime.utcfromtimestamp(0)
    epoch = epoch.replace(tzinfo=tzutc())
    dt = t - epoch

    # Create ROS message.
    time = Time()
    time.data.secs = int(dt.total_seconds())
    time.data.nsecs = dt.microseconds * 1000

    return time
properties.py 文件源码 项目:anom-py 作者: Bogdanp 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _deserialize(cls, data):
        kind = data.get(cls._type_field, None)
        if kind is None:
            return data

        value = data.get("value")
        if kind == "blob":
            return base64.b64decode(value.encode("utf-8"))

        elif kind == "datetime":
            return datetime.fromtimestamp(value, tz.tzutc())

        elif kind == "model":
            return cls._entity_from_dict(value)

        else:
            raise ValueError(f"Invalid kind {kind!r}.")
slowquery2es.py 文件源码 项目:rdsmant 作者: JangYoungWhan 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def validateLogDate(self, lines):
        delta = timedelta(hours=2)

        for line in lines:
            if not line:
                continue
            elif line.startswith("# Time: "):
                log_time = datetime.strptime(line[8:], "%y%m%d %H:%M:%S")
                log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(
                    zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
                log_time = log_time.replace(tzinfo=None)
                print(self._now, log_time)
                print("diff :", self._now - log_time)
                if (self._now - log_time) > delta:
                    return False
                else:
                    return True

        return True

    # Initialization.


问题


面经


文章

微信
公众号

扫码关注公众号