def __init__(self):
super().__init__()
self.Instrument = 'rb1610'
self.Interval = 15
self.IntervalType = IntervalType.Minute
self.BeginDate = '20160101'
self.Params['Lots'] = 1
self.SingleOrderOneBar = True
self.Params['RiskRatio'] = 1 # % Risk Per N ( 0 - 100)
self.Params['ATRLength'] = 20 # ?????? ATR Length
self.Params['boLength'] = 20 # ??? BreakOut Length
self.Params['fsLength'] = 55 # ??? FailSafe Length
self.Params['teLength'] = 10 # ???? Trailing Exit Lengt
python类ATR的实例源码
def ATR(security_list, timeperiod=14):
# ????????????
if isinstance(security_list, str):
security_list = [security_list]
# ?? ATR
atr = {}
for stock in security_list:
security_data = attribute_history(
stock, timeperiod * 2, '1d', ['close', 'high', 'low'], df=False)
close_ATR = security_data['close']
high_ATR = security_data['high']
low_ATR = security_data['low']
atr[stock] = talib.ATR(high_ATR, low_ATR, close_ATR, timeperiod)
return atr
# ???
def compute_average_true_ranges(context):
"""
Compute average true ranges, or N.
"""
if context.is_debug:
start_time = time()
rolling_window = 21
moving_average = 20
for market in context.prices.items:
context.average_true_range[market] = ATR(
context.prices[market].high[-rolling_window:],
context.prices[market].low[-rolling_window:],
context.prices[market].close[-rolling_window:],
timeperiod=moving_average
)[-1]
if context.is_test:
assert(len(context.average_true_range) > 0)
if context.is_debug:
time_taken = (time() - start_time) * 1000
log.debug('Executed in %f ms.' % time_taken)
assert(time_taken < 1024)
def atr(self, n, array=False):
"""ATR??"""
result = talib.ATR(self.high, self.low, self.close, n)
if array:
return result
return result[-1]
# ----------------------------------------------------------------------
def ATR(highs, lows, closes):
"""????"""
highs = np.array(highs)
lows = np.array(lows)
closes = np.array(closes)
return talib.ATR(highs, lows, closes)
def history(self, databar, period, indicator):
"""Recieve mock data as databar dataframe format
:databar: mock data bar, flexible, means will be changed by indicator, one dimension or several dimensions
:period: time interval, flexbile too
:indicator: include ['sma','macd','atr' ...]
"""
if indicator == 'sma':
try:
sma0 = talib.SMA(np.array(databar.low), timeperiod = period)
sma1 = talib.SMA(np.array(databar.close), timeperiod = period)
sma2 = talib.SMA(np.array(databar.high), timeperiod = period)
return pd.DataFrame({'sma0':sma0, 'sma1':sma1, 'sma2':sma2}, index=pd.DatetimeIndex(databar.date))
except KeyError:
print('Pls check databar whether is dataframe')
return None
elif indicator == 'atr':
try:
atr = talib.ATR(np.array(databar.high), np.array(databar.low), np.array(databar.close), timeperiod = period)
return pd.DataFrame({'atr':atr}, index=pd.DatetimeIndex(databar.date))
except KeyError:
print('Pls check databar whether is dataframe')
return None
elif indicator == 'macd':
try:
macd, macdsignal, macdhist = talib.MACD(databar,
fastperiod = period['fastperiod'],
slowperiod = period['slowperiod'],
signalperiod = period['signalperiod'])
return pd.DataFrame({'macd':macd, 'macdsignal':macdsignal, 'macdhist':macdhist}, index=pd.DatetimeIndex(databar.date))
except KeyError:
print('Pls check databar whether is dataframe')
return None
def calculate_indicator(stock_df):
periods = [3, 5, 10, 20, 30, 60]
# MA
for period in periods:
stock_df['MA' + str(period)] = talib.MA(stock_df['close'].values, timeperiod=period)
# EMA
periods = [3, 5, 10, 20, 30, 60]
for period in periods:
stock_df['EMA' + str(period)] = talib.EMA(stock_df['close'].values, timeperiod=period)
# AMTMA
periods = [5, 10, 20]
for period in periods:
stock_df['AMTMA' + str(period)] = talib.MA(stock_df['amount'].values, timeperiod=period)
# ATR
periods = [5, 10, 20]
for period in periods:
stock_df['ATR' + str(period)] = talib.ATR(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# ADX
period = 14
stock_df['ADX' + str(period)] = talib.ADX(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MACD
stock_df['MACD_DIFF'], stock_df['MACD_DEA'], stock_df['MACD_HIST'] = talib.MACD(
stock_df['close'].values, fastperiod=12, slowperiod=26, signalperiod=9)
# CCI
period = 14
stock_df['CCI' + str(period)] = talib.CCI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MFI
period = 14
stock_df['MFI' + str(period)] = talib.MFI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, stock_df['volume'].values,
timeperiod=period)
# ROCP
periods = [5, 10, 20]
for period in periods:
stock_df['ROCP' + str(period)] = talib.ROCP(stock_df['close'].values, timeperiod=period)
def calculate_indicator(stock_df):
periods = [3, 5, 10, 20, 30, 60]
# MA
for period in periods:
stock_df['MA' + str(period)] = talib.MA(stock_df['close'].values, timeperiod=period)
# EMA
periods = [3, 5, 10, 20, 30, 60]
for period in periods:
stock_df['EMA' + str(period)] = talib.EMA(stock_df['close'].values, timeperiod=period)
# AMTMA
periods = [5, 10, 20]
for period in periods:
stock_df['AMTMA' + str(period)] = talib.MA(stock_df['amount'].values, timeperiod=period)
# ATR
periods = [5, 10, 20]
for period in periods:
stock_df['ATR' + str(period)] = talib.ATR(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# ADX
period = 14
stock_df['ADX' + str(period)] = talib.ADX(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MACD
stock_df['MACD_DIFF'], stock_df['MACD_DEA'], stock_df['MACD_HIST'] = talib.MACD(
stock_df['close'].values, fastperiod=12, slowperiod=26, signalperiod=9)
# CCI
period = 14
stock_df['CCI' + str(period)] = talib.CCI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MFI
period = 14
stock_df['MFI' + str(period)] = talib.MFI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, stock_df['volume'].values,
timeperiod=period)
# ROCP
periods = [5, 10, 20]
for period in periods:
stock_df['ROCP' + str(period)] = talib.ROCP(stock_df['close'].values, timeperiod=period)
def mfe_mae(h, days=200, period=5):
"""
ln(1+mfe) + ln(1-mae)
:return:
"""
# ?? buy ???????? 1 ~ 200 ?? MAE ? MFE
days_list = list(range(0, 200, period))
h = h.copy()
h["ATR"] = tl.ATR(*np.asarray(h[["high", "low", "close"]].T))
e_ratios = []
# ????
size = h.shape[0]
counts = h.buy.value_counts()
buy_times = counts[True]
# ??
buy_index = np.where(h.buy == True)[0]
# ?? E1 ~ E200, E1 ??????
for e_days in days_list:
# ???? e_days=5 ?? MFE ? MAE
mfe = [0 for _ in range(size)]
mae = [0 for _ in range(size)]
for d in buy_index:
# MFE d : d + edays ?????????
if d == size-1:
# ???????
buy_times = buy_times - 1
break
cost = h.cost[d]
atr = h.ATR[d]
# ??????????? ?50%????200%???????150%??
high = h.high[d:d + e_days + 1].max()
mfe[d] = math.log(high/cost) # ???
# mfe[d] = high/cost/atr # ???
low = h.low[d:d + e_days + 1].min()
mae[d] = math.log(low/cost) # ???
# mae[d] = low/cost/atr # ???
# ? MFE ? MAE ??????
e = (sum(mfe) + sum(mae)) / buy_times # ???
# e = (sum(mfe) / buy_times) / (sum(mae) / buy_times) # ???
e_ratios.append(e)
# ?? DataFrame
return pd.DataFrame({"E-ratio":e_ratios}, index=days_list), h
def onFiveBar(self, bar):
"""??5??K?"""
# ?????????????????????????
for orderID in self.orderList:
self.cancelOrder(orderID)
self.orderList = []
# ??K???
self.closeArray[0:self.bufferSize-1] = self.closeArray[1:self.bufferSize]
self.highArray[0:self.bufferSize-1] = self.highArray[1:self.bufferSize]
self.lowArray[0:self.bufferSize-1] = self.lowArray[1:self.bufferSize]
self.closeArray[-1] = bar.close
self.highArray[-1] = bar.high
self.lowArray[-1] = bar.low
self.bufferCount += 1
if self.bufferCount < self.bufferSize:
return
# ??????
self.atrValue = talib.ATR(self.highArray,
self.lowArray,
self.closeArray,
self.kkLength)[-1]
self.kkMid = talib.MA(self.closeArray, self.kkLength)[-1]
self.kkUp = self.kkMid + self.atrValue * self.kkDev
self.kkDown = self.kkMid - self.atrValue * self.kkDev
# ?????????
# ????????OCO????
if self.pos == 0:
self.intraTradeHigh = bar.high
self.intraTradeLow = bar.low
self.sendOcoOrder(self.kkUp, self.kkDown, self.fixedSize)
# ??????
elif self.pos > 0:
self.intraTradeHigh = max(self.intraTradeHigh, bar.high)
self.intraTradeLow = bar.low
orderID = self.sell(self.intraTradeHigh*(1-self.trailingPrcnt/100),
abs(self.pos), True)
self.orderList.append(orderID)
# ??????
elif self.pos < 0:
self.intraTradeHigh = bar.high
self.intraTradeLow = min(self.intraTradeLow, bar.low)
orderID = self.cover(self.intraTradeLow*(1+self.trailingPrcnt/100),
abs(self.pos), True)
self.orderList.append(orderID)
# ????????
self.putEvent()
#----------------------------------------------------------------------
def exit_trades(self, signals, price_df):
"""
This function applies the exit criteria.
:param signals: ndarray
the signals array
:param price_df:
ohlc dataframe
:return:
"""
atr = talib.ATR(np.array(price_df['high']), np.array(price_df['low']),
np.array(price_df['close']),
timeperiod=self.atr_stops_period)
log.info('Starting to calculate profit/loss')
price_df['atr'] = atr
price = price_df['close'].values
entries = np.zeros_like(atr)
exits = np.zeros_like(atr)
profits = np.zeros_like(atr)
units = np.zeros_like(atr)
commission = np.zeros_like(atr)
_exit = 0
# Calculate profit, entry/exit (hold time) based on atr criteria
# In this loop we make sure there are no overlapping trades
for i, s in enumerate(signals):
if np.isnan(price_df.ix[i, ['atr']].values[0]):
signals[i] = 0
continue
if s and _exit <= i:
_exit, profit, unit = self._exit_trailing_atr(price_df, i, s)
entries[i] = i
exits[i] = _exit
# subtract commision from profit
profits[i] = profit
commission[i] = (unit * (price[i] +
price[_exit])) * self.commission
units[i] = unit
else:
signals[i] = 0
exits_df = pd.DataFrame({'entries': entries,
'exits': exits,
'profits': profits,
'commission': commission,
'units': units,
'signal': signals},
index=price_df.index)
price_df = price_df.join(exits_df)
return price_df
def BarUpdate(self):
""""""
if self.CurrentBar < self.Params['fsLength']:
return
atr = talib.ATR(self.H, self.L, self.C, 14)
lots = self.Params['Lots']
DonchianHi = talib.MAX(self.H, self.Params['boLength'])[-2]
DonchianLo = talib.MIN(self.L, self.Params['boLength'])[-2]
fsDonchianHi = talib.MAX(self.H, self.Params['fsLength'])[-2]
fsDonchianLo = talib.MIN(self.L, self.Params['fsLength'])[-2]
ExitHighestPrice = talib.MAX(self.H, self.Params['teLength'])[-2]
ExitLowestPrice = talib.MIN(self.L, self.Params['teLength'])[-2]
if len(atr) < 2:
return
N = atr[-2]
if self.Position == 0:
if self.H[-1] > DonchianHi:
price = max(min(self.H[-1], DonchianHi), self.O[-1])
self.Buy(price, lots, '??')
elif self.L[-1] < DonchianLo:
price = min(max(self.L[-1], DonchianLo), self.O[-1])
self.SellShort(price, lots, '??')
# ????
elif self.H[-1] > fsDonchianHi:
price = max(min(self.H[-1], fsDonchianHi), self.O[-1])
self.Buy(price, lots, '??-fs')
elif self.L[-1] < fsDonchianLo:
price = min(max(self.L[-1], fsDonchianLo), self.O[-1])
self.SellShort(price, lots, '??-fs')
elif self.Position > 0:
if self.L[-1] < ExitLowestPrice:
price = min(self.O[-1], max(self.L[-1], ExitLowestPrice))
self.Sell(price, self.PositionLong, 'exit-?????')
else:
if self.H[-1] >= self.LastEntryPriceLong + 0.5 * N:
price = max(self.O[-1], self.LastEntryPriceLong + 0.5 * N)
self.Buy(price, lots, '{0},{1}'.format(N, '??-?'))
elif self.Position < 0:
if self.H[-1] > ExitHighestPrice:
price = max(self.O[-1], min(self.H[-1], ExitHighestPrice))
self.BuyToCover(price, self.PositionShort, 'exit')
else:
if self.L[-1] <= self.LastEntryPriceShort - 0.5 * N:
price = min(self.O[-1], self.LastEntryPriceShort - 0.5 * N)
self.SellShort(price, lots, '{0},{1}'.format(N, '??-?'))