def test_overflow(self):
tiny = timedelta.resolution
td = timedelta.min + tiny
td -= tiny # no problem
self.assertRaises(OverflowError, td.__sub__, tiny)
self.assertRaises(OverflowError, td.__add__, -tiny)
td = timedelta.max - tiny
td += tiny # no problem
self.assertRaises(OverflowError, td.__add__, tiny)
self.assertRaises(OverflowError, td.__sub__, -tiny)
self.assertRaises(OverflowError, lambda: -timedelta.max)
day = timedelta(1)
self.assertRaises(OverflowError, day.__mul__, 10**9)
self.assertRaises(OverflowError, day.__mul__, 1e9)
self.assertRaises(OverflowError, day.__truediv__, 1e-20)
self.assertRaises(OverflowError, day.__truediv__, 1e-10)
self.assertRaises(OverflowError, day.__truediv__, 9e-10)
python类max()的实例源码
def update_heartbeat():
@retry(
stop_max_delay=30000, # 30 seconds max
wait_exponential_multiplier=100, # wait 2^i * 100 ms, on the i-th retry
wait_exponential_max=1000, # but wait 1 second per try maximum
wrap_exception=True
)
def retry_fetch_fail_after_30sec():
return requests.post(
config['webservice']['shifthelperHeartbeat'],
auth=(
config['webservice']['user'],
config['webservice']['password']
)
).json()
try:
return retry_fetch_fail_after_30sec()
except RetryError as e:
return {}
def filter_by_first_message_dt(self, start_dt=datetime.min, end_dt=datetime.max):
"""
Returns a copy of self after filtering self.conversations by
conversations whose first messages lie in a datetime interval.
Args:
start_dt: A datetime object satisfying start_dt <= end_dt.
end_dt: A datetime object satisfying start_dt <= end_dt.
Returns:
A UserConversations object that is equal to self after filtering
self.conversations such that the datetime of the first message
in each conversation is in the closed interval
[start_dt, end_dt].
Raises:
EmptyUserConversationsError: Filtering self.conversations results
in an empty list.
"""
if start_dt > end_dt:
raise ValueError('Must have start_dt <= end_dt')
conversation_filter = lambda x: x.messages[0].timestamp >= start_dt and x.messages[0].timestamp <= end_dt
return self.filter_user_conversations(conversation_filter)
def filter_by_last_message_dt(self, start_dt=datetime.min, end_dt=datetime.max):
"""
Returns a copy of self after filtering self.conversations by
conversations whose last messages lie in a datetime interval.
Args:
start_dt: A datetime object satisfying start_dt <= end_dt.
end_dt: A datetime object satisfying start_dt <= end_dt.
Returns:
A UserConversations object that is equal to self after filtering
self.conversations such that the datetime of the last message
in each conversation is in the closed interval
[start_dt, end_dt].
Raises:
EmptyUserConversationsError: Filtering self.conversations results
in an empty list.
"""
if start_dt > end_dt:
raise ValueError('Must have start_dt <= end_dt')
conversation_filter = lambda x: x.messages[-1].timestamp >= start_dt and x.messages[-1].timestamp <= end_dt
return self.filter_user_conversations(conversation_filter)
def filter_by_datetime(self, start_dt=datetime.min, end_dt=datetime.max):
"""
Returns a copy of self after filtering each conversation's messages
to only contain messages that lie in a datetime interval and removing
conversations with no messages in the datetime interval.
Args:
start_dt: A datetime object satisfying start_dt <= end_dt.
end_dt: A datetime object satisfying start_dt <= end_dt.
Returns:
A UserConversations object that is equal to self after filtering
the messages of each conversation in self.conversations to keep
messages whose datetimes are in the closed interval
[start_dt, end_dt]. Conversations with no messages in the
interval are discarded.
Raises:
EmptyUserConversationsError: Across all conversations, there is no
message that lies in the closed interval [start_dt, end_dt].
"""
if start_dt > end_dt:
raise ValueError("start_dt must be less than or equal to end_dt")
message_filter = lambda x: x.timestamp >= start_dt and x.timestamp <= end_dt
return self.filter_conversations(message_filter=message_filter)
def filter_by_datetime(self, start_dt=datetime.min, end_dt=datetime.max):
"""
Returns a copy of self after filtering self.messages by messages that
lie in a datetime interval.
Args:
start_dt: A datetime object satisfying start_dt <= end_dt.
end_dt: A datetime object satisfying start_dt <= end_dt.
Returns:
A Conversation object that is equal to self after filtering by
messages whose datetimes are in the closed interval
[start_dt, end_dt].
Raises:
EmptyConversationError: Filtering self.messages results in an empty
list.
"""
if start_dt > end_dt:
raise ValueError("start_dt must be less than or equal to end_dt")
message_filter = lambda x: x.timestamp >= start_dt and x.timestamp <= end_dt
filtered = self.filter_conversation(message_filter=message_filter)
return filtered
def cast_date(value, connection):
"""Cast a date value."""
# The output format depends on the server setting DateStyle. The default
# setting ISO and the setting for German are actually unambiguous. The
# order of days and months in the other two settings is however ambiguous,
# so at least here we need to consult the setting to properly parse values.
if value == '-infinity':
return date.min
if value == 'infinity':
return date.max
value = value.split()
if value[-1] == 'BC':
return date.min
value = value[0]
if len(value) > 10:
return date.max
fmt = connection.date_format()
return datetime.strptime(value, fmt).date()
def cast_timestamp(value, connection):
"""Cast a timestamp value."""
if value == '-infinity':
return datetime.min
if value == 'infinity':
return datetime.max
value = value.split()
if value[-1] == 'BC':
return datetime.min
fmt = connection.date_format()
if fmt.endswith('-%Y') and len(value) > 2:
value = value[1:5]
if len(value[3]) > 4:
return datetime.max
fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
'%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
else:
if len(value[0]) > 10:
return datetime.max
fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
return datetime.strptime(' '.join(value), ' '.join(fmt))
def cast_timestamptz(value, connection):
"""Cast a timestamptz value."""
if value == '-infinity':
return datetime.min
if value == 'infinity':
return datetime.max
value = value.split()
if value[-1] == 'BC':
return datetime.min
fmt = connection.date_format()
if fmt.endswith('-%Y') and len(value) > 2:
value = value[1:]
if len(value[3]) > 4:
return datetime.max
fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
'%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
value, tz = value[:-1], value[-1]
else:
if fmt.startswith('%Y-'):
tz = _re_timezone.match(value[1])
if tz:
value[1], tz = tz.groups()
else:
tz = '+0000'
else:
value, tz = value[:-1], value[-1]
if len(value[0]) > 10:
return datetime.max
fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
if _has_timezone:
value.append(_timezone_as_offset(tz))
fmt.append('%z')
return datetime.strptime(' '.join(value), ' '.join(fmt))
return datetime.strptime(' '.join(value), ' '.join(fmt)).replace(
tzinfo=_get_timezone(tz))
def cast_date(value, connection):
"""Cast a date value."""
# The output format depends on the server setting DateStyle. The default
# setting ISO and the setting for German are actually unambiguous. The
# order of days and months in the other two settings is however ambiguous,
# so at least here we need to consult the setting to properly parse values.
if value == '-infinity':
return date.min
if value == 'infinity':
return date.max
value = value.split()
if value[-1] == 'BC':
return date.min
value = value[0]
if len(value) > 10:
return date.max
fmt = connection.date_format()
return datetime.strptime(value, fmt).date()
def cast_timestamp(value, connection):
"""Cast a timestamp value."""
if value == '-infinity':
return datetime.min
if value == 'infinity':
return datetime.max
value = value.split()
if value[-1] == 'BC':
return datetime.min
fmt = connection.date_format()
if fmt.endswith('-%Y') and len(value) > 2:
value = value[1:5]
if len(value[3]) > 4:
return datetime.max
fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
'%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
else:
if len(value[0]) > 10:
return datetime.max
fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
return datetime.strptime(' '.join(value), ' '.join(fmt))
def testTruncateRestart(self):
truncate = self.db.truncate
self.assertRaises(TypeError, truncate, 'test_table', restart='invalid')
query = self.db.query
self.createTable('test_table', 'n serial, t text')
for n in range(3):
query("insert into test_table (t) values ('test')")
q = "select count(n), min(n), max(n) from test_table"
r = query(q).getresult()[0]
self.assertEqual(r, (3, 1, 3))
truncate('test_table')
r = query(q).getresult()[0]
self.assertEqual(r, (0, None, None))
for n in range(3):
query("insert into test_table (t) values ('test')")
r = query(q).getresult()[0]
self.assertEqual(r, (3, 4, 6))
truncate('test_table', restart=True)
r = query(q).getresult()[0]
self.assertEqual(r, (0, None, None))
for n in range(3):
query("insert into test_table (t) values ('test')")
r = query(q).getresult()[0]
self.assertEqual(r, (3, 1, 3))
def testDate(self):
query = self.db.query
for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY',
'SQL, MDY', 'SQL, DMY', 'German'):
self.db.set_parameter('datestyle', datestyle)
d = date(2016, 3, 14)
q = "select $1::date"
r = query(q, (d,)).getresult()[0][0]
self.assertIsInstance(r, date)
self.assertEqual(r, d)
q = "select '10000-08-01'::date, '0099-01-08 BC'::date"
r = query(q).getresult()[0]
self.assertIsInstance(r[0], date)
self.assertIsInstance(r[1], date)
self.assertEqual(r[0], date.max)
self.assertEqual(r[1], date.min)
q = "select 'infinity'::date, '-infinity'::date"
r = query(q).getresult()[0]
self.assertIsInstance(r[0], date)
self.assertIsInstance(r[1], date)
self.assertEqual(r[0], date.max)
self.assertEqual(r[1], date.min)
def cast_date(value, connection):
"""Cast a date value."""
# The output format depends on the server setting DateStyle. The default
# setting ISO and the setting for German are actually unambiguous. The
# order of days and months in the other two settings is however ambiguous,
# so at least here we need to consult the setting to properly parse values.
if value == '-infinity':
return date.min
if value == 'infinity':
return date.max
value = value.split()
if value[-1] == 'BC':
return date.min
value = value[0]
if len(value) > 10:
return date.max
fmt = connection.date_format()
return datetime.strptime(value, fmt).date()
def cast_date(value, connection):
"""Cast a date value."""
# The output format depends on the server setting DateStyle. The default
# setting ISO and the setting for German are actually unambiguous. The
# order of days and months in the other two settings is however ambiguous,
# so at least here we need to consult the setting to properly parse values.
if value == '-infinity':
return date.min
if value == 'infinity':
return date.max
value = value.split()
if value[-1] == 'BC':
return date.min
value = value[0]
if len(value) > 10:
return date.max
fmt = connection.date_format()
return datetime.strptime(value, fmt).date()
def cast_timestamp(value, connection):
"""Cast a timestamp value."""
if value == '-infinity':
return datetime.min
if value == 'infinity':
return datetime.max
value = value.split()
if value[-1] == 'BC':
return datetime.min
fmt = connection.date_format()
if fmt.endswith('-%Y') and len(value) > 2:
value = value[1:5]
if len(value[3]) > 4:
return datetime.max
fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
'%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
else:
if len(value[0]) > 10:
return datetime.max
fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
return datetime.strptime(' '.join(value), ' '.join(fmt))
def test_overflow(self):
tiny = timedelta.resolution
td = timedelta.min + tiny
td -= tiny # no problem
self.assertRaises(OverflowError, td.__sub__, tiny)
self.assertRaises(OverflowError, td.__add__, -tiny)
td = timedelta.max - tiny
td += tiny # no problem
self.assertRaises(OverflowError, td.__add__, tiny)
self.assertRaises(OverflowError, td.__sub__, -tiny)
self.assertRaises(OverflowError, lambda: -timedelta.max)
day = timedelta(1)
self.assertRaises(OverflowError, day.__mul__, 10**9)
self.assertRaises(OverflowError, day.__mul__, 1e9)
self.assertRaises(OverflowError, day.__truediv__, 1e-20)
self.assertRaises(OverflowError, day.__truediv__, 1e-10)
self.assertRaises(OverflowError, day.__truediv__, 9e-10)
def transaction_list(self, start=datetime.min,
end=datetime.max,
format=ReportFormat.printout,
component_path="",
output_path=None):
"""
Generate a transaction list report.
:param start: The start date to generate the report for.
:param end: The end date to generate the report for.
:param format: The format of the report.
:param component_path: The path of the component to filter the report's
transactions by.
:param output_path: The path to the file the report is written to.
If None, then the report is not written to a file.
:returns: The generated report.
"""
rpt = TransactionList(self, start, end, component_path, output_path)
return rpt.render(format)
def income_statement(self, start=datetime.min,
end=datetime.max,
format=ReportFormat.printout,
component_path="",
output_path=None):
"""
Generate a transaction list report.
:param start: The start date to generate the report for.
:param end: The end date to generate the report for.
:param format: The format of the report.
:param component_path: The path of the component to filter the report's
transactions by.
:param output_path: The path to the file the report is written to.
If None, then the report is not written to a file.
:returns: The generated report.
"""
rpt = IncomeStatement(self, start, end, component_path, output_path)
return rpt.render(format)
def __init__(self, name,
dt_account,
cr_account,
amount=0,
start=datetime.min,
end=datetime.max,
interval=1,
description=None):
"""
"""
super(BasicActivity, self).__init__(
name,
description=description,
start=start,
end=end,
interval=interval)
self.interval = interval
self.amount = amount
self.dt_account = dt_account
self.cr_account = cr_account
def test_IsoDateTimeType_model(self):
dt = IsoDateTimeType()
value = dt.to_primitive(now)
self.assertEqual(now, dt.to_native(now))
self.assertEqual(now, dt.to_native(value))
date = datetime.now()
value = dt.to_primitive(date)
self.assertEqual(date, dt.to_native(date))
self.assertEqual(now.tzinfo, dt.to_native(value).tzinfo)
# ParseError
for date in (None, '', 2017, "2007-06-23X06:40:34.00Z"):
with self.assertRaisesRegexp(ConversionError,
u'Could not parse %s. Should be ISO8601.' % date):
dt.to_native(date)
# OverflowError
for date in (datetime.max, datetime.min):
self.assertEqual(date, dt.to_native(date))
with self.assertRaises(ConversionError):
dt.to_native(dt.to_primitive(date))
def test_overflow(self):
tiny = timedelta.resolution
td = timedelta.min + tiny
td -= tiny # no problem
self.assertRaises(OverflowError, td.__sub__, tiny)
self.assertRaises(OverflowError, td.__add__, -tiny)
td = timedelta.max - tiny
td += tiny # no problem
self.assertRaises(OverflowError, td.__add__, tiny)
self.assertRaises(OverflowError, td.__sub__, -tiny)
self.assertRaises(OverflowError, lambda: -timedelta.max)
day = timedelta(1)
self.assertRaises(OverflowError, day.__mul__, 10**9)
self.assertRaises(OverflowError, day.__mul__, 1e9)
self.assertRaises(OverflowError, day.__truediv__, 1e-20)
self.assertRaises(OverflowError, day.__truediv__, 1e-10)
self.assertRaises(OverflowError, day.__truediv__, 9e-10)
def map_from_json(json_slots):
slots = []
for json_slot in json_slots:
from_long = json_slot['start'] / 1e3
if from_long < TimeSlot.time_to_long(datetime.min):
from_long = TimeSlot.time_to_long(datetime.min)
duration_long = json_slot['duration'] / 1e3
to_long = from_long + duration_long
if to_long > TimeSlot.time_to_long(datetime.max):
to_long = TimeSlot.time_to_long(datetime.max)
fr = datetime.fromtimestamp(from_long)
to = datetime.fromtimestamp(to_long)
slots.append(TimeSlot(fr, to))
return slots
def set(self, key, value):
"""Sets the key and value represented by this cache item.
Note:
The $value argument may be any item that can be serialized by python,
although the method of serialization is left up to the Implementing
Library.
Args:
value: The serializable value to be stored.
:return The invoked object.
"""
self.key = key
self.value = value
self.expire_at = datetime.max
self._setHit()
return self.key, self.value
def save(self, item):
"""Persists a cache item immediately.
:param item: The cache item to save.
:return True if the item was successfully persisted. False if there was an error.
"""
if item.expire_at != datetime.max:
expire_seconds = (item.expire_at - datetime.utcnow()).seconds
if expire_seconds > 0:
return self.client.setex(self.normalize_key(item.key),
cPickle.dumps(item), expire_seconds)
else:
return False
else:
return self.client.set(self.normalize_key(item.key),
cPickle.dumps(item))
def _load_existing_mappings(self):
"""Load the existing ARP records for this box from the db.
Returns:
A deferred whose result is a dictionary: { (ip, mac): arpid }
"""
self._logger.debug("Loading open arp records from database")
open_arp_records_queryset = manage.Arp.objects.filter(
netbox__id=self.netbox.id,
end_time__gte=datetime.max).values('id', 'ip', 'mac')
open_arp_records = yield db.run_in_thread(
storage.shadowify_queryset_and_commit,
open_arp_records_queryset)
self._logger.debug("Loaded %d open records from arp",
len(open_arp_records))
open_mappings = dict(((IP(arp['ip']), arp['mac']), arp['id'])
for arp in open_arp_records)
defer.returnValue(open_mappings)
def _make_new_mappings(self, mappings):
"""Convert a sequence of (ip, mac) tuples into a Arp shadow containers.
Arguments:
mappings -- An iterable containing tuples: (ip, mac)
"""
netbox = self.containers.factory(None, shadows.Netbox)
timestamp = datetime.now()
infinity = datetime.max
for (ip, mac) in mappings:
if not ip or not mac:
continue # Some devices seem to return empty results!
arp = self.containers.factory((ip, mac), shadows.Arp)
arp.netbox = netbox
arp.sysname = self.netbox.sysname
arp.ip = ip.strCompressed()
arp.mac = mac
arp.prefix_id = self._find_largest_matching_prefix(ip)
arp.start_time = timestamp
arp.end_time = infinity
def test_time_since(self):
def timestamp_calc(*args, **kwargs):
return datetime.now() - timedelta(*args, **kwargs)
minute = 60
hour = minute * 60
self.assertEqual(
time_since(None),
"Never")
self.assertEqual(
time_since(timestamp_calc(seconds=(10 * minute + 10))),
u"10\xa0mins")
self.assertEqual(
time_since(timestamp_calc(seconds=(1 * minute + 5))),
u"1\xa0min")
self.assertEqual(
time_since(timestamp_calc(0)),
u"Now")
self.assertEqual(
time_since(datetime.max),
u"Now")
def test_overflow(self):
tiny = timedelta.resolution
td = timedelta.min + tiny
td -= tiny # no problem
self.assertRaises(OverflowError, td.__sub__, tiny)
self.assertRaises(OverflowError, td.__add__, -tiny)
td = timedelta.max - tiny
td += tiny # no problem
self.assertRaises(OverflowError, td.__add__, tiny)
self.assertRaises(OverflowError, td.__sub__, -tiny)
self.assertRaises(OverflowError, lambda: -timedelta.max)
day = timedelta(1)
self.assertRaises(OverflowError, day.__mul__, 10**9)
self.assertRaises(OverflowError, day.__mul__, 1e9)
self.assertRaises(OverflowError, day.__truediv__, 1e-20)
self.assertRaises(OverflowError, day.__truediv__, 1e-10)
self.assertRaises(OverflowError, day.__truediv__, 9e-10)
def has_notification(self):
settings = self.settings_json
notifications = settings.get('notifications', [])
now = datetime.utcnow()
for notification in notifications:
try:
start = parse_datetime(notification['start'])
end = notification.get('end', None)
end = parse_datetime(end) if end else datetime.max
if now < start or now > end:
continue
except (ValueError, TypeError, KeyError) as e:
continue
notification_data = self.notification_data(notification)
if notification_data:
yield notification_data