python类tzutc()的实例源码

errorlog2es.py 文件源码 项目:rdsmant 作者: JangYoungWhan 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def validateLogDate(self, lines):
    delta = timedelta(hours=2)

    for line in lines:
      if not line:
        continue
      elif line.startswith("# Time: "):
        log_time = datetime.strptime(line[8:], "%y%m%d %H:%M:%S")
        log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
        log_time = log_time.replace(tzinfo=None)
        print(self._now, log_time)
        print("diff :", self._now - log_time)
        if (self._now - log_time) > delta:
          return False
        else:
          return True

    return True
errorlog2es.py 文件源码 项目:rdsmant 作者: JangYoungWhan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def validate_log_date(self, line):
    delta = timedelta(hours=2)

    m = REG_GENERAL_ERR.match(line)
    if m:
      log_time = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S")
      log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
      log_time = log_time.replace(tzinfo=None)
      if (self._now - log_time) > delta:
        return False
    elif BEGIN_DEADLOCK in line:
      m = REG_DEADLOCK.match(line)
      log_time = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S")
      log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
      log_time = log_time.replace(tzinfo=None)
      if (self._now - log_time) > delta:
        return False

    return True
tests.py 文件源码 项目:python-card-me 作者: tBaxter 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_recurrence(self):
        # Ensure date valued UNTILs in rrules are in a reasonable timezone,
        # and include that day (12/28 in this test)
        test_file = get_test_file("recurrence.ics")
        cal = base.readOne(test_file, findBegin=False)
        dates = list(cal.vevent.getrruleset())
        self.assertEqual(
            dates[0],
            datetime.datetime(2006, 1, 26, 23, 0, tzinfo=tzutc())
        )
        self.assertEqual(
            dates[1],
            datetime.datetime(2006, 2, 23, 23, 0, tzinfo=tzutc())
        )
        self.assertEqual(
            dates[-1],
            datetime.datetime(2006, 12, 28, 23, 0, tzinfo=tzutc())
        )
utils.py 文件源码 项目:kscore 作者: liuyichen 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
test_crud.py 文件源码 项目:catchpy 作者: nmaekawa 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_import_anno_ok_2(wa_image):
    catcha = wa_image

    now = datetime.now(tz.tzutc())

    # import first because CRUD.create changes created time in input
    catcha['id'] = 'naomi-xx-imported'
    resp = CRUD.import_annos([catcha])
    x2 = Anno._default_manager.get(pk=catcha['id'])
    assert x2 is not None
    assert Anno._default_manager.count() == 1

    # x2 was created more in the past? import preserves created date?
    delta = timedelta(hours=25)
    assert x2.created < (now - delta)

    # about to create
    catcha['id'] = 'naomi-xx'
    x1 = CRUD.create_anno(catcha)
    assert x1 is not None
    assert Anno._default_manager.count() == 2

    # x1 was created less than 1m ago?
    delta = timedelta(minutes=1)
    assert (now - delta) < x1.created
test_crud.py 文件源码 项目:catchpy 作者: nmaekawa 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_import_anno_ok(wa_image):
    catcha = wa_image
    catcha_reply = make_wa_object(
        age_in_hours=1, reply_to=catcha['id'])

    now = datetime.now(tz.tzutc())

    resp = CRUD.import_annos([catcha, catcha_reply])
    x2 = Anno._default_manager.get(pk=catcha['id'])
    assert x2 is not None

    # preserve replies?
    assert x2.total_replies == 1
    assert x2.replies[0].anno_id == catcha_reply['id']

    # import preserve created date?
    delta = timedelta(hours=25)
    assert x2.created < (now - delta)
deleted_entry_test.py 文件源码 项目:contentful.py 作者: contentful 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_deleted_entry(self):
        deleted_entry = DeletedEntry({
            "sys": {
                "space": {
                    "sys": {
                        "type": "Link",
                        "linkType": "Space",
                        "id": "cfexampleapi"
                        }
                    },
                "id": "5c6VY0gWg0gwaIeYkUUiqG",
                "type": "DeletedEntry",
                "createdAt": "2013-09-09T16:17:12.600Z",
                "updatedAt": "2013-09-09T16:17:12.600Z",
                "deletedAt": "2013-09-09T16:17:12.600Z",
                "revision": 1
            }
        })

        self.assertEqual(deleted_entry.id, '5c6VY0gWg0gwaIeYkUUiqG')
        self.assertEqual(deleted_entry.deleted_at, datetime.datetime(2013, 9, 9, 16, 17, 12, 600000, tzinfo=tzutc()))
        self.assertEqual(str(deleted_entry), "<DeletedEntry id='5c6VY0gWg0gwaIeYkUUiqG'>")
