python类schedule()的实例源码

schedulers.py 文件源码 项目:flask-celery3-boilerplate 作者: sdg32 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, model, app=None):
        super().__init__(
            name=model.name,
            task=model.task,
            last_run_at=model.meta.last_run_at,
            total_run_count=model.meta.total_run_count,
            schedule=model.schedule,
            args=model.args,
            kwargs=model.kwargs,
            options={
                'queue': model.queue,
                'exchange': model.exchange,
                'routing_key': model.routing_key,
                'expires': model.expires_at
            },

            app=app or celery_app._get_current_object()
        )

        self.model = model
schedulers.py 文件源码 项目:flask-celery3-boilerplate 作者: sdg32 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _unpack_entry_fields(cls, schedule, args=None, kwargs=None,
                             relative=None, options=None, **entry):
        def _unpack_entry_options(queue=None, exchange=None, routing_key=None,
                                  **kwargs):
            return {
                'queue': queue,
                'exchange': exchange,
                'routing_key': routing_key
            }

        model_schedule, model_field = cls.to_model_schedule(schedule)
        entry.update(
            {
                model_field: model_schedule
            },
            args=args or [],
            kwargs=kwargs or {},
            **_unpack_entry_options(**options or {})
        )
        return entry
schedules.py 文件源码 项目:telemetry-analysis-service 作者: mozilla 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def delete(self):
        """
        If found, delete the entry from the scheduler
        """
        entry = self.get()
        if entry is None:
            return False
        else:
            logger.info(
                'Deleting running %s from schedule. '
                'Interval: %s. '
                'Starts at: %s.' %
                (self.name, self.run_every, self.run_at)
            )
            entry.delete()
            return True
serializer.py 文件源码 项目:celery-beatx 作者: mixkorshun 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def decode_schedule(obj):
    if obj is None:
        return None

    _type = obj['__type__']
    value = obj['__value__']

    if _type == 'datetime':
        return decode_datetime(value)
    elif _type == 'crontab':
        return crontab(*value.split('\t'))
    elif _type == 'solar':
        return solar(**value)
    elif _type == 'schedule':
        return schedule(**value)
    else:
        raise NotImplementedError(
            'Cannot deserialize schedule %(type)s type' % {
                'type': _type
            }
        )
schedulers.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def from_entry(cls, name, session, skip_fields=('relative', 'options'), **entry):
        """
        ??????PeriodicTask
        :param session:
        :param name:
        :param skip_fields:
        :param entry:
        :return:
        """
        fields = dict(entry)
        for skip_field in skip_fields:
            fields.pop(skip_field, None)
        schedule = fields.pop('schedule')
        model_schedule, model_field = cls.to_model_schedule(schedule, session)
        fields[model_field] = model_schedule
        fields['args'] = json.dumps(fields.get('args') or [])
        fields['kwargs'] = json.dumps(fields.get('kwargs') or {})
        model, _ = PeriodicTask.update_or_create(session, name=name, defaults=fields)
        cls.save_model(session, model)
        return cls(model)
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_schedule_changed(self):
        self.m2.args = '[16, 16]'
        self.m2.save()
        e2 = self.s.schedule[self.m2.name]
        assert e2.args == [16, 16]

        self.m1.args = '[32, 32]'
        self.m1.save()
        e1 = self.s.schedule[self.m1.name]
        assert e1.args == [32, 32]
        e1 = self.s.schedule[self.m1.name]
        assert e1.args == [32, 32]

        self.m3.delete()
        with pytest.raises(KeyError):
            self.s.schedule.__getitem__(self.m3.name)
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_sync_syncs_before_save(self):
        # Get the entry for m2
        e1 = self.s.schedule[self.m2.name]

        # Increment the entry (but make sure it doesn't sync)
        self.s._last_sync = monotonic()
        e2 = self.s.schedule[e1.name] = self.s.reserve(e1)
        assert self.s.flushed == 1

        # Fetch the raw object from db, change the args
        # and save the changes.
        m2 = PeriodicTask.objects.get(pk=self.m2.pk)
        m2.args = '[16, 16]'
        m2.save()

        # get_schedule should now see the schedule has changed.
        # and also sync the dirty objects.
        e3 = self.s.schedule[self.m2.name]
        assert self.s.flushed == 2
        assert e3.last_run_at == e2.last_run_at
        assert e3.args == [16, 16]
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_periodic_task_disabled_while_reserved(self):
        # Get the entry for m2
        e1 = self.s.schedule[self.m2.name]

        # Increment the entry (but make sure it doesn't sync)
        self.s._last_sync = monotonic()
        e2 = self.s.schedule[e1.name] = self.s.reserve(e1)
        assert self.s.flushed == 1

        # Fetch the raw object from db, change the args
        # and save the changes.
        m2 = PeriodicTask.objects.get(pk=self.m2.pk)
        m2.enabled = False
        m2.save()

        # reserve is called because the task gets called from
        # tick after the database change is made
        self.s.reserve(e2)

        # get_schedule should now see the schedule has changed.
        # and remove entry for m2
        assert self.m2.name not in self.s.schedule
        assert self.s.flushed == 2
