anomaly_detect_ts.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:AnomalyDetection 作者: Marcnuth 项目源码 文件源码
def _detect_anoms(data, k=0.49, alpha=0.05, num_obs_per_period=None,
                  use_decomp=True, use_esd=False, direction="pos", verbose=False):

    # validation
    assert num_obs_per_period, "must supply period length for time series decomposition"
    assert direction in ['pos', 'neg', 'both'], 'direction options: pos | neg | both'
    assert data.size >= num_obs_per_period * 2, 'Anomaly detection needs at least 2 periods worth of data'
    assert data[data.isnull()].empty, 'Data contains NA. We suggest replacing NA with interpolated values before detecting anomaly'

    # conversion
    one_tail = True if direction in ['pos', 'neg'] else False
    upper_tail = True if direction in ['pos', 'both'] else False

    n = data.size

    # -- Step 1: Decompose data. This returns a univarite remainder which will be used for anomaly detection. Optionally, we might NOT decompose.
    # Note: R use stl, but here we will use MA, the result may be different TODO.. Here need improvement
    decomposed = sm.tsa.seasonal_decompose(data, freq=num_obs_per_period, two_sided=False)
    smoothed = data - decomposed.resid.fillna(0)
    data = data - decomposed.seasonal - data.mean()

    max_outliers = int(np.trunc(data.size * k))
    assert max_outliers, 'With longterm=TRUE, AnomalyDetection splits the data into 2 week periods by default. You have {0} observations in a period, which is too few. Set a higher piecewise_median_period_weeks.'.format(data.size)

    R_idx = pd.Series()

    # Compute test statistic until r=max_outliers values have been
    # removed from the sample.

    for i in range(1, max_outliers + 1):
        if verbose:
            print(i, '/', max_outliers, ' completed')

        if not data.mad():
            break

        if not one_tail:
            ares = abs(data - data.median())
        elif upper_tail:
            ares = data - data.median()
        else:
            ares = data.median() - data

        ares = ares / data.mad()

        tmp_anom_index = ares[ares.values == ares.max()].index
        cand = pd.Series(data.loc[tmp_anom_index], index=tmp_anom_index)

        data.drop(tmp_anom_index, inplace=True)

        # Compute critical value.
        p = 1 - alpha / (n - i + 1) if one_tail else (1 - alpha / (2 * (n - i + 1)))
        t = sp.stats.t.ppf(p, n - i - 1)
        lam = t * (n - i) / np.sqrt((n - i - 1 + t ** 2) * (n - i + 1))
        if ares.max() > lam:
            R_idx = R_idx.append(cand)

    return {
        'anoms': R_idx,
        'stl': smoothed
    }
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号