python类utc()的实例源码

utils.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def to_utc_timestamp(date_time):
    """Convert a naive or timezone-aware datetime to UTC timestamp.

    Arguments:
      date_time (:py:class:`datetime.datetime`): The datetime to
        convert.

    Returns:
      :py:class:`int`: The timestamp (in seconds).

    """
    if date_time is None:
        return
    if date_time.tzname() is None:
        timestamp = date_time.replace(tzinfo=timezone.utc).timestamp()
    else:
        timestamp = date_time.timestamp()
    return int(round(timestamp, 0))
utils.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def occurred(at_):
    """Calculate when a service event occurred.

    Arguments:
      at_ (:py:class:`str`): When the event occurred.

    Returns:
      :py:class:`str`: The humanized occurrence time.

    """
    try:
        occurred_at = parse(at_)
    except (TypeError, ValueError):
        logger.warning('failed to parse occurrence time %r', at_)
        return 'time not available'
    utc_now = datetime.now(tz=timezone.utc)
    try:
        return naturaltime((utc_now - occurred_at).total_seconds())
    except TypeError:  # at_ is a naive datetime
        return naturaltime((datetime.now() - occurred_at).total_seconds())
core.py 文件源码 项目:flash_services 作者: textbook 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def calculate_timeout(http_date):
        """Extract request timeout from e.g. ``Retry-After`` header.

        Note:
          Per :rfc:`2616#section-14.37`, the ``Retry-After`` header can
          be either an integer number of seconds or an HTTP date. This
          function can handle either.

        Arguments:
          http_date (:py:class:`str`): The date to parse.

        Returns:
          :py:class:`int`: The timeout, in seconds.

        """
        try:
            return int(http_date)
        except ValueError:
            date_after = parse(http_date)
        utc_now = datetime.now(tz=timezone.utc)
        return int((date_after - utc_now).total_seconds())
audit.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def reset(self, variantId, buildId, resultHash):
        self.__variantId = variantId
        self.__buildId = buildId
        self.__resultHash = resultHash
        self.__recipes = None
        self.__defines = {}
        u = os.uname()
        self.__build = {
            'sysname'  : u.sysname,
            'nodename' : u.nodename,
            'release'  : u.release,
            'version'  : u.version,
            'machine'  : u.machine,
            'date'     : datetime.now(timezone.utc).isoformat(),
        }
        self.__env = ""
        self.__metaEnv = {}
        self.__scms = []
        self.__deps = []
        self.__tools = {}
        self.__sandbox = None
        self.__id = None
test_procedures.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_throttling_compute_fine(SETTINGS):
    SETTINGS.RATE_LIMIT_THRESHOLD = 0
    from jenkins_epo.procedures import compute_throttling

    # Consumed 1/5 calls at 2/3 of the time.
    now = datetime(2017, 1, 18, 14, 40, tzinfo=timezone.utc)
    reset = datetime(2017, 1, 18, 15, tzinfo=timezone.utc)
    remaining = 4000
    seconds = compute_throttling(
        now=now,
        rate_limit=dict(rate=dict(
            limit=5000, remaining=remaining,
            reset=reset.timestamp(),
        )),
    )
    assert 0 == seconds  # Fine !
llparser.py 文件源码 项目:Ruby-Bot 作者: ahuei123456 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def convert_time(time):
    # 2014-11-05T09:00:00Z
    # yyyy-mm-ddThh:mm:ssZ
    # 2013-06-12T16:00:00+09:00
    if time is None:
        return None
    if len(time) < 10:
        return None

    year = int(time[:4])
    month = int(time[5:7])
    date = int(time[8:10])

    hour = int(time[11:13])
    minute = int(time[14:16])
    second = int(time[17:19])

    if len(time) > 21:
        tz = timezone(timedelta(hours=9))
    else:
        tz = timezone.utc

    dt = datetime.datetime(year, month, date, hour, minute, second, 0, tz)
    return dt
llparser.py 文件源码 项目:Ruby-Bot 作者: ahuei123456 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def event_remaining(dt_start, dt_end):
    now = datetime.datetime.now(timezone.utc)

    diff_end = dt_end - now
    diff_start = now - dt_start
    if diff_start.total_seconds() < 0:
        return "Event has not started yet!"
    elif diff_end.total_seconds() < 0:
        return "Event has ended!"
    else:
        seconds = diff_end.seconds

        hours = seconds // 3600
        seconds -= hours * 3600

        minutes = seconds // 60
        seconds -= minutes * 60

        return "Event ends in {} days, {} hours, {} minutes and {} seconds.".format(diff_end.days, hours, minutes, seconds)

