python类rrulestr()的实例源码

models.py 文件源码 项目:django-happenings 作者: natgeosociety 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def occurrences(self):
        import datetime
        from dateutil import rrule
        datelist = []
        if self.rrule:
            rr = rrule.rrulestr(self.rrule, dtstart=self.start_date)
            start = datetime.datetime.combine(self.start_date, datetime.datetime.min.time())
            end = start + datetime.timedelta(days=365)
            datelist = rr.between(start, end, inc=True)
        else:
            if self.start_date >= datetime.date.today():
                datelist.append(datetime.datetime.combine(self.start_date, datetime.time(0, 0, 0)))

        if not self.start_time:
            return datelist

        return [datetime.datetime.combine(x.date(), self.start_time) for x in datelist]
test_imports.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 57 收藏 0 点赞 0 评论 0
def testRRuleAll(self):
        from dateutil.rrule import rrule
        from dateutil.rrule import rruleset
        from dateutil.rrule import rrulestr
        from dateutil.rrule import YEARLY, MONTHLY, WEEKLY, DAILY
        from dateutil.rrule import HOURLY, MINUTELY, SECONDLY
        from dateutil.rrule import MO, TU, WE, TH, FR, SA, SU

        rr_all = (rrule, rruleset, rrulestr,
                  YEARLY, MONTHLY, WEEKLY, DAILY,
                  HOURLY, MINUTELY, SECONDLY,
                  MO, TU, WE, TH, FR, SA, SU)

        for var in rr_all:
            self.assertIsNot(var, None)

        # In the public interface but not in all
        from dateutil.rrule import weekday
        self.assertIsNot(weekday, None)
utils.py 文件源码 项目:django-happenings 作者: natgeosociety 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def rrulestr_to_text(rrule_str, dtstart=None):
    rrstr = rrule.rrulestr(rrule_str, dtstart=dtstart)
    if hasattr(rrstr, '_freq'):
        return rrule_to_text(rrule.rrulestr(rrule_str, dtstart=dtstart))
    elif hasattr(rrstr, '_rrule'):
        output = [rrule_to_text(r) for r in rrstr._rrule]
        if hasattr(rrstr, '_rdate') and rrstr._rdate:
            output.extend([' ', _('and also'), ' '])
            output.extend(_list(map(lambda d: datetime.datetime.strftime(d, '%B %d, %Y'), rrstr._rdate)))
        if hasattr(rrstr, '_exdate') and rrstr._exdate:
            output.extend([' ', _('except'), ' '])
            output.extend(_list(map(lambda d: datetime.datetime.strftime(d, DATE_FORMAT), rrstr._exdate)))
        return "".join(output)
utils.py 文件源码 项目:django-happenings 作者: natgeosociety 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def rrulestr_to_schedule(rrule_str, dtstart=None):
    rrstr = rrule.rrulestr(rrule_str, dtstart=dtstart)
    if hasattr(rrstr, '_freq'):
        return [rrule_to_schedule(rrule.rrulestr(rrule_str, dtstart=dtstart), dtstart)]
    elif hasattr(rrstr, '_rrule'):
        output = [rrule_to_schedule(r, dtstart) for r in rrstr._rrule]
        exceptions = []
        extras = []
        if hasattr(rrstr, '_rdate') and rrstr._rdate:
            extras.extend(_list(map(lambda d: datetime.datetime.strftime(d, '%a %x'), rrstr._rdate)))
            output = [x._replace(extras="".join(extras)) for x in output]
        if hasattr(rrstr, '_exdate') and rrstr._exdate:
            exceptions.extend(_list(map(lambda d: datetime.datetime.strftime(d, DATE_FORMAT), rrstr._exdate)))
            output = [x._replace(exceptions="".join(exceptions)) for x in output]
        return output
scheduler.py 文件源码 项目:oacids 作者: openaps 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def Poll (self, within_seconds=None):
    # print "poll within", within_seconds
    results = [ ]
    candidates = self.master.openaps.things.get('schedules', [ ])
    now = datetime.datetime.now( )
    # print "polling schedules", len(candidates), now.isoformat( ), 'for', self.MaxTasksAhead, 'MaxTasksAhead'
    for configured in candidates:
      # print "SCHEDULE", configured.item.fields
      # spec = recurrent.parse(configured.item.fields['rrule'], now=self.since)
      spec = configured.item.fields['rrule']
      rr = rrule.rrulestr(spec, dtstart=self.since)
      # print configured.item.fields['rrule'], spec
      upcoming = rr.after(now)
      # print "next", upcoming.isoformat( )
      # XXX: bug in making: need to fill out all events before within_seconds as well.
      # if (upcoming - now).total_seconds( ) <= within_seconds:
      for upcoming in iter_triggers(upcoming, rr, within_seconds):
        # print "ARM THING", configured.path
        # print "ATTEMPT ARM", configured.item.name, configured.path, spec
        # self.enqueue(upcoming, configured)
        trigger = Armable(upcoming, configured)
        # exists = self.schedules[(upcoming, configured.item.name)]
        results.append(trigger)


    return results
    pass
tasks.py 文件源码 项目:django-scheduler 作者: tejasjadhav 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __call__(self, *args, **kwargs):
        # Pop out custom keyword arguments added during scheduling and
        # previous run so as not to pollute the task function's
        # namespace.
        scheduled_task_id = kwargs.pop('scheduled_task_id')
        rrule_string = kwargs.pop('rrule_string')
        _first_eta = kwargs.pop('first_eta')
        _eta = kwargs.pop('eta')
        _until = kwargs.pop('until')

        first_eta = TaskScheduler.strptime(_first_eta)
        eta = TaskScheduler.strptime(_eta)
        until = TaskScheduler.strptime(_until)
        scheduled_task = ScheduledTask.objects.get(pk=scheduled_task_id)

        # If the task been manually set as ScheduledTask.STATUS_CANCELLED,
        # stop the execution.
        if scheduled_task.status == ScheduledTask.STATUS_CANCELLED:
            return

        scheduled_task.save_status(ScheduledTask.STATUS_RUNNING)
        ScheduledTaskRunLog.objects.create(task_id=self.request.id,
                                           scheduled_task=scheduled_task)

        # If a CancelSchedule exception is raied by the function,
        # cancel the schedule and exit.
        try:
            result = super(RepeatTask, self).__call__(*args, **kwargs)
        except CancelSchedule:
            TaskScheduler.cancel(scheduled_task_id=scheduled_task.id)
            return
        else:
            scheduled_task.save_status(ScheduledTask.STATUS_SUCCESS)

        # If rrule string is not specified, assume it to be a one time
        # task.
        if not rrule_string:
            return result

        # Preserve the start and end of rrule cycle.
        rrule_ = rrulestr(rrule_string).replace(dtstart=first_eta,
                                                until=until)

        next_eta = TaskScheduler.calculate_next_eta(rrule_=rrule_,
                                                    current_eta=eta)
        # If rrule does not return an ETA, assume it to be the end of
        # schedule and exit.
        if not next_eta:
            return result

        # Add custom keyword arguments again for the next run.
        kwargs.update({
            'scheduled_task_id': scheduled_task.id,
            'rrule_string': rrule_string,
            'first_eta': _first_eta,
            'eta': TaskScheduler.strftime(next_eta),
            'until': TaskScheduler.strftime(until),
        })

        self.apply_async(eta=next_eta, args=args, kwargs=kwargs)
        return result


问题


面经


文章

微信
公众号

扫码关注公众号