def test_entry(self):
m = self.create_model_interval(schedule(timedelta(seconds=10)))
e = self.Entry(m, app=self.app)
assert e.args == [2, 2]
assert e.kwargs == {'callback': 'foo'}
assert e.schedule
assert e.total_run_count == 0
assert isinstance(e.last_run_at, datetime)
assert e.options['queue'] == 'xaz'
assert e.options['exchange'] == 'foo'
assert e.options['routing_key'] == 'cpu'
right_now = self.app.now()
m2 = self.create_model_interval(
schedule(timedelta(seconds=10)),
last_run_at=right_now,
)
assert m2.last_run_at
e2 = self.Entry(m2, app=self.app)
assert e2.last_run_at is right_now
e3 = e2.next()
assert e3.last_run_at > e2.last_run_at
assert e3.total_run_count == 1
评论列表
文章目录