###### Sunshine ######
make_lightcone.py 文件源码 项目:atoolbox 作者: liweitianux 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def header(self):
        dDc = self.configs.Dc_cell
        Dc_min, Dc_max = self.configs.Dc_limit
        header = fits.Header()
        header["BUNIT"] = (self.configs.unit, "Data unit")
        header["zmin"] = (self.configs.zmin, "HI simulation minimum redshift")
        header["zmax"] = (self.configs.zmax, "HI simulation maximum redshift")
        header["dz"] = (self.configs.dz, "HI simulation redshift step size")
        header["Dc_min"] = (Dc_min, "[cMpc] comoving distance at zmin")
        header["Dc_max"] = (Dc_max, "[cMpc] comoving distance at zmax")
        header["Dc_step"] = (dDc, "[cMpc] comoving distance between slices")
        header["Lside"] = (self.configs.Lside, "[cMpc] Simulation side length")
        header["Nside"] = (self.configs.Nside, "Number of cells at each side")
        header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
                          "File creation date")
        header.add_history(" ".join(sys.argv))
        header.extend(self.wcs.to_header(), update=True)
        return header
get_slice_zfreq.py 文件源码 项目:atoolbox 作者: liweitianux 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def write_slice(self, outfile, data, z, clobber=False):
        freq = z2freq(z)
        Dc = cosmo.comoving_distance(z).value  # [Mpc]
        header = fits.Header()
        header["BUNIT"] = (self.header["BUNIT"],
                           self.header.comments["BUNIT"])
        header["Lside"] = (self.header["Lside"],
                           self.header.comments["Lside"])
        header["Nside"] = (self.header["Nside"],
                           self.header.comments["Nside"])
        header["REDSHIFT"] = (z, "redshift of this slice")
        header["FREQ"] = (freq, "[MHz] observed HI signal frequency")
        header["Dc"] = (Dc, "[cMpc] comoving distance")
        header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
                          "File creation date")
        header.add_history(" ".join(sys.argv))
        hdu = fits.PrimaryHDU(data=data, header=header)
        try:
            hdu.writeto(outfile, overwrite=clobber)
        except TypeError:
            hdu.writeto(outfile, clobber=clobber)
        logger.info("Wrote slice to file: %s" % outfile)
21cmfast_lightcone.py 文件源码 项目:atoolbox 作者: liweitianux 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def header(self):
        dDc = self.Dc_cell
        header = fits.Header()
        header["BUNIT"] = (str(self.unit), "Data unit")
        header["zmin"] = (self.zmin, "HI simulation minimum redshift")
        header["zmax"] = (self.zmax, "HI simulation maximum redshift")
        header["Dc_min"] = (self.Dc_min, "[cMpc] comoving distance at zmin")
        header["Dc_max"] = (self.Dc_max, "[cMpc] comoving distance at zmax")
        header["Dc_step"] = (dDc, "[cMpc] comoving distance between slices")
        header["Lside"] = (self.Lside, "[cMpc] Simulation side length")
        header["Nside"] = (self.Nside, "Number of cells at each side")
        header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
                          "File creation date")
        header.add_history(" ".join(sys.argv))
        header.extend(self.wcs.to_header(), update=True)
        return header
views.py 文件源码 项目:league 作者: massgo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def create_game():
    """Create a new game."""
    form = GameCreateForm(request.form)
    _set_game_create_choices(form)
    if form.validate_on_submit():
        white = Player.get_by_id(form.white_id.data)
        black = Player.get_by_id(form.black_id.data)
        played_at = None
        if form.played_at.data is not None:
            played_at = form.played_at.data.astimezone(timezone.utc)
        game = Game.create(
            white=white,
            black=black,
            winner=form.winner.data,
            handicap=form.handicap.data,
            komi=form.komi.data,
            season=form.season.data,
            episode=form.episode.data,
            played_at=played_at
        )
        messenger.notify_slack(_slack_game_msg(game))
        return jsonify(game.to_dict()), 201
    else:
        return jsonify(**form.errors), 404
