def cast_date(value, connection):
"""Cast a date value."""
# The output format depends on the server setting DateStyle. The default
# setting ISO and the setting for German are actually unambiguous. The
# order of days and months in the other two settings is however ambiguous,
# so at least here we need to consult the setting to properly parse values.
if value == '-infinity':
return date.min
if value == 'infinity':
return date.max
value = value.split()
if value[-1] == 'BC':
return date.min
value = value[0]
if len(value) > 10:
return date.max
fmt = connection.date_format()
return datetime.strptime(value, fmt).date()
python类max()的实例源码
def cast_timestamp(value, connection):
"""Cast a timestamp value."""
if value == '-infinity':
return datetime.min
if value == 'infinity':
return datetime.max
value = value.split()
if value[-1] == 'BC':
return datetime.min
fmt = connection.date_format()
if fmt.endswith('-%Y') and len(value) > 2:
value = value[1:5]
if len(value[3]) > 4:
return datetime.max
fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
'%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
else:
if len(value[0]) > 10:
return datetime.max
fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
return datetime.strptime(' '.join(value), ' '.join(fmt))
def cast_date(value, connection):
"""Cast a date value."""
# The output format depends on the server setting DateStyle. The default
# setting ISO and the setting for German are actually unambiguous. The
# order of days and months in the other two settings is however ambiguous,
# so at least here we need to consult the setting to properly parse values.
if value == '-infinity':
return date.min
if value == 'infinity':
return date.max
value = value.split()
if value[-1] == 'BC':
return date.min
value = value[0]
if len(value) > 10:
return date.max
fmt = connection.date_format()
return datetime.strptime(value, fmt).date()
def cast_timestamp(value, connection):
"""Cast a timestamp value."""
if value == '-infinity':
return datetime.min
if value == 'infinity':
return datetime.max
value = value.split()
if value[-1] == 'BC':
return datetime.min
fmt = connection.date_format()
if fmt.endswith('-%Y') and len(value) > 2:
value = value[1:5]
if len(value[3]) > 4:
return datetime.max
fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
'%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
else:
if len(value[0]) > 10:
return datetime.max
fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
return datetime.strptime(' '.join(value), ' '.join(fmt))
def testTruncateRestart(self):
truncate = self.db.truncate
self.assertRaises(TypeError, truncate, 'test_table', restart='invalid')
query = self.db.query
self.createTable('test_table', 'n serial, t text')
for n in range(3):
query("insert into test_table (t) values ('test')")
q = "select count(n), min(n), max(n) from test_table"
r = query(q).getresult()[0]
self.assertEqual(r, (3, 1, 3))
truncate('test_table')
r = query(q).getresult()[0]
self.assertEqual(r, (0, None, None))
for n in range(3):
query("insert into test_table (t) values ('test')")
r = query(q).getresult()[0]
self.assertEqual(r, (3, 4, 6))
truncate('test_table', restart=True)
r = query(q).getresult()[0]
self.assertEqual(r, (0, None, None))
for n in range(3):
query("insert into test_table (t) values ('test')")
r = query(q).getresult()[0]
self.assertEqual(r, (3, 1, 3))
def testDate(self):
query = self.db.query
for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY',
'SQL, MDY', 'SQL, DMY', 'German'):
self.db.set_parameter('datestyle', datestyle)
d = date(2016, 3, 14)
q = "select $1::date"
r = query(q, (d,)).getresult()[0][0]
self.assertIsInstance(r, date)
self.assertEqual(r, d)
q = "select '10000-08-01'::date, '0099-01-08 BC'::date"
r = query(q).getresult()[0]
self.assertIsInstance(r[0], date)
self.assertIsInstance(r[1], date)
self.assertEqual(r[0], date.max)
self.assertEqual(r[1], date.min)
q = "select 'infinity'::date, '-infinity'::date"
r = query(q).getresult()[0]
self.assertIsInstance(r[0], date)
self.assertIsInstance(r[1], date)
self.assertEqual(r[0], date.max)
self.assertEqual(r[1], date.min)
def get_user_to_damped_n_messages(self, dt_max, alpha):
"""
Maps each user to the number of messages before a reference datetime,
where each message count is exponentially damped by a constant times
the difference between the reference datetime and the datetime of the
message.
Args:
dt_max: A datetime representing the max datetime of messages
to consider.
alpha: A nonnegative float representing the damping factor.
Returns:
user_to_damped_n_messages: A dict mapping each user in
self.users_union to the damped number of messages by that user
before dt_max. The contribution of a message is a float equal
to exp(-alpha * t), where t is the difference in days between
dt_max and the datetime of the message.
"""
if alpha < 0:
raise ValueError('Must have alpha >= 0')
try:
# Only keep messages with datetimes <= dt_max
filtered = self.filter_by_datetime(end_dt=dt_max)
except EmptyConversationError:
# Map all users to 0 if dt_max occurs before all messages
return self.get_user_to_message_statistic(lambda x: 0)
damped_message_count = lambda x: self.exp_damped_day_difference(dt_max, x.timestamp, alpha)
user_to_damped_n_messages = filtered.get_user_to_message_statistic(damped_message_count)
return user_to_damped_n_messages
def tslice(unit, start=ben(), end=None, step=1, count=float('inf')):
"""
tslice(unit, start=None, end=None, step=1, count=None) -> generator of Blackhole object
unit in ['year', 'month', 'day', 'hour', 'minute', 'second', 'microsecond']
this is some kind xrange-like
:param unit:
:param start:
:param end:
:param step:
:param count:
:return:
"""
if unit not in Blackhole._units:
raise AttributeError()
if isinstance(start, basestring):
start = ben(start)
if isinstance(end, basestring):
end = ben(end)
cnt = 0
if step > 0:
end = end or ben(datetime.max)
while start < end and cnt < count:
yield start
start = start.shifted(**{unit: step})
cnt += 1
elif step < 0:
end = end or ben(datetime.min)
while start > end and cnt < count:
yield start
start = start.shifted(**{unit: step})
cnt += 1
def find_max_value(test_func, initial_value):
"""
Starting from an initial number (integer or float), find the maximum value
for which the test function does not yet fail, and return that maximum
value.
"""
assert isinstance(initial_value, int) and initial_value > 0
fails = FailsArray(test_func)
value = initial_value
# Advance the value exponentially beyond the max value
while fails[value] == 0:
value *= 2
# Search for the exact max value in the previous range. We search for the
# boundary where the fails array goes from 0 to 1.
boundary = 0.5
value = binary_search(fails, boundary, value // 2, value)
max_value = value - 1
# Verify that we found exactly the maximum:
assert fails[max_value] == 0 and fails[max_value + 1] == 1, \
"max_value={}, fails[+-2]: {}, {}, {}, {}, {}".\
format(max_value, fails[max_value - 2], fails[max_value - 1],
fails[max_value], fails[max_value + 1], fails[max_value + 2])
return max_value
def x_test_print_datetime_max(self):
"""Print datetime.max."""
print("\nMax value for Python datetime (datetime.max): {!r}".
format(datetime.max))
sys.stdout.flush()
def test_datetime_max(self):
"""Test timestamp_from_datetime() with datetime.max."""
# The test is that it does not raise an exception:
timestamp_from_datetime(datetime.max)
def _get_iteration_params(cls, end, limit):
if end is None:
if limit is None:
raise Exception('one of \'end\' or \'limit\' is required')
return cls.max, limit
else:
return end, sys.maxsize
def testTimestamp(self):
query = self.db.query
for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY',
'SQL, MDY', 'SQL, DMY', 'German'):
self.db.set_parameter('datestyle', datestyle)
d = datetime(2016, 3, 14)
q = "select $1::timestamp"
r = query(q, (d,)).getresult()[0][0]
self.assertIsInstance(r, datetime)
self.assertEqual(r, d)
d = datetime(2016, 3, 14, 15, 9, 26)
q = "select $1::timestamp"
r = query(q, (d,)).getresult()[0][0]
self.assertIsInstance(r, datetime)
self.assertEqual(r, d)
d = datetime(2016, 3, 14, 15, 9, 26, 535897)
q = "select $1::timestamp"
r = query(q, (d,)).getresult()[0][0]
self.assertIsInstance(r, datetime)
self.assertEqual(r, d)
q = ("select '10000-08-01 AD'::timestamp,"
" '0099-01-08 BC'::timestamp")
r = query(q).getresult()[0]
self.assertIsInstance(r[0], datetime)
self.assertIsInstance(r[1], datetime)
self.assertEqual(r[0], datetime.max)
self.assertEqual(r[1], datetime.min)
q = "select 'infinity'::timestamp, '-infinity'::timestamp"
r = query(q).getresult()[0]
self.assertIsInstance(r[0], datetime)
self.assertIsInstance(r[1], datetime)
self.assertEqual(r[0], datetime.max)
self.assertEqual(r[1], datetime.min)
def testTimestamptz(self):
query = self.db.query
timezones = dict(CET=1, EET=2, EST=-5, UTC=0)
for timezone in sorted(timezones):
tz = '%+03d00' % timezones[timezone]
try:
tzinfo = datetime.strptime(tz, '%z').tzinfo
except ValueError: # Python < 3.2
tzinfo = pg._get_timezone(tz)
self.db.set_parameter('timezone', timezone)
for datestyle in ('ISO', 'Postgres, MDY', 'Postgres, DMY',
'SQL, MDY', 'SQL, DMY', 'German'):
self.db.set_parameter('datestyle', datestyle)
d = datetime(2016, 3, 14, tzinfo=tzinfo)
q = "select $1::timestamptz"
r = query(q, (d,)).getresult()[0][0]
self.assertIsInstance(r, datetime)
self.assertEqual(r, d)
d = datetime(2016, 3, 14, 15, 9, 26, tzinfo=tzinfo)
q = "select $1::timestamptz"
r = query(q, (d,)).getresult()[0][0]
self.assertIsInstance(r, datetime)
self.assertEqual(r, d)
d = datetime(2016, 3, 14, 15, 9, 26, 535897, tzinfo)
q = "select $1::timestamptz"
r = query(q, (d,)).getresult()[0][0]
self.assertIsInstance(r, datetime)
self.assertEqual(r, d)
q = ("select '10000-08-01 AD'::timestamptz,"
" '0099-01-08 BC'::timestamptz")
r = query(q).getresult()[0]
self.assertIsInstance(r[0], datetime)
self.assertIsInstance(r[1], datetime)
self.assertEqual(r[0], datetime.max)
self.assertEqual(r[1], datetime.min)
q = "select 'infinity'::timestamptz, '-infinity'::timestamptz"
r = query(q).getresult()[0]
self.assertIsInstance(r[0], datetime)
self.assertIsInstance(r[1], datetime)
self.assertEqual(r[0], datetime.max)
self.assertEqual(r[1], datetime.min)
def cast_timestamptz(value, connection):
"""Cast a timestamptz value."""
if value == '-infinity':
return datetime.min
if value == 'infinity':
return datetime.max
value = value.split()
if value[-1] == 'BC':
return datetime.min
fmt = connection.date_format()
if fmt.endswith('-%Y') and len(value) > 2:
value = value[1:]
if len(value[3]) > 4:
return datetime.max
fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
'%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
value, tz = value[:-1], value[-1]
else:
if fmt.startswith('%Y-'):
tz = _re_timezone.match(value[1])
if tz:
value[1], tz = tz.groups()
else:
tz = '+0000'
else:
value, tz = value[:-1], value[-1]
if len(value[0]) > 10:
return datetime.max
fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
if _has_timezone:
value.append(_timezone_as_offset(tz))
fmt.append('%z')
return datetime.strptime(' '.join(value), ' '.join(fmt))
return datetime.strptime(' '.join(value), ' '.join(fmt)).replace(
tzinfo=_get_timezone(tz))
def cast_timestamptz(value, connection):
"""Cast a timestamptz value."""
if value == '-infinity':
return datetime.min
if value == 'infinity':
return datetime.max
value = value.split()
if value[-1] == 'BC':
return datetime.min
fmt = connection.date_format()
if fmt.endswith('-%Y') and len(value) > 2:
value = value[1:]
if len(value[3]) > 4:
return datetime.max
fmt = ['%d %b' if fmt.startswith('%d') else '%b %d',
'%H:%M:%S.%f' if len(value[2]) > 8 else '%H:%M:%S', '%Y']
value, tz = value[:-1], value[-1]
else:
if fmt.startswith('%Y-'):
tz = _re_timezone.match(value[1])
if tz:
value[1], tz = tz.groups()
else:
tz = '+0000'
else:
value, tz = value[:-1], value[-1]
if len(value[0]) > 10:
return datetime.max
fmt = [fmt, '%H:%M:%S.%f' if len(value[1]) > 8 else '%H:%M:%S']
if _has_timezone:
value.append(_timezone_as_offset(tz))
fmt.append('%z')
return datetime.strptime(' '.join(value), ' '.join(fmt))
return datetime.strptime(' '.join(value), ' '.join(fmt)).replace(
tzinfo=_get_timezone(tz))
def __init__(self, batch_name, sd, mimetype_files=None):
self.sd = sd
self.batch = batch_name
self.log_fp = None
self.num_files = 0
self.total_time = 0
self.min_time = timedelta.max
self.max_time = timedelta.min
self.earliest_time = datetime.max
self.latest_time = datetime.min
self.queue = self.sd.get_obj('output_queue')
self.domain = self.sd.get_obj('output_domain')
def calculateID(self, file_name_fullpath):
# Get the creation date for the first PersistenceItem in the audit (they will all be the same)
instanceID = datetime.min
tmp_instanceID = None
try:
file_object = loadFile(file_name_fullpath)
root = ET.parse(file_object).getroot()
file_object.close()
reg_key = root.find('PersistenceItem')
reg_modified = reg_key.get('created')
try:
tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ")
except ValueError as e:
tmp_instanceID = datetime.max
logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath))
instanceID = tmp_instanceID
except Exception:
traceback.print_exc(file=sys.stdout)
# If we found no PersistenceItem date we go with plan B (but most probably this is corrupt and will fail later)
if instanceID is None:
file_object = loadFile(file_name_fullpath)
content = file_object.read()
instanceID = hashlib.md5(content).hexdigest()
file_object.close()
return instanceID
def calculateID(self, file_name_fullpath):
# Get the creation date for the first PersistenceItem in the audit (they will all be the same)
instanceID = datetime.min
tmp_instanceID = None
try:
file_object = loadFile(file_name_fullpath)
root = ET.parse(file_object).getroot()
file_object.close()
reg_key = root.find('AppCompatItemExtended')
reg_modified = reg_key.get('created')
try:
tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ")
except ValueError as e:
tmp_instanceID = datetime.max
logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath))
instanceID = tmp_instanceID
except Exception:
traceback.print_exc(file=sys.stdout)
# If we found no PersistenceItem date we go with plan B (but most probably this is corrupt and will fail later)
if instanceID is None:
file_object = loadFile(file_name_fullpath)
content = file_object.read()
instanceID = hashlib.md5(content).hexdigest()
file_object.close()
return instanceID
appcompat_mirregistryaudit.py 文件源码
项目:appcompatprocessor
作者: mbevilacqua
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def calculateID(self, file_name_fullpath):
instanceID = datetime.min
tmp_instanceID = None
try:
file_object = loadFile(file_name_fullpath)
root = ET.parse(file_object).getroot()
file_object.close()
for reg_key in root.findall('RegistryItem'):
tmp_reg_key = reg_key.find('Modified')
if tmp_reg_key is not None:
reg_modified = tmp_reg_key.text
try:
tmp_instanceID = datetime.strptime(reg_modified, "%Y-%m-%dT%H:%M:%SZ")
except ValueError as e:
tmp_instanceID = datetime.max
logger.warning("Invalid reg_modified date found!: %s (%s)" % (reg_modified, file_name_fullpath))
if instanceID < tmp_instanceID:
instanceID = tmp_instanceID
else:
logger.warning("Found RegistryItem with no Modified date (Mir bug?): %s" % file_name_fullpath)
except Exception:
logger.exception("Error on calculateID for: %s" % file_name_fullpath)
# If we found no Modified date in any of the RegistryItems we go with plan B (but most probably ShimCacheParser will fail to parse anyway)
if instanceID is None:
file_object = loadFile(file_name_fullpath)
content = file_object.read()
instanceID = hashlib.md5(content).hexdigest()
file_object.close()
return instanceID