def _end_of_trading(self, *arg, **kwargs):
# ????????,??????(????????????)
# ???????????
# self._make_slice(self)
if self.backtest_type in ['day']:
self.now = str(self.end_real_date)
self.today = str(self.end_real_date)
elif self.backtest_type in ['1min', '5min', '15min', '30min', '60min']:
self.now = str(self.end_real_date) + ' 15:00:00'
self.today = str(self.end_real_date)
elif self.backtest_type in ['index_day']:
self.now = str(self.end_real_date)
self.today = str(self.end_real_date)
elif self.backtest_type in ['index_1min', 'index_5min', 'index_15min', 'index_30min', 'index_60min']:
self.now = str(self.end_real_date) + ' 15:00:00'
self.today = str(self.end_real_date)
self.today = self.end_real_date
self.sell_all()
self._deal_from_order_queue()
self.__sync_order_LM('daily_settle') # ????
python类now()的实例源码
def DeleteOlderFiles(workfolder, days):
"""
Used to delete older backups in a folder, days is retention days
Sample use to delete all files in C:temp with created date older than 3 days:
DeleteOlderFiles(r'c:\temp', 3)
"""
# os, time already imported
now = time.time()
cutoff = now - (days * 86400)
filelist = os.listdir(workfolder)
for x in filelist:
if os.path.isfile( workfolder + '\\' + x):
t = os.stat( workfolder + '\\' + x )
c = t.st_ctime
# delete file if older than a week
if c < cutoff:
print('deleting ' + x)
os.remove(workfolder + '\\' + x )
def Default(self):
try:
self["label2"].setText(_("Default"))
now = datetime.now()
info1 = 'Date = ' + now.strftime("%d-%B-%Y") + "\n"
info2 = 'Time = ' + now.strftime("%H:%M:%S") + "\n"
info3 = self.Do_cmd("uptime", None, None)
tmp = info3.split(",")
info3 = 'Uptime = ' + tmp[0].lstrip() + "\n"
info4 = self.Do_cmd("cat", "/etc/image-version", " | head -n 1")
info4 = info4[9:]
info4 = 'Boxtype = ' + info4 + "\n"
info5 = 'Load = ' + self.Do_cmd("cat", "/proc/loadavg", None)
info6 = self.Do_cut(info1 + info2 + info3 + info4 + info5)
self["label1"].setText(info6)
except:
self["label1"].setText(_("an internal error has occured"))
def process_request(self, request, spider):
parsed_url = urlparse.urlparse(request.url)
if not self.test_mode or not parsed_url.path in ["/", ""]:
return None
if not Domain.is_onion_url(request.url):
return None
d = Domain.find_by_url(request.url)
if d is None:
return None
now = datetime.now()
if now > d.next_scheduled_check:
return None
else:
raise IgnoreRequest('FilterNotScheduledMiddleware: %s is not scheduled to check' % d.host)
def getStockData(theTicker, startDate):
stock = Share(theTicker)
print("Getting Data for ... " + theTicker)
now = datetime.now()
DateNow = str(now.year) + "-" + str(now.month) + "-" + str(now.day)
data = stock.get_historical(startDate, DateNow)
stockData = []
for d in data:
tmp = []
volume = int(d['Volume'])
adjclose = float(d['Adj_Close'])
high = float(d['High'])
low = float(d['Low'])
close = float(d['Close'])
date = d['Date']
open = float(d['Open'])
tmp.append(date)
tmp.append(open)
tmp.append(high)
tmp.append(low)
tmp.append(close)
tmp.append(adjclose)
tmp.append(volume)
stockData.append(tmp)
return stockData
def test_astimezone(self):
# Pretty boring! The TZ test is more interesting here. astimezone()
# simply can't be applied to a naive object.
dt = self.theclass.now()
f = FixedOffset(44, "")
self.assertRaises(TypeError, dt.astimezone) # not enough args
self.assertRaises(TypeError, dt.astimezone, f, f) # too many args
self.assertRaises(TypeError, dt.astimezone, dt) # arg wrong type
self.assertRaises(ValueError, dt.astimezone, f) # naive
self.assertRaises(ValueError, dt.astimezone, tz=f) # naive
class Bogus(tzinfo):
def utcoffset(self, dt): return None
def dst(self, dt): return timedelta(0)
bog = Bogus()
self.assertRaises(ValueError, dt.astimezone, bog) # naive
class AlsoBogus(tzinfo):
def utcoffset(self, dt): return timedelta(0)
def dst(self, dt): return None
alsobog = AlsoBogus()
self.assertRaises(ValueError, dt.astimezone, alsobog) # also naive
def test_astimezone(self):
# Pretty boring! The TZ test is more interesting here. astimezone()
# simply can't be applied to a naive object.
dt = self.theclass.now()
f = FixedOffset(44, "")
self.assertRaises(TypeError, dt.astimezone) # not enough args
self.assertRaises(TypeError, dt.astimezone, f, f) # too many args
self.assertRaises(TypeError, dt.astimezone, dt) # arg wrong type
self.assertRaises(ValueError, dt.astimezone, f) # naive
self.assertRaises(ValueError, dt.astimezone, tz=f) # naive
class Bogus(tzinfo):
def utcoffset(self, dt): return None
def dst(self, dt): return timedelta(0)
bog = Bogus()
self.assertRaises(ValueError, dt.astimezone, bog) # naive
class AlsoBogus(tzinfo):
def utcoffset(self, dt): return timedelta(0)
def dst(self, dt): return None
alsobog = AlsoBogus()
self.assertRaises(ValueError, dt.astimezone, alsobog) # also naive
def observe_inventory(owner, repo_name, pulls):
for metric in ['additions', 'commits', 'deletions']:
metric_sum = None
if len(pulls) > 0:
metric_sum = sum([getattr(p, metric) for p in pulls])
else:
metric_sum = 0
logger.info(
'Observed for owner "%s", repo "%s", %d %s' % (owner, repo_name, metric_sum, metric))
CODE_INVENTORY.labels(owner, repo_name, metric).set(metric_sum)
for pull in pulls:
days_old = weekdays_between(pull.created_at, datetime.now())
logger.info(
'Observed for owner "%s", repo "%s", %.2f days old PR' % (owner, repo_name, days_old))
CODE_INVENTORY_AGE.labels(owner, repo_name).observe(days_old)
def trigger(self):
if self.type == TRAFFIC_FLOW_ONE_SHOT:
sleep(int(self.offset))
print "Started command at ", str(datetime.now())
sys.stdout.flush()
#os.system(self.cmd + " &")
try:
cmd_list = self.cmd.split(' ')
print cmd_list
print self.cmd
sys.stdout.flush()
p = subprocess.Popen(cmd_list,shell=False)
except:
print "Error running command: ", sys.exec_info()[0]
elif self.type == TRAFFIC_FLOW_EXPONENTIAL:
pass
elif self.type == TRAFFIC_FLOW_PERIODIC:
pass
def performRFClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
Random Forest Binary Classification
"""
clf = RandomForestClassifier(n_estimators=100, n_jobs=-1)
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
return accuracy
def performSVMClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
SVM binary Classification
"""
# c = parameters[0]
# g = parameters[1]
clf = SVC()
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
return accuracy
def performAdaBoostClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
Ada Boosting binary Classification
"""
# n = parameters[0]
# l = parameters[1]
clf = AdaBoostClassifier()
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
print "AdaBoost: ", accuracy
def performRFClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
Random Forest Binary Classification
"""
clf = RandomForestClassifier(n_estimators=100, n_jobs=-1)
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
print "RF: ", accuracy
def performSVMClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
SVM binary Classification
"""
# c = parameters[0]
# g = parameters[1]
clf = SVC()
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
print "SVM: ", accuracy
def performAdaBoostClass(X_train, y_train, X_test, y_test, fout, savemodel):
"""
Ada Boosting binary Classification
"""
# n = parameters[0]
# l = parameters[1]
clf = AdaBoostClassifier()
clf.fit(X_train, y_train)
# if savemodel == True:
# fname_out = '{}-{}.pickle'.format(fout, datetime.now())
# with open(fname_out, 'wb') as f:
# cPickle.dump(clf, f, -1)
accuracy = clf.score(X_test, y_test)
print "AdaBoost: ", accuracy
def main():
now = datetime.now()
aDay = timedelta(days=-1)
now = now + aDay
yesterdaystr = now.strftime('%Y-%m-%d')
# 10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27
getYesterdaySoccer('2017-10-28')
# if sys.argv.__len__()==1:
# sys.exit('\033[0;36;40m????:\n1???:\n1:?????? 2:??????? 3:????\n??: python TodaySoccer.pyc 1\033[0m')
#
# if __name__ == '__main__':
# getTodaySoccer(sys.argv[1])
# getTodaySoccer(3)
def now(self):
"""
Creates a Ticktock object with the current time, equivalent to datetime.now()
Returns
=======
out : ticktock
Ticktock object with the current time, equivalent to datetime.now()
See Also
========
datetime.datetime.now()
"""
dt = datetime.datetime.now()
return Ticktock(dt, 'utc')
# -----------------------------------------------
def today(self):
"""
Creates a Ticktock object with the current date and time set to 00:00:00, equivalent to date.today() with time included
Returns
=======
out : ticktock
Ticktock object with the current time, equivalent to date.today() with time included
See Also
========
datetime.date.today()
"""
dt = datetime.datetime.now()
dt = dt.replace(hour=0, minute=0, second=0, microsecond=0)
return Ticktock(dt, 'utc')
# -----------------------------------------------
# End of Ticktock class
# -----------------------------------------------
def latest_post_date(self):
"""
Returns the latest item's pubdate or updateddate. If no items
have either of these attributes this returns the current UTC date/time.
"""
latest_date = None
date_keys = ('updateddate', 'pubdate')
for item in self.items:
for date_key in date_keys:
item_date = item.get(date_key)
if item_date:
if latest_date is None or item_date > latest_date:
latest_date = item_date
# datetime.now(tz=utc) is slower, as documented in django.utils.timezone.now
return latest_date or datetime.datetime.utcnow().replace(tzinfo=utc)
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_NaT_methods(self):
# GH 9513
raise_methods = ['astimezone', 'combine', 'ctime', 'dst',
'fromordinal', 'fromtimestamp', 'isocalendar',
'strftime', 'strptime', 'time', 'timestamp',
'timetuple', 'timetz', 'toordinal', 'tzname',
'utcfromtimestamp', 'utcnow', 'utcoffset',
'utctimetuple']
nat_methods = ['date', 'now', 'replace', 'to_datetime', 'today']
nan_methods = ['weekday', 'isoweekday']
for method in raise_methods:
if hasattr(NaT, method):
self.assertRaises(ValueError, getattr(NaT, method))
for method in nan_methods:
if hasattr(NaT, method):
self.assertTrue(np.isnan(getattr(NaT, method)()))
for method in nat_methods:
if hasattr(NaT, method):
self.assertIs(getattr(NaT, method)(), NaT)
# GH 12300
self.assertEqual(NaT.isoformat(), 'NaT')
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def test_class_ops_dateutil(self):
tm._skip_if_no_dateutil()
from dateutil.tz import tzutc
def compare(x, y):
self.assertEqual(int(np.round(Timestamp(x).value / 1e9)),
int(np.round(Timestamp(y).value / 1e9)))
compare(Timestamp.now(), datetime.now())
compare(Timestamp.now('UTC'), datetime.now(tzutc()))
compare(Timestamp.utcnow(), datetime.utcnow())
compare(Timestamp.today(), datetime.today())
current_time = calendar.timegm(datetime.now().utctimetuple())
compare(Timestamp.utcfromtimestamp(current_time),
datetime.utcfromtimestamp(current_time))
compare(Timestamp.fromtimestamp(current_time),
datetime.fromtimestamp(current_time))
date_component = datetime.utcnow()
time_component = (date_component + timedelta(minutes=10)).time()
compare(Timestamp.combine(date_component, time_component),
datetime.combine(date_component, time_component))
test_timeseries.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_timestamp_compare_scalars(self):
# case where ndim == 0
lhs = np.datetime64(datetime(2013, 12, 6))
rhs = Timestamp('now')
nat = Timestamp('nat')
ops = {'gt': 'lt',
'lt': 'gt',
'ge': 'le',
'le': 'ge',
'eq': 'eq',
'ne': 'ne'}
for left, right in ops.items():
left_f = getattr(operator, left)
right_f = getattr(operator, right)
expected = left_f(lhs, rhs)
result = right_f(rhs, lhs)
self.assertEqual(result, expected)
expected = left_f(rhs, nat)
result = right_f(nat, rhs)
self.assertEqual(result, expected)
test_common.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def test_isnull_datetime():
assert (not isnull(datetime.now()))
assert notnull(datetime.now())
idx = date_range('1/1/1990', periods=20)
assert (notnull(idx).all())
idx = np.asarray(idx)
idx[0] = iNaT
idx = DatetimeIndex(idx)
mask = isnull(idx)
assert (mask[0])
assert (not mask[1:].any())
# GH 9129
pidx = idx.to_period(freq='M')
mask = isnull(pidx)
assert (mask[0])
assert (not mask[1:].any())
mask = isnull(pidx[1:])
assert (not mask.any())
def checkDayAndSendMail():
todayDate = datetime.now()
start = datetime(todayDate.year, todayDate.month, todayDate.day)
end = start + timedelta(days=1)
global dateIndex
# if change date
if dateIndex < end :
dateIndex = end
# send mail notifying server still working
msg_content = {}
msg_content['Subject'] = '[Amazon Price Alert] Server working !'
msg_content['Content'] = 'Amazon Price Alert still working until %s !' % (todayDate.strftime('%Y-%m-%d %H:%M:%S'))
msg_content['Price'] = ""
msg_content['Time'] = todayDate.strftime('%Y-%m-%d %H:%M:%S')
msg_content['ServerState'] = "Working"
msg_content['code'] = 2 # 2 is server state
send_Notification(msg_content)
def log(prefix, text, line=False):
now = datetime.now()
message = ""
if prefix == '?':
c = Fore.CYAN
elif prefix == '+':
c = Fore.GREEN
elif prefix == '-':
c = Fore.RED
elif prefix == '!':
c = Fore.YELLOW
c = Style.BRIGHT + c
e = Style.RESET_ALL + Fore.RESET
if line:
print c+"["+now.strftime("%Y-%m-%d %H:%M")+"]["+prefix+"] "+text+e
else :
print "["+now.strftime("%Y-%m-%d %H:%M")+"]["+c+prefix+e+"] "+text
def fetch_price_data(stock):
utf_decoder = codecs.getreader("utf-8")
start_date = datetime.now() - timedelta(days=130)
start_date = start_date.strftime("%Y-%m-%d")
end_date = datetime.now().strftime("%Y-%m-%d")
try:
stocks_base_URL = 'https://query.yahooapis.com/v1/public/yql?q=select%20*%20from%20yahoo.finance.historicaldata%20where%20symbol%20%3D%20'
URL_end = '%20and%20startDate%20%3D%20%22' + start_date + '%22%20and%20endDate%20%3D%20%22' + end_date + '%22&format=json&diagnostics=true&env=store%3A%2F%2Fdatatables.org%2Falltableswithkeys&callback='
query = stocks_base_URL + "%22" + stock + "%22" + "%2C"
query = query[:-3] + URL_end
api_response = urllib.request.urlopen(query)
response_data = json.load(utf_decoder(api_response))['query']['results']['quote']
price_data[stock] = response_data
except:
print("ERROR fetching price data")
pdb.set_trace()
def test_astimezone(self):
# Pretty boring! The TZ test is more interesting here. astimezone()
# simply can't be applied to a naive object.
dt = self.theclass.now()
f = FixedOffset(44, "")
self.assertRaises(TypeError, dt.astimezone) # not enough args
self.assertRaises(TypeError, dt.astimezone, f, f) # too many args
self.assertRaises(TypeError, dt.astimezone, dt) # arg wrong type
self.assertRaises(ValueError, dt.astimezone, f) # naive
self.assertRaises(ValueError, dt.astimezone, tz=f) # naive
class Bogus(tzinfo):
def utcoffset(self, dt): return None
def dst(self, dt): return timedelta(0)
bog = Bogus()
self.assertRaises(ValueError, dt.astimezone, bog) # naive
class AlsoBogus(tzinfo):
def utcoffset(self, dt): return timedelta(0)
def dst(self, dt): return None
alsobog = AlsoBogus()
self.assertRaises(ValueError, dt.astimezone, alsobog) # also naive
def savealltable(self):
self.http_get()
s = req.content
s = etree.HTML(s)
x = s.xpath("//td")
messages.append("????????...")
from datetime import datetime
snow = str(datetime.now())
if os.path.exists('table') is False:
os.mkdir('table')
with open('table/'+snow+'.text', 'w') as f:
for child in x:
messages.append(child.text)
ui.showmssg.setText('\n'.join(messages))
print(child.text)
f.writelines(child.text + "\n")
def test_astimezone(self):
# Pretty boring! The TZ test is more interesting here. astimezone()
# simply can't be applied to a naive object.
dt = self.theclass.now()
f = FixedOffset(44, "")
self.assertRaises(TypeError, dt.astimezone) # not enough args
self.assertRaises(TypeError, dt.astimezone, f, f) # too many args
self.assertRaises(TypeError, dt.astimezone, dt) # arg wrong type
self.assertRaises(ValueError, dt.astimezone, f) # naive
self.assertRaises(ValueError, dt.astimezone, tz=f) # naive
class Bogus(tzinfo):
def utcoffset(self, dt): return None
def dst(self, dt): return timedelta(0)
bog = Bogus()
self.assertRaises(ValueError, dt.astimezone, bog) # naive
class AlsoBogus(tzinfo):
def utcoffset(self, dt): return timedelta(0)
def dst(self, dt): return None
alsobog = AlsoBogus()
self.assertRaises(ValueError, dt.astimezone, alsobog) # also naive
def send_updates(self):
""" Send updated to the KNX bus. """
d = datetime.now()
if self.timeaddr:
self.tunnel.group_write(self.timeaddr,
time_to_knx(d))
if self.dateaddr:
self.tunnel.group_write(self.dateaddr,
date_to_knx(d))
if self.datetimeaddr:
self.tunnel.group_write(self.datetimeaddr,
datetime_to_knx(d))
if self.daynightaddr:
from pysolar.solar import get_altitude
alt = get_altitude(self.lat, self.long, d)
if alt > 0:
self.tunnel.group_write(self.daynightaddr, 1)
else:
self.tunnel.group_write(self.daynightaddr, 0)