views.py 文件源码 项目:league 作者: massgo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _slack_game_msg(game):
    if game.winner is Color.white:
        msg = '<{w_url}|{w_name}> (W) defeated <{b_url}|{b_name}> (B)'
    else:
        msg = '<{b_url}|{b_name}> (B) defeated <{w_url}|{w_name}> (W)'
    result = (msg + ' at {handicap} stones, {komi}.5 komi at <!date^{date_val}'
              '^{{time}} on {{date_num}}|{date_string}> '
              '(S{season:0>2}E{episode:0>2})')

    # Gross hack around the fact that we retrieve as naive DateTimes.
    # See: https://github.com/massgo/league/issues/93
    utc_time = int(game.played_at.replace(tzinfo=timezone.utc).timestamp())

    return result.format(w_name=game.white.full_name,
                         w_url=url_for('dashboard.get_player',
                                       player_id=game.white.id, _external=True),
                         b_name=game.black.full_name,
                         b_url=url_for('dashboard.get_player',
                                       player_id=game.black.id, _external=True),
                         handicap=game.handicap,
                         komi=game.komi,
                         date_string=game.played_at,
                         date_val=utc_time,
                         season=game.season,
                         episode=game.episode)
views.py 文件源码 项目:league 作者: massgo 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def update_game():
    """Update an existing game."""
    form = GameUpdateForm(request.form)
    _set_game_create_choices(form)
    if form.validate_on_submit():
        white = Player.get_by_id(form.white_id.data)
        black = Player.get_by_id(form.black_id.data)
        played_at = None
        if form.played_at.data is not None:
            played_at = form.played_at.data.astimezone(timezone.utc)
        game = Game.get_by_id(form.game_id.data)
        game.update(
            white=white,
            black=black,
            winner=form.winner.data,
            handicap=form.handicap.data,
            komi=form.komi.data,
            season=form.season.data,
            episode=form.episode.data,
            played_at=played_at
        )
        return jsonify(game.to_dict()), 200
    else:
        return jsonify(**form.errors), 404
factory.py 文件源码 项目:tukio 作者: optiflows 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _step(self, exc=None):
        """
        Wrapper around `Task._step()` to automatically dispatch a
        `TaskExecState.BEGIN` event.
        """
        if not self._in_progress:
            self._start = datetime.now(timezone.utc)
            source = {'task_exec_id': self.uid}
            if self._template:
                source['task_template_id'] = self._template.uid
            if self._workflow:
                source['workflow_template_id'] = self._workflow.template.uid
                source['workflow_exec_id'] = self._workflow.uid
            self._source = EventSource(**source)
            self._in_progress = True
            data = {
                'type': TaskExecState.BEGIN.value,
                'content': self._inputs
            }
            self._broker.dispatch(
                data,
                topics=workflow_exec_topics(self._source._workflow_exec_id),
                source=self._source,
            )
        super()._step(exc)
calendar_manager.py 文件源码 项目:grenouillebot 作者: FroggedTV 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def get_next_event(self):
        """Access to the next Event in the calendar.

        Returns:
            The Event object corresponding to the next event in the calendar
            or None if there is no event.
        """
        now = datetime.now(timezone.utc)
        while self.event_list and self.event_list[0].end < now:
            self.event_list.pop(0)

        if len(self.event_list) == 0:
            return None
        elif self.event_list[0].start > now:
            return self.event_list[0]
        elif len(self.event_list) == 1:
            return None
        else:
            return self.event_list[1]
calendar_manager.py 文件源码 项目:grenouillebot 作者: FroggedTV 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_now_event(self):
        """Access to the current Event in the calendar.

        Returns:
            The Event object corresponding to the current event in the calendar
            or None if there is no event.
        """
        now = datetime.now(timezone.utc)
        while self.event_list and self.event_list[0].end < now:
            self.event_list.pop(0)

        if len(self.event_list) == 0:
            return None
        elif self.event_list[0].start < now < self.event_list[0].end:
            return self.event_list[0]
        else:
            return None
database_handler.py 文件源码 项目:audio-feeder 作者: pganssle 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def make_new_entry(self, rel_path, id_handler):
        """
        Generates a new entry for the specified path.

        Note: This will mutate the id_handler!
        """
        # Try to match to an existing book.
        e_id = id_handler.new_id()

        abs_path = os.path.join(read_from_config('media_loc').path, rel_path)
        lmtime = os.path.getmtime(abs_path)
        added_dt = datetime.utcfromtimestamp(lmtime)
        last_modified = added_dt.replace(tzinfo=timezone.utc)

        entry_obj = oh.Entry(id=e_id, path=rel_path,
            date_added=datetime.now(timezone.utc),
            last_modified=last_modified,
            type='Book', table=self.BOOK_TABLE_NAME, data_id=None,
            hashseed=_rand.randint(0, 2**32))

        return entry_obj