deleted_asset_test.py 文件源码 项目:contentful.py 作者: contentful 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_deleted_asset(self):
        deleted_asset = DeletedAsset({
            "sys": {
                "space": {
                    "sys": {
                        "type": "Link",
                        "linkType": "Space",
                        "id": "cfexampleapi"
                        }
                    },
                "id": "5c6VY0gWg0gwaIeYkUUiqG",
                "type": "DeletedAsset",
                "createdAt": "2013-09-09T16:17:12.600Z",
                "updatedAt": "2013-09-09T16:17:12.600Z",
                "deletedAt": "2013-09-09T16:17:12.600Z",
                "revision": 1
            }
        })

        self.assertEqual(deleted_asset.id, '5c6VY0gWg0gwaIeYkUUiqG')
        self.assertEqual(deleted_asset.deleted_at, datetime.datetime(2013, 9, 9, 16, 17, 12, 600000, tzinfo=tzutc()))
        self.assertEqual(str(deleted_asset), "<DeletedAsset id='5c6VY0gWg0gwaIeYkUUiqG'>")
utils.py 文件源码 项目:jepsen-training-vpc 作者: bloomberg 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
parser.py 文件源码 项目:slack_scholar 作者: xLeitix 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _build_datetime(parts):

        timestamp = parts.get('timestamp')

        if timestamp:
            tz_utc = tz.tzutc()
            return datetime.fromtimestamp(timestamp, tz=tz_utc)

        am_pm = parts.get('am_pm')
        hour = parts.get('hour', 0)

        if am_pm == 'pm' and hour < 12:
            hour += 12
        elif am_pm == 'am' and hour == 12:
            hour = 0

        return datetime(year=parts.get('year', 1), month=parts.get('month', 1),
            day=parts.get('day', 1), hour=hour, minute=parts.get('minute', 0),
            second=parts.get('second', 0), microsecond=parts.get('microsecond', 0),
            tzinfo=parts.get('tzinfo'))
default.py 文件源码 项目:repository.jlippold 作者: jlippold 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def format_event(device, event):
    from_zone = tz.tzutc()
    to_zone = tz.tzlocal()
    utc = event['created_at'].replace(tzinfo=from_zone)
    local_time = utc.astimezone(to_zone)

    local_time_string = local_time.strftime('%I:%M %p ') 
    local_time_string += ADDON.getLocalizedString(30300) 
    local_time_string += local_time.strftime(' %A %b %d %Y')

    event_name = ''
    if event['kind'] == 'on_demand':
        event_name = ADDON.getLocalizedString(30301)
    if event['kind'] == 'motion':
        event_name = ADDON.getLocalizedString(30302)
    if event['kind'] == 'ding':
        event_name = ADDON.getLocalizedString(30303)

    return ' '.join([device.name, event_name, local_time_string])
stack.py 文件源码 项目:sceptre 作者: cloudreach 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _wait_for_completion(self):
        """
        Waits for a stack operation to finish. Prints CloudFormation events
        while it waits.

        :returns: The final stack status.
        :rtype: sceptre.stack_status.StackStatus
        """
        status = StackStatus.IN_PROGRESS

        self.most_recent_event_datetime = (
            datetime.now(tzutc()) - timedelta(seconds=3)
        )
        while status == StackStatus.IN_PROGRESS:
            status = self._get_simplified_status(self.get_status())
            self._log_new_events()
            time.sleep(4)
        return status
utils.py 文件源码 项目:AWS-AutoTag 作者: cpollard0 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def datetime2timestamp(dt, default_timezone=None):
    """Calculate the timestamp based on the given datetime instance.

    :type dt: datetime
    :param dt: A datetime object to be converted into timestamp
    :type default_timezone: tzinfo
    :param default_timezone: If it is provided as None, we treat it as tzutc().
                             But it is only used when dt is a naive datetime.
    :returns: The timestamp
    """
    epoch = datetime.datetime(1970, 1, 1)
    if dt.tzinfo is None:
        if default_timezone is None:
            default_timezone = tzutc()
        dt = dt.replace(tzinfo=default_timezone)
    d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
    if hasattr(d, "total_seconds"):
        return d.total_seconds()  # Works in Python 2.7+
    return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
