python类schedule()的实例源码

schedulers.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setup_schedule(self):
        self.install_default_entries(self.schedule)
        self.update_from_dict(self.app.conf.CELERYBEAT_SCHEDULE)
schedulers.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sync(self):
        info('Writing entries...')
        _tried = set()
        while self._dirty:
            try:
                name = self._dirty.pop()
                _tried.add(name)
                self.schedule[name].save()
            except KeyError:
                pass
schedulers.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def update_from_dict(self, dict_):
        s = {}
        for name, entry in dict_.items():
            try:

                s[name] = self.Entry.from_entry(name, self.session, **entry)
            except Exception as exc:
                error(ADD_ENTRY_ERROR, name, exc, entry)
        self.schedule.update(s)
schedulers.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def install_default_entries(self, data):
        entries = {}
        if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
            entries.setdefault(
                'celery.backend_cleanup', {
                    'task': 'celery.backend_cleanup',
                    'schedule': schedules.crontab('*/5', '*', '*'),
                    'options': {'expires': 12 * 3600},
                },
            )
        self.update_from_dict(entries)
schedulers.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def schedule(self):
        update = False
        if not self._initial_read:
            debug('DatabaseScheduler: intial read')
            update = True
            self._initial_read = True
        elif self.schedule_changed():
            info('DatabaseScheduler: Schedule changed.')
            update = True

        if update:
            self.sync()
            self._schedule = self.all_as_schedule()
            debug('Current schedule:\n%s', '\n'.join(repr(entry) for entry in self._schedule.itervalues()))
        return self._schedule
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_model_interval(self, schedule, **kwargs):
        interval = IntervalSchedule.from_schedule(schedule)
        interval.save()
        return self.create_model(interval=interval, **kwargs)
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def create_model_crontab(self, schedule, **kwargs):
        crontab = CrontabSchedule.from_schedule(schedule)
        crontab.save()
        return self.create_model(crontab=crontab, **kwargs)
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def create_model_solar(self, schedule, **kwargs):
        solar = SolarSchedule.from_schedule(schedule)
        solar.save()
        return self.create_model(solar=solar, **kwargs)
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def create_conf_entry(self):
        name = 'thefoo{0}'.format(next(_ids))
        return name, dict(
            task='djcelery.unittest.add{0}'.format(next(_ids)),
            schedule=timedelta(0, 600),
            args=(),
            relative=False,
            kwargs={},
            options={'queue': 'extra_queue'}
        )
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_entry(self):
        m = self.create_model_interval(schedule(timedelta(seconds=10)))
        e = self.Entry(m, app=self.app)

        assert e.args == [2, 2]
        assert e.kwargs == {'callback': 'foo'}
        assert e.schedule
        assert e.total_run_count == 0
        assert isinstance(e.last_run_at, datetime)
        assert e.options['queue'] == 'xaz'
        assert e.options['exchange'] == 'foo'
        assert e.options['routing_key'] == 'cpu'

        right_now = self.app.now()
        m2 = self.create_model_interval(
            schedule(timedelta(seconds=10)),
            last_run_at=right_now,
        )

        assert m2.last_run_at
        e2 = self.Entry(m2, app=self.app)
        assert e2.last_run_at is right_now

        e3 = e2.next()
        assert e3.last_run_at > e2.last_run_at
        assert e3.total_run_count == 1
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_periodic_task_model_disabled_schedule(self):
        self.m1.enabled = False
        self.m1.save()

        s = self.Scheduler(app=self.app)
        sched = s.schedule
        assert sched
        assert len(sched) == 1
        assert 'celery.backend_cleanup' in sched
        assert self.entry_name not in sched
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setup_scheduler(self, app):
        self.app = app
        self.app.conf.beat_schedule = {}

        self.m1 = self.create_model_interval(
            schedule(timedelta(seconds=10)))
        self.m1.save()
        self.m1.refresh_from_db()

        self.m2 = self.create_model_interval(
            schedule(timedelta(minutes=20)))
        self.m2.save()
        self.m2.refresh_from_db()

        self.m3 = self.create_model_crontab(
            crontab(minute='2,4,5'))
        self.m3.save()
        self.m3.refresh_from_db()

        self.m4 = self.create_model_solar(
            solar('solar_noon', 48.06, 12.86))
        self.m4.save()
        self.m4.refresh_from_db()

        # disabled, should not be in schedule
        m5 = self.create_model_interval(
            schedule(timedelta(seconds=1)))
        m5.enabled = False
        m5.save()

        self.s = self.Scheduler(app=self.app)
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_all_as_schedule(self):
        sched = self.s.schedule
        assert sched
        assert len(sched) == 5
        assert 'celery.backend_cleanup' in sched
        for n, e in sched.items():
            assert isinstance(e, self.s.Entry)
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_reserve(self):
        e1 = self.s.schedule[self.m1.name]
        self.s.schedule[self.m1.name] = self.s.reserve(e1)
        assert self.s.flushed == 1

        e2 = self.s.schedule[self.m2.name]
        self.s.schedule[self.m2.name] = self.s.reserve(e2)
        assert self.s.flushed == 1
        assert self.m2.name in self.s._dirty
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_periodic_task_disabled_and_enabled(self):
        # Get the entry for m2
        e1 = self.s.schedule[self.m2.name]

        # Increment the entry (but make sure it doesn't sync)
        self.s._last_sync = monotonic()
        self.s.schedule[e1.name] = self.s.reserve(e1)
        assert self.s.flushed == 1

        # Fetch the raw object from db, change the args
        # and save the changes.
        m2 = PeriodicTask.objects.get(pk=self.m2.pk)
        m2.enabled = False
        m2.save()

        # get_schedule should now see the schedule has changed.
        # and remove entry for m2
        assert self.m2.name not in self.s.schedule
        assert self.s.flushed == 2

        m2.enabled = True
        m2.save()

        # get_schedule should now see the schedule has changed.
        # and add entry for m2
        assert self.m2.name in self.s.schedule
        assert self.s.flushed == 3
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_sync_rollback_on_save_error(self):
        self.s.schedule[self.m1.name] = EntrySaveRaises(self.m1, app=self.app)
        self.s._dirty.add(self.m1.name)
        with pytest.raises(RuntimeError):
            self.s.sync()
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_PeriodicTask_unicode_interval(self):
        p = self.create_model_interval(schedule(timedelta(seconds=10)))
        assert text_t(p) == '{0}: every 10.0 seconds'.format(p.name)
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_PeriodicTask_unicode_no_schedule(self):
        p = self.create_model()
        assert text_t(p) == '{0}: {{no schedule}}'.format(p.name)
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_CrontabSchedule_schedule(self):
        s = CrontabSchedule(
            minute='3, 7',
            hour='3, 4',
            day_of_week='*',
            day_of_month='1, 16',
            month_of_year='1, 7',
        )
        assert s.schedule.minute == {3, 7}
        assert s.schedule.hour == {3, 4}
        assert s.schedule.day_of_week == {0, 1, 2, 3, 4, 5, 6}
        assert s.schedule.day_of_month == {1, 16}
        assert s.schedule.month_of_year == {1, 7}
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_track_changes(self):
        assert PeriodicTasks.last_change() is None
        m1 = self.create_model_interval(schedule(timedelta(seconds=10)))
        m1.save()
        x = PeriodicTasks.last_change()
        assert x
        m1.args = '(23, 24)'
        m1.save()
        y = PeriodicTasks.last_change()
        assert y
        assert y > x


问题


面经


文章

微信
公众号

扫码关注公众号