def nth_day_of_week(self, subtree):
if subtree.label().startswith('first'):
n = 0
if subtree.label().startswith('second'):
n = 1
if subtree.label().startswith('third'):
n = 2
if subtree.label().startswith('fourth'):
n = 3
if subtree.label().startswith('fifth'):
n = 4
if subtree.label().startswith('sixth'):
n = 5
if subtree.label().startswith('seventh'):
n = 6
d = dt.today()
self.dictionary['timestamp'] = self.next_weekday(d, n)
python类today()的实例源码
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_NaT_methods(self):
# GH 9513
raise_methods = ['astimezone', 'combine', 'ctime', 'dst',
'fromordinal', 'fromtimestamp', 'isocalendar',
'strftime', 'strptime', 'time', 'timestamp',
'timetuple', 'timetz', 'toordinal', 'tzname',
'utcfromtimestamp', 'utcnow', 'utcoffset',
'utctimetuple']
nat_methods = ['date', 'now', 'replace', 'to_datetime', 'today']
nan_methods = ['weekday', 'isoweekday']
for method in raise_methods:
if hasattr(NaT, method):
self.assertRaises(ValueError, getattr(NaT, method))
for method in nan_methods:
if hasattr(NaT, method):
self.assertTrue(np.isnan(getattr(NaT, method)()))
for method in nat_methods:
if hasattr(NaT, method):
self.assertIs(getattr(NaT, method)(), NaT)
# GH 12300
self.assertEqual(NaT.isoformat(), 'NaT')
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_class_ops_pytz(self):
tm._skip_if_no_pytz()
from pytz import timezone
def compare(x, y):
self.assertEqual(int(Timestamp(x).value / 1e9),
int(Timestamp(y).value / 1e9))
compare(Timestamp.now(), datetime.now())
compare(Timestamp.now('UTC'), datetime.now(timezone('UTC')))
compare(Timestamp.utcnow(), datetime.utcnow())
compare(Timestamp.today(), datetime.today())
current_time = calendar.timegm(datetime.now().utctimetuple())
compare(Timestamp.utcfromtimestamp(current_time),
datetime.utcfromtimestamp(current_time))
compare(Timestamp.fromtimestamp(current_time),
datetime.fromtimestamp(current_time))
date_component = datetime.utcnow()
time_component = (date_component + timedelta(minutes=10)).time()
compare(Timestamp.combine(date_component, time_component),
datetime.combine(date_component, time_component))
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_class_ops_dateutil(self):
tm._skip_if_no_dateutil()
from dateutil.tz import tzutc
def compare(x, y):
self.assertEqual(int(np.round(Timestamp(x).value / 1e9)),
int(np.round(Timestamp(y).value / 1e9)))
compare(Timestamp.now(), datetime.now())
compare(Timestamp.now('UTC'), datetime.now(tzutc()))
compare(Timestamp.utcnow(), datetime.utcnow())
compare(Timestamp.today(), datetime.today())
current_time = calendar.timegm(datetime.now().utctimetuple())
compare(Timestamp.utcfromtimestamp(current_time),
datetime.utcfromtimestamp(current_time))
compare(Timestamp.fromtimestamp(current_time),
datetime.fromtimestamp(current_time))
date_component = datetime.utcnow()
time_component = (date_component + timedelta(minutes=10)).time()
compare(Timestamp.combine(date_component, time_component),
datetime.combine(date_component, time_component))
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_date_range_normalize(self):
snap = datetime.today()
n = 50
rng = date_range(snap, periods=n, normalize=False, freq='2D')
offset = timedelta(2)
values = np.array([snap + i * offset for i in range(n)],
dtype='M8[ns]')
self.assert_numpy_array_equal(rng, values)
rng = date_range('1/1/2000 08:15', periods=n, normalize=False,
freq='B')
the_time = time(8, 15)
for val in rng:
self.assertEqual(val.time(), the_time)
def gerrit_search(self, instance, owner=None, reviewer=None):
max_age = datetime.today() - self.modified_after
max_age = max_age.days * 24 * 3600 + max_age.seconds
user_filter = 'owner:%s' % owner if owner else 'reviewer:%s' % reviewer
filters = ['-age:%ss' % max_age, user_filter]
# Determine the gerrit interface to use: SSH or REST API:
if 'host' in instance:
issues = self.gerrit_changes_over_ssh(instance, filters)
issues = [self.process_gerrit_ssh_issue(instance, issue)
for issue in issues]
elif 'url' in instance:
issues = self.gerrit_changes_over_rest(instance, filters)
issues = [self.process_gerrit_rest_issue(instance, issue)
for issue in issues]
else:
raise Exception('Invalid gerrit_instances configuration.')
# TODO(cjhopman): should we filter abandoned changes?
issues = filter(self.filter_issue, issues)
issues = sorted(issues, key=lambda i: i['modified'], reverse=True)
return issues
def gerrit_search(self, instance, owner=None, reviewer=None):
max_age = datetime.today() - self.modified_after
max_age = max_age.days * 24 * 3600 + max_age.seconds
user_filter = 'owner:%s' % owner if owner else 'reviewer:%s' % reviewer
filters = ['-age:%ss' % max_age, user_filter]
# Determine the gerrit interface to use: SSH or REST API:
if 'host' in instance:
issues = self.gerrit_changes_over_ssh(instance, filters)
issues = [self.process_gerrit_ssh_issue(instance, issue)
for issue in issues]
elif 'url' in instance:
issues = self.gerrit_changes_over_rest(instance, filters)
issues = [self.process_gerrit_rest_issue(instance, issue)
for issue in issues]
else:
raise Exception('Invalid gerrit_instances configuration.')
# TODO(cjhopman): should we filter abandoned changes?
issues = filter(self.filter_issue, issues)
issues = sorted(issues, key=lambda i: i['modified'], reverse=True)
return issues
def _valid_date(self):
"""Check and return a valid query date."""
date = self._parse_date(self.date)
if not date:
exit_after_echo(INVALID_DATE)
try:
date = datetime.datetime.strptime(date, '%Y%m%d')
except ValueError:
exit_after_echo(INVALID_DATE)
# A valid query date should within 50 days.
offset = date - datetime.datetime.today()
if offset.days not in range(-1, 50):
exit_after_echo(INVALID_DATE)
return date.strftime('%Y-%m-%d')
# return datetime.date.strftime(date, '%Y-%m-%d')
def _parse_date(date):
"""Parse from the user input `date`.
e.g. current year 2016:
input 6-26, 626, ... return 2016626
input 2016-6-26, 2016/6/26, ... retrun 2016626
This fn wouldn't check the date, it only gather the number as a string.
"""
result = ''.join(re.findall('\d', date))
l = len(result)
# User only input month and day, eg 6-1, 6.26, 0626...
if l in (2, 3, 4):
# year = str(datetime.today().year)
year = str(datetime.date.today().year)
return year + result # ?? ex:423--->2016423
# User input full format date, eg 201661, 2016-6-26, 20160626...
if l in (6, 7, 8):
return result
return ''
def today(self, subtree):
currentTime = dt.today()
currentTimestamp = mktime(currentTime.timetuple())
self.dictionary['timestamp'] = currentTimestamp
def tommorow(self, subtree):
dayAfterTime = dt.today() + timedelta(days=1)
dayAfterTimestamp = mktime(dayAfterTime.timetuple())
self.dictionary['timestamp'] = dayAfterTimestamp
def yesterday(self, subtree):
day_before_time = dt.today() + timedelta(days=-1)
day_before_timestamp = mktime(day_before_time.timetuple())
self.dictionary['timestamp'] = day_before_timestamp
def noon(self, subtree):
if('timestamp' not in self.dictionary):
self.dictionary['timestamp'] = mktime(dt.today().timetuple())
self.dictionary['timestamp'] += 12 * 3600
self.dictionary['hours'] = '12'
def processPage(self):
def filter_data(filter_vars, transactions, users, stock_payments, stock_payment_transactions, inventory_transactions, inventory):
import datetime
transactions = transactions.filter(timestamp__lt=datetime.date(filter_vars['year'], filter_vars['month'], filter_vars['day']))
inventory_transactions = inventory_transactions.filter(timestamp__lt=datetime.date(filter_vars['year'], filter_vars['month'], filter_vars['day']))
stock_payments = stock_payments.filter(timestamp__lt=datetime.date(filter_vars['year'], filter_vars['month'], filter_vars['day']))
stock_payment_transactions = stock_payment_transactions.filter(timestamp__lt=datetime.date(filter_vars['year'], filter_vars['month'], filter_vars['day']))
return filter_vars, transactions, users, stock_payments, stock_payment_transactions, inventory_transactions, inventory
not_enough_data = 0
float_amount = ['Float']
total_user_balance = ['User Credit']
stock_value = ['Stock Value']
owed = ['Owed Stock']
sup_float = ['Superficial Float']
for days_back in list(reversed(range(1, 29))):
d = datetime.today() - timedelta(days=days_back)
sim = FloatHistorySimulation(name='test', filter_method=filter_data, filter_vars={'year': d.year, 'month': d.month, 'day': d.day})
cel = sim.start()
try:
if cel.successful():
data = cel.get(timeout=1)
float_amount.append(float(data['float_amount']/100.0))
total_user_balance.append(float(data['total_user_balance']/100.0))
stock_value.append(float(data['stock_value']/100.0))
owed.append(float(data['owed']/100.0))
sup_float.append(float((((data['float_amount'] - data['total_user_balance']) - data['owed']) + data['stock_value'])/100.0))
elif cel.state in ('FAILURE', 'REVOKED'):
sim.delete_cache()
sim.start()
not_enough_data += 1
else:
not_enough_data += 1
except:
sim.delete_cache()
not_enough_data += 1
self.return_vars['graph_data'] = json.dumps([float_amount, total_user_balance, stock_value, owed, sup_float]) if not not_enough_data else None
self.return_vars['not_enough_data'] = not_enough_data
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_index_to_datetime(self):
idx = Index(['1/1/2000', '1/2/2000', '1/3/2000'])
result = idx.to_datetime()
expected = DatetimeIndex(datetools.to_datetime(idx.values))
self.assertTrue(result.equals(expected))
today = datetime.today()
idx = Index([today], dtype=object)
result = idx.to_datetime()
expected = DatetimeIndex([today])
self.assertTrue(result.equals(expected))
def get_credentials():
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
home_dir = os.path.expanduser('~')
credential_dir = os.path.join(home_dir, '.credentials')
if not os.path.exists(credential_dir):
os.makedirs(credential_dir)
credential_path = os.path.join(credential_dir,
'calendar-alfred-today.json')
store = oauth2client.file.Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def date_is_future(value):
if isinstance(value, datetime.date):
if value <= datetime.date.today():
raise ValidationError(
_("The date entered must be greater than today.")
)
elif isinstance(value, datetime.datetime):
if value.date() <= datetime.today():
raise ValidationError(
_("The date entered must be greater than today.")
)
else:
raise ValidationError(
_("The value entered isn't a valid type of date or datetime.")
)
def date_is_present_or_future(value):
if isinstance(value, datetime.date):
if value < datetime.date.today():
raise ValidationError(
_("The date entered must be today or lesser.")
)
elif isinstance(value, datetime.datetime):
if value.date() < datetime.datetime.today():
raise ValidationError(
_("The date entered must be today or greater.")
)
else:
raise ValidationError(
_("The value entered isn't a valid type of date or datetime.")
)
def date_is_past(value):
if isinstance(value, datetime.date):
if value >= datetime.date.today():
raise ValidationError(
_("The date entered must be today or lesser.")
)
elif isinstance(value, datetime.datetime):
if value.date() >= datetime.datetime.today():
raise ValidationError(
_("The date entered must be lesser than today.")
)
else:
raise ValidationError(
_("The value entered isn't a valid type of date or datetime.")
)
def date_is_present_or_past(value):
if isinstance(value, datetime.date):
if value > datetime.date.today():
raise ValidationError(
_("The date entered must be today or lesser.")
)
elif isinstance(value, datetime.datetime):
if value.date() > datetime.datetime.today():
raise ValidationError(
_("The date entered must be today or lesser.")
)
else:
raise ValidationError(
_("The value entered isn't a valid type of date or datetime.")
)
def today(cls):
"""Return today's date."""
result = cls._now()
return date_to_fakedate(result)
def today(cls):
"""Return a datetime object representing current time."""
return cls.now(tz=None)
def test_datetime_patch_set_time():
with TimeTravel(modules_to_patch=[__name__]) as t:
assert datetime_cls.today() == datetime_cls.fromtimestamp(_t(0))
t.clock.time = _t(1000)
assert datetime_cls.today() == datetime_cls.fromtimestamp(_t(1000))
def test_patch_stop_afer_scope_end():
with TimeTravel(modules_to_patch=__name__) as t:
assert datetime_cls.now() == datetime_cls.fromtimestamp(_t(0))
t.clock.time = _t(1000)
assert datetime_cls.today() == datetime_cls.fromtimestamp(_t(1000))
assert time.time() != _t(1000)
assert datetime_cls.today() != datetime_cls.fromtimestamp(_t(1000))
def test_inner_importing_of_datetime():
with TimeTravel(modules_to_patch=__name__):
import datetime
assert datetime.date.today() == datetime.date.fromtimestamp(_t(0))
def test_sleep_changing_datetime_now():
with TimeTravel(modules_to_patch=__name__):
assert datetime_cls.today() == datetime_cls.fromtimestamp(_t(0))
time.sleep(150)
assert datetime_cls.now() == datetime_cls.fromtimestamp(_t(150))
def test_select_no_timeout():
with TimeTravel(modules_to_patch=__name__) as t:
sock = socket.socket()
t.add_future_event(2, sock, t.event_types.select.WRITE)
now = t.clock.time
assert select.select([], [sock], []) == ([], [sock], [])
assert time.time() == now + 2
assert datetime_cls.today() == datetime_cls.fromtimestamp(now + 2)
def test_select_with_timeout():
with TimeTravel(modules_to_patch=__name__) as t:
sock = socket.socket()
t.add_future_event(2, sock, t.event_types.select.EXCEPTIONAL)
now = t.clock.time
assert select.select([], [], [sock], 6) == ([], [], [sock])
assert time.time() == now + 2
assert datetime_cls.today() == datetime_cls.fromtimestamp(now + 2)
def test_select_timeout_occurring():
with TimeTravel(modules_to_patch=__name__) as t:
sock = socket.socket()
t.add_future_event(10, sock, t.event_types.select.READ)
now = t.clock.time
assert select.select([sock], [], [], 6) == ([], [], [])
assert time.time() == now + 6
assert datetime_cls.today() == datetime_cls.fromtimestamp(now + 6)
def freebusy_request(self, start, end):
"""
Search the calendar, but return only the free/busy information.
Parameters:
* start = datetime.today().
* end = same as above.
Returns:
* [FreeBusy(), ...]
"""
root = cdav.FreeBusyQuery() + [cdav.TimeRange(start, end)]
response = self._query(root, 1, 'report')
return FreeBusy(self, response.raw)