def main():
# read csv file and transform it to datafeed (df):
df = jhta.CSV2DF('data.csv')
# transform datafeed to heikin-ashi datafeed (df):
# df = jhta.DF2HEIKIN_ASHI(df)
# set numpy datafeed from df:
df_numpy = {
'datetime': np.array(df['datetime']),
'Open': np.array(df['Open'], dtype='float'),
'High': np.array(df['High'], dtype='float'),
'Low': np.array(df['Low'], dtype='float'),
'Close': np.array(df['Close'], dtype='float'),
'Volume': np.array(df['Volume'], dtype='int')
}
# set pandas datafeed from numpy datafeed:
# df_pandas = pd.Series(df_numpy)
# set talib indicator:
indicator = ta.SMA(df_numpy['Close'], 10)
# set jhtalib indicator:
indicator2 = jhta.SMA(df, 10)
# loop through datafeed (df):
i = 0
while i < len(df['Close']):
# print row:
print (df['datetime'][i]+' Open: '+str(df['Open'][i])+' High: '+str(df['High'][i])+' Low: '+str(df['Low'][i])+' Close: '+str(df['Close'][i])+' Volume: '+str(df['Volume'][i]))
# print indicators and check for differences between talib and jhtalib:
print (str(indicator[i])+' (talib)')
print (str(indicator2[i])+' (jhTAlib)')
i += 1
python类SMA的实例源码
def boll():
#??tushare??????
df=ts.get_k_data('300580',start='2017-01-12',end='2017-05-26')
#?????
closed=df['close'].values
upper,middle,lower=talib.BBANDS(closed,matype=talib.MA_Type.SMA)
print upper,middle,lower
plt.plot(upper)
plt.plot(middle)
plt.plot(lower)
plt.grid()
plt.show()
diff1=upper-middle
diff2=middle-lower
print diff1
print diff2
def isBuy(context, analysis):
# Bullish SMA Crossover
if (getLast(analysis, 'sma_test') == 1):
# Bullish MACD
if (getLast(analysis, 'macd_test') == 1):
return True
# # Bullish Stochastics
# if(getLast(analysis, 'stoch_over_sold') == 1):
# return True
# # Bullish RSI
# if(getLast(analysis, 'rsi_over_sold') == 1):
# return True
return False
def isSell(context, analysis):
# Bearish SMA Crossover
if (getLast(analysis, 'sma_test') == 0):
# Bearish MACD
if (getLast(analysis, 'macd_test') == 0):
return True
# # Bearish Stochastics
# if(getLast(analysis, 'stoch_over_bought') == 0):
# return True
# # Bearish RSI
# if(getLast(analysis, 'rsi_over_bought') == 0):
# return True
return False
def OnBarUpdate(self, data=Data, bar=Bar):
if len(self.C) < self.p_ma2:
return
# print('{0}-{1}'.format(self.D[-1], self.C[-1]))
ma1 = talib.SMA(self.C, self.p_ma1)
ma2 = talib.SMA(self.C, self.p_ma2)
self.IndexDict['ma5'] = ma1
self.IndexDict['ma10'] = ma2
if self.PositionLong == 0:
if ma1[-1] >= ma2[-1] and ma1[-2] < ma2[-2]:
if self.PositionShort > 0:
self.BuyToCover(self.O[-1], self.p_lots, '??')
self.Buy(self.O[-1], self.p_lots, '??')
elif self.PositionShort == 0:
if ma1[-1] <= ma2[-1] and ma1[-2] > ma2[-2]:
if self.PositionLong > 0:
self.Sell(self.O[-1], self.p_lots, '??')
self.SellShort(self.O[-1], self.p_lots, '??')
def ACO(self, df):
"""
Helper indicator
:param df:
:return:
"""
df_mid_points = (df['High'] + df['Low']) / 2
mid_points = Data.toFloatArray(df_mid_points)
longav = tl.SMA(np.array(mid_points), timeperiod=40)
shortav = tl.SMA(np.array(mid_points), timeperiod=15)
A0 = longav - shortav
Mavg = tl.SMA(A0, timeperiod=15)
AcResult = tl.SMA(Mavg - A0, timeperiod=15)
signals = np.diff(AcResult)
return signals
# if __name__ == "__main__":
# np.set_printoptions(threshold=np.nan)
# pd.set_option("display.max_rows", 280)
# dt = Data()
# df = dt.getCSVData()
# #ACOscillator(df)
# ACOscillator(df)
def handle_bar(context, bar_dict):
prices = history_bars(context.s1, context.LONGPERIOD + 1, frequency, 'close')
short_avg = talib.SMA(prices, context.SHORTPERIOD)
long_avg = talib.SMA(prices, context.LONGPERIOD)
# ????portfolio??????
cur_position = context.portfolio.positions[context.s1].quantity
avg_price = context.portfolio.positions[context.s1].avg_price
capital = cur_position * avg_price
# ????portfolio????????????
shares = context.portfolio.cash / bar_dict[context.s1].close
# ??????????
plot('capital', capital)
# ???????????????????bar?????????????
if short_avg[-1] - long_avg[-1] < 0 < long_avg[-2] - short_avg[-2] and cur_position > 0:
# ????
order_target_value(context.s1, 0)
# ????????????????????
if short_avg[-1] - long_avg[-1] > 0 > long_avg[-2] - short_avg[-2]:
# ????
order_shares(context.s1, shares)
def recover_period(data):
close = data.history(sid(8554),'close', 120, '1d')
sma5 = ta.SMA(close, 5)[-1]
# print sma5
sma20 = ta.SMA(close, 20)[-1]
sma60 = ta.SMA(close, 60)[-1]
# print sma5
# print sma20
# print sma60
short_ratio = sma5/sma20 if sma5/sma20 else 1
mid_ratio = sma20/sma60 if sma20/sma60 else 1
# long_ratio = sma5/sma60 if sma5/sma60 else 1
if (short_ratio > 1) and (mid_ratio > 1):
return True
else:
return False
def handle_bar(context, bar_dict):
# ?????????????
# bar_dict[order_book_id] ?????????bar??
# context.portfolio ???????????????
# ??order_shares(id_or_ins, amount)??????
# TODO: ??????????
# ?????????????????????
prices = history_bars(context.s1, context.LONGPERIOD+1, '1d', 'close')
# ??talib????????????array?????
short_avg = talib.SMA(prices, context.SHORTPERIOD)
long_avg = talib.SMA(prices, context.LONGPERIOD)
plot("short avg", short_avg[-1])
plot("long avg", long_avg[-1])
# ????portfolio??????
cur_position = context.portfolio.positions[context.s1].quantity
# ????portfolio????????????
shares = context.portfolio.cash/bar_dict[context.s1].close
# ??????????????????????bar?????????????????bar?????????????
if short_avg[-1] - long_avg[-1] < 0 and short_avg[-2] - long_avg[-2] > 0 and cur_position > 0:
# ????
order_target_value(context.s1, 0)
# ????????????????????
if short_avg[-1] - long_avg[-1] > 0 and short_avg[-2] - long_avg[-2] < 0:
# ????
order_shares(context.s1, shares)
def handle_bar(context, bar_dict):
# ?????????????
# bar_dict[order_book_id] ?????????bar??
# context.portfolio ???????????????
# ??order_shares(id_or_ins, amount)??????
# TODO: ??????????
# ?????????????????????
prices = history_bars(context.s1, context.LONGPERIOD+1, '1d', 'close')
# ??talib????????????array?????
short_avg = talib.SMA(prices, context.SHORTPERIOD)
long_avg = talib.SMA(prices, context.LONGPERIOD)
plot("short avg", short_avg[-1])
plot("long avg", long_avg[-1])
# ????portfolio??????
cur_position = context.portfolio.positions[context.s1].quantity
# ????portfolio????????????
shares = context.portfolio.cash/bar_dict[context.s1].close
# ??????????????????????bar?????????????????bar?????????????
if short_avg[-1] - long_avg[-1] < 0 and short_avg[-2] - long_avg[-2] > 0 and cur_position > 0:
# ????
order_target_value(context.s1, 0)
# ????????????????????
if short_avg[-1] - long_avg[-1] > 0 and short_avg[-2] - long_avg[-2] < 0:
# ????
order_shares(context.s1, shares)
def handle_bar(context, bar_dict):
# ?????????????
# bar_dict[order_book_id] ?????????bar??
# context.portfolio ???????????????
# ??order_shares(id_or_ins, amount)??????
# TODO: ??????????
# ?????????????????????
prices = history_bars(context.s1, context.LONGPERIOD+1, '1d', 'close')
# ??talib????????????array?????
short_avg = talib.SMA(prices, context.SHORTPERIOD)
long_avg = talib.SMA(prices, context.LONGPERIOD)
plot("short avg", short_avg[-1])
plot("long avg", long_avg[-1])
# ????portfolio??????
cur_position = context.portfolio.positions[context.s1].quantity
# ????portfolio????????????
shares = context.portfolio.cash / bar_dict[context.s1].close
# ??????????????????????bar?????????????????bar?????????????
if short_avg[-1] - long_avg[-1] < 0 and short_avg[-2] - long_avg[-2] > 0 and cur_position > 0:
# ????
order_target_value(context.s1, 0)
# ????????????????????
if short_avg[-1] - long_avg[-1] > 0 and short_avg[-2] - long_avg[-2] < 0:
# ????
order_shares(context.s1, shares)
def baseAPI():
#??tushare??????
df=ts.get_k_data('300580',start='2017-01-12',end='2017-05-26')
#?????
closed=df['close'].values
#??????????timeperiod??????? 5,10,20 ???????
ma5=talib.SMA(closed,timeperiod=5)
ma10=talib.SMA(closed,timeperiod=10)
ma20=talib.SMA(closed,timeperiod=20)
#?????????
print closed
print ma5
print ma10
print ma20
#??plog????????????????
plt.plot(closed)
plt.plot(ma5)
plt.plot(ma10)
plt.plot(ma20)
#??????????????????
plt.grid()
#???????????????
plt.show()
def ma_type_test():
#MA_Type: 0=SMA, 1=EMA, 2=WMA, 3=DEMA, 4=TEMA, 5=TRIMA, 6=KAMA, 7=MAMA, 8=T3 (Default=SMA)
df=ts.get_k_data('300580',start='2017-01-12',end='2017-05-26')
closed=df['close'].values
sma=talib.MA(closed,timeperiod=10,matype=0)
ema=talib.MA(closed,timeperiod=10,matype=1)
wma=talib.MA(closed,timeperiod=10,matype=2)
dema=talib.MA(closed,timeperiod=10,matype=3)
tema=talib.MA(closed,timeperiod=10,matype=4)
trima=talib.MA(closed,timeperiod=10,matype=5)
kma=talib.MA(closed,timeperiod=10,matype=6)
mama=talib.MA(closed,timeperiod=10,matype=7)
t3=talib.MA(closed,timeperiod=10,matype=8)
#ouput=talib.MA(closed,timeperiod=5,matype=0)
print closed
plt.ylim([0,40])
plt.plot(sma,'r--')
plt.plot(ema,'g-*')
plt.plot(wma)
plt.plot(dema)
plt.plot(tema)
plt.plot(trima)
plt.plot(kma)
plt.plot(mama)
plt.plot(t3)
plt.grid()
plt.text(7,30,'BST')
plt.show()
def handle_bar(context, bar_dict):
# ?????????????
# bar_dict[order_book_id] ?????????bar??
# context.portfolio ???????????????
# ??order_shares(id_or_ins, amount)??????
# TODO: ??????????
stocks = set(context.to_buy)
holdings = set(get_holdings(context))
to_buy = stocks - holdings
holdings = set(get_holdings(context))
to_sell = holdings - stocks
for stock in to_sell:
if bar_dict[stock].is_trading:
order_target_percent(stock , 0)
to_buy = get_trading_stocks(to_buy, context, bar_dict)
buy_rsi = []
for stock in to_buy:
prices = history(context.SMAPERIOD+1,'1d','close')[stock].values
avg = talib.SMA(prices,context.SMAPERIOD)
if avg[-1] < prices[-1]:
buy_rsi.append(stock)
if len(buy_rsi) >0:
#cash = context.portfolio.cash
#average_value = 0.98 * cash / len(buy_rsi)
for stock in buy_rsi:
if bar_dict[stock].is_trading:
#order_value(stock ,average_value)
order_target_percent(stock,0.08)
def add_SMA(self, timeperiod=20,
type='line', color='secondary', **kwargs):
"""Simple Moving Average."""
if not self.has_close:
raise Exception()
utils.kwargs_check(kwargs, VALID_TA_KWARGS)
if 'kind' in kwargs:
type = kwargs['kind']
name = 'SMA({})'.format(str(timeperiod))
self.pri[name] = dict(type=type, color=color)
self.ind[name] = talib.SMA(self.df[self.cl].values,
timeperiod)
def _moving_average(self, data, n):
""" ??????????? """
data = transform2ndarray(data)
return talib.SMA(data, n)
def calculate_avg_volume(self, period_name, volumes):
avg_vol = talib.SMA(volumes, timeperiod=15)
self.current_indicators[period_name]['avg_volume'] = avg_vol[-1]
def handle_bar(context, bar_dict):
prices = history_bars(context.s1, context.LONGPERIOD + 1, '1d', 'close')
# print(prices)
short_avg = talib.SMA(prices, context.SHORTPERIOD)
long_avg = talib.SMA(prices, context.LONGPERIOD)
cur_position = context.portfolio.positions[context.s1].quantity
shares = context.portfolio.cash / bar_dict[context.s1].close
if short_avg[-1] < long_avg[-1] and short_avg[-2] > long_avg[-2] and cur_position > 0: # ??
order_target_value(context.s1, 0)
if short_avg[-1] > long_avg[-1] and short_avg[-2] < long_avg[-2]:
order_shares(context.s1, shares)
def handle_bar(context, bar_dict):
prices = history_bars(context.s1, context.LONGPERIOD + 1, '1h', 'close')
short_avg = talib.SMA(prices, context.SHORTPERIOD)
long_avg = talib.SMA(prices, context.LONGPERIOD)
cur_position = context.portfolio.positions[context.s1].quantity
shares = context.portfolio.cash / bar_dict[context.s1].close
if short_avg[-1] < long_avg[-1] and short_avg[-2] > long_avg[-2] and cur_position > 0:
order_target_value(context.s1, 0)
if short_avg[-1] > long_avg[-1] and short_avg[-2] < long_avg[-2]:
order_shares(context.s1, shares)
def sma(self, n, array=False):
"""????"""
result = talib.SMA(self.close, n)
if array:
return result
return result[-1]
# ----------------------------------------------------------------------
def sma(self, np_array, n, array=False):
"""????"""
if n < 2:
result = np_array
else:
result = talib.SMA(np_array, n)
if array:
return result
return result[-1]
def ACOscillator(df):
df_mid_points = (df['High'] + df['Low'])/2
mid_points = Data.toFloatArray(df_mid_points)
longav = tl.SMA(np.array(mid_points), timeperiod=40)
shortav = tl.SMA(np.array(mid_points),timeperiod =15)
A0 = longav - shortav
Mavg = tl.SMA(A0, timeperiod = 15)
AcResult = tl.SMA(Mavg - A0, timeperiod = 15)
signals = np.diff(AcResult)
return signals
def history(self, databar, period, indicator):
"""Recieve mock data as databar dataframe format
:databar: mock data bar, flexible, means will be changed by indicator, one dimension or several dimensions
:period: time interval, flexbile too
:indicator: include ['sma','macd','atr' ...]
"""
if indicator == 'sma':
try:
sma0 = talib.SMA(np.array(databar.low), timeperiod = period)
sma1 = talib.SMA(np.array(databar.close), timeperiod = period)
sma2 = talib.SMA(np.array(databar.high), timeperiod = period)
return pd.DataFrame({'sma0':sma0, 'sma1':sma1, 'sma2':sma2}, index=pd.DatetimeIndex(databar.date))
except KeyError:
print('Pls check databar whether is dataframe')
return None
elif indicator == 'atr':
try:
atr = talib.ATR(np.array(databar.high), np.array(databar.low), np.array(databar.close), timeperiod = period)
return pd.DataFrame({'atr':atr}, index=pd.DatetimeIndex(databar.date))
except KeyError:
print('Pls check databar whether is dataframe')
return None
elif indicator == 'macd':
try:
macd, macdsignal, macdhist = talib.MACD(databar,
fastperiod = period['fastperiod'],
slowperiod = period['slowperiod'],
signalperiod = period['signalperiod'])
return pd.DataFrame({'macd':macd, 'macdsignal':macdsignal, 'macdhist':macdhist}, index=pd.DatetimeIndex(databar.date))
except KeyError:
print('Pls check databar whether is dataframe')
return None
def __init__(self, marketevent):
super(Indicator, self).__init__(marketevent)
self.SMA = self.SimpleMovingAverage # shortcut
def SimpleMovingAverage(self, period, index=-1):
data = self.get_preload(period, index, 'close') # ??period????????
sma = tb.SMA(data, period)
return return_NaN(sma, index)
def handle_bar(context, bar_dict):
# ?????????????
# bar_dict[order_book_id] ?????????bar??
# context.portfolio ???????????????
# ??order_shares(id_or_ins, amount)??????
# TODO: ??????????
# ?????????????????????
prices = history_bars(context.s1, context.LONGPERIOD+1, '1d', 'close')
# ??talib????????????array?????
short_avg = talib.SMA(prices, context.SHORTPERIOD)
long_avg = talib.SMA(prices, context.LONGPERIOD)
plot("short avg", short_avg[-1])
plot("long avg", long_avg[-1])
# ????portfolio??????
cur_position = context.portfolio.positions[context.s1].quantity
# ????portfolio????????????
shares = context.portfolio.cash / bar_dict[context.s1].close
# ??????????????????????bar?????????????????bar?????????????
if short_avg[-1] - long_avg[-1] < 0 and short_avg[-2] - long_avg[-2] > 0 and cur_position > 0:
# ????
order_target_value(context.s1, 0)
# ????????????????????
if short_avg[-1] - long_avg[-1] > 0 and short_avg[-2] - long_avg[-2] < 0:
# ????
order_shares(context.s1, shares)
def adosc_bloomberg(my_close, my_high, my_low, my_volume, fastperiod, slowperiod):
MFV = (((my_close - my_low)-(my_high - my_close)) / (my_high - my_low)) * my_volume
ADL = np.cumsum(MFV)
ADOSC = SMA(ADL.values, timeperiod=fastperiod) - SMA(ADL.values, timeperiod=slowperiod)
return ADOSC
def overlap_process(event):
print(event.widget.get())
overlap = event.widget.get()
upperband, middleband, lowerband = ta.BBANDS(close, timeperiod=5, nbdevup=2, nbdevdn=2, matype=0)
fig, axes = plt.subplots(2, 1, sharex=True)
ax1, ax2 = axes[0], axes[1]
axes[0].plot(close, 'rd-', markersize=3)
axes[0].plot(upperband, 'y-')
axes[0].plot(middleband, 'b-')
axes[0].plot(lowerband, 'y-')
axes[0].set_title(overlap, fontproperties="SimHei")
if overlap == u'???':
pass
elif overlap == u'????????':
real = ta.DEMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'??????? ':
real = ta.EMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'??????——?????':
real = ta.HT_TRENDLINE(close)
axes[1].plot(real, 'r-')
elif overlap == u'???????????':
real = ta.KAMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'?????':
real = ta.MA(close, timeperiod=30, matype=0)
axes[1].plot(real, 'r-')
elif overlap == u'MESA???????':
mama, fama = ta.MAMA(close, fastlimit=0, slowlimit=0)
axes[1].plot(mama, 'r-')
axes[1].plot(fama, 'g-')
elif overlap == u'????????':
real = ta.MAVP(close, periods, minperiod=2, maxperiod=30, matype=0)
axes[1].plot(real, 'r-')
elif overlap == u'???????':
real = ta.SMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'????????(T3)':
real = ta.T3(close, timeperiod=5, vfactor=0)
axes[1].plot(real, 'r-')
elif overlap == u'????????':
real = ta.TEMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'?????? ':
real = ta.TRIMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'???????':
real = ta.WMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
plt.show()
# ????