def __preptime(self,when):
"""
Extract information in a suitable format from when,
a datetime.datetime object.
"""
# datetime days are numbered in the Gregorian calendar
# while the calculations from NOAA are distibuted as
# OpenOffice spreadsheets with days numbered from
# 1/1/1900. The difference are those numbers taken for
# 18/12/2010
self.day = when.toordinal()-(734124-40529)
t=when.time()
self.time= (t.hour + t.minute/60.0 + t.second/3600.0)/24.0
self.timezone=0
offset=when.utcoffset()
if not offset is None:
self.timezone=offset.seconds/3600.0
python类time()的实例源码
def timedur_to_IB(timedur: str) -> str:
"""
Convert a user-input ambiguous time duration string to IB-style
durationString and check validility.
:param timedur: A user-input ambiguous time duration string,
like '1 min', '5days',etc.
:returns: IB-style durationString
"""
tdur = timedur_standardize(timedur)
t_num = re.findall('\d+', tdur)[0]
t_unit = tdur[-1]
if t_unit in ['m', 'h']:
multip = {'m': 60, 'h': 3600}[t_unit]
t_num = str(multip * int(t_num))
t_unit = 's'
if t_unit in ['s', 'd', 'W', 'M', 'Y']:
return t_num + ' ' + t_unit.upper()
else:
raise TypeError(
"Invalid input time duration string: {}!".format(timedur))
def from_frontend_value(key, value):
"""Returns a `SiteConfiguration` object value for the relevant `key` and
JSON-serializable `value`, applying any transformation reversed by to_frontend_value."""
if key == NICETIES_OPEN:
from datetime import timedelta
return timedelta(days=value)
elif key == CLOSING_TIME:
from datetime import datetime
return datetime.strptime(value, '%H:%M').time()
elif key == CLOSING_BUFFER:
from datetime import timedelta
return timedelta(minutes=value)
elif key == CACHE_TIMEOUT:
from datetime import timedelta
return timedelta(seconds=value)
elif key == INCLUDE_FACULTY:
return value
elif key == INCLUDE_RESIDENTS:
return value
else:
raise ValueError('No such config key!')
def _DATETIME_to_python(self, value, dsc=None):
"""Connector/Python always returns naive datetime.datetime
Connector/Python always returns naive timestamps since MySQL has
no time zone support. Since Django needs non-naive, we need to add
the UTC time zone.
Returns datetime.datetime()
"""
if not value:
return None
dt = MySQLConverter._DATETIME_to_python(self, value)
if dt is None:
return None
if settings.USE_TZ and timezone.is_naive(dt):
dt = dt.replace(tzinfo=timezone.utc)
return dt
def _DATETIME_to_python(self, value, dsc=None):
"""Connector/Python always returns naive datetime.datetime
Connector/Python always returns naive timestamps since MySQL has
no time zone support. Since Django needs non-naive, we need to add
the UTC time zone.
Returns datetime.datetime()
"""
if not value:
return None
dt = MySQLConverter._DATETIME_to_python(self, value)
if dt is None:
return None
if settings.USE_TZ and timezone.is_naive(dt):
dt = dt.replace(tzinfo=timezone.utc)
return dt
def __init__(self, dt):
if not isinstance(dt, (datetime, date, timedelta, time)):
raise ValueError('You must use datetime, date, timedelta or time')
if isinstance(dt, datetime):
self.params = Parameters({'value': 'DATE-TIME'})
elif isinstance(dt, date):
self.params = Parameters({'value': 'DATE'})
elif isinstance(dt, time):
self.params = Parameters({'value': 'TIME'})
if (isinstance(dt, datetime) or isinstance(dt, time))\
and getattr(dt, 'tzinfo', False):
tzinfo = dt.tzinfo
if tzinfo is not pytz.utc and\
(tzutc is None or not isinstance(tzinfo, tzutc)):
# set the timezone as a parameter to the property
tzid = tzid_from_dt(dt)
if tzid:
self.params.update({'TZID': tzid})
self.dt = dt
def from_ical(cls, ical, timezone=None):
if isinstance(ical, cls):
return ical.dt
u = ical.upper()
if u.startswith(('P', '-P', '+P')):
return vDuration.from_ical(ical)
if len(ical) in (15, 16):
return vDatetime.from_ical(ical, timezone=timezone)
elif len(ical) == 8:
return vDate.from_ical(ical)
elif len(ical) in (6, 7):
return vTime.from_ical(ical)
else:
raise ValueError(
"Expected datetime, date, or time, got: '%s'" % ical
)
def __init__(self, manager):
# Do a delayed import to prevent possible circular import errors.
from boto.sdb.db.model import Model
self.model_class = Model
self.manager = manager
self.type_map = {bool: (self.encode_bool, self.decode_bool),
int: (self.encode_int, self.decode_int),
float: (self.encode_float, self.decode_float),
self.model_class: (
self.encode_reference, self.decode_reference
),
Key: (self.encode_reference, self.decode_reference),
datetime: (self.encode_datetime, self.decode_datetime),
date: (self.encode_date, self.decode_date),
time: (self.encode_time, self.decode_time),
Blob: (self.encode_blob, self.decode_blob),
str: (self.encode_string, self.decode_string),
}
if six.PY2:
self.type_map[long] = (self.encode_long, self.decode_long)
def decode_time(self, value):
""" converts strings in the form of HH:MM:SS.mmmmmm
(created by datetime.time.isoformat()) to
datetime.time objects.
Timzone-aware strings ("HH:MM:SS.mmmmmm+HH:MM") won't
be handled right now and will raise TimeDecodeError.
"""
if '-' in value or '+' in value:
# TODO: Handle tzinfo
raise TimeDecodeError("Can't handle timezone aware objects: %r" % value)
tmp = value.split('.')
arg = map(int, tmp[0].split(':'))
if len(tmp) == 2:
arg.append(int(tmp[1]))
return time(*arg)
def _connect(self):
args = dict(aws_access_key_id=self.db_user,
aws_secret_access_key=self.db_passwd,
is_secure=self.enable_ssl)
try:
region = [x for x in boto.sdb.regions() if x.endpoint == self.db_host][0]
args['region'] = region
except IndexError:
pass
self._sdb = boto.connect_sdb(**args)
# This assumes that the domain has already been created
# It's much more efficient to do it this way rather than
# having this make a roundtrip each time to validate.
# The downside is that if the domain doesn't exist, it breaks
self._domain = self._sdb.lookup(self.db_name, validate=False)
if not self._domain:
self._domain = self._sdb.create_domain(self.db_name)
feature_engineering.py 文件源码
项目:smart-battery-for-smart-energy-usage
作者: AnatolyPavlov
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def transform(self, df):
'''Transform input
time series into a set of time series for
each hourly-slice in a day, where each time
series for a given hour has days as its agument.'''
days = extract_days(df)
day = days[0]
next_day = days[1]
time_intervs_in_day = []
for i, datetime in enumerate(df.query('index >= @day and index < @next_day').index):
if datetime.time() not in time_intervs_in_day:
time_intervs_in_day.append(datetime.time())
#
df['time'] = [d.time() for i, d in enumerate(df.index)]# Adding time only column
time_intervs_data = {} # key=time interval, values = pd.DataFrame with daily time series
for time_intv in time_intervs_in_day:
time_intervs_data[time_intv] = df[df['time']==time_intv].drop('time', axis=1)
#
return time_intervs_data
feature_engineering.py 文件源码
项目:smart-battery-for-smart-energy-usage
作者: AnatolyPavlov
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def transform_inverse(self, df):
price = pd.read_csv(self.path_to_price, parse_dates=True, index_col='Unnamed: 0')
#
index = []
values = []
days = extract_days(df)
time_intervs_in_day = [d.time() for i, d in enumerate(price.index)]
for day in days:
i = 0
for time_intv in time_intervs_in_day:
if time_intv <= df.index[i].time():
values.append(df.values[i][0])
index.append(datetime.combine(day, time_intv))
else:
i+=1
values.append(df.values[i][0])
index.append(datetime.combine(day, time_intv))
df_out = pd.DataFrame(values, columns=[df.columns[0]], index=index)
return df_out
def sign_out(entry, time_out=None, forgot=False):
"""Sign out of an existing entry in the timesheet. If the user
forgot to sign out, flag the entry.
:param entry: `models.Entry` object. The entry to sign out.
:param time_out: (optional) `datetime.time` object. Specify the sign out time.
:param forgot: (optional) If true, user forgot to sign out. Entry will be flagged as forgotten.
:return: The signed out entry.
""" # noqa
if time_out is None:
time_out = datetime.today().time()
if forgot:
entry.forgot_sign_out = True
logger.info(
'{} forgot to sign out on {}.'.format(entry.user_id, entry.date)
)
else:
entry.time_out = time_out
logger.info('{} ({}) signed out.'.format(entry.user_id, entry.user_type))
return entry
def __init__(self, manager):
# Do a delayed import to prevent possible circular import errors.
from boto.sdb.db.model import Model
self.model_class = Model
self.manager = manager
self.type_map = {bool: (self.encode_bool, self.decode_bool),
int: (self.encode_int, self.decode_int),
float: (self.encode_float, self.decode_float),
self.model_class: (
self.encode_reference, self.decode_reference
),
Key: (self.encode_reference, self.decode_reference),
datetime: (self.encode_datetime, self.decode_datetime),
date: (self.encode_date, self.decode_date),
time: (self.encode_time, self.decode_time),
Blob: (self.encode_blob, self.decode_blob),
str: (self.encode_string, self.decode_string),
}
if six.PY2:
self.type_map[long] = (self.encode_long, self.decode_long)
def decode_time(self, value):
""" converts strings in the form of HH:MM:SS.mmmmmm
(created by datetime.time.isoformat()) to
datetime.time objects.
Timzone-aware strings ("HH:MM:SS.mmmmmm+HH:MM") won't
be handled right now and will raise TimeDecodeError.
"""
if '-' in value or '+' in value:
# TODO: Handle tzinfo
raise TimeDecodeError("Can't handle timezone aware objects: %r" % value)
tmp = value.split('.')
arg = map(int, tmp[0].split(':'))
if len(tmp) == 2:
arg.append(int(tmp[1]))
return time(*arg)
def _connect(self):
args = dict(aws_access_key_id=self.db_user,
aws_secret_access_key=self.db_passwd,
is_secure=self.enable_ssl)
try:
region = [x for x in boto.sdb.regions() if x.endpoint == self.db_host][0]
args['region'] = region
except IndexError:
pass
self._sdb = boto.connect_sdb(**args)
# This assumes that the domain has already been created
# It's much more efficient to do it this way rather than
# having this make a roundtrip each time to validate.
# The downside is that if the domain doesn't exist, it breaks
self._domain = self._sdb.lookup(self.db_name, validate=False)
if not self._domain:
self._domain = self._sdb.create_domain(self.db_name)
index.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 35
收藏 0
点赞 0
评论 0
def snap(self, freq='S'):
"""
Snap time stamps to nearest occurring frequency
"""
# Superdumb, punting on any optimizing
freq = to_offset(freq)
snapped = np.empty(len(self), dtype=_NS_DTYPE)
for i, v in enumerate(self):
s = v
if not freq.onOffset(s):
t0 = freq.rollback(s)
t1 = freq.rollforward(s)
if abs(s - t0) < abs(t1 - s):
s = t0
else:
s = t1
snapped[i] = s
# we know it conforms; skip check
return DatetimeIndex(snapped, freq=freq, verify_integrity=False)
test_sql.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 42
收藏 0
点赞 0
评论 0
def test_datetime_time(self):
# test support for datetime.time
df = DataFrame([time(9, 0, 0), time(9, 1, 30)], columns=["a"])
df.to_sql('test_time', self.conn, index=False)
res = read_sql_table('test_time', self.conn)
tm.assert_frame_equal(res, df)
# GH8341
# first, use the fallback to have the sqlite adapter put in place
sqlite_conn = TestSQLiteFallback.connect()
sql.to_sql(df, "test_time2", sqlite_conn, index=False)
res = sql.read_sql_query("SELECT * FROM test_time2", sqlite_conn)
ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
tm.assert_frame_equal(ref, res) # check if adapter is in place
# then test if sqlalchemy is unaffected by the sqlite adapter
sql.to_sql(df, "test_time3", self.conn, index=False)
if self.flavor == 'sqlite':
res = sql.read_sql_query("SELECT * FROM test_time3", self.conn)
ref = df.applymap(lambda _: _.strftime("%H:%M:%S.%f"))
tm.assert_frame_equal(ref, res)
res = sql.read_sql_table("test_time3", self.conn)
tm.assert_frame_equal(df, res)
def _is_between(check_time, start_time, end_time):
"""
check if a given time is between an interval
:param check_time: datetime.time obj
:param start_time: datetime.time obj
:param end_time: datetime.time obj
:return:
"""
if start_time.hour < check_time.hour < end_time.hour:
return True
elif check_time.hour == start_time.hour and check_time.hour < \
end_time.hour:
return check_time.minute >= start_time.minute
elif check_time.hour > start_time.hour and check_time.hour == \
end_time.hour:
return check_time.minute <= end_time.minute
elif check_time.hour == start_time.hour and check_time.hour == \
end_time.hour:
return start_time.minute <= check_time.minute <= end_time.minute
else:
return False
def next_class(self, time):
"""
get the class next to a given time
:param time: datetime.time object
:return: [course, course time]
"""
day = datetime.now().weekday()
weekdays = ['monday', 'tuesday', 'wednesday',
'thursday', 'friday']
courses = self.all_classes(weekdays[day])
for c in courses:
if self._is_before(time, c[duration][0]):
# return the first class that follows given time
return c
# no class after given time
return None
def _resolve_time(t):
"""
Resolves the time string into datetime.time. This method
is needed because Caltrain arrival/departure time hours
can exceed 23 (e.g. 24, 25), to signify trains that arrive
after 12 AM. The 'day' variable is incremented from 0 in
these situations, and the time resolved back to a valid
datetime.time (e.g. 24:30:00 becomes days=1, 00:30:00).
:param t: the time to resolve
:type t: str or unicode
:returns: tuple of days and datetime.time
"""
hour, minute, second = [int(x) for x in t.split(":")]
day, hour = divmod(hour, 24)
r = _BASE_DATE + timedelta(hours=hour,
minutes=minute,
seconds=second)
return day, r.time()
def _resolve_duration(start, end):
"""
Resolves the duration between two times. Departure/arrival
times that exceed 24 hours or cross a day boundary are correctly
resolved.
:param start: the time to resolve
:type start: Stop
:param end: the time to resolve
:type end: Stop
:returns: tuple of days and datetime.time
"""
start_time = _BASE_DATE + timedelta(hours=start.departure.hour,
minutes=start.departure.minute,
seconds=start.departure.second,
days=start.departure_day)
end_time = _BASE_DATE + timedelta(hours=end.arrival.hour,
minutes=end.arrival.minute,
seconds=end.arrival.second,
days=end.departure_day)
return end_time - start_time
def _pre_TIMESTAMP_LTZ_to_python(self, value, ctx):
"""
TIMESTAMP LTZ to datetime
This takes consideration of the session parameter TIMEZONE if
available. If not, tzlocal is used
"""
microseconds, fraction_of_nanoseconds = \
self._extract_timestamp(value, ctx)
tzinfo_value = self._get_session_tz()
try:
t0 = ZERO_EPOCH + timedelta(seconds=(microseconds))
t = pytz.utc.localize(t0, is_dst=False).astimezone(tzinfo_value)
return t, fraction_of_nanoseconds
except OverflowError:
logger.debug(
"OverflowError in converting from epoch time to "
"timestamp_ltz: %s(ms). Falling back to use struct_time."
)
return time.gmtime(microseconds), fraction_of_nanoseconds
def _TIME_to_python(self, ctx):
"""
TIME to formatted string, SnowflakeDateTime, or datetime.time
No timezone is attached.
"""
scale = ctx['scale']
conv0 = lambda value: datetime.utcfromtimestamp(float(value)).time()
def conv(value):
microseconds = float(value[0:-scale + 6])
return datetime.utcfromtimestamp(microseconds).time()
return conv if scale > 6 else conv0
def _derive_offset_timestamp(self, value, is_utc=False):
"""
Derives TZ offset and timestamp from the datatime object
"""
tzinfo = value.tzinfo
if tzinfo is None:
# If no tzinfo is attached, use local timezone.
tzinfo = self._get_session_tz() if not is_utc else pytz.UTC
t = pytz.utc.localize(value, is_dst=False).astimezone(tzinfo)
else:
# if tzinfo is attached, just covert to epoch time
# as the server expects it in UTC anyway
t = value
offset = tzinfo.utcoffset(
t.replace(tzinfo=None)).total_seconds() / 60 + 1440
return offset, t
def __init__(self, manager):
# Do a delayed import to prevent possible circular import errors.
from boto.sdb.db.model import Model
self.model_class = Model
self.manager = manager
self.type_map = {bool: (self.encode_bool, self.decode_bool),
int: (self.encode_int, self.decode_int),
float: (self.encode_float, self.decode_float),
self.model_class: (
self.encode_reference, self.decode_reference
),
Key: (self.encode_reference, self.decode_reference),
datetime: (self.encode_datetime, self.decode_datetime),
date: (self.encode_date, self.decode_date),
time: (self.encode_time, self.decode_time),
Blob: (self.encode_blob, self.decode_blob),
str: (self.encode_string, self.decode_string),
}
if six.PY2:
self.type_map[long] = (self.encode_long, self.decode_long)
def decode_time(self, value):
""" converts strings in the form of HH:MM:SS.mmmmmm
(created by datetime.time.isoformat()) to
datetime.time objects.
Timzone-aware strings ("HH:MM:SS.mmmmmm+HH:MM") won't
be handled right now and will raise TimeDecodeError.
"""
if '-' in value or '+' in value:
# TODO: Handle tzinfo
raise TimeDecodeError("Can't handle timezone aware objects: %r" % value)
tmp = value.split('.')
arg = map(int, tmp[0].split(':'))
if len(tmp) == 2:
arg.append(int(tmp[1]))
return time(*arg)
def _connect(self):
args = dict(aws_access_key_id=self.db_user,
aws_secret_access_key=self.db_passwd,
is_secure=self.enable_ssl)
try:
region = [x for x in boto.sdb.regions() if x.endpoint == self.db_host][0]
args['region'] = region
except IndexError:
pass
self._sdb = boto.connect_sdb(**args)
# This assumes that the domain has already been created
# It's much more efficient to do it this way rather than
# having this make a roundtrip each time to validate.
# The downside is that if the domain doesn't exist, it breaks
self._domain = self._sdb.lookup(self.db_name, validate=False)
if not self._domain:
self._domain = self._sdb.create_domain(self.db_name)
def isoformat(self, sep='T'):
"""
Formats the date as "%Y-%m-%d %H:%M:%S" with the sep param between the
date and time portions
:param set:
A single character of the separator to place between the date and
time
:return:
The formatted datetime as a unicode string in Python 3 and a byte
string in Python 2
"""
if self.microsecond == 0:
return self.strftime('0000-%%m-%%d%s%%H:%%M:%%S' % sep)
return self.strftime('0000-%%m-%%d%s%%H:%%M:%%S.%%f' % sep)
def isoformat(self, sep='T'):
"""
Formats the date as "%Y-%m-%d %H:%M:%S" with the sep param between the
date and time portions
:param set:
A single character of the separator to place between the date and
time
:return:
The formatted datetime as a unicode string in Python 3 and a byte
string in Python 2
"""
if self.microsecond == 0:
return self.strftime('0000-%%m-%%d%s%%H:%%M:%%S' % sep)
return self.strftime('0000-%%m-%%d%s%%H:%%M:%%S.%%f' % sep)