api.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:aschedule 作者: eightnoteight 项目源码 文件源码
def every(self, get_coro_or_fut, interval: _timedelta_cls, start_at: datetime,
              count=float('inf'), loop=None):
        """
        executes the given job at (start_at, start_at + interval, start_at + 2 * interval, ....)

        :param get_coro_or_fut: a callable which returns a co-routine or a future
        :param interval: a datetime.timedelta object denoting the interval between
                         consecutive job excutions
        :param start_at: a datetime.datetime object denoting the start time to start
                         this schedule
        :param count: the number of time to execute this job, by default the job will be executed
                      continously
        :param loop: event loop if the given future by get_coro_or_fut is hooked up
                     with a custom event loop
        :return: JobSchedule object so that the user can control the schedule based
                 on it's future
        """
        if interval.total_seconds() < 0:
            raise AScheduleException("error: invalid interval ({} seconds).".format(
                interval.total_seconds()))
        if count <= 0:
            raise AScheduleException("error: invalid count({}).".format(count))
        diff = start_at - datetime.now()
        if round(diff.total_seconds()) < 0:
            start_at += interval * ceil((-diff) / interval)
        intervals = itertools.chain(
            iter([round((start_at - datetime.now()).total_seconds())]),
            map(lambda x: x[1], itertools.takewhile(
                lambda x: x[0] < (count - 1),
                enumerate(itertools.repeat(round(interval.total_seconds()))))))
        schedule = JobSchedule(get_coro_or_fut, intervals, loop=loop)
        self.add_schedule(schedule)
        return schedule
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号