def test_is_time_to_run_after_midnight_and_after_late_metric_slack_time(
self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=0,
minute=10, second=0, microsecond=1)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(hours=-1)))
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
python类datetime()的实例源码
def test_same_time_different_day_behaviour(self):
check_time = datetime.datetime(
year=2016, month=11, day=7, hour=22,
minute=10, second=0, microsecond=1)
PreHourlyProcessorUtil.get_data_provider().set_last_processed(
date_time=(check_time + datetime.timedelta(days=-1)))
self.assertTrue(PreHourlyProcessorUtil.is_time_to_run(check_time))
def test_crontab():
c = Crontab()
c.add('boo')
c.add('foo', 0)
c.add('bar', [1, 3], -5, -1, -1, 0)
assert c.actions(0, 1, 1, 1, 1) == {'boo', 'foo'}
assert c.actions(1, 1, 1, 1, 1) == {'boo'}
assert c.actions(1, 5, 1, 1, 7) == {'boo', 'bar'}
assert c.actions(3, 5, 1, 1, 7) == {'boo', 'bar'}
ts = mktime(datetime(2016, 1, 17, 5, 1).timetuple())
assert c.actions_ts(ts) == {'boo', 'bar'}
def _get_extensions(self):
pathname = os.path.join(self.dirname, self.filename)
name_ver = '%s-%s' % (self.name, self.version)
info_dir = '%s.dist-info' % name_ver
arcname = posixpath.join(info_dir, 'EXTENSIONS')
wrapper = codecs.getreader('utf-8')
result = []
with ZipFile(pathname, 'r') as zf:
try:
with zf.open(arcname) as bf:
wf = wrapper(bf)
extensions = json.load(wf)
cache = self._get_dylib_cache()
prefix = cache.prefix_to_dir(pathname)
cache_base = os.path.join(cache.base, prefix)
if not os.path.isdir(cache_base):
os.makedirs(cache_base)
for name, relpath in extensions.items():
dest = os.path.join(cache_base, convert_path(relpath))
if not os.path.exists(dest):
extract = True
else:
file_time = os.stat(dest).st_mtime
file_time = datetime.datetime.fromtimestamp(file_time)
info = zf.getinfo(relpath)
wheel_time = datetime.datetime(*info.date_time)
extract = wheel_time > file_time
if extract:
zf.extract(relpath, cache_base)
result.append((name, dest))
except KeyError:
pass
return result
def _get_x509_days_left(x509):
date_fmt = '%Y%m%d%H%M%SZ'
current_datetime = datetime.datetime.utcnow()
not_after = time.strptime(x509.get_notAfter(), date_fmt)
not_before = time.strptime(x509.get_notBefore(), date_fmt)
ret = {'not_after': (datetime.datetime(*not_after[:6]) - current_datetime).days,
'not_before': (datetime.datetime(*not_before[:6]) - current_datetime).days}
return ret
def time_combined(year,month,day,hour,minute,meridiem):
"""
Time is tricky. So I am following the simple rules;
12:** AM will have the hour changed to 0
1:** AM will not have the hour changed
12:** PM will not have the hour changed
1:** PM will have the hour changed by adding 12
From these simple points, I have constructed the following
if statements to take control of the correct hour.
"""
if meridiem == "AM":
if hour == 12:
hour = 0
else:
if hour < 12:
hour = hour + 12
# Create the final start/end date fields
return datetime.datetime(
year,
month,
day,
hour,
minute
)
def parse_neuralynx_time_string(self, time_string):
# Parse a datetime object from the idiosyncratic time string in Neuralynx file headers
try:
tmp_date = [int(x) for x in time_string.split()[4].split('/')]
tmp_time = [int(x) for x in time_string.split()[-1].replace('.', ':').split(':')]
tmp_microsecond = tmp_time[3] * 1000
except:
warnings.warn('Unable to parse time string from Neuralynx header: ' + time_string)
return None
else:
return datetime.datetime(tmp_date[2], tmp_date[0], tmp_date[1], # Year, month, day
tmp_time[0], tmp_time[1], tmp_time[2], # Hour, minute, second
tmp_microsecond)
def test_get_vacations_view_is_working_properly(self):
"""testing if GET: /api/users/{id}/vacations view is working properly
"""
from stalker import db, Vacation
import datetime
vac1 = Vacation(
user=self.test_user1,
start=datetime.datetime(2016, 4, 24, 0, 0),
end=datetime.datetime(2016, 4, 28, 0, 0)
)
vac2 = Vacation(
user=self.test_user1,
start=datetime.datetime(2016, 7, 1, 0, 0),
end=datetime.datetime(2016, 7, 8, 0, 0)
)
db.DBSession.add_all([vac1, vac2])
db.DBSession.flush()
import transaction
transaction.commit()
from stalker import User
user1 = User.query.filter(User.login == self.test_user1.login).first()
response = self.test_app.get(
'/api/users/%s/vacations' % self.test_user1.id
)
self.assertEqual(
sorted(response.json_body),
sorted([
{
'id': v.id,
'$ref': '/api/vacations/%s' % v.id,
'name': v.name,
'entity_type': v.entity_type
} for v in [user1.vacations[0], user1.vacations[1]]
])
)
# TASKS
def test_update_entity_is_working_properly(self):
"""testing if update_entity() method is working properly
"""
# create a time log
import datetime
start = datetime.datetime(2016, 7, 26, 16)
end = datetime.datetime(2016, 7, 26, 17)
new_end = datetime.datetime(2016, 7, 26, 18)
from stalker import db, TimeLog
db.DBSession.flush()
db.DBSession.commit()
t1 = TimeLog(
task=self.test_task1,
resource=self.test_user1,
start=start,
end=end,
created_by=self.test_user2
)
db.DBSession.add(t1)
db.DBSession.commit()
from stalker_pyramid.testing import DummyRequest, DummyMultiDict
request = DummyRequest()
request.matchdict['id'] = t1.id
request.params = DummyMultiDict()
from stalker_pyramid.views import EntityViewBase
request.params['end'] = \
EntityViewBase.milliseconds_since_epoch(new_end)
self.patch_logged_in_user(request)
time_log_view = time_log.TimeLogViews(request)
response = time_log_view.update_entity()
t1_db = TimeLog.query.filter(TimeLog.name == t1.name).first()
self.assertEqual(t1_db.end, new_end)
def test_delete_entity_is_working_properly(self):
"""testing if delete_entity() method is working properly
"""
# create a time log
import datetime
start = datetime.datetime(2016, 7, 26, 16)
end = datetime.datetime(2016, 7, 26, 17)
from stalker import db, TimeLog
db.DBSession.flush()
db.DBSession.commit()
t1 = TimeLog(
task=self.test_task1,
resource=self.test_user1,
start=start,
end=end,
created_by=self.test_user2
)
db.DBSession.add(t1)
db.DBSession.commit()
from stalker_pyramid.testing import DummyRequest
request = DummyRequest()
request.matchdict['id'] = t1.id
time_log_view = time_log.TimeLogViews(request)
response = time_log_view.delete_entity()
self.assertIsNone(
TimeLog.query
.filter(TimeLog.task == self.test_task1)
.filter(TimeLog.resource == self.test_user1)
.first()
)
def test_update_entity_is_working_properly_with_post(self):
"""testing if POST: /api/time_logs/{id} view is working properly
"""
# create a time log
import datetime
start = datetime.datetime(2016, 7, 26, 16)
end = datetime.datetime(2016, 7, 26, 17)
new_end = datetime.datetime(2016, 7, 26, 18)
from stalker import db, TimeLog
db.DBSession.flush()
db.DBSession.commit()
t1 = TimeLog(
task=self.test_task1,
resource=self.test_user1,
start=start,
end=end,
created_by=self.test_user2
)
db.DBSession.add(t1)
db.DBSession.commit()
from stalker_pyramid.views import EntityViewBase
self.admin_login()
response = self.test_app.post(
'/api/time_logs/%s' % t1.id,
params={
'end': EntityViewBase.milliseconds_since_epoch(new_end)
},
status=200
)
t1_db = TimeLog.query.filter(TimeLog.name == t1.name).first()
self.assertEqual(t1_db.end, new_end)
def test_delete_entity_is_working_properly(self):
"""testing if DELETE: /api/time_logs/{id} view is working properly
"""
# create a time log
import datetime
start = datetime.datetime(2016, 7, 26, 16)
end = datetime.datetime(2016, 7, 26, 17)
from stalker import db, TimeLog
db.DBSession.flush()
db.DBSession.commit()
t1 = TimeLog(
task=self.test_task1,
resource=self.test_user1,
start=start,
end=end,
created_by=self.test_user2
)
db.DBSession.add(t1)
db.DBSession.commit()
response = self.test_app.delete(
'/api/time_logs/%s' % t1.id,
status=200
)
self.assertIsNone(
TimeLog.query
.filter(TimeLog.task == self.test_task1)
.filter(TimeLog.resource == self.test_user1)
.first()
)
def test_create_entity_with_invalid_user_id(self):
"""testing if create_entity() method is working properly with invalid
user_id parameter
"""
import datetime
start = datetime.datetime(2016, 4, 22, 10)
end = datetime.datetime(2016, 4, 22, 16)
from stalker_pyramid.testing import DummyRequest, DummyMultiDict
from stalker_pyramid.views import EntityViewBase
request = DummyRequest()
request.params = DummyMultiDict()
request.params['user_id'] = -1
request.params['start'] = \
EntityViewBase.milliseconds_since_epoch(start)
request.params['end'] = EntityViewBase.milliseconds_since_epoch(end)
vacation_views = vacation.VacationViews(request)
from pyramid.httpexceptions import HTTPServerError
with self.assertRaises(HTTPServerError) as cm:
vacation_views.create_entity()
self.assertEqual(
str(cm.exception),
'Missing "user_id" parameter'
)