def test_timezone_serializing():
"""
Serializing with timezones test
"""
tzs = dateutil.tz.tzical("test_files/timezones.ics")
pacific = tzs.get('US/Pacific')
cal = base.Component('VCALENDAR')
cal.setBehavior(icalendar.VCalendar2_0)
ev = cal.add('vevent')
ev.add('dtstart').value = datetime.datetime(2005, 10, 12, 9,
tzinfo=pacific)
evruleset = rruleset()
evruleset.rrule(rrule(WEEKLY, interval=2, byweekday=[2,4],
until=datetime.datetime(2005, 12, 15, 9)))
evruleset.rrule(rrule(MONTHLY, bymonthday=[-1,-5]))
evruleset.exdate(datetime.datetime(2005, 10, 14, 9, tzinfo=pacific))
ev.rruleset = evruleset
ev.add('duration').value = datetime.timedelta(hours=1)
apple = tzs.get('America/Montreal')
ev.dtstart.value = datetime.datetime(2005, 10, 12, 9, tzinfo=apple)
python类tz()的实例源码
def now():
return datetime.now(dateutil.tz.tzlocal())
def as_local(stamp):
return stamp.astimezone(dateutil.tz.tzlocal())
def security_credentials_role_name():
role_arn = _get_role_arn()
credentials = _credential_map.get(role_arn)
# Refresh credentials if going to expire soon.
now = datetime.datetime.now(tz=dateutil.tz.tzutc())
if not credentials or credentials['Expiration'] < now + _refresh_timeout:
try:
# Use any boto3 credential provider except the instance metadata provider.
botocore_session = botocore.session.Session()
botocore_session.get_component('credential_provider').remove('iam-role')
session = boto3.session.Session(botocore_session=botocore_session)
credentials = session.client('sts').assume_role(RoleArn=role_arn,
RoleSessionName="ectou-metadata")['Credentials']
credentials['LastUpdated'] = now
_credential_map[role_arn] = credentials
except Exception as e:
bottle.response.status = 404
bottle.response.content_type = 'text/plain' # EC2 serves json as text/plain
return json.dumps({
'Code': 'Failure',
'Message': e.message,
}, indent=2)
# Return current credential.
bottle.response.content_type = 'text/plain' # EC2 serves json as text/plain
return json.dumps({
'Code': 'Success',
'LastUpdated': _format_iso(credentials['LastUpdated']),
"Type": "AWS-HMAC",
'AccessKeyId': credentials['AccessKeyId'],
'SecretAccessKey': credentials['SecretAccessKey'],
'Token': credentials['SessionToken'],
'Expiration': _format_iso(credentials['Expiration'])
}, indent=2)
def datetime_now():
# type: (None) -> datetime.datetime
"""Return a timezone-aware datetime instance with local offset
:rtype: datetime.datetime
:return: datetime now with local tz
"""
return datetime.datetime.now(tz=dateutil.tz.tzlocal())
def _create_downloader_for_start(td):
d = ops.Downloader(mock.MagicMock(), mock.MagicMock(), mock.MagicMock())
d._cleanup_temporary_files = mock.MagicMock()
d._download_start = datetime.datetime.now(tz=dateutil.tz.tzlocal())
d._initialize_transfer_threads = mock.MagicMock()
d._general_options.concurrency.crypto_processes = 1
d._general_options.concurrency.md5_processes = 1
d._general_options.concurrency.disk_threads = 1
d._general_options.concurrency.transfer_threads = 1
d._general_options.resume_file = pathlib.Path(str(td.join('rf')))
d._spec.sources = []
d._spec.options = mock.MagicMock()
d._spec.options.chunk_size_bytes = 1
d._spec.options.mode = azmodels.StorageModes.Auto
d._spec.options.overwrite = True
d._spec.options.rename = False
d._spec.skip_on = mock.MagicMock()
d._spec.skip_on.md5_match = False
d._spec.skip_on.lmt_ge = False
d._spec.skip_on.filesize_match = False
d._spec.destination = mock.MagicMock()
d._spec.destination.path = pathlib.Path(str(td))
d._download_start_time = util.datetime_now()
d._pre_md5_skip_on_check = mock.MagicMock()
d._check_download_conditions = mock.MagicMock()
d._all_remote_files_processed = False
p = '/cont/remote/path'
asp = azops.SourcePath()
asp.add_path_with_storage_account(p, 'sa')
d._spec.sources.append(asp)
return d
def is_time_to_send_summary(self, bot, report_by):
tz = dateutil.tz.gettz(bot.plugin_config['timezone'])
report_by = dateutil.parser.parse(report_by).replace(tzinfo=tz)
now = datetime.now(dateutil.tz.tzlocal())
return now >= report_by
def is_last_call(self, bot, report_by):
tz = dateutil.tz.gettz(bot.plugin_config['timezone'])
report_by = dateutil.parser.parse(report_by).replace(tzinfo=tz)
last_call = dateutil.parser.parse(bot.plugin_config['last_call'])
last_call = timedelta(hours=last_call.hour, minutes=last_call.minute)
if not last_call:
return
now = datetime.now(dateutil.tz.tzlocal())
return report_by - now < last_call
def is_too_early_to_ask(self, bot, ask_earliest):
tz = dateutil.tz.gettz(bot.plugin_config['timezone'])
ask_earliest = dateutil.parser.parse(ask_earliest).replace(tzinfo=tz)
now = datetime.now(dateutil.tz.tzlocal())
return now < ask_earliest
def inject_globals():
import datetime
import dateutil.tz
import pytz
utc = datetime.datetime.now(tz=pytz.utc)
sa_tz = pytz.timezone('Africa/Johannesburg')
sa = utc.astimezone(sa_tz)
local_tz = dateutil.tz.tzlocal()
local = utc.astimezone(local_tz)
cti = utc.strftime('%Y-%m-%d %H:%M') + ' (UTC) • ' + sa.strftime('%H:%M (%Z)')
if local.tzname() not in ('UTC', sa.tzname()):
cti += ' • ' + local.strftime('%H:%M (%Z)')
vi = 'Librarian %s (%s)' % (app.config['_version_string'], app.config['_git_hash'])
lds_info = app.config.get('local_disk_staging')
if lds_info is not None:
staging_available = True
staging_dest_displayed = lds_info['displayed_dest']
staging_dest_path = lds_info['dest_prefix']
staging_username_placeholder = lds_info['username_placeholder']
else:
staging_available = False
staging_dest_displayed = None
staging_dest_path = None
staging_username_placeholder = None
return {
'current_time_info': cti,
'version_info': vi,
'staging_available': staging_available,
'staging_dest_displayed': staging_dest_displayed,
'staging_dest_path': staging_dest_path,
'staging_username_placeholder': staging_username_placeholder,
}
# JSON API
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def test_index_unique(self):
uniques = self.dups.index.unique()
expected = DatetimeIndex([datetime(2000, 1, 2), datetime(2000, 1, 3),
datetime(2000, 1, 4), datetime(2000, 1, 5)])
self.assertEqual(uniques.dtype, 'M8[ns]') # sanity
self.assertTrue(uniques.equals(expected))
self.assertEqual(self.dups.index.nunique(), 4)
# #2563
self.assertTrue(isinstance(uniques, DatetimeIndex))
dups_local = self.dups.index.tz_localize('US/Eastern')
dups_local.name = 'foo'
result = dups_local.unique()
expected = DatetimeIndex(expected).tz_localize('US/Eastern')
self.assertTrue(result.tz is not None)
self.assertEqual(result.name, 'foo')
self.assertTrue(result.equals(expected))
# NaT, note this is excluded
arr = [1370745748 + t for t in range(20)] + [iNaT]
idx = DatetimeIndex(arr * 3)
self.assertTrue(idx.unique().equals(DatetimeIndex(arr)))
self.assertEqual(idx.nunique(), 20)
self.assertEqual(idx.nunique(dropna=False), 21)
arr = [Timestamp('2013-06-09 02:42:28') + timedelta(seconds=t)
for t in range(20)] + [NaT]
idx = DatetimeIndex(arr * 3)
self.assertTrue(idx.unique().equals(DatetimeIndex(arr)))
self.assertEqual(idx.nunique(), 20)
self.assertEqual(idx.nunique(dropna=False), 21)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def test_recreate_from_data(self):
freqs = ['M', 'Q', 'A', 'D', 'B', 'BH', 'T', 'S', 'L', 'U', 'H', 'N',
'C']
for f in freqs:
org = DatetimeIndex(start='2001/02/01 09:00', freq=f, periods=1)
idx = DatetimeIndex(org, freq=f)
self.assertTrue(idx.equals(org))
org = DatetimeIndex(start='2001/02/01 09:00', freq=f,
tz='US/Pacific', periods=1)
idx = DatetimeIndex(org, freq=f, tz='US/Pacific')
self.assertTrue(idx.equals(org))
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def assert_range_equal(left, right):
assert (left.equals(right))
assert (left.freq == right.freq)
assert (left.tz == right.tz)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_timestamp_to_datetime(self):
tm._skip_if_no_pytz()
rng = date_range('20090415', '20090519', tz='US/Eastern')
stamp = rng[0]
dtval = stamp.to_pydatetime()
self.assertEqual(stamp, dtval)
self.assertEqual(stamp.tzinfo, dtval.tzinfo)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_timestamp_to_datetime_explicit_pytz(self):
tm._skip_if_no_pytz()
import pytz
rng = date_range('20090415', '20090519',
tz=pytz.timezone('US/Eastern'))
stamp = rng[0]
dtval = stamp.to_pydatetime()
self.assertEqual(stamp, dtval)
self.assertEqual(stamp.tzinfo, dtval.tzinfo)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_timestamp_to_datetime_explicit_dateutil(self):
tm._skip_if_windows_python_3()
tm._skip_if_no_dateutil()
from pandas.tslib import _dateutil_gettz as gettz
rng = date_range('20090415', '20090519', tz=gettz('US/Eastern'))
stamp = rng[0]
dtval = stamp.to_pydatetime()
self.assertEqual(stamp, dtval)
self.assertEqual(stamp.tzinfo, dtval.tzinfo)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_to_datetime_utc_is_true(self):
# See gh-11934
start = pd.Timestamp('2014-01-01', tz='utc')
end = pd.Timestamp('2014-01-03', tz='utc')
date_range = pd.bdate_range(start, end)
result = pd.to_datetime(date_range, utc=True)
expected = pd.DatetimeIndex(data=date_range)
tm.assert_index_equal(result, expected)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_to_datetime_tz_psycopg2(self):
# xref 8260
try:
import psycopg2
except ImportError:
raise nose.SkipTest("no psycopg2 installed")
# misc cases
tz1 = psycopg2.tz.FixedOffsetTimezone(offset=-300, name=None)
tz2 = psycopg2.tz.FixedOffsetTimezone(offset=-240, name=None)
arr = np.array([datetime(2000, 1, 1, 3, 0, tzinfo=tz1),
datetime(2000, 6, 1, 3, 0, tzinfo=tz2)],
dtype=object)
result = pd.to_datetime(arr, errors='coerce', utc=True)
expected = DatetimeIndex(['2000-01-01 08:00:00+00:00',
'2000-06-01 07:00:00+00:00'],
dtype='datetime64[ns, UTC]', freq=None)
tm.assert_index_equal(result, expected)
# dtype coercion
i = pd.DatetimeIndex([
'2000-01-01 08:00:00+00:00'
], tz=psycopg2.tz.FixedOffsetTimezone(offset=-300, name=None))
self.assertFalse(com.is_datetime64_ns_dtype(i))
# tz coerceion
result = pd.to_datetime(i, errors='coerce')
tm.assert_index_equal(result, i)
result = pd.to_datetime(i, errors='coerce', utc=True)
expected = pd.DatetimeIndex(['2000-01-01 13:00:00'],
dtype='datetime64[ns, UTC]')
tm.assert_index_equal(result, expected)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_to_datetime_freq(self):
xp = bdate_range('2000-1-1', periods=10, tz='UTC')
rs = xp.to_datetime()
self.assertEqual(xp.freq, rs.freq)
self.assertEqual(xp.tzinfo, rs.tzinfo)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_to_period_tz_pytz(self):
tm._skip_if_no_pytz()
from dateutil.tz import tzlocal
from pytz import utc as UTC
xp = date_range('1/1/2000', '4/1/2000').to_period()
ts = date_range('1/1/2000', '4/1/2000', tz='US/Eastern')
result = ts.to_period()[0]
expected = ts[0].to_period()
self.assertEqual(result, expected)
self.assertTrue(ts.to_period().equals(xp))
ts = date_range('1/1/2000', '4/1/2000', tz=UTC)
result = ts.to_period()[0]
expected = ts[0].to_period()
self.assertEqual(result, expected)
self.assertTrue(ts.to_period().equals(xp))
ts = date_range('1/1/2000', '4/1/2000', tz=tzlocal())
result = ts.to_period()[0]
expected = ts[0].to_period()
self.assertEqual(result, expected)
self.assertTrue(ts.to_period().equals(xp))