def test_generate_signature(defconfig):
kwargs = dict(
method='GET',
version=defconfig.version,
endpoint=defconfig.endpoint,
date=datetime.now(tzutc()),
request_path='/path/to/api/',
content=b'"test data"',
content_type='application/json',
access_key=defconfig.access_key,
secret_key=defconfig.secret_key,
hash_type='md5'
)
headers, signature = generate_signature(**kwargs)
assert kwargs['hash_type'].upper() in headers['Authorization']
assert kwargs['access_key'] in headers['Authorization']
assert signature in headers['Authorization']
python类tzutc()的实例源码
def testTzAll(self):
from dateutil.tz import tzutc
from dateutil.tz import tzoffset
from dateutil.tz import tzlocal
from dateutil.tz import tzfile
from dateutil.tz import tzrange
from dateutil.tz import tzstr
from dateutil.tz import tzical
from dateutil.tz import gettz
from dateutil.tz import tzwin
from dateutil.tz import tzwinlocal
tz_all = ["tzutc", "tzoffset", "tzlocal", "tzfile", "tzrange",
"tzstr", "tzical", "gettz"]
tz_all += ["tzwin", "tzwinlocal"] if sys.platform.startswith("win") else []
lvars = locals()
for var in tz_all:
self.assertIsNot(lvars[var], None)
def testFoldPositiveUTCOffset(self):
# Test that we can resolve ambiguous times
tzname = self._get_tzname('Australia/Sydney')
with self._gettz_context(tzname):
SYD0 = self.gettz(tzname)
SYD1 = self.gettz(tzname)
t0_u = datetime(2012, 3, 31, 15, 30, tzinfo=tz.tzutc()) # AEST
t1_u = datetime(2012, 3, 31, 16, 30, tzinfo=tz.tzutc()) # AEDT
# Using fresh tzfiles
t0_syd0 = t0_u.astimezone(SYD0)
t1_syd1 = t1_u.astimezone(SYD1)
self.assertEqual(t0_syd0.replace(tzinfo=None),
datetime(2012, 4, 1, 2, 30))
self.assertEqual(t1_syd1.replace(tzinfo=None),
datetime(2012, 4, 1, 2, 30))
self.assertEqual(t0_syd0.utcoffset(), timedelta(hours=11))
self.assertEqual(t1_syd1.utcoffset(), timedelta(hours=10))
def testGapPositiveUTCOffset(self):
# Test that we don't have a problem around gaps.
tzname = self._get_tzname('Australia/Sydney')
with self._gettz_context(tzname):
SYD0 = self.gettz(tzname)
SYD1 = self.gettz(tzname)
t0_u = datetime(2012, 10, 6, 15, 30, tzinfo=tz.tzutc()) # AEST
t1_u = datetime(2012, 10, 6, 16, 30, tzinfo=tz.tzutc()) # AEDT
# Using fresh tzfiles
t0 = t0_u.astimezone(SYD0)
t1 = t1_u.astimezone(SYD1)
self.assertEqual(t0.replace(tzinfo=None),
datetime(2012, 10, 7, 1, 30))
self.assertEqual(t1.replace(tzinfo=None),
datetime(2012, 10, 7, 3, 30))
self.assertEqual(t0.utcoffset(), timedelta(hours=10))
self.assertEqual(t1.utcoffset(), timedelta(hours=11))
def testFoldNegativeUTCOffset(self):
# Test that we can resolve ambiguous times
tzname = self._get_tzname('America/Toronto')
with self._gettz_context(tzname):
# Calling fromutc() alters the tzfile object
TOR0 = self.gettz(tzname)
TOR1 = self.gettz(tzname)
t0_u = datetime(2011, 11, 6, 5, 30, tzinfo=tz.tzutc())
t1_u = datetime(2011, 11, 6, 6, 30, tzinfo=tz.tzutc())
# Using fresh tzfiles
t0_tor0 = t0_u.astimezone(TOR0)
t1_tor1 = t1_u.astimezone(TOR1)
self.assertEqual(t0_tor0.replace(tzinfo=None),
datetime(2011, 11, 6, 1, 30))
self.assertEqual(t1_tor1.replace(tzinfo=None),
datetime(2011, 11, 6, 1, 30))
self.assertEqual(t0_tor0.utcoffset(), timedelta(hours=-4.0))
self.assertEqual(t1_tor1.utcoffset(), timedelta(hours=-5.0))
def testGapNegativeUTCOffset(self):
# Test that we don't have a problem around gaps.
tzname = self._get_tzname('America/Toronto')
with self._gettz_context(tzname):
# Calling fromutc() alters the tzfile object
TOR0 = self.gettz(tzname)
TOR1 = self.gettz(tzname)
t0_u = datetime(2011, 3, 13, 6, 30, tzinfo=tz.tzutc())
t1_u = datetime(2011, 3, 13, 7, 30, tzinfo=tz.tzutc())
# Using fresh tzfiles
t0 = t0_u.astimezone(TOR0)
t1 = t1_u.astimezone(TOR1)
self.assertEqual(t0.replace(tzinfo=None),
datetime(2011, 3, 13, 1, 30))
self.assertEqual(t1.replace(tzinfo=None),
datetime(2011, 3, 13, 3, 30))
self.assertNotEqual(t0, t1)
self.assertEqual(t0.utcoffset(), timedelta(hours=-5.0))
self.assertEqual(t1.utcoffset(), timedelta(hours=-4.0))
def testEqualAmbiguousComparison(self):
tzname = self._get_tzname('Australia/Sydney')
with self._gettz_context(tzname):
SYD0 = self.gettz(tzname)
SYD1 = self.gettz(tzname)
t0_u = datetime(2012, 3, 31, 14, 30, tzinfo=tz.tzutc()) # AEST
t1_u = datetime(2012, 3, 31, 16, 30, tzinfo=tz.tzutc()) # AEDT
t0_syd0 = t0_u.astimezone(SYD0)
t0_syd1 = t0_u.astimezone(SYD1)
# This is considered an "inter-zone comparison" because it's an
# ambiguous datetime.
self.assertEqual(t0_syd0, t0_syd1)
def get_utc_transitions(self, tzi, year, gap):
dston, dstoff = tzi.transitions(year)
if gap:
t_n = dston - timedelta(minutes=30)
t0_u = t_n.replace(tzinfo=tzi).astimezone(tz.tzutc())
t1_u = t0_u + timedelta(hours=1)
else:
# Get 1 hour before the first ambiguous date
t_n = dstoff - timedelta(minutes=30)
t0_u = t_n.replace(tzinfo=tzi).astimezone(tz.tzutc())
t_n += timedelta(hours=1) # Naive ambiguous date
t0_u = t0_u + timedelta(hours=1) # First ambiguous date
t1_u = t0_u + timedelta(hours=1) # Second ambiguous date
return t_n, t0_u, t1_u
def datetime2timestamp(dt, default_timezone=None):
"""Calculate the timestamp based on the given datetime instance.
:type dt: datetime
:param dt: A datetime object to be converted into timestamp
:type default_timezone: tzinfo
:param default_timezone: If it is provided as None, we treat it as tzutc().
But it is only used when dt is a naive datetime.
:returns: The timestamp
"""
epoch = datetime.datetime(1970, 1, 1)
if dt.tzinfo is None:
if default_timezone is None:
default_timezone = tzutc()
dt = dt.replace(tzinfo=default_timezone)
d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
if hasattr(d, "total_seconds"):
return d.total_seconds() # Works in Python 2.7+
return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
def make_analyzed_tickets(AnalyzedAgileTicket, datetime, tzutc):
"""Make ticket from a list of dicts with key data."""
from dateutil.parser import parse
default = datetime(1979, 8, 15, 0, 0, 0, tzinfo=tzutc)
def _make_analyzed_tickets(ticket_datas):
tickets = []
for data in ticket_datas:
t = AnalyzedAgileTicket(
key=data['key'],
committed=dict(state="Committed", entered_at=parse(data['committed'], default=default)),
started=dict(state="Started", entered_at=parse(data['started'], default=default)),
ended=dict(state="Ended", entered_at=parse(data['ended'], default=default))
)
tickets.append(t)
return tickets
return _make_analyzed_tickets
def weeks_of_tickets(datetime, tzutc, AnalyzedAgileTicket):
"""A bunch of tickets."""
from dateutil.parser import parse
parsed = []
default = datetime(1979, 8, 15, 0, 0, 0, tzinfo=tzutc)
current_path = path.dirname(path.abspath(__file__))
csv_file = path.join(current_path, 'data', 'weeks_of_tickets.csv')
count = 1
for row in csv.DictReader(open(csv_file, 'r')):
t = AnalyzedAgileTicket(
key="FOO-{}".format(count),
committed=dict(state="committed", entered_at=parse(row['committed'], default=default)),
started=dict(state="started", entered_at=parse(row['started'], default=default)),
ended=dict(state="ended", entered_at=parse(row['ended'], default=default))
)
parsed.append(t)
count += 1
return parsed
def __init__(self, product_id: str, sequence_id: int, order_side: OrderSide, size: str, price: str,
status: OrderStatus = OrderStatus.open, order_id: str = None, order_type: OrderType = OrderType.limit,
created_at: Optional[datetime] = None, historical: bool = False, confirmed: bool = False):
self.product_id = product_id
self.order_side = order_side
self.order_type = order_type
self.status = status
self.sequence_id = int(sequence_id)
self.size = size
self.filled_size = '0'
self.price = str(price)
self.order_id = order_id
if created_at is None:
self.created_at = datetime.now(tz.tzutc())
else:
# orders cannot be created in the future please
self.created_at = min(datetime.now(tz.tzutc()), created_at)
self.historical = historical
self.confirmed = confirmed
if float(self.size) < 0:
raise OrderException('Order size must be positive {}'.format(self.size))
def datetime2timestamp(dt, default_timezone=None):
"""Calculate the timestamp based on the given datetime instance.
:type dt: datetime
:param dt: A datetime object to be converted into timestamp
:type default_timezone: tzinfo
:param default_timezone: If it is provided as None, we treat it as tzutc().
But it is only used when dt is a naive datetime.
:returns: The timestamp
"""
epoch = datetime.datetime(1970, 1, 1)
if dt.tzinfo is None:
if default_timezone is None:
default_timezone = tzutc()
dt = dt.replace(tzinfo=default_timezone)
d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
if hasattr(d, "total_seconds"):
return d.total_seconds() # Works in Python 2.7+
return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
def _build_datetime(parts):
timestamp = parts.get('timestamp')
if timestamp:
tz_utc = tz.tzutc()
return datetime.fromtimestamp(timestamp, tz=tz_utc)
am_pm = parts.get('am_pm')
hour = parts.get('hour', 0)
if am_pm == 'pm' and hour < 12:
hour += 12
elif am_pm == 'am' and hour == 12:
hour = 0
return datetime(year=parts.get('year', 1), month=parts.get('month', 1),
day=parts.get('day', 1), hour=hour, minute=parts.get('minute', 0),
second=parts.get('second', 0), microsecond=parts.get('microsecond', 0),
tzinfo=parts.get('tzinfo'))
def __init__(self, dt):
if not isinstance(dt, (datetime, date, timedelta, time)):
raise ValueError('You must use datetime, date, timedelta or time')
if isinstance(dt, datetime):
self.params = Parameters({'value': 'DATE-TIME'})
elif isinstance(dt, date):
self.params = Parameters({'value': 'DATE'})
elif isinstance(dt, time):
self.params = Parameters({'value': 'TIME'})
if (isinstance(dt, datetime) or isinstance(dt, time))\
and getattr(dt, 'tzinfo', False):
tzinfo = dt.tzinfo
if tzinfo is not pytz.utc and\
(tzutc is None or not isinstance(tzinfo, tzutc)):
# set the timezone as a parameter to the property
tzid = tzid_from_dt(dt)
if tzid:
self.params.update({'TZID': tzid})
self.dt = dt
def datetime2timestamp(dt, default_timezone=None):
"""Calculate the timestamp based on the given datetime instance.
:type dt: datetime
:param dt: A datetime object to be converted into timestamp
:type default_timezone: tzinfo
:param default_timezone: If it is provided as None, we treat it as tzutc().
But it is only used when dt is a naive datetime.
:returns: The timestamp
"""
epoch = datetime.datetime(1970, 1, 1)
if dt.tzinfo is None:
if default_timezone is None:
default_timezone = tzutc()
dt = dt.replace(tzinfo=default_timezone)
d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
if hasattr(d, "total_seconds"):
return d.total_seconds() # Works in Python 2.7+
return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
def my_local_time():
# METHOD 1: Hardcode zones:
# from_zone = tz.gettz('UTC')
# to_zone = tz.gettz('America/New_York')
# METHOD 2: Auto-detect zones:
from_zone = tz.tzutc()
to_zone = tz.tzlocal()
utc = datetime.utcnow()
# utc = datetime.strptime('2011-01-21 02:37:21', '%Y-%m-%d %H:%M:%S')
# Tell the datetime object that it's in UTC time zone since
# datetime objects are 'naive' by default
utc = utc.replace(tzinfo=from_zone)
# Convert time zone
return utc.astimezone(to_zone)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_index_convert_to_datetime_array_dateutil(self):
tm._skip_if_no_dateutil()
import dateutil
def _check_rng(rng):
converted = rng.to_pydatetime()
tm.assertIsInstance(converted, np.ndarray)
for x, stamp in zip(converted, rng):
tm.assertIsInstance(x, datetime)
self.assertEqual(x, stamp.to_pydatetime())
self.assertEqual(x.tzinfo, stamp.tzinfo)
rng = date_range('20090415', '20090519')
rng_eastern = date_range('20090415', '20090519',
tz='dateutil/US/Eastern')
rng_utc = date_range('20090415', '20090519', tz=dateutil.tz.tzutc())
_check_rng(rng)
_check_rng(rng_eastern)
_check_rng(rng_utc)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_period_resample_with_local_timezone_dateutil(self):
# GH5430
tm._skip_if_no_dateutil()
import dateutil
local_timezone = 'dateutil/America/Los_Angeles'
start = datetime(year=2013, month=11, day=1, hour=0, minute=0,
tzinfo=dateutil.tz.tzutc())
# 1 day later
end = datetime(year=2013, month=11, day=2, hour=0, minute=0,
tzinfo=dateutil.tz.tzutc())
index = pd.date_range(start, end, freq='H')
series = pd.Series(1, index=index)
series = series.tz_convert(local_timezone)
result = series.resample('D', kind='period').mean()
# Create the expected series
# Index is moved back a day with the timezone conversion from UTC to
# Pacific
expected_index = (pd.period_range(start=start, end=end, freq='D') - 1)
expected = pd.Series(1, index=expected_index)
assert_series_equal(result, expected)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_class_ops_dateutil(self):
tm._skip_if_no_dateutil()
from dateutil.tz import tzutc
def compare(x, y):
self.assertEqual(int(np.round(Timestamp(x).value / 1e9)),
int(np.round(Timestamp(y).value / 1e9)))
compare(Timestamp.now(), datetime.now())
compare(Timestamp.now('UTC'), datetime.now(tzutc()))
compare(Timestamp.utcnow(), datetime.utcnow())
compare(Timestamp.today(), datetime.today())
current_time = calendar.timegm(datetime.now().utctimetuple())
compare(Timestamp.utcfromtimestamp(current_time),
datetime.utcfromtimestamp(current_time))
compare(Timestamp.fromtimestamp(current_time),
datetime.fromtimestamp(current_time))
date_component = datetime.utcnow()
time_component = (date_component + timedelta(minutes=10)).time()
compare(Timestamp.combine(date_component, time_component),
datetime.combine(date_component, time_component))
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def test_cant_compare_tz_naive_w_aware_dateutil(self):
tm._skip_if_no_dateutil()
from dateutil.tz import tzutc
utc = tzutc()
# #1404
a = Timestamp('3/12/2012')
b = Timestamp('3/12/2012', tz=utc)
self.assertRaises(Exception, a.__eq__, b)
self.assertRaises(Exception, a.__ne__, b)
self.assertRaises(Exception, a.__lt__, b)
self.assertRaises(Exception, a.__gt__, b)
self.assertRaises(Exception, b.__eq__, a)
self.assertRaises(Exception, b.__ne__, a)
self.assertRaises(Exception, b.__lt__, a)
self.assertRaises(Exception, b.__gt__, a)
if sys.version_info < (3, 3):
self.assertRaises(Exception, a.__eq__, b.to_pydatetime())
self.assertRaises(Exception, a.to_pydatetime().__eq__, b)
else:
self.assertFalse(a == b.to_pydatetime())
self.assertFalse(a.to_pydatetime() == b)
def process(self, asgs):
msg_tmpl = self.data.get('message', self.default_template)
key = self.data.get('key', self.data.get('tag', DEFAULT_TAG))
op = self.data.get('op', 'suspend')
date = self.data.get('days', 4)
n = datetime.now(tz=tzutc())
stop_date = n + timedelta(days=date)
try:
msg = msg_tmpl.format(
op=op, action_date=stop_date.strftime('%Y/%m/%d'))
except Exception:
self.log.warning("invalid template %s" % msg_tmpl)
msg = self.default_template.format(
op=op, action_date=stop_date.strftime('%Y/%m/%d'))
self.log.info("Tagging %d asgs for %s on %s" % (
len(asgs), op, stop_date.strftime('%Y/%m/%d')))
self.tag(asgs, key, msg)
def fetch_credential_report(self):
client = local_session(self.manager.session_factory).client('iam')
try:
report = client.get_credential_report()
except ClientError as e:
if e.response['Error']['Code'] != 'ReportNotPresent':
raise
report = None
if report:
threshold = datetime.datetime.now(tz=tzutc()) - timedelta(
seconds=self.get_value_or_schema_default(
'report_max_age'))
if not report['GeneratedTime'].tzinfo:
threshold = threshold.replace(tzinfo=None)
if report['GeneratedTime'] < threshold:
report = None
if report is None:
if not self.get_value_or_schema_default('report_generate'):
raise ValueError("Credential Report Not Present")
client.generate_credential_report()
time.sleep(self.get_value_or_schema_default('report_delay'))
report = client.get_credential_report()
return report['Content']
def test_age(self):
now = datetime.now(tz=tz.tzutc())
three_months = now - timedelta(90)
two_months = now - timedelta(60)
one_month = now - timedelta(30)
def i(d):
return instance(LaunchTime=d)
fdata = {
'type': 'value',
'key': 'LaunchTime',
'op': 'less-than',
'value_type': 'age',
'value': 32}
self.assertFilter(fdata, i(three_months), False)
self.assertFilter(fdata, i(two_months), False)
self.assertFilter(fdata, i(one_month), True)
self.assertFilter(fdata, i(now), True)
self.assertFilter(fdata, i(now.isoformat()), True)
def test_expiration(self):
now = datetime.now(tz=tz.tzutc())
three_months = now + timedelta(90)
two_months = now + timedelta(60)
def i(d):
return instance(LaunchTime=d)
fdata = {
'type': 'value',
'key': 'LaunchTime',
'op': 'less-than',
'value_type': 'expiration',
'value': 61}
self.assertFilter(fdata, i(three_months), False)
self.assertFilter(fdata, i(two_months), True)
self.assertFilter(fdata, i(now), True)
self.assertFilter(fdata, i(now.isoformat()), True)
def test_filter_instance_age(self):
now = datetime.now(tz=tz.tzutc())
three_months = now - timedelta(90)
two_months = now - timedelta(60)
one_month = now - timedelta(30)
def i(d):
return instance(LaunchTime=d)
for ii, v in [
(i(now), False),
(i(three_months), True),
(i(two_months), True),
(i(one_month), False)
]:
self.assertFilter({'type': 'instance-uptime', 'op': 'gte', 'days': 60}, ii, v)
def filter_extant_exports(client, bucket, prefix, days, start, end=None):
"""Filter days where the bucket already has extant export keys.
"""
end = end or datetime.now()
# days = [start + timedelta(i) for i in range((end-start).days)]
try:
tag_set = client.get_object_tagging(Bucket=bucket, Key=prefix).get('TagSet', [])
except ClientError as e:
if e.response['Error']['Code'] != 'NoSuchKey':
raise
tag_set = []
tags = {t['Key']: t['Value'] for t in tag_set}
if 'LastExport' not in tags:
return sorted(days)
last_export = parse(tags['LastExport'])
if last_export.tzinfo is None:
last_export = last_export.replace(tzinfo=tzutc())
return [d for d in sorted(days) if d > last_export]
def iso8601_to_rostime(iso):
"""Converts ISO 8601 time to ROS Time.
Args:
iso: ISO 8601 encoded string.
Returns:
std_msgs/Time.
"""
# Convert to datetime in UTC.
t = dateutil.parser.parse(iso)
if not t.utcoffset():
t = t.replace(tzinfo=tzutc())
# Convert to time from epoch in UTC.
epoch = datetime.utcfromtimestamp(0)
epoch = epoch.replace(tzinfo=tzutc())
dt = t - epoch
# Create ROS message.
time = Time()
time.data.secs = int(dt.total_seconds())
time.data.nsecs = dt.microseconds * 1000
return time
def _deserialize(cls, data):
kind = data.get(cls._type_field, None)
if kind is None:
return data
value = data.get("value")
if kind == "blob":
return base64.b64decode(value.encode("utf-8"))
elif kind == "datetime":
return datetime.fromtimestamp(value, tz.tzutc())
elif kind == "model":
return cls._entity_from_dict(value)
else:
raise ValueError(f"Invalid kind {kind!r}.")
def validateLogDate(self, lines):
delta = timedelta(hours=2)
for line in lines:
if not line:
continue
elif line.startswith("# Time: "):
log_time = datetime.strptime(line[8:], "%y%m%d %H:%M:%S")
log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(
zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
log_time = log_time.replace(tzinfo=None)
print(self._now, log_time)
print("diff :", self._now - log_time)
if (self._now - log_time) > delta:
return False
else:
return True
return True
# Initialization.