def MA_RIBBON(df, ma_series):
ma_array = np.zeros([len(df), len(ma_series)])
ema_list = []
for idx, ma_len in enumerate(ma_series):
ema_i = EMA(df, n = ma_len, field = 'close')
ma_array[:, idx] = ema_i
ema_list.append(ema_i)
corr = np.empty([len(df)])
pval = np.empty([len(df)])
dist = np.empty([len(df)])
corr[:] = np.NAN
pval[:] = np.NAN
dist[:] = np.NAN
max_n = max(ma_series)
for idy in range(len(df)):
if idy >= max_n - 1:
corr[idy], pval[idy] = stats.spearmanr(ma_array[idy,:], range(len(ma_series), 0, -1))
dist[idy] = max(ma_array[idy,:]) - min(ma_array[idy,:])
corr_ts = pd.Series(corr*100, index = df.index, name = "MARIBBON_CORR")
pval_ts = pd.Series(pval*100, index = df.index, name = "MARIBBON_PVAL")
dist_ts = pd.Series(dist, index = df.index, name = "MARIBBON_DIST")
return pd.concat([corr_ts, pval_ts, dist_ts] + ema_list, join='outer', axis=1)
python类EMA的实例源码
def getStrategy_GD(start_trading_day,end_trading_day,_time,_close):
point = []
iday = _time.index(start_trading_day)
eday = _time.index(end_trading_day)
short_ema = ta.EMA(np.array(_close,dtype='f8'),timeperiod=10)
long_ema = ta.EMA(np.array(_close,dtype='f8'),timeperiod=25)
short_ema = short_ema[iday:eday]
long_ema = long_ema[iday:eday]
point.append(0)
for i in range(1,len(short_ema)):
if (short_ema[i-1] <= long_ema[i-1]) and (short_ema[i] >= long_ema[i]):
point.append(1)
elif (short_ema[i-1] >= long_ema[i-1]) and (short_ema[i] <= long_ema[i]):
point.append(-1)
else:
point.append(0)
return point
def getStrategy_GD(start_trading_day,end_trading_day,_time,_close):
point = []
iday = _time.index(start_trading_day)
eday = _time.index(end_trading_day)
short_ema = ta.EMA(np.array(_close,dtype='f8'),timeperiod=10)
long_ema = ta.EMA(np.array(_close,dtype='f8'),timeperiod=25)
short_ema = short_ema[iday:eday]
long_ema = long_ema[iday:eday]
point.append(0)
for i in range(1,len(short_ema)):
if (short_ema[i-1] <= long_ema[i-1]) and (short_ema[i] >= long_ema[i]):
point.append(1)
elif (short_ema[i-1] >= long_ema[i-1]) and (short_ema[i] <= long_ema[i]):
point.append(-1)
else:
point.append(0)
return point
def atr(df, n = 20):
new_tr = max(df['high'][-1]-df['low'][-1], abs(df['high'][-1] - df['close'][-2]), abs(df['low'][-1] - df['close'][-2]))
alpha = 2.0/(n+1)
df['ATR'+str(n)][-1] = df['ATR'+str(n)][-2] * (1-alpha) + alpha * new_tr
# talib matype: 0=SMA, 1=EMA, 2=WMA, 3=DEMA, 4=TEMA, 5=TRIMA, 6=KAMA, 7=MAMA, 8=T3
def EMA(df, n, field = 'close'):
return pd.Series(talib.EMA(df[field].values, n), name = 'EMA_' + field.upper() + '_' + str(n), index = df.index)
def EMAVAR(df, n, field = 'close'):
ema_ts = EMA(df, n, field)
alpha = 2.0 / (n + 1)
var_adj = (1-alpha) * (df[field] - ema_ts.shift(1).fillna(0))**2
evar_ts = pd.Series(talib.EMA(var_adj.values, n), name = 'EVAR_' + field.upper() + '_' + str(n), index = df.index)
return pd.concat([ema_ts, evar_ts], join='outer', axis=1)
def TEMA(ts, n):
n = int(n)
ts_ema1 = pd.Series( pd.ewma(ts, span = n, adjust = False), name = 'EMA' + str(n) )
ts_ema2 = pd.Series( pd.ewma(ts_ema1, span = n, adjust = False), name = 'EMA2' + str(n) )
ts_ema3 = pd.Series( pd.ewma(ts_ema2, span = n, adjust = False), name = 'EMA3' + str(n) )
ts_tema = pd.Series( 3 * ts_ema1 - 3 * ts_ema2 + ts_ema3, name = 'TEMA' + str(n) )
return ts_tema
def add_EMA(self, timeperiod=26,
type='line', color='secondary', **kwargs):
"""Exponential Moving Average."""
if not self.has_close:
raise Exception()
utils.kwargs_check(kwargs, VALID_TA_KWARGS)
if 'kind' in kwargs:
type = kwargs['kind']
name = 'EMA({})'.format(str(timeperiod))
self.pri[name] = dict(type=type, color=color)
self.ind[name] = talib.EMA(self.df[self.cl].values,
timeperiod)
def add_TRIX(self, timeperiod=15,
type='area', color='secondary', **kwargs):
"""1-day Rate of Change of Triple Smooth EMA."""
if not self.has_close:
raise Exception()
utils.kwargs_check(kwargs, VALID_TA_KWARGS)
if 'kind' in kwargs:
type = kwargs['kind']
name = 'TRIX({})'.format(str(timeperiod))
self.sec[name] = dict(type=type, color=color)
self.ind[name] = talib.TRIX(self.df[self.cl].values,
timeperiod)
def calculate_obv(self, period_name, closing_prices, volumes):
obv = talib.OBV(closing_prices, volumes)
obv_ema = talib.EMA(obv, timeperiod=3)
self.current_indicators[period_name]['obv_ema'] = obv_ema[-1]
self.current_indicators[period_name]['obv'] = obv[-1]
def ema(self, np_array, n, array=False):
"""????"""
if n < 2:
result = np_array
else:
result = talib.EMA(np_array, n)
if array:
return result
return result[-1]
def ema(self, np_array, n, array=False):
"""????"""
if n < 2:
result = np_array
else:
result = talib.EMA(np_array, n)
if array:
return result
return result[-1]
def MACD(self):
"""???????????????
MACD: (12-day EMA - 26-day EMA) ??? ??
Signal Line: 9-day EMA of MACD ??, ??
MACD Histogram: MACD - Signal Line ???, ???
return : macd, macdsignal, macdhist(??)"""
closes = self.getCloses()
return talib.MACD(closes)
def MACD(closes):
"""???????????????
MACD: (12-day EMA - 26-day EMA) ??? ??
Signal Line: 9-day EMA of MACD ??, ??
MACD Histogram: MACD - Signal Line ???, ???
return : macd, macdsignal, macdhist(??)"""
return talib.MACD(closes)
def BOLL(closes, matype=MA_Type.EMA):
"""???
matype: ???????? ????????????? EMA??2??SMA?4?????????
return upper, middle, lower"""
closes = np.array(closes)
return talib.BBANDS(closes, timeperiod=20, matype=matype)
def EMA(security_list, timeperiod=30):
# ????????????
if isinstance(security_list, str):
security_list = [security_list]
# ?? EMA
security_data = history(timeperiod * 2, '1d', 'close',
security_list, df=False, skip_paused=True)
ema = {}
for stock in security_list:
ema[stock] = talib.EMA(security_data[stock], timeperiod)
return ema
# DMA
def macd(close, previous_macds=[], fast_period=12, slow_period=26, signal_period=9):
"""
MACD - Moving Average Convergence Divergence
previous_macd: numpy.ndarray of previous MACDs
Returns:
- macd
- macd_line
"""
dataset_size = close.size
if dataset_size < slow_period-1:
print('Error in macd.py: passed not enough data! Required: ' + str(slow_period) +
' passed: ' + str(dataset_size))
return None, None
try:
ema_slow = talib.EMA(close, timeperiod=slow_period)[-1]
ema_fast = talib.EMA(close[-fast_period:], timeperiod=fast_period)[-1]
macd_value = ema_fast - ema_slow
# print('previous_macds:', previous_macds)
if len(previous_macds) < signal_period:
signal_line = None
else:
signal_line = talib.EMA(previous_macds[-signal_period:], timeperiod=signal_period)[-1]
except Exception as e:
print('Got Exception in macd.py. Details: ' + str(e) + '. Data: ' + str(previous_macds))
return None, None
return macd_value, signal_line
def calculate_indicator(stock_df):
periods = [3, 5, 10, 20, 30, 60]
# MA
for period in periods:
stock_df['MA' + str(period)] = talib.MA(stock_df['close'].values, timeperiod=period)
# EMA
periods = [3, 5, 10, 20, 30, 60]
for period in periods:
stock_df['EMA' + str(period)] = talib.EMA(stock_df['close'].values, timeperiod=period)
# AMTMA
periods = [5, 10, 20]
for period in periods:
stock_df['AMTMA' + str(period)] = talib.MA(stock_df['amount'].values, timeperiod=period)
# ATR
periods = [5, 10, 20]
for period in periods:
stock_df['ATR' + str(period)] = talib.ATR(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# ADX
period = 14
stock_df['ADX' + str(period)] = talib.ADX(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MACD
stock_df['MACD_DIFF'], stock_df['MACD_DEA'], stock_df['MACD_HIST'] = talib.MACD(
stock_df['close'].values, fastperiod=12, slowperiod=26, signalperiod=9)
# CCI
period = 14
stock_df['CCI' + str(period)] = talib.CCI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MFI
period = 14
stock_df['MFI' + str(period)] = talib.MFI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, stock_df['volume'].values,
timeperiod=period)
# ROCP
periods = [5, 10, 20]
for period in periods:
stock_df['ROCP' + str(period)] = talib.ROCP(stock_df['close'].values, timeperiod=period)
def calculate_indicator(stock_df):
periods = [3, 5, 10, 20, 30, 60]
# MA
for period in periods:
stock_df['MA' + str(period)] = talib.MA(stock_df['close'].values, timeperiod=period)
# EMA
periods = [3, 5, 10, 20, 30, 60]
for period in periods:
stock_df['EMA' + str(period)] = talib.EMA(stock_df['close'].values, timeperiod=period)
# AMTMA
periods = [5, 10, 20]
for period in periods:
stock_df['AMTMA' + str(period)] = talib.MA(stock_df['amount'].values, timeperiod=period)
# ATR
periods = [5, 10, 20]
for period in periods:
stock_df['ATR' + str(period)] = talib.ATR(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# ADX
period = 14
stock_df['ADX' + str(period)] = talib.ADX(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MACD
stock_df['MACD_DIFF'], stock_df['MACD_DEA'], stock_df['MACD_HIST'] = talib.MACD(
stock_df['close'].values, fastperiod=12, slowperiod=26, signalperiod=9)
# CCI
period = 14
stock_df['CCI' + str(period)] = talib.CCI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MFI
period = 14
stock_df['MFI' + str(period)] = talib.MFI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, stock_df['volume'].values,
timeperiod=period)
# ROCP
periods = [5, 10, 20]
for period in periods:
stock_df['ROCP' + str(period)] = talib.ROCP(stock_df['close'].values, timeperiod=period)
def adosc_chartschool(my_close, my_high, my_low, my_volume, fastperiod, slowperiod):
MFV = (((my_close - my_low)-(my_high - my_close)) / (my_high - my_low)) * my_volume
ADL = np.cumsum(MFV)
ADOSC = EMA(ADL.values, timeperiod=fastperiod) - EMA(ADL.values, timeperiod=slowperiod)
return ADOSC
def __recountEma(self):
"""??K??EMA1 ?EMA2"""
l = len(self.lineBar)
# 1?lineBar?????????
if len(self.lineBar) < max(7, self.inputEma1Len, self.inputEma2Len)+2:
self.debugCtaLog(u'?????,??Bar?????{0}???EMA???{1}'.
format(len(self.lineBar), max(7, self.inputEma1Len, self.inputEma2Len)+2))
return
# ?????EMA??
if self.inputEma1Len > 0:
if self.inputEma1Len > l:
ema1Len = l
else:
ema1Len = self.inputEma1Len
# 3????InputN??(??????????????
listClose=[x.close for x in self.lineBar[-ema1Len - 1:-1]]
barEma1 = ta.EMA(numpy.array(listClose, dtype=float), ema1Len)[-1]
barEma1 = round(float(barEma1), 3)
if len(self.lineEma1) > self.inputEma1Len*8:
del self.lineEma1[0]
self.lineEma1.append(barEma1)
# ?????EMA??
if self.inputEma2Len > 0:
if self.inputEma2Len > l:
ema2Len = l
else:
ema2Len = self.inputEma2Len
# 3????InputN??(??????????????
listClose=[x.close for x in self.lineBar[-ema2Len - 1:-1]]
barEma2 = ta.EMA(numpy.array(listClose, dtype=float), ema2Len)[-1]
barEma2 = round(float(barEma2), 3)
if len(self.lineEma2) > self.inputEma1Len*8:
del self.lineEma2[0]
self.lineEma2.append(barEma2)
def overlap_process(event):
print(event.widget.get())
overlap = event.widget.get()
upperband, middleband, lowerband = ta.BBANDS(close, timeperiod=5, nbdevup=2, nbdevdn=2, matype=0)
fig, axes = plt.subplots(2, 1, sharex=True)
ax1, ax2 = axes[0], axes[1]
axes[0].plot(close, 'rd-', markersize=3)
axes[0].plot(upperband, 'y-')
axes[0].plot(middleband, 'b-')
axes[0].plot(lowerband, 'y-')
axes[0].set_title(overlap, fontproperties="SimHei")
if overlap == u'???':
pass
elif overlap == u'????????':
real = ta.DEMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'??????? ':
real = ta.EMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'??????——?????':
real = ta.HT_TRENDLINE(close)
axes[1].plot(real, 'r-')
elif overlap == u'???????????':
real = ta.KAMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'?????':
real = ta.MA(close, timeperiod=30, matype=0)
axes[1].plot(real, 'r-')
elif overlap == u'MESA???????':
mama, fama = ta.MAMA(close, fastlimit=0, slowlimit=0)
axes[1].plot(mama, 'r-')
axes[1].plot(fama, 'g-')
elif overlap == u'????????':
real = ta.MAVP(close, periods, minperiod=2, maxperiod=30, matype=0)
axes[1].plot(real, 'r-')
elif overlap == u'???????':
real = ta.SMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'????????(T3)':
real = ta.T3(close, timeperiod=5, vfactor=0)
axes[1].plot(real, 'r-')
elif overlap == u'????????':
real = ta.TEMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'?????? ':
real = ta.TRIMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
elif overlap == u'???????':
real = ta.WMA(close, timeperiod=30)
axes[1].plot(real, 'r-')
plt.show()
# ????
def TDX_ADX(highs, lows, closes):
"""??????ADX, ?????test_adx
return: np.ndarray
MTR:=EXPMEMA(MAX(MAX(HIGH-LOW,ABS(HIGH-REF(CLOSE,1))),ABS(REF(CLOSE,1)-LOW)),N);
HD :=HIGH-REF(HIGH,1);
LD :=REF(LOW,1)-LOW;
DMP:=EXPMEMA(IF(HD>0&&HD>LD,HD,0),N);
DMM:=EXPMEMA(IF(LD>0&&LD>HD,LD,0),N);
PDI: DMP*100/MTR;
MDI: DMM*100/MTR;
ADX: EXPMEMA(ABS(MDI-PDI)/(MDI+PDI)*100,MM);
ADXR:EXPMEMA(ADX,MM);
"""
assert(len(closes)>30)
highs = np.array(highs)
lows = np.array(lows)
closes = np.array(closes)
mtr = np.zeros(len(closes))
for i, v in np.ndenumerate(closes):
i = i[0]
if i>0:
y = closes[i-1]
mtr[i] = max(max(highs[i]-lows[i], abs(highs[i]-y)), abs(y-lows[i]))
n = 14
mm = 6
mtr = talib.EMA(mtr, n)
hd = np.zeros(len(highs))
ld = np.zeros(len(highs))
for i, v in np.ndenumerate(highs):
i = i[0]
if i>0:
hd[i] = highs[i] - highs[i-1]
ld[i] = lows[i-1] - lows[i]
if not (hd[i] > 0 and hd[i]>ld[i]):
hd[i] = 0
if not (ld[i]>0 and ld[i]>hd[i]):
ld[i] = 0
dmp = talib.EMA(hd, n)
dmm = talib.EMA(ld, n)
pdi = dmp * 100 / mtr
mdi = dmm * 100 / mtr
adx = np.zeros(len(mdi))
for i, v in np.ndenumerate(mdi):
i = i[0]
adx[i] = abs(mdi[i]-pdi[i]) / (mdi[i]+pdi[i])*100
adx = talib.EMA(adx, mm)
return adx
def calculate_features(df):
"""
Method which calculates and generates features
"""
close = df['close'].values
high = df['high'].values
low = df['low'].values
volume = df['volume'].values
last_row = df.tail(1).copy()
# ************** Calc EMAs
ema_periods = [2, 4, 8, 12, 16, 20]
for ema_period in ema_periods:
ema = talib.EMA(close[-ema_period:], timeperiod=ema_period)[-1]
last_row['ema' + str(ema_period)] = ema
# ************** Calc RSIs
rsi_periods = [5]
for rsi_period in rsi_periods:
rsi = talib.RSI(close[-rsi_period:], timeperiod=rsi_period-1)[-1]
last_row['rsi' + str(rsi_period)] = rsi
last_row['rsi_above_50' + str(rsi_period)] = int(rsi > 50.0)
# ************** Calc CCIs
cci_periods = [5]
for cci_period in cci_periods:
cci = talib.CCI(high[-cci_period:],
low[-cci_period:],
close[-cci_period:],
timeperiod=cci_period)[-1]
last_row['cci' + str(cci_period)] = cci
# ************** Calc MACD 1
macd_periods = [34]
for macd_period in macd_periods:
macd, macd_signal, _ = talib.MACD(close[-macd_period:],
fastperiod=12,
slowperiod=26,
signalperiod=9)
macd = macd[-1]
signal_line = macd_signal[-1]
last_row['macd_above_signal' + str(macd_period)] = int(macd > signal_line)
last_row['macd_above_zero' + str(macd_period)] = int(macd > 0.0)
# ************** Calc OBVs
obv_periods = [2, 4, 8, 12, 16, 20]
for obv_period in obv_periods:
obv = talib.OBV(close[-obv_period:], volume[-obv_period:])[-1]
last_row['obv' + str(obv_period)] = obv
return last_row
def calculate_features(df):
"""
Method which calculates and generates features
"""
close = df['close'].values
high = df['high'].values
low = df['low'].values
volume = df['volume'].values
last_row = df.tail(1).copy()
# ************** Calc EMAs
ema_periods = [2, 4, 8, 12, 16, 20]
for ema_period in ema_periods:
ema = talib.EMA(close[-ema_period:], timeperiod=ema_period)[-1]
last_row['ema' + str(ema_period)] = ema
# ************** Calc RSIs
rsi_periods = [5]
for rsi_period in rsi_periods:
rsi = talib.RSI(close[-rsi_period:], timeperiod=rsi_period-1)[-1]
last_row['rsi' + str(rsi_period)] = rsi
last_row['rsi_above_50' + str(rsi_period)] = int(rsi > 50.0)
# ************** Calc CCIs
cci_periods = [5]
for cci_period in cci_periods:
cci = talib.CCI(high[-cci_period:],
low[-cci_period:],
close[-cci_period:],
timeperiod=cci_period)[-1]
last_row['cci' + str(cci_period)] = cci
# ************** Calc MACD 1
macd_periods = [34]
for macd_period in macd_periods:
macd, macd_signal, _ = talib.MACD(close[-macd_period:],
fastperiod=12,
slowperiod=26,
signalperiod=9)
macd = macd[-1]
signal_line = macd_signal[-1]
last_row['macd_above_signal' + str(macd_period)] = int(macd > signal_line)
last_row['macd_above_zero' + str(macd_period)] = int(macd > 0.0)
# ************** Calc OBVs
obv_periods = [2, 4, 8, 12, 16, 20]
for obv_period in obv_periods:
obv = talib.OBV(close[-obv_period:], volume[-obv_period:])[-1]
last_row['obv' + str(obv_period)] = obv
return last_row
def calculate(self, look_back, wallet):
"""
Main strategy logic (the meat of the strategy)
"""
(dataset_cnt, _) = common.get_dataset_count(look_back, self.group_by_field)
# Wait until we have enough data
if dataset_cnt < self.min_history_ticks:
print('dataset_cnt:', dataset_cnt)
return self.actions
self.actions.clear()
# Calculate indicators
df = look_back.tail(self.min_history_ticks)
close = df['close'].values
# ************** Calc EMA20
ema20_period = 25
ema20 = talib.EMA(close[-ema20_period:], timeperiod=ema20_period)[-1]
close_price = self.get_price(TradeState.none, df.tail(), self.pair)
print('close_price:', close_price, 'ema:', ema20)
if close_price <= ema20:
new_action = TradeState.sell
else:
new_action = TradeState.buy
# ************** Calc EMA Death Cross
ema_interval_short = 6
ema_interval_long = 25
ema_short = talib.EMA(close[-ema_interval_short:], timeperiod=ema_interval_short)[-1]
ema_long = talib.EMA(close[-ema_interval_long:], timeperiod=ema_interval_long)[-1]
if ema_short <= ema_long: # If we are below death cross, sell
new_action = TradeState.sell
trade_price = self.get_price(new_action, df.tail(), self.pair)
action = TradeAction(self.pair,
new_action,
amount=None,
rate=trade_price,
buy_sell_mode=self.buy_sell_mode)
self.actions.append(action)
return self.actions
def calculate(self, look_back, wallet):
"""
Main strategy logic (the meat of the strategy)
"""
(dataset_cnt, _) = common.get_dataset_count(look_back, self.group_by_field)
# Wait until we have enough data
if dataset_cnt < self.min_history_ticks:
print('dataset_cnt:', dataset_cnt)
return self.actions
self.actions.clear()
# Calculate indicators
df = look_back.tail(self.min_history_ticks)
close = df['close'].values
volume = df['volume'].values
new_action = TradeState.none
close_price = self.get_price(TradeState.none, look_back, self.pair)
# ************** OBV (On Balance Volume)
obv = talib.OBV(close, volume)[-1]
print('obv:', obv)
if obv >= 100.0:
new_action = TradeState.buy
elif obv < 100.0:
new_action = TradeState.sell
# ************** Calc EMA
ema_period = 6
ema = talib.EMA(close[-ema_period:], timeperiod=ema_period)[-1]
if close_price <= ema:
new_action = TradeState.sell
if new_action == TradeState.none:
return self.actions
trade_price = self.get_price(new_action, df.tail(), self.pair)
action = TradeAction(self.pair,
new_action,
amount=None,
rate=trade_price,
buy_sell_mode=self.buy_sell_mode)
self.actions.append(action)
return self.actions