def test_multibuy_hint_two_buys_applicable(self):
member = Member.objects.get(username="jokke")
coke = Product.objects.create(
name="coke",
price=100,
active=True
)
with freeze_time(timezone.datetime(2018, 1, 1)) as frozen_time:
for i in range(1, 3):
Sale.objects.create(
member=member,
product=coke,
price=100,
)
frozen_time.tick()
give_multibuy_hint, sale_hints = stregsystem_views._multibuy_hint(timezone.datetime(2018, 1, 1, tzinfo=pytz.UTC), member)
self.assertTrue(give_multibuy_hint)
self.assertEqual(sale_hints, "{} {}:{}".format("jokke", coke.id, 2))
python类freeze_time()的实例源码
def setUp(self):
self.flan = Product.objects.create(name="FLan", price=1.0, active=True)
self.flanmad = Product.objects.create(name="FLan mad", price=2.0, active=True)
self.notflan = Product.objects.create(name="Ikke Flan", price=2.0, active=True)
self.alan = Member.objects.create(username="tester", firstname="Alan", lastname="Alansen")
self.bob = Member.objects.create(username="bob", firstname="bob", lastname="bob")
with freeze_time('2017-02-02'):
Sale.objects.create(member=self.alan, product=self.flan, price=1.0)
with freeze_time('2017-02-15'):
Sale.objects.create(member=self.alan, product=self.flan, price=1.0)
with freeze_time('2017-02-07'):
Sale.objects.create(member=self.alan, product=self.flanmad, price=1.0)
with freeze_time('2017-02-05'):
Sale.objects.create(member=self.alan, product=self.notflan, price=1.0)
def test_default_poller_interval_is_30_seconds(capsys):
class ActivateImpl(MockedImpl):
CRONTAB = [
'0 0 * * * stub.print_datetime',
]
plugin = ActivateImpl()
plugin.activate()
with freeze_time('2016-01-01 00:00:01'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out == '2016-01-01'
with freeze_time('2016-01-01 00:00:31'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out == ''
def test_timezone_in_config(capsys):
class MockConfig(object):
TIMEZONE = 'Asia/Tokyo'
class ActivateImpl(MockedImpl):
CRONTAB = [
'0 0 * * * .print_datetime',
]
def print_datetime(self, polled_time):
six.print_(polled_time.strftime('%Y-%m-%d'), end='')
plugin = ActivateImpl()
plugin.activate()
setattr(plugin, 'bot_config', MockConfig())
with freeze_time('2016-01-01 00:00:01'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out != '2016-01-01'
def test_default_poller_interval_is_30_seconds(capsys):
class ActivateImpl(MockedImpl):
CRONTAB = [
'0 0 * * * stub.print_datetime',
]
plugin = ActivateImpl()
plugin.activate()
with freeze_time('2016-01-01 00:00:01'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out == '2016-01-01'
with freeze_time('2016-01-01 00:00:31'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out == ''
def test_timezone_in_plugin(capsys):
class ActivateImpl(MockedImpl):
TIMEZONE = 'Asia/Tokyo'
CRONTAB = [
'0 0 * * * .print_datetime',
]
def activate(self):
self.activate_crontab()
def print_datetime(self, polled_time):
six.print_(polled_time.strftime('%Y-%m-%d'), end='')
plugin = ActivateImpl()
plugin.activate()
with freeze_time('2016-01-01 00:00:01'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out != '2016-01-01'
def test_timezone_in_config(capsys):
class MockConfig(object):
TIMEZONE = 'Asia/Tokyo'
class ActivateImpl(MockedImpl):
CRONTAB = [
'0 0 * * * .print_datetime',
]
def print_datetime(self, polled_time):
six.print_(polled_time.strftime('%Y-%m-%d'), end='')
plugin = ActivateImpl()
plugin.activate()
setattr(plugin, 'bot_config', MockConfig())
with freeze_time('2016-01-01 00:00:01'):
plugin.poll_crontab()
out, err = capsys.readouterr()
assert out != '2016-01-01'
def test_perform_block_delete(app):
content = 'foo'
eff = app.perform_block_create(EBlockCreate(content))
block_id = perform_sequence([], eff)
# Delete from new blocks
with freeze_time('2012-01-01') as frozen_datetime:
eff = app.perform_block_delete(EBlockDelete(block_id))
perform_sequence([], eff)
assert app.last_modified == Arrow.fromdatetime(frozen_datetime())
# Delete in cache
app.block_cache[block_id] = {'foo': 'bar'}
assert app.block_cache.currsize == 1
with freeze_time('2012-01-01') as frozen_datetime:
eff = app.perform_block_delete(EBlockDelete(block_id))
perform_sequence([], eff)
assert app.last_modified == Arrow.fromdatetime(frozen_datetime())
assert app.block_cache.currsize == 0
# Not found
with pytest.raises(BlockNotFound):
eff = app.perform_block_delete(EBlockDelete(block_id))
perform_sequence([], eff)
def test_perform_user_vlob_delete(app):
eff = app.perform_user_vlob_update(EUserVlobUpdate(1, 'foo'))
perform_sequence([], eff)
# Delete from new user vlobs
with freeze_time('2012-01-01') as frozen_datetime:
eff = app.perform_user_vlob_delete(EUserVlobDelete())
perform_sequence([], eff)
assert app.last_modified == Arrow.fromdatetime(frozen_datetime())
# Delete in cache
app.user_vlob_cache[2] = {'foo': 'bar'}
assert app.user_vlob_cache.currsize == 1
with freeze_time('2012-01-01') as frozen_datetime:
eff = app.perform_user_vlob_delete(EUserVlobDelete(2))
perform_sequence([], eff)
assert app.last_modified == Arrow.fromdatetime(frozen_datetime())
assert app.user_vlob_cache.currsize == 0
# Not found
with pytest.raises(UserVlobNotFound):
eff = app.perform_user_vlob_delete(EUserVlobDelete(2))
perform_sequence([], eff)
def test_get(self, s3_client):
# given
data = b'#000000'
frozen_time = datetime.datetime(2016, 10, 23, 10, 30, tzinfo=datetime.timezone.utc)
with freezegun.freeze_time(frozen_time):
s3_client.boto.put_object(
Bucket=s3_client.bucket,
Key=os.path.join(s3_client.prefix, 'black.color'),
Body=data,
)
# when
output_object = s3_client.get('black.color')
# then
assert output_object.fp.read() == data
assert output_object.total_size == len(data)
assert output_object.timestamp == to_timestamp(frozen_time)
def test_execute_report_summary_already_sent(self):
with freezegun.freeze_time('2016-12-23 11:00'):
self.task.execute(self.bot, self.slack)
task = self.bot.fast_queue.pop()
self.assertIsInstance(task, pony.tasks.AskStatus)
self.assertListEqual(task.teams, ['dev_team1'])
self.assertEqual(task.user_id, '_sasha_id')
# mark day as already reported
self.bot.storage.set('report', {
date(2016, 12, 23): {
'dev_team1': {
'reported_at': datetime.utcnow(),
'reports': {}
}
}
})
self.task.execute(self.bot, self.slack)
with self.assertRaises(IndexError):
self.bot.fast_queue.pop()
def test_execute_day_is_holiday(self):
with freezegun.freeze_time('2016-12-01 11:00'):
self.task.execute(self.bot, self.slack)
task = self.bot.fast_queue.pop()
self.assertIsInstance(task, pony.tasks.SendMessage)
self.assertEqual(task.to, '#dev-team')
self.assertIn('No Standup Today', task.text)
self.assertIn('Romanian National Day', task.text)
# this is sent only once (report is marked reported)
self.task.execute(self.bot, self.slack)
with self.assertRaises(IndexError):
self.bot.fast_queue.pop()
self.assertIsNotNone(
self.bot.storage.get('report')[
date(2016, 12, 1)
]['dev_team1'].get('reported_at')
)
def test_rfc_1123_date(self) -> None:
"""
``rfc_1123_date`` returns the date formatted as required by Vuforia.
This test matches the example date set at
`<https://library.vuforia.com/articles/Training/Using-the-VWS-API>`_.
"""
date = datetime.datetime(
day=22,
month=4,
year=2012,
hour=8,
minute=49,
second=37,
)
with freeze_time(date):
assert rfc_1123_date() == 'Sun, 22 Apr 2012 08:49:37 GMT'
def test_has_timed_out_true(
self, timeout: timedelta, time_to_wait: timedelta
) -> None:
"""
:param timeout: The timeout to set and wait for
:param time_to_wait: The time to wait on top of the timeout,
in order to ensure that the server timed out
:return:
"""
self.service.timeout = timeout
self.service.check_in()
assume(
self._target_time_in_range(
datetime.utcnow(), timeout, time_to_wait
)
)
time_to_freeze_to = datetime.utcnow() + timeout + time_to_wait
with freeze_time(time_to_freeze_to): # ZA WARUDO!
self.assertTrue(self.service.has_timed_out)
def test_has_timed_out_false(
self, timeout: timedelta, time_to_wait: timedelta
) -> None:
self.service.timeout = timeout
self.service.check_in()
assume(
self._target_time_in_range(
datetime.utcnow(), timeout, time_to_wait
)
)
with freeze_time(
datetime.utcnow() + timeout + time_to_wait
): # ZA WARUDO!
self.assertFalse(self.service.has_timed_out)
test_transform_dataset_stats.py 文件源码
项目:skills-ml
作者: workforce-data-initiative
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_dataset_stats_counter_empty():
counter = DatasetStatsCounter(quarter='2013Q1', dataset_id='VA')
with moto.mock_s3():
with freeze_time('2017-01-10'):
s3_conn = boto.connect_s3()
s3_conn.create_bucket('test-bucket')
counter.save(s3_conn, 'test-bucket/stats')
key = s3_conn.get_bucket('test-bucket')\
.get_key('stats/quarterly/VA_2013Q1')
expected_stats = {
'total': 0,
'output_counts': {},
'input_counts': {},
'output_percentages': {},
'input_percentages': {},
'last_updated': '2017-01-10T00:00:00',
'quarter': '2013Q1',
}
assert json.loads(key.get_contents_as_string().decode('utf-8')) == expected_stats
test_transform_dataset_stats.py 文件源码
项目:skills-ml
作者: workforce-data-initiative
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_dataset_stats_aggregator():
with moto.mock_s3():
s3_conn = boto.connect_s3()
aggregator = DatasetStatsAggregator(dataset_id='CB', s3_conn=s3_conn)
add_s3_content(
s3_conn,
{
'test-bucket/stats/quarterly/CB_2014Q1':
json.dumps(sample_quarter_stats('2014Q1')),
'test-bucket/stats/quarterly/CB_2014Q2':
json.dumps(sample_quarter_stats('2014Q2')),
'test-bucket/stats/quarterly/VA_2014Q1':
json.dumps(sample_quarter_stats('2014Q1')),
}
)
with freeze_time('2017-01-10'):
aggregator.run('test-bucket/stats')
expected_stats = sample_dataset_stats()
key = s3_conn.get_bucket('test-bucket')\
.get_key('stats/dataset_summaries/CB.json')
assert json.loads(key.get_contents_as_string().decode('utf-8')) == expected_stats
def test_key_values(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()
ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()
unique_key_pairs = ExampleKeyedConfig.key_values()
self.assertEquals(len(unique_key_pairs), 2)
self.assertEquals(set(unique_key_pairs), set([('left_a', 'right_a'), ('left_b', 'right_b')]))
unique_left_keys = ExampleKeyedConfig.key_values('left', flat=True)
self.assertEquals(len(unique_left_keys), 2)
self.assertEquals(set(unique_left_keys), set(['left_a', 'left_b']))
def test_current_set(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleKeyedConfig(left='left_a', right='right_a', int_field=0, changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', int_field=0, changed_by=self.user).save()
ExampleKeyedConfig(left='left_a', right='right_a', int_field=1, changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', int_field=2, changed_by=self.user).save()
queryset = ExampleKeyedConfig.objects.current_set()
self.assertEqual(len(queryset.all()), 2)
self.assertEqual(
set(queryset.order_by('int_field').values_list('int_field', flat=True)),
set([1, 2])
)
def test_action_create(question_factory):
question = question_factory()
project = question.module.project
now = timezone.now()
with freeze_time(now):
action = Action(
actor=question.creator,
verb=Verbs.ADD.value,
obj=question,
target=project,
description='description'
)
assert action.actor == question.creator
assert action.verb == Verbs.ADD.value
assert action.obj == question
assert action.target == project
assert action.timestamp == now
assert action.public is True
assert action.description == 'description'
def test_phase_end_tomorrow(phase_factory):
phase = phase_factory(
start_date=parse('2013-01-01 17:00:00 UTC'),
end_date=parse('2013-01-01 18:00:00 UTC')
)
project = phase.module.project
action_count = Action.objects.filter(verb=SCHEDULE).count()
assert action_count == 0
with freeze_time('2013-01-01 17:30:00 UTC'):
call_command('create_system_actions')
action_count = Action.objects.filter(verb=SCHEDULE).count()
action = Action.objects.filter(verb=SCHEDULE).last()
assert action_count == 1
assert action.obj == phase
assert action.verb == SCHEDULE
assert action.project == project
def test_project_starts_later_or_earlier(phase_factory):
phase = phase_factory(
start_date=parse('2013-01-01 17:00:00 UTC'),
end_date=parse('2013-01-02 18:00:00 UTC')
)
with freeze_time(phase.start_date - timedelta(days=1)):
call_command('create_system_actions')
action_count = Action.objects.filter(verb=START).count()
assert action_count == 0
with freeze_time(phase.start_date + timedelta(days=1)):
call_command('create_system_actions')
action_count = Action.objects.filter(verb=START).count()
assert action_count == 0
def test_project_start_single_action(phase_factory):
phase = phase_factory(
start_date=parse('2013-01-01 17:00:00 UTC'),
end_date=parse('2013-01-01 18:00:00 UTC')
)
action_count = Action.objects.filter(verb=START).count()
assert action_count == 0
with freeze_time(phase.start_date + timedelta(minutes=30)):
call_command('create_system_actions')
action_count = Action.objects.filter(verb=START).count()
assert action_count == 1
# first phase starts within the last hour but script has already run
with freeze_time(phase.start_date + timedelta(minutes=45)):
call_command('create_system_actions')
action_count = Action.objects.filter(verb=START).count()
assert action_count == 1
def test_project_start_reschedule(phase_factory):
phase = phase_factory(
start_date=parse('2013-01-01 17:00:00 UTC'),
end_date=parse('2013-01-01 18:00:00 UTC')
)
# first phase starts within an hour
with freeze_time(phase.start_date + timedelta(minutes=30)):
call_command('create_system_actions')
action_count = Action.objects.filter(verb=START).count()
assert action_count == 1
# first phases start date has been moved forward
# and the start actions timestamp has to be adapted
phase.start_date = phase.start_date + timedelta(days=1)
phase.save()
with freeze_time(phase.start_date + timedelta(minutes=30)):
call_command('create_system_actions')
action_count = Action.objects.filter(verb=START).count()
assert action_count == 1
action = Action.objects.filter(verb=START).first()
assert action.timestamp == phase.start_date
def test_days_left(project, phase_factory):
phase1 = phase_factory(
start_date=parse('2013-01-01 18:00:00 UTC'),
end_date=parse('2013-01-02 18:00:00 UTC'),
module__project=project,
)
phase2 = phase_factory(
start_date=parse('2013-02-01 18:00:00 UTC'),
end_date=parse('2013-02-02 18:00:00 UTC'),
module__project=project,
)
with freeze_time(phase1.start_date):
assert project.days_left is 1
with freeze_time(phase1.end_date):
assert project.days_left is None
with freeze_time(phase2.start_date):
assert project.days_left is 1
with freeze_time(phase2.end_date):
assert project.days_left is None
def test_react_rating_anonymous(rf, question, comment):
with freeze_time('2013-01-02 18:00:00 UTC'):
user = AnonymousUser()
props = react_comment_render_for_props(rf, user, question)
comments_content_type = ContentType.objects.get_for_model(comment)
request = rf.get('/')
request.user = user
comments = ThreadSerializer(
question.comments.all().order_by('-created'),
many=True, context={'request': request}).data
assert props == {
'comments': comments,
'comments_contenttype': comments_content_type.pk,
'isAuthenticated': False,
'isModerator': False,
'isReadOnly': True,
'user_name': '',
}
def test_react_rating_user(rf, user, phase, question, comment):
with freeze_time('2013-01-02 18:00:00 UTC'):
props = react_comment_render_for_props(rf, user, question)
comments_content_type = ContentType.objects.get_for_model(comment)
request = rf.get('/')
request.user = user
comments = ThreadSerializer(
question.comments.all().order_by('-created'),
many=True, context={'request': request}).data
assert props == {
'comments': comments,
'comments_contenttype': comments_content_type.pk,
'isAuthenticated': True,
'isModerator': False,
'isReadOnly': False,
'user_name': user.username,
}
def test_manager_finished_phases(phase_factory):
old_phase = phase_factory(
start_date=parse('2013-01-01 17:00:00 UTC'),
end_date=parse('2013-01-01 18:00:00 UTC')
)
new_phase = phase_factory(
start_date=parse('2013-01-01 18:00:00 UTC'),
end_date=parse('2013-01-01 19:00:00 UTC')
)
with freeze_time(new_phase.start_date):
finished_phases = models.Phase.objects.finished_phases()
assert list(finished_phases) == [old_phase]
with freeze_time(new_phase.end_date):
finished_phases = models.Phase.objects.finished_phases()
assert list(finished_phases) == [old_phase, new_phase]
def test_manager_finish_next(phase_factory):
phase_today = phase_factory(
start_date=parse('2013-01-01 17:00:00 UTC'),
end_date=parse('2013-01-01 17:00:01 UTC')
)
phase_tomorrow = phase_factory(
start_date=parse('2013-01-01 17:00:00 UTC'),
end_date=parse('2013-01-02 17:00:00 UTC')
)
phase_factory(
start_date=parse('2013-01-01 17:00:00 UTC'),
end_date=parse('2013-01-02 17:00:03 UTC')
)
with freeze_time(phase_today.start_date):
finish_phases = models.Phase.objects.finish_next()
assert list(finish_phases) == [phase_today, phase_tomorrow]
with freeze_time(phase_today.end_date):
finish_phases = models.Phase.objects.finish_next()
assert list(finish_phases) == [phase_tomorrow]
def test_past_phases(phase_factory):
phase1 = phase_factory(
start_date=parse('2013-01-01 18:00:00 UTC'),
end_date=parse('2013-01-10 18:00:00 UTC'),
)
phase2 = phase_factory(
start_date=parse('2013-01-05 18:00:00 UTC'),
end_date=parse('2013-01-15 18:00:00 UTC'),
)
with freeze_time(phase1.start_date):
assert list(models.Phase.objects.past_phases()) == []
with freeze_time(phase1.end_date):
assert list(models.Phase.objects.past_phases()) == [phase1]
with freeze_time(phase2.end_date):
assert list(models.Phase.objects.past_phases()) == [phase1, phase2]