test_schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_SolarSchedule_schedule(self):
        s = SolarSchedule(event='solar_noon', latitude=48.06, longitude=12.86)
        dt = datetime(day=26, month=7, year=2050, hour=1, minute=0)
        dt_lastrun = make_aware(dt)

        assert s.schedule is not None
        isdue, nextcheck = s.schedule.is_due(dt_lastrun)
        assert isdue is False  # False means task isn't due, but keep checking.
        assert (nextcheck > 0) and (isdue is False) or \
            (nextcheck == s.max_interval) and (isdue is True)

        s2 = SolarSchedule(event='solar_noon', latitude=48.06, longitude=12.86)
        dt2 = datetime(day=26, month=7, year=2000, hour=1, minute=0)
        dt2_lastrun = make_aware(dt2)

        assert s2.schedule is not None
        isdue2, nextcheck2 = s2.schedule.is_due(dt2_lastrun)
        assert isdue2 is True  # True means task is due and should run.
        assert (nextcheck2 > 0) and (isdue2 is True) or \
            (nextcheck2 == s2.max_interval) and (isdue2 is False)
schedulers.py 文件源码 项目:django-celery-beat 作者: celery 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def schedule(self):
        update = False
        if not self._initial_read:
            debug('DatabaseScheduler: initial read')
            update = True
            self._initial_read = True
        elif self.schedule_changed():
            info('DatabaseScheduler: Schedule changed.')
            update = True

        if update:
            self.sync()
            self._schedule = self.all_as_schedule()
            # the schedule changed, invalidate the heap in Scheduler.tick
            self._heap = None
            if logger.isEnabledFor(logging.DEBUG):
                debug('Current schedule:\n%s', '\n'.join(
                    repr(entry) for entry in values(self._schedule)),
                )
        return self._schedule
schedulers.py 文件源码 项目:flask-celery3-boilerplate 作者: sdg32 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def to_model_schedule(cls, schedule):
        for schedule_type, model_type, model_field in cls.model_schedules:
            schedule = schedules.maybe_schedule(schedule)
            if isinstance(schedule, schedule_type):
                model_schedule = model_type.from_schedule(schedule)
                return model_schedule, model_field

        raise ValueError(
            'Cannot convert schedule type {0!r} to model'.format(schedule)
        )
schedulers.py 文件源码 项目:flask-celery3-boilerplate 作者: sdg32 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setup_schedule(self):
        self.install_default_entries(self.schedule)
        self.update_from_dict(self.app.conf.CELERYBEAT_SCHEDULE)
schedulers.py 文件源码 项目:flask-celery3-boilerplate 作者: sdg32 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def install_default_entries(self, data):
        entries = {}
        if self.app.conf.CELERY_TASK_RESULT_EXPIRES:
            # Add backend clean up
            entries.setdefault(
                'celery.backend_cleanup', {
                    'task': 'celery.backend_cleanup',
                    'schedule': schedules.crontab('0', '4', '*')
                }
            )

        self.update_from_dict(entries)
schedulers.py 文件源码 项目:flask-celery3-boilerplate 作者: sdg32 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def schedule(self):
        update = False
        if not self._initial_read:
            logger.info('DatabaseScheduler: initial read')
            update = True
            self._initial_read = True
        elif self.is_schedule_changed:
            logger.info('DatabaseScheduler: schedule changed')
            update = True

        if update:
            self._schedule = self.all_as_schedule()

        return self._schedule
models.py 文件源码 项目:flask-celery3-boilerplate 作者: sdg32 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def schedule(self):
        return schedules.crontab(
            minute=self.minute,
            hour=self.hour,
            day_of_week=self.day_of_week,
            day_of_month=self.day_of_month,
            month_of_year=self.month_of_year
        )
models.py 文件源码 项目:flask-celery3-boilerplate 作者: sdg32 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def from_schedule(cls, schedule):
        data = dict([
            (x, getattr(schedule, '_orig_{0}'.format(x)))
            for x in (
                'minute', 'hour', 'day_of_week', 'day_of_month', 'month_of_year'
            )
        ])
        instance = cls.query.filter(*[
            getattr(cls, k) == v for k, v in data.items()
        ]).first()
        if not instance:
            instance = cls(**data)
            db.session.add(instance)
            db.session.commit()
        return instance
