celery_test.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:settei 作者: spoqa 项目源码 文件源码
def test_worker_schedule():
    # timedelta
    conf = WorkerConfiguration({
        'worker': {
            'broker_url': 'redis://',
            'celery_result_backend': 'redis://',
            'celerybeat_schedule': {
                'add-every-30-seconds': {
                    'task': 'tasks.add',
                    'schedule': 'timedelta(seconds=30)',
                },
            },
        }
    })
    assert conf.worker_schedule == {
        'add-every-30-seconds': {
            'task': 'tasks.add',
            'schedule': datetime.timedelta(seconds=30),
            'args': (),
        },
    }
    assert conf.worker_config['CELERYBEAT_SCHEDULE'] == conf.worker_schedule
    # crontab
    conf2 = WorkerConfiguration({
        'worker': {
            'broker_url': 'redis://',
            'celery_result_backend': 'redis://',
            'celerybeat_schedule': {
                'add-every-minute': {
                    'task': 'tasks.add',
                    'schedule': "crontab(minute='*')",
                    'args': [16, 16],
                },
            },
        }
    })
    assert conf2.worker_schedule == {
        'add-every-minute': {
            'task': 'tasks.add',
            'schedule': crontab(minute='*'),
            'args': (16, 16),
        },
    }
    assert conf2.worker_config['CELERYBEAT_SCHEDULE'] == conf2.worker_schedule
    # import path
    conf3 = WorkerConfiguration({
        'worker': {
            'broker_url': 'redis://',
            'celery_result_backend': 'redis://',
            'celerybeat_schedule': {
                'add-every-minute': {
                    'task': 'tasks.add',
                    'schedule': "celery.schedules:crontab(minute='*')",
                    'args': [16, 16],
                },
            },
        }
    })
    assert conf3.worker_schedule == conf2.worker_schedule
    assert conf3.worker_config['CELERYBEAT_SCHEDULE'] == conf3.worker_schedule
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号