utilities.py 文件源码 项目:pandachaika 作者: pandabuilder 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def map_external_gallery_data_to_internal(gallery_data: DataDict) -> GalleryData:
    internal_gallery_data = GalleryData(
        gallery_data['gid'],
        token=gallery_data['token'],
        archiver_key=gallery_data['archiver_key'],
        title=unescape(gallery_data['title']),
        title_jpn=unescape(gallery_data['title_jpn']),
        thumbnail_url=gallery_data['thumb'],
        category=gallery_data['category'],
        provider=constants.provider_name,
        uploader=gallery_data['uploader'],
        posted=datetime.fromtimestamp(int(gallery_data['posted']), timezone.utc),
        filecount=gallery_data['filecount'],
        filesize=gallery_data['filesize'],
        expunged=gallery_data['expunged'],
        rating=gallery_data['rating'],
        tags=translate_tag_list(gallery_data['tags']),
    )
    m = re.search(constants.default_fjord_tags, ",".join(internal_gallery_data.tags))
    if m:
        internal_gallery_data.fjord = True
    if constants.ex_thumb_url in internal_gallery_data.thumbnail_url:
        internal_gallery_data.thumbnail_url = internal_gallery_data.thumbnail_url.replace(constants.ex_thumb_url, constants.ge_thumb_url)
    return internal_gallery_data
taskhud.py 文件源码 项目:taskhud 作者: usefulthings 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def t_date(s):
    """
    TaskWarrior provides times as UTC timestamps in ISO 8601
    """
    year   = int(s[0:4])
    month  = int(s[4:6])
    day    = int(s[6:8])

    hour   = int(s[9:11])
    minute = int(s[11:13])
    second = int(s[13:15])

    # This is UTC time
    ts = datetime(year, month, day, hour, minute, second)

    # Convert to local time
    local_time = ts.replace(tzinfo=timezone.utc).astimezone(tz=None)
    # Convert to ISO display format, and remove timezone offset
    iso_format = local_time.isoformat(sep=" ")[:-6]

    return iso_format

# TODO: move to separate module
datetimetester.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def convert_between_tz_and_utc(self, tz, utc):
        dston = self.dston.replace(tzinfo=tz)
        # Because 1:MM on the day DST ends is taken as being standard time,
        # there is no spelling in tz for the last hour of daylight time.
        # For purposes of the test, the last hour of DST is 0:MM, which is
        # taken as being daylight time (and 1:MM is taken as being standard
        # time).
        dstoff = self.dstoff.replace(tzinfo=tz)
        for delta in (timedelta(weeks=13),
                      DAY,
                      HOUR,
                      timedelta(minutes=1),
                      timedelta(microseconds=1)):

            self.checkinside(dston, tz, utc, dston, dstoff)
            for during in dston + delta, dstoff - delta:
                self.checkinside(during, tz, utc, dston, dstoff)

            self.checkoutside(dstoff, tz, utc)
            for outside in dston - delta, dstoff + delta:
                self.checkoutside(outside, tz, utc)
datetimetester.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_easy(self):
        # Despite the name of this test, the endcases are excruciating.
        self.convert_between_tz_and_utc(Eastern, utc_real)
        self.convert_between_tz_and_utc(Pacific, utc_real)
        self.convert_between_tz_and_utc(Eastern, utc_fake)
        self.convert_between_tz_and_utc(Pacific, utc_fake)
        # The next is really dancing near the edge.  It works because
        # Pacific and Eastern are far enough apart that their "problem
        # hours" don't overlap.
        self.convert_between_tz_and_utc(Eastern, Pacific)
        self.convert_between_tz_and_utc(Pacific, Eastern)
        # OTOH, these fail!  Don't enable them.  The difficulty is that
        # the edge case tests assume that every hour is representable in
        # the "utc" class.  This is always true for a fixed-offset tzinfo
        # class (lke utc_real and utc_fake), but not for Eastern or Central.
        # For these adjacent DST-aware time zones, the range of time offsets
        # tested ends up creating hours in the one that aren't representable
        # in the other.  For the same reason, we would see failures in the
        # Eastern vs Pacific tests too if we added 3*HOUR to the list of
        # offset deltas in convert_between_tz_and_utc().
        #
        # self.convert_between_tz_and_utc(Eastern, Central)  # can't work
        # self.convert_between_tz_and_utc(Central, Eastern)  # can't work
