def SVAPO(df, period = 8, cutoff = 1, stdev_h = 1.5, stdev_l = 1.3, stdev_period = 100):
HA = HEIKEN_ASHI(df, 1)
haCl = (HA.HAopen + HA.HAclose + HA.HAhigh + HA.HAlow)/4.0
haC = TEMA( haCl, 0.625 * period )
vave = MA(df, 5 * period, field = 'volume').shift(1)
vc = pd.concat([df['volume'], vave*2], axis=1).min(axis=1)
vtrend = TEMA(LINEAR_REG_SLOPE(df.volume, period), period)
UpD = pd.Series(vc)
DoD = pd.Series(-vc)
UpD[(haC<=haC.shift(1)*(1+cutoff/1000.0))|(vtrend < vtrend.shift(1))] = 0
DoD[(haC>=haC.shift(1)*(1-cutoff/1000.0))|(vtrend > vtrend.shift(1))] = 0
delta_sum = pd.rolling_sum(UpD + DoD, period)/(vave+1)
svapo = pd.Series(TEMA(delta_sum, period), name = 'SVAPO_%s' % period)
svapo_std = pd.rolling_std(svapo, stdev_period)
svapo_ub = pd.Series(svapo_std * stdev_h, name = 'SVAPO_UB%s' % period)
svapo_lb = pd.Series(-svapo_std * stdev_l, name = 'SVAPO_LB%s' % period)
return pd.concat([svapo, svapo_ub, svapo_lb], join='outer', axis=1)
python类MA的实例源码
def FOUR(closes, days = [5,10,20,60]):
"""????? return: fours"""
closes = np.array(closes)
avgs = []
for day in days:
avgs.append(MA(closes, day=day))
max_day = max(days)
#???????????????
dvs = np.zeros(len(closes[max_day:]))
for i in range(len(avgs)):
c = avgs[i][max_day:]/closes[max_day:]
for j in range(i, len(avgs)):
dvs += c - avgs[j][max_day:]/closes[max_day:]
max_day = min(max_day, len(closes))
fours = np.zeros(max_day)
fours = np.full(len(fours), np.nan)
fours = agl.array_insert(fours, len(fours), np.array(dvs))
return fours
def __init__(self, selector):
self.selector = selector
self.supported = {"ROCP", "OROCP", "HROCP", "LROCP", "MACD", "RSI", "VROCP", "BOLL", "MA", "VMA", "PRICE_VOLUME"}
self.feature = []
def ma_type_test():
#MA_Type: 0=SMA, 1=EMA, 2=WMA, 3=DEMA, 4=TEMA, 5=TRIMA, 6=KAMA, 7=MAMA, 8=T3 (Default=SMA)
df=ts.get_k_data('300580',start='2017-01-12',end='2017-05-26')
closed=df['close'].values
sma=talib.MA(closed,timeperiod=10,matype=0)
ema=talib.MA(closed,timeperiod=10,matype=1)
wma=talib.MA(closed,timeperiod=10,matype=2)
dema=talib.MA(closed,timeperiod=10,matype=3)
tema=talib.MA(closed,timeperiod=10,matype=4)
trima=talib.MA(closed,timeperiod=10,matype=5)
kma=talib.MA(closed,timeperiod=10,matype=6)
mama=talib.MA(closed,timeperiod=10,matype=7)
t3=talib.MA(closed,timeperiod=10,matype=8)
#ouput=talib.MA(closed,timeperiod=5,matype=0)
print closed
plt.ylim([0,40])
plt.plot(sma,'r--')
plt.plot(ema,'g-*')
plt.plot(wma)
plt.plot(dema)
plt.plot(tema)
plt.plot(trima)
plt.plot(kma)
plt.plot(mama)
plt.plot(t3)
plt.grid()
plt.text(7,30,'BST')
plt.show()
def pre_data(stick_code,ktype='D'):
# ktype in ('D','W','M')
global df
db = m_db2()
try:
if ktype == 'D':
df = db.get_data("select * from t_stick_data_d where code = '"+stick_code+"' and date > '2015-09-01';")#and date>'2015-05-01'
elif ktype == 'W':
df = db.get_data("select * from t_stick_data_w where code = '"+stick_code+"' ;")#and date>'2015-05-01'
elif ktype == 'M':
df = db.get_data("select * from t_stick_data_m where code = '" + stick_code + "' ;") # and date>'2015-05-01'
except Exception as e:
print('ERR:',e)
return
df['cci'] = ta.CCI(df['high'].values.astype('double'),df['low'].values.astype('double'),df['close'].values.astype('double'))
df['diff'],df['dea'],df['macd'] = ta.MACD(df['close'].values.astype('double'),fastperiod=12, slowperiod=26, signalperiod=9)
df['obv'] = ta.OBV(df['close'].values.astype('double'),df['vol'].values.astype('double'))
df['volma5']=ta.MA(df['vol'].values.astype('double'),5);
df['volma20'] = ta.MA(df['vol'].values.astype('double'), 20);
df['MA20'] = ta.MA(df['close'].values.astype('double'), 20)
df['MA60'] = ta.MA(df['close'].values.astype('double'), 60)
df['cwbili']=0
df['pricebili']=0
return df
# draw
def pre_data(stick_code,ktype='D',today=''):
# ktype in ('D','W','M')
#today='2010-01-01'
if '' == today:
today = datetime.date.today().strftime('%Y-%m-%d')
# begindate = datetime.date.today() - datetime.timedelta(days=13)
global df
db = m_db2()
try:
if ktype == 'D':
df = db.get_data("select * from t_stick_data_d where code = '"+stick_code+"' and date > '2015-09-01' and date <='"+today+"' order by date asc;")#and date>'2015-05-01'
elif ktype == 'W':
df = db.get_data("select * from t_stick_data_w where code = '"+stick_code+"' ;")#and date>'2015-05-01'
elif ktype == 'M':
df = db.get_data("select * from t_stick_data_m where code = '" + stick_code + "' ;") # and date>'2015-05-01'
except Exception as e:
#print('ERR:',e)
return
df['cci'] = ta.CCI(df['high'].values.astype('double'),df['low'].values.astype('double'),df['close'].values.astype('double'))
df['diff'],df['dea'],df['macd'] = ta.MACD(df['close'].values.astype('double'),fastperiod=12, slowperiod=26, signalperiod=9)
df['obv'] = ta.OBV(df['close'].values.astype('double'),df['vol'].values.astype('double'))
df['volma5']=ta.MA(df['vol'].values.astype('double'),5);
df['volma13'] = ta.MA(df['vol'].values.astype('double'), 13);
df['volma20'] = ta.MA(df['vol'].values.astype('double'), 20);
df['volma34'] = ta.MA(df['vol'].values.astype('double'), 34);
df['MA20'] = ta.MA(df['close'].values.astype('double'), 20)
df['MA60'] = ta.MA(df['close'].values.astype('double'), 60)
df['MA5'] = ta.MA(df['close'].values.astype('double'), 5)
df['MA13'] = ta.MA(df['close'].values.astype('double'), 13)
df['MA34'] = ta.MA(df['close'].values.astype('double'), 34)
df['MA89'] = ta.MA(df['close'].values.astype('double'), 89)
df['MA144'] = ta.MA(df['close'].values.astype('double'), 144)
df['cwbili']=0
df['pricebili']=0
return df
# draw
def pre_data(stick_code,ktype='D'):
# ktype in ('D','W','M')
global df
db = m_db2()
try:
if ktype == 'D':
df = db.get_data("select * from t_stick_data_d where code = '"+stick_code+"' and date > '2015-09-01';")#and date>'2015-05-01'
elif ktype == 'W':
df = db.get_data("select * from t_stick_data_w where code = '"+stick_code+"' ;")#and date>'2015-05-01'
elif ktype == 'M':
df = db.get_data("select * from t_stick_data_m where code = '" + stick_code + "' ;") # and date>'2015-05-01'
except Exception as e:
print('ERR:',e)
return
df['cci'] = ta.CCI(df['high'].values.astype('double'),df['low'].values.astype('double'),df['close'].values.astype('double'))
df['diff'],df['dea'],df['macd'] = ta.MACD(df['close'].values.astype('double'),fastperiod=12, slowperiod=26, signalperiod=9)
df['obv'] = ta.OBV(df['close'].values.astype('double'),df['vol'].values.astype('double'))
df['volma5']=ta.MA(df['vol'].values.astype('double'),5);
df['volma20'] = ta.MA(df['vol'].values.astype('double'), 20);
df['MA20'] = ta.MA(df['close'].values.astype('double'), 5)
#print(df)
#i= ta.CDLCONCEALBABYSWALL(df['open'].values.astype('double'),df['high'].values.astype('double'),
# df['low'].values.astype('double'),df['close'].values.astype('double'),)
#print(i)
return df
# draw
def MAEXT(df, n, field = 'close', ma_type = 0):
return pd.Series(talib.MA(df[field].values, timeperiod = n, matype = ma_type), index = df.index, name = 'MA_' + field.upper() + str(n))
def maext(df, n, field = 'close', ma_type = 0):
key = 'MA_' + field.upper() + '_' + str(n)
ma_ts = talib.MA(df[field][-(n+1):].values, timeperiod = n, matype = ma_type)
df[key][-1] = float(ma_ts[-1])
def MA(df, n, field = 'close'):
return pd.Series(pd.rolling_mean(df[field], n), name = 'MA_' + field.upper() + '_' + str(n), index = df.index)
def SMAVAR(df, n, field = 'close'):
ma_ts = MA(df, n, field)
var_ts = pd.Series(pd.rolling_mean(df[field]**2, n) - ma_ts**2, name = 'SVAR_' + field.upper() + '_' + str(n))
return pd.concat([ma_ts, var_ts], join='outer', axis=1)
def BBANDS(df, n, k = 2):
MA = pd.Series(pd.rolling_mean(df['close'], n))
MSD = pd.Series(pd.rolling_std(df['close'], n))
b1 = 2 * k * MSD / MA
B1 = pd.Series(b1, name = 'BollingerB' + str(n))
b2 = (df['close'] - MA + k * MSD) / (2 * k * MSD)
B2 = pd.Series(b2, name = 'Bollingerb' + str(n))
return pd.concat([B1,B2], join='outer', axis=1)
#Pivot Points, Supports and Resistances
def add_MA(self, timeperiod=20, matype=0,
type='line', color='secondary', **kwargs):
"""Moving Average (customizable)."""
if not self.has_close:
raise Exception()
utils.kwargs_check(kwargs, VALID_TA_KWARGS)
if 'kind' in kwargs:
type = kwargs['kind']
name = 'MA({})'.format(str(timeperiod))
self.pri[name] = dict(type=type, color=color)
self.ind[name] = talib.MA(self.df[self.cl].values,
timeperiod, matype)
def add_STOCH(self, fastk_period=5, slowk_period=3,
slowk_matype=0, slowd_period=3, slowd_matype=0,
types=['line', 'line'],
colors=['primary', 'tertiary'],
**kwargs):
"""Slow Stochastic Oscillator.
Note that the first argument of types and colors refers to Slow Stoch %K,
while second argument refers to Slow Stoch %D
(signal line of %K obtained by MA).
"""
if not (self.has_high and self.has_low and self.has_close):
raise Exception()
utils.kwargs_check(kwargs, VALID_TA_KWARGS)
if 'kind' in kwargs:
kwargs['type'] = kwargs['kind']
if 'kinds' in kwargs:
types = kwargs['type']
if 'type' in kwargs:
types = [kwargs['type']] * 2
if 'color' in kwargs:
colors = [kwargs['color']] * 2
name = 'STOCH({},{},{})'.format(str(fastk_period),
str(slowk_period),
str(slowd_period))
slowk = name + r'[%k]'
slowd = name + r'[%d]'
self.sec[slowk] = dict(type=types[0], color=colors[0])
self.sec[slowd] = dict(type=types[1], color=colors[1], on=slowk)
self.ind[slowk], self.ind[slowd] = talib.STOCH(self.df[self.hi].values,
self.df[self.lo].values,
self.df[self.cl].values,
fastk_period, slowk_period,
slowk_matype, slowd_period,
slowd_matype)
def add_STOCHF(self, fastk_period=5, fastd_period=3, fastd_matype=0,
types=['line', 'line'],
colors=['primary', 'tertiary'],
**kwargs):
"""Fast Stochastic Oscillator.
Note that the first argument of types and colors refers to Fast Stoch %K,
while second argument refers to Fast Stoch %D
(signal line of %K obtained by MA).
"""
if not (self.has_high and self.has_low and self.has_close):
raise Exception()
utils.kwargs_check(kwargs, VALID_TA_KWARGS)
if 'kind' in kwargs:
kwargs['type'] = kwargs['kind']
if 'kinds' in kwargs:
types = kwargs['type']
if 'type' in kwargs:
types = [kwargs['type']] * 2
if 'color' in kwargs:
colors = [kwargs['color']] * 2
name = 'STOCHF({},{})'.format(str(fastk_period),
str(fastd_period))
fastk = name + r'[%k]'
fastd = name + r'[%d]'
self.sec[fastk] = dict(type=types[0], color=colors[0])
self.sec[fastd] = dict(type=types[1], color=colors[1], on=fastk)
self.ind[fastk], self.ind[fastd] = talib.STOCHF(self.df[self.hi].values,
self.df[self.lo].values,
self.df[self.cl].values,
fastk_period, fastd_period,
fastd_matype)
def __init__(self, selector):
self.selector = selector
self.supported = {"ROCP", "OROCP", "HROCP", "LROCP", "MACD", "RSI", "VROCP", "BOLL", "MA", "VMA", "PRICE_VOLUME"}
self.feature = []
prep_data_03_stock_02_OHLCV_arrays_2_features_targets_arrays.py 文件源码
项目:LIE
作者: EmbraceLife
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def __init__(self, selector):
self.selector = selector
self.supported = {"ROCP", "OROCP", "HROCP", "LROCP", "MACD", "RSI", "VROCP", "BOLL", "MA", "VMA", "PRICE_VOLUME"}
self.feature = []
def __init__(self, selector):
self.selector = selector
self.supported = {"ROCP", "OROCP", "HROCP", "LROCP", "MACD", "RSI", "VROCP", "BOLL", "MA", "VMA", "PRICE_VOLUME"}
self.feature = []
def __init__(self, selector):
self.selector = selector
self.supported = {"ROCP", "OROCP", "HROCP", "LROCP", "MACD", "RSI", "VROCP", "BOLL", "MA", "VMA", "PRICE_VOLUME"}
self.feature = []
def __init__(self, selector):
self.selector = selector
self.supported = {"ROCP", "OROCP", "HROCP", "LROCP", "MACD", "RSI", "VROCP", "BOLL", "MA", "VMA", "PRICE_VOLUME"}
self.feature = []
def moving_average(data, tp=14):
"""
:param data: ndarray
data for MA
:param tp: int
time period for MA
"""
return MA(data, timeperiod=tp)
def get_fit(codes, data):
for code in codes:
try:
his = data.history(code, frequency='5min', length=21, fields='close')
except Exception as e:
print e
continue
fast = pd.Series(talib.MA(his.values, timeperiod=10), his.index)
slow = pd.Series(talib.MA(his.values, timeperiod=20), his.index)
if fast[-1] > slow[-1] and (fast[-2] < slow[-2]):
yield code
def handle_data(context, data):
# print data.history('000001', length=2)
dc = data.history(context.s, fields="close", length=20).values
context.ma10[0] = context.ma10[1]
context.ma20[0] = context.ma20[1]
context.ma10[1] = ta.MA(dc, 10)[-1]
context.ma20[1] = ta.MA(dc, 20)[-1]
if context.ma10[0] <= context.ma20[0] and context.ma10[1] > context.ma20[1]:
print(context.ma10, context.ma20)
order_target(context.security, 1)
elif context.ma10[0] >= context.ma20[0] and context.ma10[1] < context.ma20[1]:
print(context.ma10, context.ma20)
order_target(context.security, -1)
position = context.portfolio.positions.get(context.s, 0)
def entry(context, data, s, lot=LOT):
def order_to(n):
sod = context.stop_order.pop(s, None)
# cancel_order(sod.id)
if sod:
o = get_order(sod)
if o.is_open:
o.cancel()
n *= 2
order(s, n)
# cancel former stop_order
dc = data.history(s, fields="close", length=MA_SLOW_LEN).values
ma_fast = context.ma_fast[s]
ma_slow = context.ma_slow[s]
ma_fast[0] = ma_fast[1]
ma_slow[0] = ma_slow[1]
try:
ma_fast[1] = ta.MA(dc, MA_FAST_LEN)[-1]
ma_slow[1] = ta.MA(dc, MA_SLOW_LEN)[-1]
except:
return
if ma_fast[0] <= ma_slow[0] and ma_fast[1] > ma_slow[1]:
order_to(lot)
elif ma_fast[0] >= ma_slow[0] and ma_fast[1] < ma_slow[1]:
order_to(-lot)
def calculate_ma(context, data):
for ticker in context.tickers:
if data.can_trade(ticker):
context.ma[ticker] = {
'fast': talib.MA(data.history(ticker, fields='close', length=fast + 1).values, fast),
'slow': talib.MA(data.history(ticker, fields='close', length=slow + 1).values, slow)
}
def MA_CN(close,timeperiod=5):
return tl.MA(close, timeperiod, 0)
def MA(self):
""""""
closes = self.getCloses()
return talib.MA(closes)
#
#----------------------------------------------------------------------
def MA(closes, day=5):
"""closes is close price, day = avg day"""
return talib.MA(closes, day)
def TDX_BOLL(closes):
"""????TDX??????????? ?????
closes: np.ndarray
return: upper, middle, lower
??????BOLL-M
{?? N: 2 250 20 }
MID:=MA(C,N);
#MID:=SMA(C,N,1);
VART1:=POW((C-MID),2);
VART2:=MA(VART1,N);
VART3:=SQRT(VART2);
UPPER:=MID+2*VART3;
LOWER:=MID-2*VART3;
BOLL:REF(MID,1),COLORFFFFFF;
UB:REF(UPPER,1),COLOR00FFFF;
LB:REF(LOWER,1),COLORFF00FF;
"""
closes = np.array(closes)
assert(len(closes)>=20)
n = 20
mid = talib.MA(closes, n)
vart1 = np.zeros(len(closes))
for i, v in np.ndenumerate(closes):
i = i[0]
vart1[i] = pow(closes[i] - mid[i], 2)
vart2 = talib.MA(vart1, n)
vart3 = np.sqrt(vart2)
upper = mid + 2*vart3
lower = mid - 2*vart3
return upper, mid, lower
def unittest_ma():
closes = Guider('600100').getCloses()
print(MA(closes))
fours = FOUR(closes)
print( len(closes), len(fours))
print(fours)
rsi = RSI(closes)
#rsi = rsi/100 - 0.5
rsi -= 50
fours *= 100
pl.figure
#pl.plot(closes)
pl.plot(fours,'r')
pl.plot(rsi)
pl.show()