def add_CCI(self, timeperiod=14,
type='line', color='secondary', **kwargs):
"""Channel Commodity Index."""
if not (self.has_high and self.has_low and self.has_close):
raise Exception()
utils.kwargs_check(kwargs, VALID_TA_KWARGS)
if 'kind' in kwargs:
type = kwargs['kind']
name = 'CCI({})'.format(str(timeperiod))
self.sec[name] = dict(type=type, color=color)
self.ind[name] = talib.CCI(self.df[self.hi].values,
self.df[self.lo].values,
self.df[self.cl].values,
timeperiod)
python类CCI的实例源码
def CCI(security_list, timeperiod=14):
# ????????????
if isinstance(security_list, str):
security_list = [security_list]
# ?? CCI
cci = {}
for stock in security_list:
security_data = attribute_history(
stock, timeperiod * 2, '1d', ['close', 'high', 'low'], df=False)
close_CCI = security_data['close']
high_CCI = security_data['high']
low_CCI = security_data['low']
cci[stock] = talib.CCI(high_CCI, low_CCI, close_CCI, timeperiod)
return cci
# ATR
def pre_data(stick_code,ktype='D'):
# ktype in ('D','W','M')
global df
db = m_db2()
try:
if ktype == 'D':
df = db.get_data("select * from t_stick_data_d where code = '"+stick_code+"' and date > '2015-09-01';")#and date>'2015-05-01'
elif ktype == 'W':
df = db.get_data("select * from t_stick_data_w where code = '"+stick_code+"' ;")#and date>'2015-05-01'
elif ktype == 'M':
df = db.get_data("select * from t_stick_data_m where code = '" + stick_code + "' ;") # and date>'2015-05-01'
except Exception as e:
print('ERR:',e)
return
df['cci'] = ta.CCI(df['high'].values.astype('double'),df['low'].values.astype('double'),df['close'].values.astype('double'))
df['diff'],df['dea'],df['macd'] = ta.MACD(df['close'].values.astype('double'),fastperiod=12, slowperiod=26, signalperiod=9)
df['obv'] = ta.OBV(df['close'].values.astype('double'),df['vol'].values.astype('double'))
df['volma5']=ta.MA(df['vol'].values.astype('double'),5);
df['volma20'] = ta.MA(df['vol'].values.astype('double'), 20);
df['MA20'] = ta.MA(df['close'].values.astype('double'), 20)
df['MA60'] = ta.MA(df['close'].values.astype('double'), 60)
df['cwbili']=0
df['pricebili']=0
return df
# draw
def pre_data(stick_code,ktype='D',today=''):
# ktype in ('D','W','M')
#today='2010-01-01'
if '' == today:
today = datetime.date.today().strftime('%Y-%m-%d')
# begindate = datetime.date.today() - datetime.timedelta(days=13)
global df
db = m_db2()
try:
if ktype == 'D':
df = db.get_data("select * from t_stick_data_d where code = '"+stick_code+"' and date > '2015-09-01' and date <='"+today+"' order by date asc;")#and date>'2015-05-01'
elif ktype == 'W':
df = db.get_data("select * from t_stick_data_w where code = '"+stick_code+"' ;")#and date>'2015-05-01'
elif ktype == 'M':
df = db.get_data("select * from t_stick_data_m where code = '" + stick_code + "' ;") # and date>'2015-05-01'
except Exception as e:
#print('ERR:',e)
return
df['cci'] = ta.CCI(df['high'].values.astype('double'),df['low'].values.astype('double'),df['close'].values.astype('double'))
df['diff'],df['dea'],df['macd'] = ta.MACD(df['close'].values.astype('double'),fastperiod=12, slowperiod=26, signalperiod=9)
df['obv'] = ta.OBV(df['close'].values.astype('double'),df['vol'].values.astype('double'))
df['volma5']=ta.MA(df['vol'].values.astype('double'),5);
df['volma13'] = ta.MA(df['vol'].values.astype('double'), 13);
df['volma20'] = ta.MA(df['vol'].values.astype('double'), 20);
df['volma34'] = ta.MA(df['vol'].values.astype('double'), 34);
df['MA20'] = ta.MA(df['close'].values.astype('double'), 20)
df['MA60'] = ta.MA(df['close'].values.astype('double'), 60)
df['MA5'] = ta.MA(df['close'].values.astype('double'), 5)
df['MA13'] = ta.MA(df['close'].values.astype('double'), 13)
df['MA34'] = ta.MA(df['close'].values.astype('double'), 34)
df['MA89'] = ta.MA(df['close'].values.astype('double'), 89)
df['MA144'] = ta.MA(df['close'].values.astype('double'), 144)
df['cwbili']=0
df['pricebili']=0
return df
# draw
def pre_data(stick_code,ktype='D'):
# ktype in ('D','W','M')
global df
db = m_db2()
try:
if ktype == 'D':
df = db.get_data("select * from t_stick_data_d where code = '"+stick_code+"' and date > '2015-09-01';")#and date>'2015-05-01'
elif ktype == 'W':
df = db.get_data("select * from t_stick_data_w where code = '"+stick_code+"' ;")#and date>'2015-05-01'
elif ktype == 'M':
df = db.get_data("select * from t_stick_data_m where code = '" + stick_code + "' ;") # and date>'2015-05-01'
except Exception as e:
print('ERR:',e)
return
df['cci'] = ta.CCI(df['high'].values.astype('double'),df['low'].values.astype('double'),df['close'].values.astype('double'))
df['diff'],df['dea'],df['macd'] = ta.MACD(df['close'].values.astype('double'),fastperiod=12, slowperiod=26, signalperiod=9)
df['obv'] = ta.OBV(df['close'].values.astype('double'),df['vol'].values.astype('double'))
df['volma5']=ta.MA(df['vol'].values.astype('double'),5);
df['volma20'] = ta.MA(df['vol'].values.astype('double'), 20);
df['MA20'] = ta.MA(df['close'].values.astype('double'), 5)
#print(df)
#i= ta.CDLCONCEALBABYSWALL(df['open'].values.astype('double'),df['high'].values.astype('double'),
# df['low'].values.astype('double'),df['close'].values.astype('double'),)
#print(i)
return df
# draw
def CCI(df, n):
PP = (df['high'] + df['low'] + df['close']) / 3
CCI = pd.Series((PP - pd.rolling_mean(PP, n)) / pd.rolling_std(PP, n) / 0.015, name = 'CCI' + str(n))
return CCI
def cci(df, n):
real = talib.CCI(df['high'][(-n-1):], df['low'][(-n-1):], df['close'][(-n-1):], timeperiod=n)
df['CCI' + str(n)][-1] = real[-1]
#Coppock Curve
def CCI_CN(high, low, close, timeperiod=14) :
len1 = len(high)
len2 = len(low)
len3 = len(close)
if len1 != len2 or len1 != len3:
print ("CCI_CN input invalid for len:%s %s %s " %(str(len1),str(len2),str(len3)))
return np.array(np.nan)
cci = tl.CCI(high, low, close, timeperiod=timeperiod)
return cci
#MACD
def CCI_DAY(self, context, security, data={}, ref=0):
CCI = self.CCI_DATA_DAY(context, security, data, ref+1)
if np.isnan(CCI[-1]):
return 0
return CCI[-1-ref]
def CCI_DATA(self, context, security, freq = 'D', data={}, dataCount=1):
#sma target round2
precision = 14
high, low, close = self.GET_PERIOD_DATA(context, security, freq, data, dataCount+precision)
if np.isnan(close[-1]):
return np.array([np.nan])
CCI = self.CCI_CN(high, low, close)
if len(CCI) > precision:
CCI = CCI[-dataCount:]
else:
#print "security:%s no len data precison %s" %(str(security), len(CCI))
pass
decimal.getcontext().rounding=decimal.ROUND_HALF_UP
CCI = np.array([float(decimal.Decimal(s).quantize(decimal.Decimal('0.00'))) for s in CCI])
return CCI
def cci(self, n, array=False):
"""CCI??"""
result = talib.CCI(self.high, self.low, self.close, n)
if array:
return result
return result[-1]
# ----------------------------------------------------------------------
def calculate_indicator(stock_df):
periods = [3, 5, 10, 20, 30, 60]
# MA
for period in periods:
stock_df['MA' + str(period)] = talib.MA(stock_df['close'].values, timeperiod=period)
# EMA
periods = [3, 5, 10, 20, 30, 60]
for period in periods:
stock_df['EMA' + str(period)] = talib.EMA(stock_df['close'].values, timeperiod=period)
# AMTMA
periods = [5, 10, 20]
for period in periods:
stock_df['AMTMA' + str(period)] = talib.MA(stock_df['amount'].values, timeperiod=period)
# ATR
periods = [5, 10, 20]
for period in periods:
stock_df['ATR' + str(period)] = talib.ATR(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# ADX
period = 14
stock_df['ADX' + str(period)] = talib.ADX(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MACD
stock_df['MACD_DIFF'], stock_df['MACD_DEA'], stock_df['MACD_HIST'] = talib.MACD(
stock_df['close'].values, fastperiod=12, slowperiod=26, signalperiod=9)
# CCI
period = 14
stock_df['CCI' + str(period)] = talib.CCI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MFI
period = 14
stock_df['MFI' + str(period)] = talib.MFI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, stock_df['volume'].values,
timeperiod=period)
# ROCP
periods = [5, 10, 20]
for period in periods:
stock_df['ROCP' + str(period)] = talib.ROCP(stock_df['close'].values, timeperiod=period)
def calculate_indicator(stock_df):
periods = [3, 5, 10, 20, 30, 60]
# MA
for period in periods:
stock_df['MA' + str(period)] = talib.MA(stock_df['close'].values, timeperiod=period)
# EMA
periods = [3, 5, 10, 20, 30, 60]
for period in periods:
stock_df['EMA' + str(period)] = talib.EMA(stock_df['close'].values, timeperiod=period)
# AMTMA
periods = [5, 10, 20]
for period in periods:
stock_df['AMTMA' + str(period)] = talib.MA(stock_df['amount'].values, timeperiod=period)
# ATR
periods = [5, 10, 20]
for period in periods:
stock_df['ATR' + str(period)] = talib.ATR(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# ADX
period = 14
stock_df['ADX' + str(period)] = talib.ADX(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MACD
stock_df['MACD_DIFF'], stock_df['MACD_DEA'], stock_df['MACD_HIST'] = talib.MACD(
stock_df['close'].values, fastperiod=12, slowperiod=26, signalperiod=9)
# CCI
period = 14
stock_df['CCI' + str(period)] = talib.CCI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, timeperiod=period)
# MFI
period = 14
stock_df['MFI' + str(period)] = talib.MFI(stock_df['high'].values, stock_df['low'].values,
stock_df['close'].values, stock_df['volume'].values,
timeperiod=period)
# ROCP
periods = [5, 10, 20]
for period in periods:
stock_df['ROCP' + str(period)] = talib.ROCP(stock_df['close'].values, timeperiod=period)
def pre_data(self,stick_code, ktype='D', beginday='',endday=''):
# ktype in ('D','W','M')
# today='2010-01-01'
if '' == beginday:
begindaytmp = datetime.date.today() - datetime.timedelta(days=13)
beginday = begindaytmp.strftime('%Y-%m-%d')
if '' == endday:
endday = datetime.date.today().strftime('%Y-%m-%d')
df =''
print(beginday,endday)
try:
if ktype == 'D':
df = self.get_data(
"select * from t_stick_data_d \
where code = '" + stick_code + "' and date > '"+beginday+"' \
and date <='" + endday + "' order by date asc;") # and date>'2015-05-01'
elif ktype == 'W':
df = self.get_data(
"select * from t_stick_data_w \
where code = '" + stick_code + "' and date > '"+beginday+"' \
and date <='" + endday + "' order by date asc;") # and date>'2015-05-01'
elif ktype == 'M':
df = self.get_data(
"select * from t_stick_data_m \
where code = '" + stick_code + "' and date > '"+beginday+"' \
and date <='" + endday + "' order by date asc;") # and date>'2015-05-01'
except Exception as e:
# print('ERR:',e)
return
df['cci'] = ta.CCI(df['high'].values.astype('double'), df['low'].values.astype('double'),
df['close'].values.astype('double'))
df['diff'], df['dea'], df['macd'] = ta.MACD(df['close'].values.astype('double'), fastperiod=12, slowperiod=26,
signalperiod=9)
df['obv'] = ta.OBV(df['close'].values.astype('double'), df['vol'].values.astype('double'))
df['volma5'] = ta.MA(df['vol'].values.astype('double'), 5);
df['volma20'] = ta.MA(df['vol'].values.astype('double'), 20);
df['MA20'] = ta.MA(df['close'].values.astype('double'), 20)
df['MA60'] = ta.MA(df['close'].values.astype('double'), 60)
df['cwbili'] = 0
df['pricebili'] = 0
return df
# xx=m_db2();
# df=xx.pre_data('000157',ktype='W')
# print(df)
# xx.insert_data('t_stick_data_m_test',df.head(20).as_matrix())
# xx.commit()
def calculate_features(df):
"""
Method which calculates and generates features
"""
close = df['close'].values
high = df['high'].values
low = df['low'].values
volume = df['volume'].values
last_row = df.tail(1).copy()
# ************** Calc EMAs
ema_periods = [2, 4, 8, 12, 16, 20]
for ema_period in ema_periods:
ema = talib.EMA(close[-ema_period:], timeperiod=ema_period)[-1]
last_row['ema' + str(ema_period)] = ema
# ************** Calc RSIs
rsi_periods = [5]
for rsi_period in rsi_periods:
rsi = talib.RSI(close[-rsi_period:], timeperiod=rsi_period-1)[-1]
last_row['rsi' + str(rsi_period)] = rsi
last_row['rsi_above_50' + str(rsi_period)] = int(rsi > 50.0)
# ************** Calc CCIs
cci_periods = [5]
for cci_period in cci_periods:
cci = talib.CCI(high[-cci_period:],
low[-cci_period:],
close[-cci_period:],
timeperiod=cci_period)[-1]
last_row['cci' + str(cci_period)] = cci
# ************** Calc MACD 1
macd_periods = [34]
for macd_period in macd_periods:
macd, macd_signal, _ = talib.MACD(close[-macd_period:],
fastperiod=12,
slowperiod=26,
signalperiod=9)
macd = macd[-1]
signal_line = macd_signal[-1]
last_row['macd_above_signal' + str(macd_period)] = int(macd > signal_line)
last_row['macd_above_zero' + str(macd_period)] = int(macd > 0.0)
# ************** Calc OBVs
obv_periods = [2, 4, 8, 12, 16, 20]
for obv_period in obv_periods:
obv = talib.OBV(close[-obv_period:], volume[-obv_period:])[-1]
last_row['obv' + str(obv_period)] = obv
return last_row
def calculate_features(df):
"""
Method which calculates and generates features
"""
close = df['close'].values
high = df['high'].values
low = df['low'].values
volume = df['volume'].values
last_row = df.tail(1).copy()
# ************** Calc EMAs
ema_periods = [2, 4, 8, 12, 16, 20]
for ema_period in ema_periods:
ema = talib.EMA(close[-ema_period:], timeperiod=ema_period)[-1]
last_row['ema' + str(ema_period)] = ema
# ************** Calc RSIs
rsi_periods = [5]
for rsi_period in rsi_periods:
rsi = talib.RSI(close[-rsi_period:], timeperiod=rsi_period-1)[-1]
last_row['rsi' + str(rsi_period)] = rsi
last_row['rsi_above_50' + str(rsi_period)] = int(rsi > 50.0)
# ************** Calc CCIs
cci_periods = [5]
for cci_period in cci_periods:
cci = talib.CCI(high[-cci_period:],
low[-cci_period:],
close[-cci_period:],
timeperiod=cci_period)[-1]
last_row['cci' + str(cci_period)] = cci
# ************** Calc MACD 1
macd_periods = [34]
for macd_period in macd_periods:
macd, macd_signal, _ = talib.MACD(close[-macd_period:],
fastperiod=12,
slowperiod=26,
signalperiod=9)
macd = macd[-1]
signal_line = macd_signal[-1]
last_row['macd_above_signal' + str(macd_period)] = int(macd > signal_line)
last_row['macd_above_zero' + str(macd_period)] = int(macd > 0.0)
# ************** Calc OBVs
obv_periods = [2, 4, 8, 12, 16, 20]
for obv_period in obv_periods:
obv = talib.OBV(close[-obv_period:], volume[-obv_period:])[-1]
last_row['obv' + str(obv_period)] = obv
return last_row