def add_ADX(self, timeperiod=14,
type='line', color='secondary', **kwargs):
"""Average Directional Movement Index."""
if not (self.has_high and self.has_low and self.has_close):
raise Exception()
utils.kwargs_check(kwargs, VALID_TA_KWARGS)
if 'kind' in kwargs:
type = kwargs['kind']
name = 'ADX({})'.format(str(timeperiod))
self.sec[name] = dict(type=type, color=color)
self.ind[name] = talib.ADX(self.df[self.hi].values,
self.df[self.lo].values,
self.df[self.cl].values,
timeperiod)
python类ADX的实例源码
def ADX(df, n):
return pd.Series(talib.ADX(df['high'].values, df['low'].values, df['close'].values, timeperiod = n), index = df.index, name = 'ADX_%s' % str(n))
# UpMove = df['high'] - df['high'].shift(1)
# DoMove = df['low'].shift(1) - df['low']
# UpD = pd.Series(UpMove)
# DoD = pd.Series(DoMove)
# UpD[(UpMove<=DoMove)|(UpMove <= 0)] = 0
# DoD[(DoMove<=UpMove)|(DoMove <= 0)] = 0
# ATRs = ATR(df,span = n, min_periods = n)
# PosDI = pd.Series(pd.ewma(UpD, span = n, min_periods = n - 1) / ATRs)
# NegDI = pd.Series(pd.ewma(DoD, span = n, min_periods = n - 1) / ATRs)
# ADX = pd.Series(pd.ewma(abs(PosDI - NegDI) / (PosDI + NegDI), span = n_ADX, min_periods = n_ADX - 1), name = 'ADX' + str(n) + '_' + str(n_ADX))
# return ADX
def _test_adx(self):
code = '300033'
df_five_hisdat = mysql.getFiveHisdat(code, start_day='2017-6-14')
#df_five_hisdat = pd.read_csv('../test.csv')
df_five_hisdat = df_five_hisdat.sort_index()
#agl.print_df(df_five_hisdat)
highs, lows, closes = df_five_hisdat['h'], df_five_hisdat['l'], df_five_hisdat['c']
adx = ADX(highs, lows, closes)
#ui.DrawTs(pl, ts=closes,high=adx)
print(adx[-1])
adx = TDX_ADX(highs, lows, closes)
print(adx[-1], len(adx))
print(adx[-5:])
#???? ????
#ui.DrawTs(pl, ts=closes[-100:],high=adx[-100:])
def calculate_indicator(stock_df):
periods = [3, 5, 10, 20, 30, 60]
# MA
for period in periods:
stock_df['MA' + str(period)] = talib.MA(stock_df['close'].values, timeperiod=period)
# EMA
periods = [3, 5, 10, 20, 30, 60]
for period in periods:
stock_df['EMA' + str(period)] = talib.EMA(stock_df['close'].values, timeperiod=period)
# AMTMA
periods = [5, 10, 20]
for period in periods:
stock_df['AMTMA' + str(period)] = talib.MA(stock_df['amount'].values, timeperiod=period)
# ATR
periods = [5, 10, 20]
for period in periods:
stock_df['ATR' + str(period)] = talib.ATR(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# ADX
period = 14
stock_df['ADX' + str(period)] = talib.ADX(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MACD
stock_df['MACD_DIFF'], stock_df['MACD_DEA'], stock_df['MACD_HIST'] = talib.MACD(
stock_df['close'].values, fastperiod=12, slowperiod=26, signalperiod=9)
# CCI
period = 14
stock_df['CCI' + str(period)] = talib.CCI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MFI
period = 14
stock_df['MFI' + str(period)] = talib.MFI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, stock_df['volume'].values,
timeperiod=period)
# ROCP
periods = [5, 10, 20]
for period in periods:
stock_df['ROCP' + str(period)] = talib.ROCP(stock_df['close'].values, timeperiod=period)
def calculate_indicator(stock_df):
periods = [3, 5, 10, 20, 30, 60]
# MA
for period in periods:
stock_df['MA' + str(period)] = talib.MA(stock_df['close'].values, timeperiod=period)
# EMA
periods = [3, 5, 10, 20, 30, 60]
for period in periods:
stock_df['EMA' + str(period)] = talib.EMA(stock_df['close'].values, timeperiod=period)
# AMTMA
periods = [5, 10, 20]
for period in periods:
stock_df['AMTMA' + str(period)] = talib.MA(stock_df['amount'].values, timeperiod=period)
# ATR
periods = [5, 10, 20]
for period in periods:
stock_df['ATR' + str(period)] = talib.ATR(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# ADX
period = 14
stock_df['ADX' + str(period)] = talib.ADX(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MACD
stock_df['MACD_DIFF'], stock_df['MACD_DEA'], stock_df['MACD_HIST'] = talib.MACD(
stock_df['close'].values, fastperiod=12, slowperiod=26, signalperiod=9)
# CCI
period = 14
stock_df['CCI' + str(period)] = talib.CCI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MFI
period = 14
stock_df['MFI' + str(period)] = talib.MFI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, stock_df['volume'].values,
timeperiod=period)
# ROCP
periods = [5, 10, 20]
for period in periods:
stock_df['ROCP' + str(period)] = talib.ROCP(stock_df['close'].values, timeperiod=period)
def calculate_adx(self, period_name, close):
adx = talib.ADX(self.highs, self.lows, close, timeperiod=14)
self.current_indicators[period_name]['adx'] = adx[-1]
def adx(self, n, array=False):
"""ADX??"""
result = talib.ADX(self.high, self.low, self.close, n)
if array:
return result
return result[-1]
# ----------------------------------------------------------------------
def TDX_ADX(highs, lows, closes):
"""??????ADX, ?????test_adx
return: np.ndarray
MTR:=EXPMEMA(MAX(MAX(HIGH-LOW,ABS(HIGH-REF(CLOSE,1))),ABS(REF(CLOSE,1)-LOW)),N);
HD :=HIGH-REF(HIGH,1);
LD :=REF(LOW,1)-LOW;
DMP:=EXPMEMA(IF(HD>0&&HD>LD,HD,0),N);
DMM:=EXPMEMA(IF(LD>0&&LD>HD,LD,0),N);
PDI: DMP*100/MTR;
MDI: DMM*100/MTR;
ADX: EXPMEMA(ABS(MDI-PDI)/(MDI+PDI)*100,MM);
ADXR:EXPMEMA(ADX,MM);
"""
assert(len(closes)>30)
highs = np.array(highs)
lows = np.array(lows)
closes = np.array(closes)
mtr = np.zeros(len(closes))
for i, v in np.ndenumerate(closes):
i = i[0]
if i>0:
y = closes[i-1]
mtr[i] = max(max(highs[i]-lows[i], abs(highs[i]-y)), abs(y-lows[i]))
n = 14
mm = 6
mtr = talib.EMA(mtr, n)
hd = np.zeros(len(highs))
ld = np.zeros(len(highs))
for i, v in np.ndenumerate(highs):
i = i[0]
if i>0:
hd[i] = highs[i] - highs[i-1]
ld[i] = lows[i-1] - lows[i]
if not (hd[i] > 0 and hd[i]>ld[i]):
hd[i] = 0
if not (ld[i]>0 and ld[i]>hd[i]):
ld[i] = 0
dmp = talib.EMA(hd, n)
dmm = talib.EMA(ld, n)
pdi = dmp * 100 / mtr
mdi = dmm * 100 / mtr
adx = np.zeros(len(mdi))
for i, v in np.ndenumerate(mdi):
i = i[0]
adx[i] = abs(mdi[i]-pdi[i]) / (mdi[i]+pdi[i])*100
adx = talib.EMA(adx, mm)
return adx
def ADX(highs, lows, closes):
"""??????, ADX?????????????????????
DMI??ADX?DX"""
highs = np.array(highs)
lows = np.array(lows)
closes = np.array(closes)
return talib.ADX(highs, lows, closes, timeperiod=14)
def oscillator2(data):
float_close = Data.toFloatArray(df['Close'])
float_high = Data.toFloatArray(df['High'])
float_low = Data.toFloatArray(df['Low'])
float_open = Data.toFloatArray(df['Open'])
adx_values = tl.ADX(np.array(float_high),np.array(float_low),np.array(float_close), timeperiod = 14)
dmi = tl.DX(np.array(float_high),np.array(float_low),np.array(float_close), timeperiod = 14)
mdmi = tl.MINUS_DI(np.array(float_high),np.array(float_low),np.array(float_close), timeperiod = 14)
rsi = tl.RSI(np.array(float_close),timeperiod = 4 )
signals = []
flag = 0
for i in xrange(40 , len(adx_values) - 2):
if flag ==0:
if adx_values[i]>20 and dmi[i]>mdmi[i] and df.loc[i+1, 'Open']> (df.loc[i, 'Close']+1.8) and rsi[i]<50:
signal = ['HSI', df.loc[i+1, 'Date'], 'Long', df.loc[i+1, 'Close']]
flag =1
signals.append(signal)
if adx_values[i]>20 and dmi[i]<mdmi[i] and df.loc[i+1, 'Open']< (df.loc[i, 'Close']-1.8) and rsi[i]<50:
signal = ['HSI', df.loc[i+1, 'Date'], 'Short', df.loc[i+1, 'Close']]
flag =2
signals.append(signal)
elif flag ==1:
if df.loc[i, 'Close']>= signal[3]*1.01 or df.loc[i, 'Close']<= signal[3]*0.90 or (df.loc[i, 'Date']-signal[1])>timedelta(days=5):
signal = ['HSI', df.loc[i, 'Date'], 'Short', df.loc[i+1, 'Open']]
flag = 0
signals.append(signal)
elif flag ==2:
if df.loc[i, 'Close']<= signal[3]*0.99 or df.loc[i, 'Close']>= signal[3]*1.10 or (df.loc[i, 'Date']-signal[1])>timedelta(days=5):
signal = ['HSI', df.loc[i+1, 'Date'], 'Long', df.loc[i+1, 'Close']]
flag = 0
signals.append(signal)
sig = pd.DataFrame(signals, columns=['Code', 'Time', 'Action', 'Price'])
print sig['Time'][10]-sig['Time'][0]
profits = []
print sig
for k in range(0,len(signals)/2):
if sig['Action'][k*2] == "Long":
profit = sig['Price'][k*2+1] - sig['Price'][k*2]
else:
profit = sig['Price'][k*2]- sig['Price'][k*2+1]
profits.append(profit)
print np.sum(profits)
print(profits)
###### PLOT #######
longSignals = sig[sig['Action'] == 'Long']
shortSignals = sig[sig['Action'] == 'Short']
plt.plot(df['Date'], df['Close'], longSignals['Time'], longSignals['Price'], 'r^', shortSignals['Time'],
shortSignals['Price'], 'gv', markersize=10)
red_patch = mpatches.Patch(color='red', label='Long')
green_patch = mpatches.Patch(color='green', label='Short')
plt.legend(handles=[red_patch, green_patch])
plt.grid()
plt.show()
###### PLOT #######