def _detect_anoms(data, k=0.49, alpha=0.05, num_obs_per_period=None,
use_decomp=True, use_esd=False, direction="pos", verbose=False):
# validation
assert num_obs_per_period, "must supply period length for time series decomposition"
assert direction in ['pos', 'neg', 'both'], 'direction options: pos | neg | both'
assert data.size >= num_obs_per_period * 2, 'Anomaly detection needs at least 2 periods worth of data'
assert data[data.isnull()].empty, 'Data contains NA. We suggest replacing NA with interpolated values before detecting anomaly'
# conversion
one_tail = True if direction in ['pos', 'neg'] else False
upper_tail = True if direction in ['pos', 'both'] else False
n = data.size
# -- Step 1: Decompose data. This returns a univarite remainder which will be used for anomaly detection. Optionally, we might NOT decompose.
# Note: R use stl, but here we will use MA, the result may be different TODO.. Here need improvement
decomposed = sm.tsa.seasonal_decompose(data, freq=num_obs_per_period, two_sided=False)
smoothed = data - decomposed.resid.fillna(0)
data = data - decomposed.seasonal - data.mean()
max_outliers = int(np.trunc(data.size * k))
assert max_outliers, 'With longterm=TRUE, AnomalyDetection splits the data into 2 week periods by default. You have {0} observations in a period, which is too few. Set a higher piecewise_median_period_weeks.'.format(data.size)
R_idx = pd.Series()
# Compute test statistic until r=max_outliers values have been
# removed from the sample.
for i in range(1, max_outliers + 1):
if verbose:
print(i, '/', max_outliers, ' completed')
if not data.mad():
break
if not one_tail:
ares = abs(data - data.median())
elif upper_tail:
ares = data - data.median()
else:
ares = data.median() - data
ares = ares / data.mad()
tmp_anom_index = ares[ares.values == ares.max()].index
cand = pd.Series(data.loc[tmp_anom_index], index=tmp_anom_index)
data.drop(tmp_anom_index, inplace=True)
# Compute critical value.
p = 1 - alpha / (n - i + 1) if one_tail else (1 - alpha / (2 * (n - i + 1)))
t = sp.stats.t.ppf(p, n - i - 1)
lam = t * (n - i) / np.sqrt((n - i - 1 + t ** 2) * (n - i + 1))
if ares.max() > lam:
R_idx = R_idx.append(cand)
return {
'anoms': R_idx,
'stl': smoothed
}
评论列表
文章目录