def test_update_and_stop_seeding_that_seeding_return_none(self, mock_get_torrent, mock_hset):
mock_get_torrent.return_value = self.torrent(
status='seeding',
progress=Decimal('100.00'),
ratio=Decimal('9.99'),
rateUpload=10500,
rateDownload=105000,
stop=mock_stop
)
self.torrent_model.created = timezone.now() + timezone.timedelta(hours=-24, seconds=-1)
self.torrent_model.save()
self.assertIsNone(update_and_stop_seeding(self.torrent_model.pk))
mock_get_torrent.assert_called_with(self.torrent_model.hash)
mock_hset.assert_called_with('torrent:{}'.format(self.torrent_model.pk), 'rate_upload', 0)
python类timedelta()的实例源码
def setUp(self):
division = Division.objects.create(
id='ocd-division/country:us', name='USA')
jur1 = Jurisdiction.objects.create(
id="ocd-division/country:us/state:mo",
name="Missouri State Senate",
url="http://www.senate.mo.gov",
division=division,
)
jur2 = Jurisdiction.objects.create(
id="ocd-division/country:us/state:da",
name="Dausa State Senate",
url="http://www.senate.da.gov",
division=division,
)
start_time = timezone.now()
end_time = start_time + timezone.timedelta(minutes=10)
RunPlan.objects.create(jurisdiction=jur1, success=True,
start_time=start_time, end_time=end_time)
start_time = end_time + timezone.timedelta(minutes=10)
end_time = start_time + timezone.timedelta(minutes=10)
RunPlan.objects.create(jurisdiction=jur2, success=True,
start_time=start_time, end_time=end_time)
def setUp(self):
division = Division.objects.create(
id='ocd-division/country:us', name='USA')
jur1 = Jurisdiction.objects.create(
id="ocd-division/country:us/state:mo",
name="Missouri State Senate",
url="http://www.senate.mo.gov",
division=division,
)
start_time = timezone.now()
end_time = start_time + timezone.timedelta(minutes=10)
RunPlan.objects.create(jurisdiction=jur1, success=True,
start_time=start_time, end_time=end_time)
LegislativeSession.objects.create(jurisdiction=jur1,
identifier="2017",
name="2017 Test Session",
start_date="2017-06-25",
end_date="2017-06-26")
def test_project_create(me, api):
project_data = {
'name': 'my_project',
'description': 'blah',
}
resp = api.post('/api/projects/', project_data)
created_project_data = resp.json()
assert resp.status_code == 201, created_project_data
resp = api.get('/api/projects/{}/'.format(created_project_data['id']))
fetched_project_data = resp.json()
assert resp.status_code == 200, fetched_project_data
assert fetched_project_data == created_project_data
assert fetched_project_data['name'] == project_data['name']
assert fetched_project_data['description'] == project_data['description']
assert fetched_project_data['owner'] == me.id
creation_date = parse_date(fetched_project_data['creationDate'])
assert (timezone.now() - creation_date) < timedelta(seconds=5)
def check_user_state():
'''?????????????????????'''
users = User.objects.filter(level__gt=0)
for user in users:
# ??????????????
if timezone.now() - timezone.timedelta(days=1) > user.level_expire_time:
user.ss_user.enable = False
user.ss_user.upload_traffic = 0
user.ss_user.download_traffic = 0
user.ss_user.transfer_enable = settings.DEFAULT_TRAFFIC
user.ss_user.save()
user.level = 0
user.save()
logs = 'time: {} use: {} level timeout '.format(timezone.now().strftime('%Y-%m-%d'),
user.username).encode('utf8')
print(logs)
print('Time:{} CHECKED'.format(timezone.now()))
def delete_old(request):
"""Delete old messages"""
old_at = timezone.now() - timezone.timedelta(
days=settings.OLD_MSG_THRESHOLD)
q_sender = Q(sender__username__iexact=request.user.username) & Q(
sender_status='1normal') & Q(created_at__lte=old_at)
msgs = Msg.objects.filter(q_sender)
for msg in msgs:
msg.sender_status = '6deleted'
msg.save()
q_recipient = Q(recipient__username__iexact=request.user.username) \
& (Q(recipient_status='1normal') | Q(
recipient_status='2read')) & Q(created_at__lte=old_at)
msgs = Msg.objects.filter(q_recipient)
for msg in msgs:
msg.recipient_status = '6deleted'
msg.save()
return redirect('msgs:inbox', page=1)
def shortdate(value):
now = timezone.template_localtime(timezone.now())
diff = now - value
if diff < timezone.timedelta():
return date_format(value, "DATE_FORMAT")
elif diff.days < 1:
return _('today')
elif diff.days < 2:
return _('yesterday')
elif diff.days < 30:
return ungettext_lazy('%d day ago', '%d days ago') % diff.days
else:
months = diff.days // 30
if months <= 6:
return ungettext_lazy('a month ago', '%d months ago') % months
else:
return date_format(value, "DATE_FORMAT")
def open_bets(request):
# used for expiring soon and new bet tags
tomorrow = timezone.now() + timezone.timedelta(days=1)
yesterday = timezone.now() + timezone.timedelta(days=-1)
# get the current user
current_user = request.user
# get all open prop bets from other users
open_bets = ProposedBet.objects.filter(remaining_wagers__gt=0, end_date__gt=timezone.now(), won_bet__isnull=True).exclude(user=current_user)
# get all bets created in past 24 hours
new_bets = ProposedBet.objects.filter(remaining_wagers__gt=0, end_date__gt=timezone.now(), created_on__gt=yesterday, won_bet__isnull=True).exclude(user=current_user)
# get all bets expiring in next 24 hours
closing_soon_bets = ProposedBet.objects.filter(remaining_wagers__gt=0, end_date__gt=timezone.now(), end_date__lt=tomorrow, won_bet__isnull=True).exclude(user=current_user)
return render(request, 'bets/base_open_bets.html', {'nbar': 'open_bets', 'open_bets': open_bets, 'new_bets': new_bets, 'closing_soon_bets': closing_soon_bets})
def change(self, request):
if not self.is_mine(request.user) or self.has_edit:
return 1
if len(request.POST['body']) > 2200 or len(request.POST['body']) < 1:
return 1
if not self.befores:
befores_json = []
else:
befores_json = json.loads(self.befores)
befores_json.append(self.body)
self.befores = json.dumps(befores_json)
self.body = request.POST['body']
self.spoils = request.POST.get('is_spoiler', False)
self.feeling = request.POST.get('feeling_id', 0)
if not timezone.now() < self.created + timezone.timedelta(minutes=2):
self.has_edit = True
return self.save()
def change(self, request):
if not self.is_mine(request.user) or self.has_edit:
return 1
if len(request.POST['body']) > 2200 or len(request.POST['body']) < 1:
return 1
if not self.befores:
befores_json = []
else:
befores_json = json.loads(self.befores)
befores_json.append(self.body)
self.befores = json.dumps(befores_json)
self.body = request.POST['body']
self.spoils = request.POST.get('is_spoiler', False)
self.feeling = request.POST.get('feeling_id', 0)
if not timezone.now() < self.created + timezone.timedelta(minutes=2):
self.has_edit = True
return self.save()
def give_notification(user, type, to, post=None, comment=None):
# Just keeping this simple for now, might want to make it better later
# If the user sent a notification to this user at least 5 seconds ago, return False
# Or if user is to
# Or if yeah notifications are off and this is a yeah notification
user_is_self_unk = (not type == 3 and user == to)
is_notification_too_fast = (user.notification_sender.filter(created__gt=timezone.now() - timedelta(seconds=5), type=type).exclude(type=4) and not type == 3)
user_no_yeahnotif = (not to.let_yeahnotifs() and (type == 0 or type == 1))
if user_is_self_unk or is_notification_too_fast or user_no_yeahnotif:
return False
# Search for my own notifiaction. If it exists, set it as unread.
merge_own = user.notification_sender.filter(created__gt=timezone.now() - timedelta(hours=8), to=to, type=type, context_post=post, context_comment=comment)
if merge_own:
# If it's merged, don't unread that one, but unread what it's merging.
return merge_own.first().set_unread()
# Search for a notification already there so we can merge with it if it exists
merge_s = Notification.objects.filter(created__gt=timezone.now() - timedelta(hours=8), to=to, type=type, context_post=post, context_comment=comment)
# If it exists, merge with it. Else, create a new notification.
if merge_s:
return merge_s.first().merge(user)
else:
return user.notification_sender.create(source=user, type=type, to=to, context_post=post, context_comment=comment)
def get_friendships(user, limit=50, offset=0, latest=False, online_only=False):
if not limit:
return Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-created')
if latest:
if online_only:
delta = timezone.now() - timedelta(seconds=48)
awman = []
for friend in Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-latest')[offset:offset + limit]:
if friend.other(user).last_login > delta:
awman.append(friend)
return awman
# Fix all of this at some point
# return Friendship.objects.filter(
#source=When(source__ne=user, source__last_login__gt=delta),
#target=When(target__ne=user, target__last_login__gt=delta)
#).order_by('-latest')[offset:offset + limit]
else:
return Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-latest')[offset:offset + limit]
else:
return Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-created')[offset:offset + limit]
def make_message(self, request):
if not request.user.has_freedom() and (request.POST.get('url') or request.FILES.get('screen')):
return 6
if Message.real.filter(creator=request.user, created__gt=timezone.now() - timedelta(seconds=2)).exists():
return 3
if len(request.POST['body']) > 50000 or (len(request.POST['body']) < 1 and not request.POST.get('_post_type') == 'painting'):
return 1
upload = None
drawing = None
if request.FILES.get('screen'):
upload = util.image_upload(request.FILES['screen'], True)
if upload == 1:
return 2
if request.POST.get('_post_type') == 'painting':
if not request.POST.get('painting'):
return 2
drawing = util.image_upload(request.POST['painting'], False, True)
if drawing == 1:
return 2
new_post = self.message_set.create(body=request.POST.get('body'), creator=request.user, feeling=int(request.POST.get('feeling_id', 0)), drawing=drawing, screenshot=upload)
new_post.mine = True
return new_post
def __init__(self, begin=None, end=None):
if begin:
self.begin = arrow.Arrow.strptime(begin, '%Y-%m-%d', settings.TIME_ZONE) if begin else arrow.now()
self.begin = self.begin.floor('day').to('UTC').datetime
elif end:
to = arrow.Arrow.strptime(end, '%Y-%m-%d', settings.TIME_ZONE).floor('day').to('UTC').datetime
self.begin = to - timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD)
else:
self.begin = arrow.now()
self.begin = self.begin.floor('day').to('UTC').datetime
if end:
self.end = arrow.Arrow.strptime(end, '%Y-%m-%d', settings.TIME_ZONE).floor('day').to('UTC').datetime
else:
self.end = self.begin + timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD)
self.events = Event.objects.get_by_dates(begin=self.begin, end=self.end)
def test_get_for_closing(self):
now = arrow.now().floor('day').to('UTC').datetime
event_before = Event()
event_before.begin = now - timezone.timedelta(days=3)
event_before.end = now - timezone.timedelta(days=4)
event_before.title = 'test_title_now'
event_before.status = 'open'
event_before.save()
event_after = copy.copy(event_before)
event_after.id = None
event_after.begin = now + timezone.timedelta(days=3)
event_after.end = now + timezone.timedelta(days=4)
event_after.save()
queryset = Event.objects.get_for_closing()
self.assertTrue(event_before in queryset)
self.assertTrue(event_after not in queryset)
self.assertEqual(queryset.count(), 2)
for event in queryset:
self.assertTrue(event.end < now)
self.assertTrue(event.paid >= event.total)
def test_calendar(self):
calendar = Calendar()
now = arrow.now().floor('day').to('UTC').datetime
week = now + timezone.timedelta(days=settings.EVENTS_CALENDAR_PERIOD)
self.assertEqual(now, calendar.begin)
self.assertEqual(week, calendar.end)
event = Event()
event.begin = now + timezone.timedelta(days=3)
event.end = now + timezone.timedelta(days=4)
event.title = 'test_title_now'
event.status = 'open'
event.save()
days = calendar.get_days()
self.assertEqual(settings.EVENTS_CALENDAR_PERIOD, len(days))
for element in days:
if event.begin <= element.date < event.end:
self.assertIn(event, element.events)
for hour in element.hours:
if event.begin <= hour.date < event.end:
self.assertIn(event, hour.events)
def test_check_callback_and_save(self):
# we ensure a user may by default not send more than 50 messages a day
Message.objects.filter(sender=self.participant1).count()
r = 100
for i in range(r):
try:
m = Message(sender=self.participant1, thread=self.thread1, body="hi")
m.save()
except Exception:
pass
# we have more than 50 messages (more in setUp)
self.assertEqual(50, Message.objects.filter(sender=self.participant1).count())
# the limit is only in the last 24 hours
Message.objects.filter(sender=self.participant1).update(sent_at=now() - timedelta(days=1, seconds=1))
last = Message.objects.filter(sender=self.participant1).latest('id')
new = Message.objects.create(sender=self.participant1, thread=self.thread1, body="hi")
self.assertEqual(new.id, last.id + 1)
def test_get_lasts_messages_of_threads_check_who_read(self):
# participant 1 and 2 have read the messages, 3 no
self.participation1.date_last_check = now() + timedelta(days=1)
self.participation1.save()
self.participation2.date_last_check = now() + timedelta(days=1)
self.participation2.save()
# we do not modify self.participation3.date_last_check
# this means participant 1 and 2 have read the message, not 3
with self.assertNumQueries(5):
messages = Message.managers.get_lasts_messages_of_threads(self.participant1.id, check_who_read=True, check_is_notification=False)
# the ordering has not been modified
self.assertEqual([self.m33.id, self.m22.id, self.m11.id], [m.id for m in messages])
# participant 1 and 2 have read Thread 1
self.assertTrue(self.participant1.id in messages[2].readers)
self.assertTrue(self.participant2.id in messages[2].readers)
self.assertFalse(self.participant3.id in messages[0].readers)
def test_get_lasts_messages_of_threads_check_is_notification_check_who_read(self):
# participant 1 and 2 have read the messages, 3 no
self.participation1.date_last_check = now() + timedelta(days=1)
self.participation1.save()
self.participation2.date_last_check = now() + timedelta(days=1)
self.participation2.save()
# we create a notification check
with self.assertNumQueries(6):
messages = Message.managers.get_lasts_messages_of_threads(self.participant1.id, check_who_read=True, check_is_notification=True)
# the ordering has not been modified
self.assertEqual([self.m33.id, self.m22.id, self.m11.id], [m.id for m in messages])
# the second conversation has no notification because last reply comes from the current participant
self.assertEqual(messages[1].is_notification, False)
# the third (first in ordering) conversation has new messages
self.assertEqual(messages[0].is_notification, True)
# participant 1 and 2 have read Thread 1
self.assertTrue(self.participant1.id in messages[2].readers)
def test_list_messages_in_thread(self):
# no authentication
response = self.client_unauthenticated.get("{0}{1}/list_messages_in_thread/".format(self.url, self.thread1.id))
self.assertEqual(403, response.status_code)
# no permission
response = self.client_authenticated.get("{0}{1}/list_messages_in_thread/".format(self.url, self.thread_unrelated.id))
self.assertEqual(403, response.status_code)
# ok
# participant 3 has read the 2 last messages, 1 only the first
p1 = Participation.objects.create(participant=self.participant3, thread=self.thread3)
p1.date_last_check = now() - timedelta(days=1)
p1.save()
p2 = Participation.objects.create(participant=self.participant1, thread=self.thread3)
p2.date_last_check = now() - timedelta(days=2)
p2.save()
# we change the date of the messages
self.m31.sent_at = p1.date_last_check = now() - timedelta(days=3)
self.m31.save()
self.m32.sent_at = p1.date_last_check = now() - timedelta(days=1, hours=12)
self.m32.save()
response = self.client_authenticated.get("{0}{1}/list_messages_in_thread/".format(self.url, self.thread3.id))
messages_dct = parse_json_response(response.data)
messages = messages_dct["results"]
self.assertEqual([self.m33.id, self.m32.id, self.m31.id], [m["id"] for m in messages])
self.assertEqual([set([]), set([self.participant3.id]), set([self.participant1.id, self.participant3.id])], [set(m["readers"]) for m in messages])
def form_valid(self, form):
# Get data
latitude = form.cleaned_data['latitude']
longitude = form.cleaned_data['longitude']
# Get today's date
now = timezone.now()
# Get next week's date
next_week = now + timezone.timedelta(weeks=1)
# Get Point
location = Point(longitude, latitude, srid=4326)
# Look up events
events = Event.objects.filter(datetime__gte=now).filter(datetime__lte=next_week).annotate(distance=Distance('venue__location', location)).order_by('distance')[0:5]
# Render the template
return render_to_response('gigs/lookupresults.html', {
'events': events
})
def parse_daily_schedule(self, table, date):
"""Get daily schedule for t.cast channel."""
date_format = timezone.datetime.strftime(date, "%Y%m%d")
next_date = date + timezone.timedelta(days=1)
daily_schedule = []
# Get schedule
for hour in range(24):
date_hour_string = date_format + '{:02d}'.format(hour)
cell = table.find('td', id=date_hour_string)
if cell is None:
return None
schedules = cell.find_all('div', class_='con active')
for schedule in schedules:
if hour in range(self.start_hour):
# Next day's schedule.
daily_schedule.append(self.parse_schedule_item(schedule, next_date))
else:
daily_schedule.append(self.parse_schedule_item(schedule, date))
# Add to list
return daily_schedule
def test_access_is_forbidden_when_interviewer_for_already_passed_course(self):
old_app_info = ApplicationInfoFactory(
start_date=timezone.now() - timezone.timedelta(days=10),
end_date=timezone.now() - timezone.timedelta(days=5),
start_interview_date=timezone.now() - timezone.timedelta(days=4),
end_interview_date=timezone.now() - timezone.timedelta(days=2),
)
other_user = BaseUserFactory(password=self.test_password)
other_interviewer = Interviewer.objects.create_from_user(other_user)
add_course_to_interviewer_courses(interviewer=other_interviewer, course=old_app_info.course)
with self.login(email=other_interviewer.email, password=self.test_password):
response = self.get(self.url)
self.response_403(response)
def test_can_add_free_time_if_interviewer(self):
free_time_count = InterviewerFreeTime.objects.count()
start_time = faker.time_object()
end_time = (timezone.datetime.combine(timezone.now(), start_time) + timezone.timedelta(seconds=1)).time()
field_names = ('interview_time_length', 'break_time')
data = {
'date': (timezone.now() + timezone.timedelta(days=1)).date(),
'start_time': start_time,
'end_time': end_time,
field_names[0]: InterviewerFreeTime._meta.get_field(field_names[0]).get_default(),
field_names[1]: InterviewerFreeTime._meta.get_field(field_names[1]).get_default()
}
with self.login(email=self.interviewer.email, password=self.test_password):
response = self.post(self.url, data=data)
self.assertEqual(302, response.status_code)
self.assertEqual(free_time_count + 1, InterviewerFreeTime.objects.count())
self.assertIsNotNone(self.interviewer.free_time_slots)
def test_creates_competition_when_data_is_valid(self):
competition_count = Competition.objects.count()
start_date = timezone.now()
end_date = start_date + timezone.timedelta(days=2)
data = {
'name': faker.name(),
'start_date': start_date.date(),
'end_date': end_date.date(),
'slug_url': faker.slug()
}
with self.login(email=self.superuser.email, password=self.test_password):
response = self.post(url_name=self.url, data=data)
self.response_302(response)
self.assertEqual(competition_count + 1, Competition.objects.count())
def test_course_is_created_successfully_on_post(self):
self.assertEqual(0, Course.objects.count())
start_date = faker.date_object()
data = {
'name': faker.word(),
'start_date': start_date,
'end_date': start_date + timezone.timedelta(days=faker.pyint()),
'repository': faker.url(),
'video_channel': faker.url(),
'facebook_group': faker.url(),
'slug_url': faker.slug(),
}
with self.login(email=self.user.email, password=self.test_password):
response = self.post(self.url, data=data)
self.assertRedirects(response, expected_url=reverse('dashboard:education:user-course-detail',
kwargs={'course_id': Course.objects.last().id}))
self.assertEqual(1, Course.objects.count())
def test_create_application_info_creates_application_info_when_data_is_valid(self):
self.start_date = timezone.now().date() + timezone.timedelta(days=1)
self.end_date = timezone.now().date() + timezone.timedelta(days=2)
self.start_interview_date = timezone.now().date() + timezone.timedelta(days=1)
self.end_interview_date = timezone.now().date() + timezone.timedelta(days=2)
current_app_info_count = ApplicationInfo.objects.count()
create_application_info(start_date=self.start_date,
end_date=self.end_date,
course=self.course,
start_interview_date=self.start_interview_date,
end_interview_date=self.end_interview_date)
self.assertEqual(current_app_info_count + 1, ApplicationInfo.objects.count())
def test_post_creates_instance_when_data_is_valid(self):
add_teacher(course=self.course, teacher=self.teacher)
self.course.start_date = timezone.now().date() + timezone.timedelta(days=1)
self.course.end_date = timezone.now().date() + timezone.timedelta(days=2)
self.course.save()
current_app_info_count = ApplicationInfo.objects.count()
self.start_date = timezone.now().date() + timezone.timedelta(days=1)
self.end_date = timezone.now().date() + timezone.timedelta(days=2)
data = {
'start_date': self.start_date,
'end_date': self.end_date,
}
with self.login(email=self.user.email, password=self.test_password):
response = self.post(self.url, data=data)
self.assertRedirects(response,
expected_url=reverse('dashboard:education:user-course-detail',
kwargs={'course_id': self.course.id}))
self.assertEqual(current_app_info_count + 1, ApplicationInfo.objects.count())
def test_post_successfully_creates_application_when_apply_is_open(self):
self.app_info.start_date = timezone.now().date()
self.app_info.end_date = timezone.now().date() + timezone.timedelta(days=2)
self.app_info.save()
current_app_count = Application.objects.count()
data = {
'phone': faker.phone_number(),
'works_at': faker.job(),
'skype': faker.word()
}
with self.login(email=self.user.email, password=self.test_password):
response = self.post(self.url, data=data)
self.assertRedirects(response, expected_url=reverse('dashboard:applications:user-applications'))
self.assertEqual(current_app_count + 1, Application.objects.count())
def test_post_does_not_create_application_when_apply_is_closed(self):
self.app_info.start_date = timezone.now().date() - timezone.timedelta(days=2)
self.app_info.end_date = timezone.now().date() - timezone.timedelta(days=1)
self.app_info.save()
current_app_count = Application.objects.count()
data = {
'phone': faker.phone_number(),
'works_at': faker.job()
}
with self.login(email=self.user.email, password=self.test_password):
response = self.post(self.url, data=data)
self.assertFalse(response.context_data['form'].is_valid())
self.assertEqual(current_app_count, Application.objects.count())