models.py 文件源码 项目:flask-celery3-boilerplate 作者: sdg32 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def schedule(self):
        return schedules.schedule(timedelta(**{self.period: self.every}))
models.py 文件源码 项目:flask-celery3-boilerplate 作者: sdg32 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def from_schedule(cls, schedule):
        every = max(schedule.run_every.total_seconds(), 0)
        instance = cls.query.filter_by(every=every, period='seconds')
        if not instance:
            instance = cls(every=every, period='seconds')
            db.session.add(instance)
            db.session.commit()
        return instance
schedules.py 文件源码 项目:telemetry-analysis-service 作者: mozilla 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create(self):
        entry = RedBeatSchedulerEntry(
            name=self.name,
            task=self.task,
            schedule=schedule(
                run_every=self.run_every,
                # setting "now" to the job start datetime
                nowfun=lambda: self.run_at,
                app=celery,
            ),
            args=(self.spark_job.pk,),
            kwargs={},
            app=celery,
        )
        return entry
schedules.py 文件源码 项目:telemetry-analysis-service 作者: mozilla 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add(self):
        """
        Create and save an entry to the scheduler
        """
        logger.info(
            'Adding running %s to schedule. '
            'Interval: %s. '
            'Starts at: %s.' %
            (self.name, self.run_every, self.run_at)
        )
        entry = self.create()
        entry.save()
        return entry
serializer.py 文件源码 项目:celery-beatx 作者: mixkorshun 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def serialize_entry(entry, schedule_encoder=encode_schedule):
    return {
        'name': entry.name,
        'task': entry.task,
        'schedule': schedule_encoder(entry.schedule),
        'args': entry.args,
        'kwargs': entry.kwargs,
        'last_run_at': encode_datetime(entry.last_run_at, allow_null=True),
        'total_run_count': entry.total_run_count,
        'options': entry.options
    }
serializer.py 文件源码 项目:celery-beatx 作者: mixkorshun 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def deserialize_entry(entry, schedule_decoder=decode_schedule):
    return ScheduleEntry(
        name=entry['name'],
        task=entry['task'],
        schedule=schedule_decoder(entry['schedule']),
        args=entry['args'],
        kwargs=entry['kwargs'],
        last_run_at=decode_datetime(entry['last_run_at'], allow_null=True),
        total_run_count=entry['total_run_count'],
        options=entry['options'],
    )
model.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def schedule(self):
        return schedules.schedule(datetime.timedelta(**{self.period.code: self.every}))
model.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def from_schedule(cls, session, schedule, period='seconds'):
        every = max(schedule.run_every.total_seconds(), 0)
        obj = cls.filter_by(session, every=every, period=period).first()
        if obj is None:
            return cls(every=every, period=period)
        else:
            return obj
model.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def schedule(self):
        return schedules.crontab(minute=self.minute,
                                 hour=self.hour,
                                 day_of_week=self.day_of_week,
                                 day_of_month=self.day_of_month,
                                 month_of_year=self.month_of_year)
model.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def from_schedule(cls, session, schedule):
        spec = {'minute': schedule._orig_minute,
                'hour': schedule._orig_hour,
                'day_of_week': schedule._orig_day_of_week,
                'day_of_month': schedule._orig_day_of_month,
                'month_of_year': schedule._orig_month_of_year}
        obj = cls.filter_by(session, **spec).first()
        if obj is None:
            return cls(**spec)
        else:
            return obj
model.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def schedule(self):
        if self.crontab:
            return self.crontab.schedule
        if self.interval:
            return self.interval.schedule
schedulers.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def is_due(self):
        if not self.model.enabled:
            return False, 5.0   # 5 second delay for re-enable.
        return self.schedule.is_due(self.last_run_at)
schedulers.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def to_model_schedule(cls, schedule, session):
        """
        :param session:
        :param schedule:
        :return:
        """
        for schedule_type, model_type, model_field in cls.model_schedules:
            debug(cls.model_schedules)
            schedule = schedules.maybe_schedule(schedule)
            if isinstance(schedule, schedule_type):
                model_schedule = model_type.from_schedule(session, schedule)
                cls.save_model(session, model_schedule)
                return model_schedule, model_field
        raise ValueError('Cannot convert schedule type {0!r} to model'.format(schedule))
schedulers.py 文件源码 项目:celerybeat-sqlalchemy 作者: kindule 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __repr__(self):
        return '<ModelEntry: {0} {1}(*{2}, **{3}) {{4}}>'.format(
            safe_str(self.name), self.task, self.args, self.kwargs, self.schedule,
        )


问题


面经


文章

微信
公众号

扫码关注公众号