schedule.py 文件源码 项目:scrapy-soccerway 作者: tvl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parse_match(self, response):
        item = MatchInfo()
        item['id'] = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a/@href').extract()[3])['id'][0]
        item['area'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[1]
        item['competition'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[2]
        item['home_team'] = response.xpath('//div[@class="container left"]//a/text()').extract_first()
        item['away_team'] = response.xpath('//div[@class="container right"]//a/text()').extract_first()
        item['ht_last5'] = ''.join(response.xpath('//div[@class="container left"]//a/text()').extract()[1:6])
        item['at_last5'] = ''.join(response.xpath('//div[@class="container right"]//a/text()').extract()[1:6])
        item['datetime'] = datetime.fromtimestamp(int(response.xpath('//div[@class="details clearfix"]/dl/dt[.="Date"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Date"]//span/@data-value').extract_first()), timezone.utc).isoformat(' ')
        #item['competition'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Competition"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Competition"]/a/text()').extract_first()
        item['game_week'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Game week"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Game week"]/text()').extract_first()
        item['kick_off'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Kick-off"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Kick-off"]//span/text()').extract_first()
        item['venue'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Venue"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Venue"]//a/text()').extract_first()
        item['updated'] = datetime.utcnow().isoformat(' ')
        yield item
        return item
        #self.log('URL: {}'.format(response.url))
__init__.py 文件源码 项目:arduino-ciao-meteor-ddp-connector 作者: andrea689 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _decode(self, o):
        if isinstance(o, dict):
            if len(o) == 1:
                if "$escape" in o:
                    return self._decode_escaped(o['$escape'])
                if "$date" in o:
                    return datetime.fromtimestamp(o["$date"] / 1000.0, timezone.utc)
                if "$binary" in o:
                    return b64decode(o['$binary'])
            if len(o) == 2 and "$type" in o and "$value" in o:
                try:
                    reviver = self.custom_type_hooks[o['$type']]
                except KeyError:
                    raise UnknownTypeError(o["$type"])

                return reviver(o["$value"])

            if self.object_pairs_hook is not None:
                return self.object_pairs_hook((k, self._decode(v)) for k, v in o.items())
            return {k: self._decode(v) for k, v in o.items()}

        if isinstance(o, (list, tuple)):
            return [self._decode(v) for v in o]

        return o
benchmark-api-heartbeat.py 文件源码 项目:aw-server 作者: ActivityWatch 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def benchmark():
    ds = aw_datastore.Datastore(aw_datastore.storages.PeeweeStorage, testing=True)
    api = aw_server.api.ServerAPI(ds, testing=True)

    print(api.get_info())

    bucket_id = "test-benchmark"

    try:
        api.create_bucket(bucket_id, "test", "test", "test")
    except Exception as e:
        print(e)

    print("Benchmarking... this will take 30 seconds")
    for i in range(120):
        sleep(0.1)
        api.heartbeat(bucket_id, Event(timestamp=datetime.now(tz=tz.utc), data={"test": str(int(i))}), pulsetime=0.3)
test_client.py 文件源码 项目:aw-server 作者: ActivityWatch 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _create_heartbeat_events(start=datetime.now(tz=timezone.utc),
                             delta=timedelta(seconds=1)):
    e1_ts = start
    e2_ts = e1_ts + delta

    # Needed since server (or underlying datastore) drops precision up to milliseconds.
    # Update: Even with millisecond precision it sometimes fails. (tried using `round` and `int`)
    #         Now rounding down to 10ms precision to prevent random failure.
    #         10ms precision at least seems to work well.
    # TODO: Figure out why it sometimes fails with millisecond precision. Would probably
    #       be useful to find the microsecond values where it consistently always fails.
    e1_ts = e1_ts.replace(microsecond=int(e1_ts.microsecond / 10000) * 100)
    e2_ts = e2_ts.replace(microsecond=int(e2_ts.microsecond / 10000) * 100)

    e1 = Event(timestamp=e1_ts, data={"label": "test"})
    e2 = Event(timestamp=e2_ts, data={"label": "test"})

    return e1, e2
test_client.py 文件源码 项目:aw-server 作者: ActivityWatch 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_midnight_heartbeats(client, bucket):
    now = datetime.now(tz=timezone.utc)
    midnight = now.replace(hour=23, minute=50)
    events = _create_periodic_events(20, start=midnight, delta=timedelta(minutes=1))

    label_ring = ["1", "1", "2", "3", "4"]
    for i, e in enumerate(events):
        e.data["label"] = label_ring[i % len(label_ring)]
        client.heartbeat(bucket, e, pulsetime=90)

    recv_events_merged = client.get_events(bucket, limit=-1)
    assert len(recv_events_merged) == 4 / 5 * len(events)

    recv_events_after_midnight = client.get_events(bucket, start=midnight + timedelta(minutes=10))
    pprint(recv_events_after_midnight)
    assert len(recv_events_after_midnight) == int(len(recv_events_merged) / 2)
test_timezone.py 文件源码 项目:pendulum 作者: sdispater 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_astimezone(self):
        d = Pendulum(2015, 1, 15, 18, 15, 34)
        now = Pendulum(2015, 1, 15, 18, 15, 34)
        self.assertEqual('UTC', d.timezone_name)
        self.assertPendulum(d, now.year, now.month, now.day, now.hour, now.minute)

        d = d.astimezone('Europe/Paris')
        self.assertEqual('Europe/Paris', d.timezone_name)
        self.assertPendulum(d, now.year, now.month, now.day, now.hour + 1, now.minute)

        if sys.version_info >= (3, 2):
            d = d.astimezone(timezone.utc)
            self.assertEqual('+00:00', d.timezone_name)
            self.assertPendulum(d, now.year, now.month, now.day, now.hour, now.minute)

            d = d.astimezone(timezone(timedelta(hours=-8)))
            self.assertEqual('-08:00', d.timezone_name)
            self.assertPendulum(d, now.year, now.month, now.day, now.hour - 8, now.minute)
datetimetester.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def convert_between_tz_and_utc(self, tz, utc):
        dston = self.dston.replace(tzinfo=tz)
        # Because 1:MM on the day DST ends is taken as being standard time,
        # there is no spelling in tz for the last hour of daylight time.
        # For purposes of the test, the last hour of DST is 0:MM, which is
        # taken as being daylight time (and 1:MM is taken as being standard
        # time).
        dstoff = self.dstoff.replace(tzinfo=tz)
        for delta in (timedelta(weeks=13),
                      DAY,
                      HOUR,
                      timedelta(minutes=1),
                      timedelta(microseconds=1)):

            self.checkinside(dston, tz, utc, dston, dstoff)
            for during in dston + delta, dstoff - delta:
                self.checkinside(during, tz, utc, dston, dstoff)

            self.checkoutside(dstoff, tz, utc)
            for outside in dston - delta, dstoff + delta:
                self.checkoutside(outside, tz, utc)
datetimetester.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_easy(self):
        # Despite the name of this test, the endcases are excruciating.
        self.convert_between_tz_and_utc(Eastern, utc_real)
        self.convert_between_tz_and_utc(Pacific, utc_real)
        self.convert_between_tz_and_utc(Eastern, utc_fake)
        self.convert_between_tz_and_utc(Pacific, utc_fake)
        # The next is really dancing near the edge.  It works because
        # Pacific and Eastern are far enough apart that their "problem
        # hours" don't overlap.
        self.convert_between_tz_and_utc(Eastern, Pacific)
        self.convert_between_tz_and_utc(Pacific, Eastern)
        # OTOH, these fail!  Don't enable them.  The difficulty is that
        # the edge case tests assume that every hour is representable in
        # the "utc" class.  This is always true for a fixed-offset tzinfo
        # class (lke utc_real and utc_fake), but not for Eastern or Central.
        # For these adjacent DST-aware time zones, the range of time offsets
        # tested ends up creating hours in the one that aren't representable
        # in the other.  For the same reason, we would see failures in the
        # Eastern vs Pacific tests too if we added 3*HOUR to the list of
        # offset deltas in convert_between_tz_and_utc().
        #
        # self.convert_between_tz_and_utc(Eastern, Central)  # can't work
        # self.convert_between_tz_and_utc(Central, Eastern)  # can't work
parser.py 文件源码 项目:twtxt 作者: buckket 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def parse_tweets(raw_tweets, source, now=None):
    """
        Parses a list of raw tweet lines from a twtxt file
        and returns a list of :class:`Tweet` objects.

        :param list raw_tweets: list of raw tweet lines
        :param Source source: the source of the given tweets
        :param Datetime now: the current datetime

        :returns: a list of parsed tweets :class:`Tweet` objects
        :rtype: list
    """
    if now is None:
        now = datetime.now(timezone.utc)

    tweets = []
    for line in raw_tweets:
        try:
            tweet = parse_tweet(line, source, now)
        except (ValueError, OverflowError) as e:
            logger.debug("{0} - {1}".format(source.url, e))
        else:
            tweets.append(tweet)

    return tweets


问题


面经


文章

微信
公众号

扫码关注公众号