def KDJ_CN(high, low, close, fastk_period=9, slowk_period=3, fastd_period=3) :
len1 = len(high)
len2 = len(low)
len3 = len(close)
if len1 != len2 or len1 != len3:
print ("KDJ_CN input invalid for len:%s %s %s " %(str(len1),str(len2),str(len3)))
return np.array(np.nan),np.array(np.nan),np.array(np.nan)
kValue, dValue = tl.STOCHF(high, low, close, fastk_period, fastd_period=fastd_period, fastd_matype=0)
kValue = np.array(map(lambda x : SecurityDataSrcBase.SMA_CN(kValue[:x], slowk_period), range(1, len(kValue) + 1)))
dValue = np.array(map(lambda x : SecurityDataSrcBase.SMA_CN(kValue[:x], fastd_period), range(1, len(kValue) + 1)))
jValue = 3 * kValue - 2 * dValue
func = lambda arr : np.array([0 if x < 0 else (100 if x > 100 else x) for x in arr])
kValue = func(kValue)
dValue = func(dValue)
jValue = func(jValue)
return kValue, dValue, jValue
python类STOCHF的实例源码
def STOCH(df, n = 14, slowk_period = 3, slowd_period = 3):
fastk, fastd = talib.STOCHF(df['high'].values, df['low'].values, df['close'].values, fastk_period = n, fastd_period=slowk_period)
slowk, slowd = talib.STOCH(df['high'].values, df['low'].values, df['close'].values, fastk_period = n, slowk_period=slowk_period, slowd_period=slowd_period)
fk = pd.Series(fastk, index = df.index, name = "STOCHFK_%s_%s_%s" % (str(n), str(slowk_period), str(slowd_period)))
sk = pd.Series(slowk, index = df.index, name = "STOCHSK_%s_%s_%s" % (str(n), str(slowk_period), str(slowd_period)))
sd = pd.Series(slowd, index = df.index, name = "STOCHSD_%s_%s_%s" % (str(n), str(slowk_period), str(slowd_period)))
return pd.concat([fk, sk, sd], join='outer', axis=1)
def STOCHF(df, n = 14, fastd_period = 3):
fastk, fastd = talib.STOCHF(df['high'].values, df['low'].values, df['close'].values, fastk_period = n, fastd_period=fastd_period)
fk = pd.Series(fastk, index = df.index, name = "STOCFK_%s_%s" % (str(n), str(fastd_period)))
sk = pd.Series(fastd, index = df.index, name = "STOCSK_%s_%s" % (str(n), str(fastd_period)))
return pd.concat([fk, sk], join='outer', axis=1)
def add_STOCHF(self, fastk_period=5, fastd_period=3, fastd_matype=0,
types=['line', 'line'],
colors=['primary', 'tertiary'],
**kwargs):
"""Fast Stochastic Oscillator.
Note that the first argument of types and colors refers to Fast Stoch %K,
while second argument refers to Fast Stoch %D
(signal line of %K obtained by MA).
"""
if not (self.has_high and self.has_low and self.has_close):
raise Exception()
utils.kwargs_check(kwargs, VALID_TA_KWARGS)
if 'kind' in kwargs:
kwargs['type'] = kwargs['kind']
if 'kinds' in kwargs:
types = kwargs['type']
if 'type' in kwargs:
types = [kwargs['type']] * 2
if 'color' in kwargs:
colors = [kwargs['color']] * 2
name = 'STOCHF({},{})'.format(str(fastk_period),
str(fastd_period))
fastk = name + r'[%k]'
fastd = name + r'[%d]'
self.sec[fastk] = dict(type=types[0], color=colors[0])
self.sec[fastd] = dict(type=types[1], color=colors[1], on=fastk)
self.ind[fastk], self.ind[fastd] = talib.STOCHF(self.df[self.hi].values,
self.df[self.lo].values,
self.df[self.cl].values,
fastk_period, fastd_period,
fastd_matype)
def KDJ(security_list, fastk_period=5, slowk_period=3, fastd_period=3):
def SMA_CN(close, timeperiod):
close = np.nan_to_num(close)
return reduce(lambda x, y: ((timeperiod - 1) * x + y) / timeperiod, close)
# ????????????
if isinstance(security_list, str):
security_list = [security_list]
# ?? KDJ
n = max(fastk_period, slowk_period, fastd_period)
k = {}
d = {}
j = {}
for stock in security_list:
security_data = attribute_history(
stock, n * 2, '1d', fields=['high', 'low', 'close'], df=False)
high = security_data['high']
low = security_data['low']
close = security_data['close']
kValue, dValue = talib.STOCHF(
high, low, close, fastk_period, fastd_period, fastd_matype=0)
kValue = np.array(map(lambda x: SMA_CN(
kValue[:x], slowk_period), range(1, len(kValue) + 1)))
dValue = np.array(map(lambda x: SMA_CN(
kValue[:x], fastd_period), range(1, len(kValue) + 1)))
jValue = 3 * kValue - 2 * dValue
def func(arr): return np.array(
[0 if x < 0 else (100 if x > 100 else x) for x in arr])
k[stock] = func(kValue)
d[stock] = func(dValue)
j[stock] = func(jValue)
return k, d, j
# RSI
def test_fso_expected_with_talib(self, seed):
"""
Test the output that is returned from the fast stochastic oscillator
is the same as that from the ta-lib STOCHF function.
"""
window_length = 14
nassets = 6
rng = np.random.RandomState(seed=seed)
input_size = (window_length, nassets)
# values from 9 to 12
closes = 9.0 + (rng.random_sample(input_size) * 3.0)
# Values from 13 to 15
highs = 13.0 + (rng.random_sample(input_size) * 2.0)
# Values from 6 to 8.
lows = 6.0 + (rng.random_sample(input_size) * 2.0)
expected_out_k = []
for i in range(nassets):
fastk, fastd = talib.STOCHF(
high=highs[:, i],
low=lows[:, i],
close=closes[:, i],
fastk_period=window_length,
fastd_period=1,
)
expected_out_k.append(fastk[-1])
expected_out_k = np.array(expected_out_k)
today = pd.Timestamp('2015')
out = np.empty(shape=(nassets,), dtype=np.float)
assets = np.arange(nassets, dtype=np.float)
fso = FastStochasticOscillator()
fso.compute(
today, assets, out, closes, lows, highs
)
assert_equal(out, expected_out_k, array_decimal=6)