reporting_daily.py 文件源码 项目:garage-door 作者: denniskline 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def combine_histories_and_messages(histories, smsMessages):
    cmdSuccessStyle = 'class="commandSuccess"'
    cmdErrorStyle = 'class="commandError"'
    basicStyle = ''

    from_zone = tz.tzutc()
    to_zone = tz.tzlocal()

    combinedList = []
    for history in histories:
        combinedList.append({"action": history.get('state'), "timestamp": history.get('changedAt'), "style": basicStyle})
    for smsMessage in smsMessages:
        style = cmdSuccessStyle if smsMessage.get('status').lower() == 'processed' else cmdErrorStyle
        createdAt = smsMessage.get('createdAt').replace(tzinfo=from_zone)
        combinedList.append({"action": smsMessage.get('body').lower(), "timestamp": createdAt.astimezone(to_zone), "style": style})

    combinedList.sort(key=lambda c: c.get("timestamp"))
    return combinedList
zenodo_spam.py 文件源码 项目:hadroid 作者: hadroid 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def time_params_to_dates(args):
    now = datetime.now(tz=tzutc())
    if args['--hours'] or args['--days']:
        if args['--days']:
            hours = int(args['--days']) * 24
        else:
            hours = int(args['--hours'])
        until_date = now
        from_date = until_date - timedelta(hours=hours)
    else:
        if args['--from']:
            from_date = parse_date(args['--from'])
            if args['--until']:
                until_date = parse_date(args['--until'])
            else:
                until_date = now
        else:
            until_date = now
            from_date = until_date - timedelta(days=1)
    return from_date, until_date
token.py 文件源码 项目:py-xbox-webapi 作者: tuxuser 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, token):
        """
        Container for storing Windows Live Refresh Token.

        Subclass of :class:`Token`

        WARNING: Only invoke when creating a FRESH token!
        Don't use to convert saved token into object
        Refresh Token usually has a lifetime of 14 days

        Args:
            token (str): The JWT Refresh-Token
        """
        date_issued = datetime.now(tzutc())
        date_valid = date_issued + timedelta(days=14)
        super(RefreshToken, self).__init__(token, date_issued, date_valid)
tests.py 文件源码 项目:vobject 作者: eventable 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_scratchbuild(self):
        """
        CreateCalendar 2.0 format from scratch
        """
        test_cal = get_test_file("simple_2_0_test.ics")
        cal = base.newFromBehavior('vcalendar', '2.0')
        cal.add('vevent')
        cal.vevent.add('dtstart').value = datetime.datetime(2006, 5, 9)
        cal.vevent.add('description').value = "Test event"
        cal.vevent.add('created').value = \
            datetime.datetime(2006, 1, 1, 10,
                              tzinfo=dateutil.tz.tzical(
                                  "test_files/timezones.ics").get('US/Pacific'))
        cal.vevent.add('uid').value = "Not very random UID"
        cal.vevent.add('dtstamp').value = datetime.datetime(2017, 6, 26, 0, tzinfo=tzutc())

        # Note we're normalizing line endings, because no one got time for that.
        self.assertEqual(
            cal.serialize().replace('\r\n', '\n'),
            test_cal.replace('\r\n', '\n')
        )
tests.py 文件源码 项目:vobject 作者: eventable 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_recurrence(self):
        """
        Ensure date valued UNTILs in rrules are in a reasonable timezone,
        and include that day (12/28 in this test)
        """
        test_file = get_test_file("recurrence.ics")
        cal = base.readOne(test_file)
        dates = list(cal.vevent.getrruleset())
        self.assertEqual(
            dates[0],
            datetime.datetime(2006, 1, 26, 23, 0, tzinfo=tzutc())
        )
        self.assertEqual(
            dates[1],
            datetime.datetime(2006, 2, 23, 23, 0, tzinfo=tzutc())
        )
        self.assertEqual(
            dates[-1],
            datetime.datetime(2006, 12, 28, 23, 0, tzinfo=tzutc())
        )
periodic_cleanup.py 文件源码 项目:rpc-gating 作者: rcbops 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def cleanup_instances(self):
        """Cleanup Instances.

        Delete instances if they are in an error state or are over the
        age defined in INSTANCE_AGE_LIMIT.

        Instances that don't match PROTECTED_PREFIX are cleaned up
        """
        current_time = datetime.datetime.now(tzutc())
        max_age = datetime.timedelta(hours=self.age_limit)

        for server in self.unprotected_servers:
            created_time = dateutil.parser.parse(server.created_at)
            age = current_time - created_time
            errored = server.status == "ERROR"
            if errored or age > max_age:
                _indp("Deleting {name} Errored: {error} Age: {age}".format(
                    name=server.name, error=errored, age=age))
                self.conn.compute.delete_server(server.id)


问题


面经


文章

微信
公众号

扫码关注公众号