def test_future_phases(phase_factory):
phase1 = phase_factory(
start_date=parse('2013-01-01 18:00:00 UTC'),
end_date=parse('2013-01-10 18:00:00 UTC'),
)
phase2 = phase_factory(
start_date=parse('2013-01-05 18:00:00 UTC'),
end_date=parse('2013-01-15 18:00:00 UTC'),
)
phase3 = phase_factory(
start_date=None,
end_date=None
)
with freeze_time(phase1.start_date - timedelta(minutes=1)):
assert (list(models.Phase.objects.future_phases())
== [phase3, phase1, phase2])
with freeze_time(phase2.start_date - timedelta(minutes=1)):
assert list(models.Phase.objects.future_phases()) == [phase3, phase2]
with freeze_time(phase2.end_date):
assert list(models.Phase.objects.future_phases()) == [phase3]
python类freeze_time()的实例源码
def test_past_and_active_phases(phase_factory):
phase1 = phase_factory(
start_date=parse('2013-01-01 18:00:00 UTC'),
end_date=parse('2013-01-10 18:00:00 UTC'),
)
phase2 = phase_factory(
start_date=parse('2013-01-05 18:00:00 UTC'),
end_date=parse('2013-01-15 18:00:00 UTC'),
)
phase_factory(
start_date=None,
end_date=None
)
with freeze_time(phase1.start_date - timedelta(minutes=1)):
assert list(models.Phase.objects.past_and_active_phases()) == []
with freeze_time(phase1.start_date):
assert list(models.Phase.objects.past_and_active_phases()) == [phase1]
with freeze_time(phase2.start_date):
assert (list(models.Phase.objects.past_and_active_phases())
== [phase1, phase2])
with freeze_time(phase2.end_date):
assert (list(models.Phase.objects.past_and_active_phases())
== [phase1, phase2])
def test_seconds_until_midnight(self):
arg_list = [
"2017-05-29 23:59:59",
"2017-05-29 00:00:00",
"2017-05-29 00:00:01"
]
expected = [
1,
0,
86399
]
for idx, arg in enumerate(arg_list):
with freeze_time(arg):
self.assertEqual(
how_many_seconds_until_midnight(),
expected[idx]
)
test_nightly_resetting_defaultdict.py 文件源码
项目:shifthelper
作者: fact-project
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_NightlyResettingDefaultdict():
from shifthelper.tools import NightlyResettingDefaultdict
initial_datetime = datetime.datetime(2016, 1, 1)
two_days_later = datetime.datetime(2016, 1, 3, 0, 0, 0)
with freeze_time(initial_datetime) as frozen_datetime:
nightly_max_rate = NightlyResettingDefaultdict(lambda: -np.inf)
nightly_max_rate['foo'] = 5
assert nightly_max_rate['foo'] == 5
frozen_datetime.move_to(two_days_later)
assert nightly_max_rate['foo'] == -np.inf
nightly_max_rate['foo'] = 6
assert nightly_max_rate['foo'] == 6
def test_key_values(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()
ExampleKeyedConfig(left='left_a', right='right_a', changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', changed_by=self.user).save()
unique_key_pairs = ExampleKeyedConfig.key_values()
self.assertEquals(len(unique_key_pairs), 2)
self.assertEquals(set(unique_key_pairs), set([('left_a', 'right_a'), ('left_b', 'right_b')]))
unique_left_keys = ExampleKeyedConfig.key_values('left', flat=True)
self.assertEquals(len(unique_left_keys), 2)
self.assertEquals(set(unique_left_keys), set(['left_a', 'left_b']))
def test_current_set(self, mock_cache):
mock_cache.get.return_value = None
with freeze_time('2012-01-01'):
ExampleKeyedConfig(left='left_a', right='right_a', int_field=0, changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', int_field=0, changed_by=self.user).save()
ExampleKeyedConfig(left='left_a', right='right_a', int_field=1, changed_by=self.user).save()
ExampleKeyedConfig(left='left_b', right='right_b', int_field=2, changed_by=self.user).save()
queryset = ExampleKeyedConfig.objects.current_set()
self.assertEqual(len(queryset.all()), 2)
self.assertEqual(
set(queryset.order_by('int_field').values_list('int_field', flat=True)),
set([1, 2])
)
def test_list_view(rf, phase, module_factory, idea_factory):
module = phase.module
project = module.project
idea = idea_factory(module=module)
other_module = module_factory()
other_idea = idea_factory(module=other_module)
with freeze_time(phase.start_date):
view = views.IdeaListView.as_view()
request = rf.get('/ideas')
response = view(request, project=project, module=module)
assert idea in response.context_data['idea_list']
assert other_idea not in response.context_data['idea_list']
assert response.context_data['idea_list'][0].comment_count == 0
assert response.context_data['idea_list'][0].positive_rating_count == 0
assert response.context_data['idea_list'][0].negative_rating_count == 0
def test_create_view(client, phase, user):
module = phase.module
with freeze_time(phase.start_date):
count = models.Idea.objects.all().count()
assert count == 0
url = reverse('idea-create', kwargs={'slug': module.slug})
response = client.get(url)
assert response.status_code == 302
assert redirect_target(response) == 'account_login'
client.login(username=user.email, password='password')
response = client.get(url)
assert response.status_code == 200
idea = {'name': 'Idea', 'description': 'description'}
response = client.post(url, idea)
assert response.status_code == 302
assert redirect_target(response) == 'idea-detail'
count = models.Idea.objects.all().count()
assert count == 1
def test_update_view(client, phase, idea):
idea.module = phase.module
idea.save()
user = idea.creator
with freeze_time(phase.start_date):
url = reverse('idea-update', kwargs={'slug': idea.slug})
response = client.get(url)
assert response.status_code == 302
client.login(username=user.email, password='password')
response = client.get(url)
assert response.status_code == 200
data = {'description': 'description', 'name': idea.name}
response = client.post(url, data)
id = idea.pk
updated_idea = models.Idea.objects.get(id=id)
assert updated_idea.description == 'description'
assert response.status_code == 302
def test_ideas_download_contains_right_data(rf, idea_factory, admin):
idea = idea_factory()
module = idea.module
idea_factory(module=module)
idea_factory(module=module)
now = timezone.now()
with freeze_time(now):
request = rf.get('/ideas/download/module/{}'.format(module.slug))
request.user = admin
response = views.IdeaDownloadView.as_view()(request, slug=module.slug)
assert response.status_code == 200
assert (response._headers['content-type'] ==
('Content-Type',
'application/vnd.openxmlformats-officedocument'
'.spreadsheetml.sheet'))
assert (response._headers['content-disposition'] ==
('Content-Disposition',
'attachment; filename="{}_{}.xlsx"'
.format(
module.project.slug,
now.strftime('%Y%m%dT%H%M%S'))))
def test_phase_dispatch_mixin_return_active_phase(
rf,
project_detail_view,
phase1,
phase2
):
project = phase1.module.project
project_url = reverse('project-detail', args=[project.slug])
with freeze_time(phase1.start_date):
# Requesting garbage should return the currently active phase.
request = rf.get("{0}?phase={1}".format(project_url, "A"*100))
response = project_detail_view(request, slug=project.slug)
assert FakePhase0View.template_name in response.template_name
assert FakePhase1View.template_name not in response.template_name
# Without any further specification via '?phase=' return the
# active phase.
request = rf.get(project_url)
response = project_detail_view(request, slug=project.slug)
assert FakePhase0View.template_name in response.template_name
assert FakePhase1View.template_name not in response.template_name
def setUp(self):
# Create an 'ephemeral' token so we can test token timeouts. We
# want a timeout long enough to last the test, but we don't want to
# slow down the tests too much either.
self.normal_creds = TokenCredentials(self.key_path, self.key_id,
self.team_id)
self.lasting_header = self.normal_creds.get_authorization_header(
self.topics[0])
with freeze_time('2012-01-14'):
self.expiring_creds = \
TokenCredentials(self.key_path, self.key_id,
self.team_id,
token_lifetime=self.token_lifetime)
self.expiring_header = self.expiring_creds.get_authorization_header(
self.topics[0])
def test_find_entries():
"EntryLoader.find_entries can find entries for the specified blog"
db = DB()
blog = create_blog()
entries = []
with freeze_time('2017-01-13 12:00:02'):
entries.append(create_entry(blog=blog))
with freeze_time('2017-01-13 12:00:01'):
entries.append(create_entry(blog=blog))
with freeze_time('2017-01-13 12:00:00'):
entries.append(create_entry(blog=blog))
found_entries = EntryLoader.find_entries(db, blog.id)
assert len(found_entries) == len(entries)
assert [e.id for e in found_entries] == [e.id for e in entries]
found_entries_with_limit = EntryLoader.find_entries(db, blog.id, limit=2)
assert len(found_entries_with_limit) == len(entries[0:2])
assert [
e.id for e in found_entries_with_limit
] == [e.id for e in entries[0:2]]
def test_index_with_entries():
db = DB()
with global_user(random_string(5)):
blog = BlogAction.ensure_global_blog_created(db)
entries = []
with freeze_time('2017-01-13 12:00:02'):
entries.append(create_entry(blog=blog))
with freeze_time('2017-01-13 12:00:01'):
entries.append(create_entry(blog=blog))
with freeze_time('2017-01-13 12:00:00'):
entries.append(create_entry(blog=blog))
res = web_client().get('/')
assert res.status == '200 OK'
d = pq(res.data)
assert [
int(d(a).attr('data-entry-id')) for a in d('.entry')
] == [e.id for e in entries]
def test_publish_update_rotating():
"""Test if update rotating publishes works."""
with test.clean_and_config(os.path.join(
_test_base,
b"publish-current.yml",
)) as (tyml, config):
do_publish_create_rotating(config)
with freezegun.freeze_time("2012-10-11 10:10:10"):
args = [
'-c',
config,
'publish',
'update',
]
main(args)
state = SystemStateReader()
state.read()
expect = {
u'fake/current stable': set([u'fake-current']),
u'fakerepo01/current stable': set([u'fakerepo01-current']),
u'fakerepo02/current stable': set([u'fakerepo02-current'])
}
assert expect == state.publish_map
def test_publish_snapshot_update_rotating():
"""Test if update rotating publishes via snapshot works."""
with test.clean_and_config(os.path.join(
_test_base,
b"publish-current.yml",
)) as (tyml, config):
do_publish_create_rotating(config)
with freezegun.freeze_time("2012-10-11 10:10:10"):
args = [
'-c',
config,
'snapshot',
'update',
]
main(args)
state = SystemStateReader()
state.read()
expect = {
u'fake/current stable': set([u'fake-current']),
u'fakerepo01/current stable': set([u'fakerepo01-current']),
u'fakerepo02/current stable': set([u'fakerepo02-current'])
}
assert expect == state.publish_map
test_authentication.py 文件源码
项目:alexa-voice-service-client
作者: richtier
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_retrieve_api_token_expiring_memo(token_retrieve_200):
authenticator = AlexaVoiceServiceTokenAuthenticator(
client_id='debug', secret='debug', refresh_token='debug'
)
with freeze_time('3012-01-14 12:00:00'):
authenticator.retrieve_api_token()
authenticator.retrieve_api_token()
assert len(token_retrieve_200.request_history) == 1
with freeze_time('3012-01-14 13:00:00'):
authenticator.retrieve_api_token()
assert len(token_retrieve_200.request_history) == 2
with freeze_time('3012-01-14 13:30:00'):
authenticator.retrieve_api_token()
assert len(token_retrieve_200.request_history) == 2
with freeze_time('3012-01-14 14:00:00'):
authenticator.retrieve_api_token()
assert len(token_retrieve_200.request_history) == 3
def test_edited_is_false_for_newly_created_content_within_15_minutes_grace_period(self):
with freeze_time(self.public_content.created + datetime.timedelta(minutes=14)):
self.public_content.save()
self.assertFalse(self.public_content.edited)
def test_edited_is_true_for_newly_created_content_after_15_minutes_grace_period(self):
with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
self.public_content.save()
self.assertTrue(self.public_content.edited)
def test_dict_for_view_edited_post(self):
with freeze_time(self.public_content.created + datetime.timedelta(minutes=16)):
self.public_content.save()
self.assertEqual(self.public_content.dict_for_view(self.user), {
"author": self.public_content.author_id,
"author_guid": self.public_content.author.guid,
"author_handle": self.public_content.author.handle,
"author_home_url": self.public_content.author.home_url,
"author_image": self.public_content.author.safer_image_url_small,
"author_is_local": bool(self.public_content.author.user),
"author_name": self.public_content.author.handle,
"author_profile_url": self.public_content.author.get_absolute_url(),
"content_type": self.public_content.content_type.string_value,
"delete_url": reverse("content:delete", kwargs={"pk": self.public_content.id}),
"detail_url": self.public_content.get_absolute_url(),
"formatted_timestamp": self.public_content.timestamp,
"guid": self.public_content.guid,
"has_shared": False,
"humanized_timestamp": "%s (edited)" % self.public_content.humanized_timestamp,
"id": self.public_content.id,
"is_authenticated": True,
"is_author": True,
"is_following_author": False,
"parent": "",
"profile_id": self.public_content.author.id,
"rendered": self.public_content.rendered,
"reply_count": 0,
"reply_url": reverse("content:reply", kwargs={"pk": self.public_content.id}),
"shares_count": 0,
"slug": self.public_content.slug,
"through": self.public_content.id,
"update_url": reverse("content:update", kwargs={"pk": self.public_content.id}),
})