def __preptime(self,when):
"""
Extract information in a suitable format from when,
a datetime.datetime object.
"""
# datetime days are numbered in the Gregorian calendar
# while the calculations from NOAA are distibuted as
# OpenOffice spreadsheets with days numbered from
# 1/1/1900. The difference are those numbers taken for
# 18/12/2010
self.day = when.toordinal()-(734124-40529)
t=when.time()
self.time= (t.hour + t.minute/60.0 + t.second/3600.0)/24.0
self.timezone=0
offset=when.utcoffset()
if not offset is None:
self.timezone=offset.seconds/3600.0
python类time()的实例源码
def get_minute_in_day_to_message_statistic(self, message_statistic):
"""
Maps each minute in a day to the sum of the values of a message
statistic over all messages from that minute.
Args:
message_statistic: A function mapping a Message object to an int or
a float.
Returns:
minute_in_day_to_message_statistic: A dict mapping a time object
representing a minute in a day to the sum of the values of
message_statistic over all messages in self.messages from that
minute.
"""
minute_in_day_to_message_statistic = {}
for hour in range(self.HOURS_PER_DAY):
for minute in range(self.MINUTES_PER_HOUR):
minute_in_day = time(hour, minute)
minute_in_day_to_message_statistic[minute_in_day] = 0
for message in self.messages:
minute_in_day = time(message.timestamp.hour, message.timestamp.minute)
minute_in_day_to_message_statistic[minute_in_day] += message_statistic(message)
return minute_in_day_to_message_statistic
def n_messages_chi_square(self, time_interval):
"""
Computes a chi square test against the null hypothesis that the number
of messages is uniformly distributed across the time interval. Only
makes sense for the time intervals 'minute in hour', 'minute in day',
'hour' since those ones have a fixed number of values.
Args:
time_interval: One of 'minute in hour', 'minute in day', 'hour'.
Returns:
chisq: A float representing the chi square statistic where the
observations consist of the number of messages in each value of
time_interval and the null hypothesis is that the number of
messages is uniformly distributed.
p: A float representing the p-value of the chi square test.
"""
valid_time_intervals = ['minute in hour', 'minute in day', 'hour']
if time_interval not in valid_time_intervals:
raise ValueError('time_interval must be in {}'.format(valid_time_intervals))
result = chisquare(self.get_n_messages_in_time_interval(time_interval))
return (result.statistic, result.pvalue)
def QA_data_tick_resample(tick, type_='1min'):
data = tick['price'].resample(
type_, label='right', closed='left').ohlc()
data['volume'] = tick['vol'].resample(
type_, label='right', closed='left').sum()
data['code'] = tick['code'][0]
__data_ = pd.DataFrame()
_temp = tick.drop_duplicates('date')['date']
for item in _temp:
__data = data[item]
_data = __data[time(9, 31):time(11, 30)].append(
__data[time(13, 1):time(15, 0)])
__data_ = __data_.append(_data)
__data_['datetime'] = __data_.index
__data_['date'] = __data_['datetime'].apply(lambda x: str(x)[0:10])
return __data_.fillna(method='ffill').set_index(['datetime', 'code'], drop=False)
def _check_annotations(value):
"""
Recursively check that value is either of a "simple" type (number, string,
date/time) or is a (possibly nested) dict, list or numpy array containing
only simple types.
"""
if isinstance(value, np.ndarray):
if not issubclass(value.dtype.type, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. NumPy arrays with dtype %s"
"are not allowed" % value.dtype.type)
elif isinstance(value, dict):
for element in value.values():
_check_annotations(element)
elif isinstance(value, (list, tuple)):
for element in value:
_check_annotations(element)
elif not isinstance(value, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. Annotations of type %s are not"
"allowed" % type(value))
def _check_annotations(value):
"""
Recursively check that value is either of a "simple" type (number, string,
date/time) or is a (possibly nested) dict, list or numpy array containing
only simple types.
"""
if isinstance(value, np.ndarray):
if not issubclass(value.dtype.type, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. NumPy arrays with dtype %s"
"are not allowed" % value.dtype.type)
elif isinstance(value, dict):
for element in value.values():
_check_annotations(element)
elif isinstance(value, (list, tuple)):
for element in value:
_check_annotations(element)
elif not isinstance(value, ALLOWED_ANNOTATION_TYPES):
raise ValueError("Invalid annotation. Annotations of type %s are not"
"allowed" % type(value))
def __init__(self, value):
"""
Initializer value can be:
- integer_type: absolute nanoseconds in the day
- datetime.time: built-in time
- string_type: a string time of the form "HH:MM:SS[.mmmuuunnn]"
"""
if isinstance(value, six.integer_types):
self._from_timestamp(value)
elif isinstance(value, datetime.time):
self._from_time(value)
elif isinstance(value, six.string_types):
self._from_timestring(value)
else:
raise TypeError('Time arguments must be a whole number, datetime.time, or string')
def __init__(self, value):
"""
Initializer value can be:
- integer_type: absolute days from epoch (1970, 1, 1). Can be negative.
- datetime.date: built-in date
- string_type: a string time of the form "yyyy-mm-dd"
"""
if isinstance(value, six.integer_types):
self.days_from_epoch = value
elif isinstance(value, (datetime.date, datetime.datetime)):
self._from_timetuple(value.timetuple())
elif isinstance(value, six.string_types):
self._from_datestring(value)
else:
raise TypeError('Date arguments must be a whole number, datetime.date, or string')
def test_time():
"""Test a simple timeline"""
time_chart = TimeLine(truncate_label=1000)
time_chart.add('times', [
(time(1, 12, 29), 2),
(time(21, 2, 29), 10),
(time(12, 30, 59), 7)
])
q = time_chart.render_pyquery()
assert list(
map(lambda t: t.split(' ')[0],
q(".axis.x text").map(texts))) == [
'02:46:40',
'05:33:20',
'08:20:00',
'11:06:40',
'13:53:20',
'16:40:00',
'19:26:40']
def bind_processor(self, dialect):
datetime_time = datetime.time
format = self._storage_format
def process(value):
if value is None:
return None
elif isinstance(value, datetime_time):
return format % {
'hour': value.hour,
'minute': value.minute,
'second': value.second,
'microsecond': value.microsecond,
}
else:
raise TypeError("SQLite Time type only accepts Python "
"time objects as input.")
return process
def __init__(self, isolation_level=None, native_datetime=False, **kwargs):
default.DefaultDialect.__init__(self, **kwargs)
self.isolation_level = isolation_level
# this flag used by pysqlite dialect, and perhaps others in the
# future, to indicate the driver is handling date/timestamp
# conversions (and perhaps datetime/time as well on some hypothetical
# driver ?)
self.native_datetime = native_datetime
if self.dbapi is not None:
self.supports_default_values = (
self.dbapi.sqlite_version_info >= (3, 3, 8))
self.supports_cast = (
self.dbapi.sqlite_version_info >= (3, 2, 3))
self.supports_multivalues_insert = (
# http://www.sqlite.org/releaselog/3_7_11.html
self.dbapi.sqlite_version_info >= (3, 7, 11))
# see http://www.sqlalchemy.org/trac/ticket/2568
# as well as http://www.sqlite.org/src/info/600482d161
self._broken_fk_pragma_quotes = (
self.dbapi.sqlite_version_info < (3, 6, 14))
def result_processor(self, dialect, coltype):
time = datetime.time
def process(value):
# convert from a timedelta value
if value is not None:
microseconds = value.microseconds
seconds = value.seconds
minutes = seconds // 60
return time(minutes // 60,
minutes % 60,
seconds - minutes * 60,
microsecond=microseconds)
else:
return None
return process
def default(self, o,
dates=(datetime.datetime, datetime.date),
times=(datetime.time,),
textual=(decimal.Decimal, uuid.UUID, DjangoPromise),
isinstance=isinstance,
datetime=datetime.datetime,
text_type=text_type):
if isinstance(o, dates):
if not isinstance(o, datetime):
o = datetime(o.year, o.month, o.day, 0, 0, 0, 0)
r = o.isoformat()
if r.endswith("+00:00"):
r = r[:-6] + "Z"
return r
elif isinstance(o, times):
return o.isoformat()
elif isinstance(o, textual):
return text_type(o)
else:
return super(JsonEncoder, self).default(o)
def test_escape_any(self):
now = datetime.datetime.now()
self.assertEqual(escape_any('foo\n\r\\bar'), r'"foo\n\r\\bar"')
self.assertEqual(escape_any(now),
'"%s"^^xsd:dateTime' % now.isoformat())
self.assertEqual(escape_any(now.date()),
'"%s"^^xsd:date' % now.date().isoformat())
self.assertEqual(escape_any(now.time()),
'"%s"^^xsd:time' % now.time().isoformat())
self.assertEqual(escape_any(True), 'true')
self.assertEqual(escape_any(5), '5')
self.assertEqual(escape_any(Decimal(5.5)), '5.5')
self.assertEqual(escape_any(5.5), '"5.5"^^xsd:double')
self.assertEqual(escape_any(RDFTerm("raw")), 'raw')
self.assertEqual(escape_any(Node("subject", {})), 'subject')
with self.assertRaises(TypeError):
escape_any(int)
def put(self, job, next_t=None):
"""Queue a new job.
Args:
job (telegram.ext.Job): The ``Job`` instance representing the new job
next_t (Optional[int, float, datetime.timedelta, datetime.datetime, datetime.time]):
Time in or at which the job should run for the first time. This parameter will be
interpreted depending on its type.
``int`` or ``float`` will be interpreted as "seconds from now" in which the job
should run.
``datetime.timedelta`` will be interpreted as "time from now" in which the job
should run.
``datetime.datetime`` will be interpreted as a specific date and time at which the
job should run.
``datetime.time`` will be interpreted as a specific time at which the job should
run. This could be either today or, if the time has already passed, tomorrow.
"""
warnings.warn("'JobQueue.put' is being deprecated, use 'JobQueue.run_once', "
"'JobQueue.run_daily' or 'JobQueue.run_repeating' instead")
if job.job_queue is None:
job.job_queue = self
self._put(job, next_t=next_t)
def _main_loop(self):
"""
Thread target of thread ``job_queue``. Runs in background and performs ticks on the job
queue.
"""
while self._running:
# self._next_peek may be (re)scheduled during self.tick() or self.put()
with self.__next_peek_lock:
tmout = self._next_peek - time.time() if self._next_peek else None
self._next_peek = None
self.__tick.clear()
self.__tick.wait(tmout)
# If we were woken up by self.stop(), just bail out
if not self._running:
break
self.tick()
self.logger.debug('%s thread stopped', self.__class__.__name__)
def test_serial():
assert s.serialize_value(None) == 'x'
assert s.serialize_value(True) == 'true'
assert s.serialize_value(False) == 'false'
assert s.serialize_value(5) == 'i:5'
assert s.serialize_value(5.0) == 'f:5.0'
assert s.serialize_value(decimal.Decimal('5.5')) == 'n:5.5'
assert s.serialize_value('abc') == 's:abc'
assert s.serialize_value(b'abc') == 'b:YWJj'
assert s.serialize_value(b'abc') == 'b:YWJj'
assert s.serialize_value(datetime.date(2007, 12, 5)) == 'd:2007-12-05'
assert s.serialize_value(datetime.datetime(2007, 12, 5, 12, 30, 30, tzinfo=utc)) \
== 'dt:2007-12-05 12:30:30+00:00'
assert s.serialize_value(datetime.time(12, 34, 56)) == 't:12:34:56'
with raises(NotImplementedError):
s.serialize_value(csv.reader)
def __excel_date_dt(self, date):
adj = False
if isinstance(date, dt.date):
if self.__parent_wb.dates_1904:
epoch_tuple = (1904, 1, 1)
else:
epoch_tuple = (1899, 12, 31)
adj = True
if isinstance(date, dt.datetime):
epoch = dt.datetime(*epoch_tuple)
else:
epoch = dt.date(*epoch_tuple)
else: # it's a datetime.time instance
date = dt.datetime.combine(dt.datetime(1900, 1, 1), date)
epoch = dt.datetime(1900, 1, 1)
delta = date - epoch
xldate = delta.days + delta.seconds / 86400.0
# Add a day for Excel's missing leap day in 1900
if adj and xldate > 59:
xldate += 1
return xldate
def readDate(self):
"""
Reads a UTC date from the data stream. Client and servers are
responsible for applying their own timezones.
Date: C{0x0B T7 T6} .. C{T0 Z1 Z2 T7} to C{T0} form a 64 bit
Big Endian number that specifies the number of nanoseconds
that have passed since 1/1/1970 0:00 to the specified time.
This format is UTC 1970. C{Z1} and C{Z0} for a 16 bit Big
Endian number indicating the indicated time's timezone in
minutes.
"""
ms = self.stream.read_double() / 1000.0
self.stream.read_short() # tz
# Timezones are ignored
d = util.get_datetime(ms)
if self.timezone_offset:
d = d + self.timezone_offset
self.context.addObject(d)
return d
def writeDate(self, d):
"""
Writes a date to the data stream.
@type d: Instance of C{datetime.datetime}
@param d: The date to be encoded to the AMF0 data stream.
"""
if isinstance(d, datetime.time):
raise pyamf.EncodeError(
'A datetime.time instance was found but AMF0 has no way to '
'encode time objects. Please use datetime.datetime instead '
'(got:%r)' % (d,)
)
# According to the Red5 implementation of AMF0, dates references are
# created, but not used.
if self.timezone_offset is not None:
d -= self.timezone_offset
secs = util.get_timestamp(d)
tz = 0
self.writeType(TYPE_DATE)
self.stream.write_double(secs * 1000.0)
self.stream.write_short(tz)
def get_timestamp(d):
"""
Returns a UTC timestamp for a C{datetime.datetime} object.
@type d: C{datetime.datetime}
@return: UTC timestamp.
@rtype: C{float}
@see: Inspiration taken from the U{Intertwingly blog
<http://intertwingly.net/blog/2007/09/02/Dealing-With-Dates>}.
"""
if isinstance(d, datetime.date) and not isinstance(d, datetime.datetime):
d = datetime.datetime.combine(d, datetime.time(0, 0, 0, 0))
msec = str(d.microsecond).rjust(6).replace(' ', '0')
return float('%s.%s' % (calendar.timegm(d.utctimetuple()), msec))
def date2values(date):
'''
Convert a date object into values for create or write
'''
res = {}
if not isinstance(date, datetime.datetime):
res['date'] = True
res['datetime'] = datetime.datetime.combine(date,
datetime.time())
else:
res['date'] = False
if date.tzinfo:
res['datetime'] = date.astimezone(tzlocal)
else:
res['datetime'] = date
return res
def sql_format(value):
if isinstance(value, (Query, Expression)):
return value
if value is None:
return None
if isinstance(value, basestring):
year, month, day = map(int, value.split("-", 2))
return datetime.date(year, month, day)
assert(isinstance(value, datetime.date))
# Allow datetime with min time for XML-RPC
# datetime must be tested separately because datetime is a
# subclass of date
assert(not isinstance(value, datetime.datetime)
or value.time() == datetime.time())
return value
def test_parse_time_timezone(self):
self.check_time_tz("+01", 3600)
self.check_time_tz("-01", -3600)
self.check_time_tz("+01:15", 4500)
self.check_time_tz("-01:15", -4500)
# The Python datetime module does not support time zone
# offsets that are not a whole number of minutes.
# We round the offset to the nearest minute.
self.check_time_tz("+01:15:00", 60 * (60 + 15))
self.check_time_tz("+01:15:29", 60 * (60 + 15))
self.check_time_tz("+01:15:30", 60 * (60 + 16))
self.check_time_tz("+01:15:59", 60 * (60 + 16))
self.check_time_tz("-01:15:00", -60 * (60 + 15))
self.check_time_tz("-01:15:29", -60 * (60 + 15))
self.check_time_tz("-01:15:30", -60 * (60 + 16))
self.check_time_tz("-01:15:59", -60 * (60 + 16))
def check_datetime_tz(self, str_offset, offset):
from datetime import datetime, timedelta
base = datetime(2007, 1, 1, 13, 30, 29)
base_str = '2007-01-01 13:30:29'
value = self.DATETIME(base_str + str_offset, self.curs)
# Value has time zone info and correct UTC offset.
self.assertNotEqual(value.tzinfo, None),
self.assertEqual(value.utcoffset(), timedelta(seconds=offset))
# Datetime is correct.
self.assertEqual(value.replace(tzinfo=None), base)
# Conversion to UTC produces the expected offset.
UTC = FixedOffsetTimezone(0, "UTC")
value_utc = value.astimezone(UTC).replace(tzinfo=None)
self.assertEqual(base - value_utc, timedelta(seconds=offset))
def test_parse_datetime_timezone(self):
self.check_datetime_tz("+01", 3600)
self.check_datetime_tz("-01", -3600)
self.check_datetime_tz("+01:15", 4500)
self.check_datetime_tz("-01:15", -4500)
# The Python datetime module does not support time zone
# offsets that are not a whole number of minutes.
# We round the offset to the nearest minute.
self.check_datetime_tz("+01:15:00", 60 * (60 + 15))
self.check_datetime_tz("+01:15:29", 60 * (60 + 15))
self.check_datetime_tz("+01:15:30", 60 * (60 + 16))
self.check_datetime_tz("+01:15:59", 60 * (60 + 16))
self.check_datetime_tz("-01:15:00", -60 * (60 + 15))
self.check_datetime_tz("-01:15:29", -60 * (60 + 15))
self.check_datetime_tz("-01:15:30", -60 * (60 + 16))
self.check_datetime_tz("-01:15:59", -60 * (60 + 16))
def test_parse_time_timezone(self):
self.check_time_tz("+01", 3600)
self.check_time_tz("-01", -3600)
self.check_time_tz("+01:15", 4500)
self.check_time_tz("-01:15", -4500)
# The Python datetime module does not support time zone
# offsets that are not a whole number of minutes.
# We round the offset to the nearest minute.
self.check_time_tz("+01:15:00", 60 * (60 + 15))
self.check_time_tz("+01:15:29", 60 * (60 + 15))
self.check_time_tz("+01:15:30", 60 * (60 + 16))
self.check_time_tz("+01:15:59", 60 * (60 + 16))
self.check_time_tz("-01:15:00", -60 * (60 + 15))
self.check_time_tz("-01:15:29", -60 * (60 + 15))
self.check_time_tz("-01:15:30", -60 * (60 + 16))
self.check_time_tz("-01:15:59", -60 * (60 + 16))
def check_datetime_tz(self, str_offset, offset):
from datetime import datetime, timedelta
base = datetime(2007, 1, 1, 13, 30, 29)
base_str = '2007-01-01 13:30:29'
value = self.DATETIME(base_str + str_offset, self.curs)
# Value has time zone info and correct UTC offset.
self.assertNotEqual(value.tzinfo, None),
self.assertEqual(value.utcoffset(), timedelta(seconds=offset))
# Datetime is correct.
self.assertEqual(value.replace(tzinfo=None), base)
# Conversion to UTC produces the expected offset.
UTC = FixedOffsetTimezone(0, "UTC")
value_utc = value.astimezone(UTC).replace(tzinfo=None)
self.assertEqual(base - value_utc, timedelta(seconds=offset))
def test_parse_datetime_timezone(self):
self.check_datetime_tz("+01", 3600)
self.check_datetime_tz("-01", -3600)
self.check_datetime_tz("+01:15", 4500)
self.check_datetime_tz("-01:15", -4500)
# The Python datetime module does not support time zone
# offsets that are not a whole number of minutes.
# We round the offset to the nearest minute.
self.check_datetime_tz("+01:15:00", 60 * (60 + 15))
self.check_datetime_tz("+01:15:29", 60 * (60 + 15))
self.check_datetime_tz("+01:15:30", 60 * (60 + 16))
self.check_datetime_tz("+01:15:59", 60 * (60 + 16))
self.check_datetime_tz("-01:15:00", -60 * (60 + 15))
self.check_datetime_tz("-01:15:29", -60 * (60 + 15))
self.check_datetime_tz("-01:15:30", -60 * (60 + 16))
self.check_datetime_tz("-01:15:59", -60 * (60 + 16))
def escape_parameter(v):
if v is None:
return 'NULL'
t = type(v)
if t == str:
return u"'" + v.replace(u"'", u"''") + u"'"
elif t == bool:
return u"TRUE" if v else u"FALSE"
elif t == time.struct_time:
return u'%04d-%02d-%02d %02d:%02d:%02d' % (
v.tm_year, v.tm_mon, v.tm_mday, v.tm_hour, v.tm_min, v.tm_sec)
elif t == datetime.datetime:
return "timestamp '" + v.isoformat() + "'"
elif t == datetime.date:
return "date '" + str(v) + "'"
elif t == datetime.timedelta:
return u"interval '" + str(v) + "'"
elif t == int or t == float:
return str(v)
elif t == decimal.Decimal:
return "decimal '" + str(v) + "'"
else:
return "'" + str(v) + "'"