def datetime_from_simple_time(cls, el, datetime_field):
"""
The function takes a datetime_fieldname and
returns a datetime aware of the timezone.
Returns None if fields do not exist in el.
"""
if (datetime_field not in el or
el[datetime_field] is None or
el[datetime_field] == '0000-00-00 00:00:00' or
el[datetime_field] == ''):
return None
else:
return timezone.make_aware(
datetime.datetime.strptime(u"{} UTC".format(el[datetime_field]), "%Y-%m-%d %H:%M:%S %Z"),
pytz.utc)
python类utc()的实例源码
def _create_daily_stats(self, perfs):
# create daily and cumulative stats dataframe
daily_perfs = []
# TODO: the loop here could overwrite expected properties
# of daily_perf. Could potentially raise or log a
# warning.
for perf in perfs:
if 'daily_perf' in perf:
perf['daily_perf'].update(
perf['daily_perf'].pop('recorded_vars')
)
perf['daily_perf'].update(perf['cumulative_risk_metrics'])
daily_perfs.append(perf['daily_perf'])
else:
self.risk_report = perf
daily_dts = [np.datetime64(perf['period_close'], utc=True)
for perf in daily_perfs]
daily_stats = pd.DataFrame(daily_perfs, index=daily_dts)
return daily_stats
def on_dt_changed(self, dt):
"""
Callback triggered by the simulation loop whenever the current dt
changes.
Any logic that should happen exactly once at the start of each datetime
group should happen here.
"""
assert isinstance(dt, datetime), \
"Attempt to set algorithm's current time with non-datetime"
assert dt.tzinfo == pytz.utc, \
"Algorithm expects a utc datetime"
self.datetime = dt
self.perf_tracker.set_date(dt)
self.blotter.set_date(dt)
def create_test_panel_ohlc_source(sim_params, env):
start = sim_params.first_open \
if sim_params else pd.datetime(1990, 1, 3, 0, 0, 0, 0, pytz.utc)
end = sim_params.last_close \
if sim_params else pd.datetime(1990, 1, 8, 0, 0, 0, 0, pytz.utc)
index = env.days_in_range(start, end)
price = np.arange(0, len(index)) + 100
high = price * 1.05
low = price * 0.95
open_ = price + .1 * (price % 2 - .5)
volume = np.ones(len(index)) * 1000
arbitrary = np.ones(len(index))
df = pd.DataFrame({'price': price,
'high': high,
'low': low,
'open': open_,
'volume': volume,
'arbitrary': arbitrary},
index=index)
panel = pd.Panel.from_dict({0: df})
return DataPanelSource(panel), panel
def _coerce_datetime(maybe_dt):
if isinstance(maybe_dt, datetime.datetime):
return maybe_dt
elif isinstance(maybe_dt, datetime.date):
return datetime.datetime(
year=maybe_dt.year,
month=maybe_dt.month,
day=maybe_dt.day,
tzinfo=pytz.utc,
)
elif isinstance(maybe_dt, (tuple, list)) and len(maybe_dt) == 3:
year, month, day = maybe_dt
return datetime.datetime(
year=year,
month=month,
day=day,
tzinfo=pytz.utc,
)
else:
raise TypeError('Cannot coerce %s into a datetime.datetime'
% type(maybe_dt).__name__)
def test_generator_dates(self):
"""
Ensure the pipeline of generators are in sync, at least as far as
their current dates.
"""
sim_params = factory.create_simulation_parameters(
start=datetime(2011, 7, 30, tzinfo=pytz.utc),
end=datetime(2012, 7, 30, tzinfo=pytz.utc),
env=self.env,
)
algo = TestAlgo(self, sim_params=sim_params, env=self.env)
trade_source = factory.create_daily_trade_source(
[8229],
sim_params,
env=self.env,
)
algo.set_sources([trade_source])
gen = algo.get_generator()
self.assertTrue(list(gen))
self.assertTrue(algo.slippage.latest_date)
self.assertTrue(algo.latest_date)
def test_progress(self):
"""
Ensure the pipeline of generators are in sync, at least as far as
their current dates.
"""
sim_params = factory.create_simulation_parameters(
start=datetime(2008, 1, 1, tzinfo=pytz.utc),
end=datetime(2008, 1, 5, tzinfo=pytz.utc),
env=self.env,
)
algo = TestAlgo(self, sim_params=sim_params, env=self.env)
trade_source = factory.create_daily_trade_source(
[8229],
sim_params,
env=self.env,
)
algo.set_sources([trade_source])
gen = algo.get_generator()
results = list(gen)
self.assertEqual(results[-2]['progress'], 1.0)
def test_serialization(self):
start_dt = datetime(year=2008,
month=10,
day=9,
tzinfo=pytz.utc)
end_dt = datetime(year=2008,
month=10,
day=16,
tzinfo=pytz.utc)
sim_params = SimulationParameters(
period_start=start_dt,
period_end=end_dt,
env=self.env,
)
perf_tracker = perf.PerformanceTracker(
sim_params, env=self.env
)
check_perf_tracker_serialization(perf_tracker)
def test_yahoo_bars_to_panel_source(self):
env = TradingEnvironment()
finder = AssetFinder(env.engine)
stocks = ['AAPL', 'GE']
env.write_data(equities_identifiers=stocks)
start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)
end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)
data = factory.load_bars_from_yahoo(stocks=stocks,
indexes={},
start=start,
end=end)
check_fields = ['sid', 'open', 'high', 'low', 'close',
'volume', 'price']
copy_panel = data.copy()
sids = finder.map_identifier_index_to_sids(
data.items, data.major_axis[0]
)
copy_panel.items = sids
source = DataPanelSource(copy_panel)
for event in source:
for check_field in check_fields:
self.assertIn(check_field, event)
self.assertTrue(isinstance(event['volume'], (integer_types)))
self.assertTrue(event['sid'] in sids)
def test_load_bars_from_yahoo(self):
stocks = ['AAPL', 'GE']
start = pd.datetime(1993, 1, 1, 0, 0, 0, 0, pytz.utc)
end = pd.datetime(2002, 1, 1, 0, 0, 0, 0, pytz.utc)
data = load_bars_from_yahoo(stocks=stocks, start=start, end=end)
assert data.major_axis[0] == pd.Timestamp('1993-01-04 00:00:00+0000')
assert data.major_axis[-1] == pd.Timestamp('2001-12-31 00:00:00+0000')
for stock in stocks:
assert stock in data.items
for ohlc in ['open', 'high', 'low', 'close', 'volume', 'price']:
assert ohlc in data.minor_axis
np.testing.assert_raises(
AssertionError, load_bars_from_yahoo, stocks=stocks,
start=end, end=start
)
def test_day_after_thanksgiving(self):
early_closes = tradingcalendar.get_early_closes(
tradingcalendar.start,
tradingcalendar.end.replace(year=tradingcalendar.end.year + 1)
)
# November 2012
# Su Mo Tu We Th Fr Sa
# 1 2 3
# 4 5 6 7 8 9 10
# 11 12 13 14 15 16 17
# 18 19 20 21 22 23 24
# 25 26 27 28 29 30
fourth_friday = datetime.datetime(2012, 11, 23, tzinfo=pytz.utc)
self.assertIn(fourth_friday, early_closes)
# November 2013
# Su Mo Tu We Th Fr Sa
# 1 2
# 3 4 5 6 7 8 9
# 10 11 12 13 14 15 16
# 17 18 19 20 21 22 23
# 24 25 26 27 28 29 30
fifth_friday = datetime.datetime(2013, 11, 29, tzinfo=pytz.utc)
self.assertIn(fifth_friday, early_closes)
def test_partial_month(self):
start = datetime.datetime(
year=1991,
month=1,
day=1,
hour=0,
minute=0,
tzinfo=pytz.utc)
# 1992 and 1996 were leap years
total_days = 365 * 5 + 2
end = start + datetime.timedelta(days=total_days)
sim_params90s = SimulationParameters(
period_start=start,
period_end=end,
env=self.env,
)
returns = factory.create_returns_from_range(sim_params90s)
returns = returns[:-10] # truncate the returns series to end mid-month
metrics = risk.RiskReport(returns, sim_params90s, env=self.env)
total_months = 60
self.check_metrics(metrics, total_months, start)
def use_testing_credentials(args, credentials):
print("Skipping AWS API calls because AWSMFA_TESTING_MODE is set.",
file=sys.stderr)
# AWS returns offset-aware UTC times, so we fake that in order to
# verify consistent code paths between py2 and py3 datetime.
fake_expiration = (datetime.datetime.now(tz=pytz.utc) +
datetime.timedelta(minutes=5))
fake_credentials = {
'AccessKeyId': credentials.get(args.identity_profile,
'aws_access_key_id'),
'SecretAccessKey': credentials.get(args.identity_profile,
'aws_secret_access_key'),
'SessionToken': "420",
'Expiration': fake_expiration,
}
print_expiration_time(fake_expiration)
update_credentials_file(args.aws_credentials,
args.target_profile,
args.identity_profile,
credentials,
fake_credentials)
def add_emails(self, topic_ids):
'''
add all emails and email_blobs
'''
# loop over emails
for email in range(0, self.num_emails): # loop over emails
em = self.metadata[email]
dtime_orig = dateparse(em['Date'])
dtime_utc = dtime_orig.astimezone(pytz.utc)
values = nparray([em['Subject'], em['From'], em['To'], em['Cc'],
em['Bcc'], dtime_orig, dtime_utc])
values = nparray([value.replace("'", " ") if
(value and isinstance(value, str)) else value for value in values])
rows = nparray(['subject', 'sender', 'receiver', 'cc', 'bcc',
'send_time', 'send_time_utc'])
bool = nparray([True if a else False for a in values])
self.add_email('email', rows[bool], values[bool])
for idx2, t_id in enumerate(topic_ids): # loop over topics
rows = nparray(['topic_id', 'topic_probability'])
values = nparray([t_id, self.email_prob[idx2, email]])
self.add_blob('email_blob', rows, values)
test_cache_project_stats.py 文件源码
项目:FRG-Crowdsourcing
作者: 97amarnathk
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_stats_hours(self):
"""Test CACHE PROJECT STATS hours works."""
pr = ProjectFactory.create()
task = TaskFactory.create(n_answers=1)
today = datetime.now(pytz.utc)
TaskFactory.create()
TaskRunFactory.create(project=pr, task=task)
AnonymousTaskRunFactory.create(project=pr)
hours, hours_anon, hours_auth, max_hours, \
max_hours_anon, max_hours_auth = stats_hours(pr.id)
assert len(hours) == 24, len(hours)
assert hours[today.strftime('%H')] == 2, hours[today.strftime('%H')]
assert hours_anon[today.strftime('%H')] == 1, hours_anon[today.strftime('%H')]
assert hours_auth[today.strftime('%H')] == 1, hours_auth[today.strftime('%H')]
assert max_hours == 2
assert max_hours_anon == 1
assert max_hours_auth == 1
test_cache_project_stats.py 文件源码
项目:FRG-Crowdsourcing
作者: 97amarnathk
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_stats_hours_with_period(self):
"""Test CACHE PROJECT STATS hours with period works."""
pr = ProjectFactory.create()
today = datetime.now(pytz.utc)
d = date.today() - timedelta(days=6)
task = TaskFactory.create(n_answers=1, created=d)
TaskRunFactory.create(project=pr, task=task, created=d, finish_time=d)
d = date.today() - timedelta(days=16)
AnonymousTaskRunFactory.create(project=pr, created=d, finish_time=d)
hours, hours_anon, hours_auth, max_hours, \
max_hours_anon, max_hours_auth = stats_hours(pr.id)
assert len(hours) == 24, len(hours)
# We use 00 as the timedelta sets the hour to 00
assert hours['00'] == 1, hours[today.strftime('%H')]
assert hours_anon['00'] == 0, hours_anon[today.strftime('%H')]
assert hours_auth['00'] == 1, hours_auth[today.strftime('%H')]
assert max_hours == 1
assert max_hours_anon is None
assert max_hours_auth == 1
def test_fromutc(self):
# naive datetime.
dt1 = datetime(2011, 10, 31)
# localized datetime, same timezone.
dt2 = self.tz.localize(dt1)
# Both should give the same results. Note that the standard
# Python tzinfo.fromutc() only supports the second.
for dt in [dt1, dt2]:
loc_dt = self.tz.fromutc(dt)
loc_dt2 = pytz.utc.localize(dt1).astimezone(self.tz)
self.assertEqual(loc_dt, loc_dt2)
# localized datetime, different timezone.
new_tz = pytz.timezone('Europe/Paris')
self.assertTrue(self.tz is not new_tz)
dt3 = new_tz.localize(dt1)
self.assertRaises(ValueError, self.tz.fromutc, dt3)
def test_encode_datetime():
now = datetime.utcnow()
now_utc = now.replace(tzinfo=pytz.utc)
stripped = datetime(*now.timetuple()[:3])
serialized = loads(dumps({
'datetime': now,
'tz': now_utc,
'date': now.date(),
'time': now.time()},
))
assert serialized == {
'datetime': now.isoformat(),
'tz': '{0}Z'.format(now_utc.isoformat().split('+', 1)[0]),
'time': now.time().isoformat(),
'date': stripped.isoformat(),
}
def _sync_character(server, character, data):
has_no_changes = (
(data['x'] == character.x) or
(data['y'] == character.y) or
(data['z'] == character.y))
last_online = (datetime
.utcfromtimestamp(data['last_online'])
.replace(tzinfo=pytz.utc))
character.server = server
character.conan_id = data['conan_id']
character.name = data['name']
character.level = data['level']
character.is_online = data['is_online']
character.steam_id = data['steam_id']
character.last_killed_by = data['last_killed_by']
character.x = data['x']
character.y = data['y']
character.z = data['z']
character.last_online = last_online
character.save()
return not has_no_changes
def test_to_dict(self):
instance = SimpleModel(date_field='1973-12-06', int_field='100', float_field='7')
self.assertDictEqual(instance.to_dict(), {
"date_field": datetime.date(1973, 12, 6),
"int_field": 100,
"float_field": 7.0,
"datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
"alias_field": 0.0,
'str_field': 'dozo'
})
self.assertDictEqual(instance.to_dict(include_readonly=False), {
"date_field": datetime.date(1973, 12, 6),
"int_field": 100,
"float_field": 7.0,
"datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
'str_field': 'dozo'
})
self.assertDictEqual(
instance.to_dict(include_readonly=False, field_names=('int_field', 'alias_field', 'datetime_field')), {
"int_field": 100,
"datetime_field": datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc)
})
def test_datetime_field(self):
f = DateTimeField()
epoch = datetime(1970, 1, 1, tzinfo=pytz.utc)
# Valid values
for value in (date(1970, 1, 1), datetime(1970, 1, 1), epoch,
epoch.astimezone(pytz.timezone('US/Eastern')), epoch.astimezone(pytz.timezone('Asia/Jerusalem')),
'1970-01-01 00:00:00', '1970-01-17 00:00:17', '0000-00-00 00:00:00', 0,
'2017-07-26T08:31:05', '2017-07-26T08:31:05Z', '2017-07-26 08:31',
'2017-07-26T13:31:05+05', '2017-07-26 13:31:05+0500'):
dt = f.to_python(value, pytz.utc)
self.assertEquals(dt.tzinfo, pytz.utc)
# Verify that conversion to and from db string does not change value
dt2 = f.to_python(f.to_db_string(dt, quote=False), pytz.utc)
self.assertEquals(dt, dt2)
# Invalid values
for value in ('nope', '21/7/1999', 0.5,
'2017-01 15:06:00', '2017-01-01X15:06:00', '2017-13-01T15:06:00'):
with self.assertRaises(ValueError):
f.to_python(value, pytz.utc)
def test_date_field(self):
f = DateField()
epoch = date(1970, 1, 1)
# Valid values
for value in (datetime(1970, 1, 1), epoch, '1970-01-01', '0000-00-00', 0):
d = f.to_python(value, pytz.utc)
self.assertEquals(d, epoch)
# Verify that conversion to and from db string does not change value
d2 = f.to_python(f.to_db_string(d, quote=False), pytz.utc)
self.assertEquals(d, d2)
# Invalid values
for value in ('nope', '21/7/1999', 0.5):
with self.assertRaises(ValueError):
f.to_python(value, pytz.utc)
# Range check
for value in (date(1900, 1, 1), date(2900, 1, 1)):
with self.assertRaises(ValueError):
f.validate(value)
def from_tsv(cls, line, field_names=None, timezone_in_use=pytz.utc, database=None):
'''
Create a model instance from a tab-separated line. The line may or may not include a newline.
The `field_names` list must match the fields defined in the model, but does not have to include all of them.
If omitted, it is assumed to be the names of all fields in the model, in order of definition.
- `line`: the TSV-formatted data.
- `field_names`: names of the model fields in the data.
- `timezone_in_use`: the timezone to use when parsing dates and datetimes.
- `database`: if given, sets the database that this instance belongs to.
'''
from six import next
field_names = field_names or [name for name, field in cls._fields]
values = iter(parse_tsv(line))
kwargs = {}
for name in field_names:
field = getattr(cls, name)
kwargs[name] = field.to_python(next(values), timezone_in_use)
obj = cls(**kwargs)
if database is not None:
obj.set_database(database)
return obj
def filter(self, message):
# First filter by weekday: we don't work on weekends
date = TZ.normalize(message.date.replace(tzinfo=pytz.utc))
if not is_workday(date):
return False
# Then filter by time: we work in range of [9.am, 18.pm]
if date.hour < START or date.hour >= END:
return False
# Then filter by message text
text = message.text
if self._bad_keyword_detect(text) or self._keyword_detect(text):
return True
return False
def workday_end_time(bot, update):
bot.sendChatAction(
chat_id=update.message.chat_id, action=ChatAction.TYPING
)
text_templates = (
'{}?????????????',
'?????{}',
'??{}????',
'??????{}',
'???{}????',
'????{}',
'??????: {}',
)
now = TZ.normalize(update.message.date.replace(tzinfo=pytz.utc))
end_time = TZ.localize(datetime(now.year, now.month, now.day, hour=18))
duration = end_time - now
hour = duration.seconds // 3600
minute = (duration.seconds % 3600) // 60
time_remaining = ' {} ??'.format(hour) if hour else ''
time_remaining += ' {} ??'.format(minute)
text = random.choice(text_templates).format(time_remaining)
update.message.reply_text(text, quote=True)
def test_serial():
assert s.serialize_value(None) == 'x'
assert s.serialize_value(True) == 'true'
assert s.serialize_value(False) == 'false'
assert s.serialize_value(5) == 'i:5'
assert s.serialize_value(5.0) == 'f:5.0'
assert s.serialize_value(decimal.Decimal('5.5')) == 'n:5.5'
assert s.serialize_value('abc') == 's:abc'
assert s.serialize_value(b'abc') == 'b:YWJj'
assert s.serialize_value(b'abc') == 'b:YWJj'
assert s.serialize_value(datetime.date(2007, 12, 5)) == 'd:2007-12-05'
assert s.serialize_value(datetime.datetime(2007, 12, 5, 12, 30, 30, tzinfo=utc)) \
== 'dt:2007-12-05 12:30:30+00:00'
assert s.serialize_value(datetime.time(12, 34, 56)) == 't:12:34:56'
with raises(NotImplementedError):
s.serialize_value(csv.reader)
def test_unserial():
def twoway(x):
assert s.unserialize_value(s.serialize_value(x)) == x
twoway(None)
twoway(True)
twoway(False)
twoway(5)
twoway(5.0)
twoway(decimal.Decimal('5.5'))
twoway('abc')
twoway(b'abc')
twoway(b'abc')
twoway(datetime.date(2007, 12, 5))
twoway(datetime.datetime(2007, 12, 5, 12, 30, 30, tzinfo=utc))
twoway(Z('abc'))
with raises(ValueError):
s.unserialize_value('zzzz:abc')
def utc_week_limits(utc_dt):
"""Returns US/Pacific start (12:00 am Sunday) and end (11:59 pm Saturday) of the week containing utc_dt, in UTC."""
local_now = utc_dt.replace(tzinfo=pytz.utc).astimezone(pytz.timezone('US/Pacific'))
local_week_start = local_now - timedelta(
days=local_now.weekday() + 1,
hours=local_now.hour,
minutes=local_now.minute,
seconds=local_now.second,
microseconds=local_now.microsecond,
)
local_week_end = local_week_start + timedelta(days=7, minutes=-1)
utc_week_start = local_week_start.astimezone(pytz.utc).replace(tzinfo=None)
utc_week_end = local_week_end.astimezone(pytz.utc).replace(tzinfo=None)
return (utc_week_start, utc_week_end)
patient_evaluation_report.py 文件源码
项目:health-mosconi
作者: GNUHealth-Mosconi
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def parse(cls, report, objects, data, localcontext):
Company = Pool().get('company.company')
timezone = None
company_id = Transaction().context.get('company')
if company_id:
company = Company(company_id)
if company.timezone:
timezone = pytz.timezone(company.timezone)
dt = datetime.now()
localcontext['print_date'] = datetime.astimezone(dt.replace(
tzinfo=pytz.utc), timezone)
localcontext['print_time'] = localcontext['print_date'].time()
return super(PatientEvaluationReport, cls).parse(report, objects, data,
localcontext)
def get_date_ymd(dt):
"""Get a datetime from a string 'Year-month-day'
Args:
dt (str): a date
Returns:
datetime: a datetime object
"""
assert dt
if isinstance(dt, datetime):
return as_utc(dt)
if dt == 'today':
today = datetime.utcnow()
return pytz.utc.localize(datetime(today.year, today.month, today.day))
elif dt == 'tomorrow':
tomorrow = datetime.utcnow() + timedelta(1)
return pytz.utc.localize(datetime(tomorrow.year, tomorrow.month, tomorrow.day))
elif dt == 'yesterday':
yesterday = datetime.utcnow() - timedelta(1)
return pytz.utc.localize(datetime(yesterday.year, yesterday.month, yesterday.day))
return as_utc(dateutil.parser.parse(dt))