python类nan()的实例源码

optflow_utils.py 文件源码 项目:pybot 作者: spillai 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def sparse_optical_flow(im1, im2, pts, fb_threshold=-1, 
                        window_size=15, max_level=2, 
                        criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)): 

    # Forward flow
    p1, st, err = cv2.calcOpticalFlowPyrLK(im1, im2, pts, None, 
                                           winSize=(window_size, window_size), 
                                           maxLevel=max_level, criteria=criteria )

    # Backward flow
    if fb_threshold > 0:     
        p0r, st0, err = cv2.calcOpticalFlowPyrLK(im2, im1, p1, None, 
                                           winSize=(window_size, window_size), 
                                           maxLevel=max_level, criteria=criteria)
        p0r[st0 == 0] = np.nan

        # Set only good
        fb_good = (np.fabs(p0r-p0) < fb_threshold).all(axis=1)

        p1[~fb_good] = np.nan
        st = np.bitwise_and(st, st0)
        err[~fb_good] = np.nan

    return p1, st, err
tasks.py 文件源码 项目:sensu_drive 作者: ilavender 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def y_sum_by_time(x_arr, y_arr, top=None):
    df = pd.DataFrame({'Timestamp': pd.to_datetime(x_arr, unit='s'), 'Status': y_arr})
    df['Date'] = df['Timestamp'].apply(lambda x: "%d/%d/%d" % (x.day, x.month, x.year))
    df['Hour'] = df['Timestamp'].apply(lambda x: "%d" % (x.hour))
    df['Weekday'] = df['Timestamp'].apply(lambda x: "%s" % (x.weekday_name))

    times = ['Hour', 'Weekday', 'Date']

    result = {}

    for groupby in times:

        df_group = df.groupby(groupby, as_index=False).agg({'Status': np.sum})

        if top != None and top > 0:
            #df_group = df_group.nlargest(top, 'Status').sort(['Status', 'Hour'],ascending=False)
            idx = df_group.nlargest(top, 'Status') > 0
        else:
            idx = df_group['Status'].max() == df_group['Status']

        result[groupby] = {k: g['Status'].replace(np.nan, 'None').tolist() for k,g in df_group[idx].groupby(groupby)}

    return result
format_test.py 文件源码 项目:treecat 作者: posterior 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_pd_outer_join():
    dfs = [
        pd.DataFrame({
            'id': [0, 1, 2, 3],
            'a': ['foo', 'bar', 'baz', np.nan],
            'b': ['panda', 'zebra', np.nan, np.nan],
        }),
        pd.DataFrame({
            'id': [1, 2, 3, 4],
            'b': ['mouse', np.nan, 'tiger', 'egret'],
            'c': ['toe', 'finger', 'nose', np.nan],
        }),
    ]
    expected = pd.DataFrame({
        'id': [0, 1, 2, 3, 4],
        'a': ['foo', 'bar', 'baz', np.nan, np.nan],
        'b': ['panda', 'zebra', np.nan, 'tiger', 'egret'],
        'c': [np.nan, 'toe', 'finger', 'nose', np.nan],
    }).set_index('id')
    actual = pd_outer_join(dfs, on='id')
    print(expected)
    print(actual)
    assert expected.equals(actual)
test_stats.py 文件源码 项目:npstreams 作者: LaurentRDC 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_against_numpy_nanstd(self):
        source = [np.random.random((16, 12, 5)) for _ in range(10)]
        for arr in source:
            arr[randint(0, 15), randint(0, 11), randint(0, 4)] = np.nan
        stack = np.stack(source, axis = -1)

        for axis in (0, 1, 2, None):
            for ddof in range(4):
                with self.subTest('axis = {}, ddof = {}'.format(axis, ddof)):
                    from_numpy = np.nanstd(stack, axis = axis, ddof = ddof)
                    from_ivar = last(istd(source, axis = axis, ddof = ddof, ignore_nan = True))
                    self.assertSequenceEqual(from_numpy.shape, from_ivar.shape)
                    self.assertTrue(np.allclose(from_ivar, from_numpy))
history_container.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def frame_from_bardata(self, data, algo_dt):
        """
        Create a DataFrame from the given BarData and algo dt.
        """
        data = data._data
        frame_data = np.empty((len(self.fields), len(self.sids))) * np.nan

        for j, sid in enumerate(self.sids):
            sid_data = data.get(sid)
            if not sid_data:
                continue
            if algo_dt != sid_data['dt']:
                continue
            for i, field in enumerate(self.fields):
                frame_data[i, j] = sid_data.get(field, np.nan)

        return pd.DataFrame(
            frame_data,
            index=self.fields.copy(),
            columns=self.sids.copy(),
        )
cumulative.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def information_ratio(algo_volatility, algorithm_return, benchmark_return):
    """
    http://en.wikipedia.org/wiki/Information_ratio

    Args:
        algorithm_returns (np.array-like):
            All returns during algorithm lifetime.
        benchmark_returns (np.array-like):
            All benchmark returns during algo lifetime.

    Returns:
        float. Information ratio.
    """
    if zp_math.tolerant_equals(algo_volatility, 0):
        return np.nan

    # The square of the annualization factor is in the volatility,
    # because the volatility is also annualized,
    # i.e. the sqrt(annual factor) is in the volatility's numerator.
    # So to have the the correct annualization factor for the
    # Sharpe value's numerator, which should be the sqrt(annual factor).
    # The square of the sqrt of the annual factor, i.e. the annual factor
    # itself, is needed in the numerator to factor out the division by
    # its square root.
    return (algorithm_return - benchmark_return) / algo_volatility
risk.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def sharpe_ratio(algorithm_volatility, algorithm_return, treasury_return):
    """
    http://en.wikipedia.org/wiki/Sharpe_ratio

    Args:
        algorithm_volatility (float): Algorithm volatility.
        algorithm_return (float): Algorithm return percentage.
        treasury_return (float): Treasury return percentage.

    Returns:
        float. The Sharpe ratio.
    """
    if zp_math.tolerant_equals(algorithm_volatility, 0):
        return np.nan

    return (algorithm_return - treasury_return) / algorithm_volatility
test_sources.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_nan_filter_panel(self):
        dates = pd.date_range('1/1/2000', periods=2, freq='B', tz='UTC')
        df = pd.Panel(np.random.randn(2, 2, 2),
                      major_axis=dates,
                      items=[4, 5],
                      minor_axis=['price', 'volume'])
        # should be filtered
        df.loc[4, dates[0], 'price'] = np.nan
        # should not be filtered, should have been ffilled
        df.loc[5, dates[1], 'price'] = np.nan
        source = DataPanelSource(df)
        event = next(source)
        self.assertEqual(5, event.sid)
        event = next(source)
        self.assertEqual(4, event.sid)
        self.assertRaises(StopIteration, next, source)
test_algorithm.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def _algo_record_float_magic_should_pass(self, var_type):
        test_algo = TradingAlgorithm(
            script=record_float_magic % var_type,
            sim_params=self.sim_params,
            env=self.env,
        )
        set_algo_instance(test_algo)

        self.zipline_test_config['algorithm'] = test_algo
        self.zipline_test_config['trade_count'] = 200

        zipline = simfactory.create_test_zipline(
            **self.zipline_test_config)
        output, _ = drain_zipline(self, zipline)
        self.assertEqual(len(output), 252)
        incr = []
        for o in output[:200]:
            incr.append(o['daily_perf']['recorded_vars']['data'])
        np.testing.assert_array_equal(incr, [np.nan] * 200)
test_transforms.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def initialize_with(test_case, tfm_name, days):
    def initalize(context):
        context.test_case = test_case
        context.days = days
        context.mins_for_days = []
        context.price_bars = (None, [np.nan], [np.nan], [np.nan])
        context.vol_bars = (None, [np.nan], [np.nan], [np.nan])
        if context.days:
            context.warmup = days + 1
        else:
            context.warmup = 2

        context.current_date = None

        context.last_close_prices = [np.nan, np.nan, np.nan, np.nan]
        add_transform(tfm_name, days)

    return initalize
test_munge.py 文件源码 项目:zipline-chinese 作者: zhanghan1990 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_ffill(self):
        # test ndim=1
        N = 100
        s = pd.Series(np.random.randn(N))
        mask = random.sample(range(N), 10)
        s.iloc[mask] = np.nan

        correct = s.ffill().values
        test = ffill(s.values)
        assert_almost_equal(correct, test)

        # test ndim=2
        df = pd.DataFrame(np.random.randn(N, N))
        df.iloc[mask] = np.nan
        correct = df.ffill().values
        test = ffill(df.values)
        assert_almost_equal(correct, test)
tracker_utils.py 文件源码 项目:pybot 作者: spillai 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def track(self, im0, im1, p0): 
        """
        Main tracking method using sparse optical flow (LK)
        """
        if p0 is None or not len(p0): 
            return np.array([])

        # Forward flow
        p1, st1, err1 = cv2.calcOpticalFlowPyrLK(im0, im1, p0, None, **self.lk_params_)
        p1[st1 == 0] = np.nan

        if self.fb_check_: 
            # Backward flow
            p0r, st0, err0 = cv2.calcOpticalFlowPyrLK(im1, im0, p1, None, **self.lk_params_)
            p0r[st0 == 0] = np.nan

            # Set only good
            fb_good = (np.fabs(p0r-p0) < 3).all(axis=1)
            p1[~fb_good] = np.nan

        return p1
recovery.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def matthews_correl_coeff(ntp, ntn, nfp, nfn):
    '''
    This calculates the Matthews correlation coefficent.

    https://en.wikipedia.org/wiki/Matthews_correlation_coefficient

    '''

    mcc_top = (ntp*ntn - nfp*nfn)
    mcc_bot = msqrt((ntp + nfp)*(ntp + nfn)*(ntn + nfp)*(ntn + nfn))

    if mcc_bot > 0:
        return mcc_top/mcc_bot
    else:
        return np.nan



#######################################
## VARIABILITY RECOVERY (PER MAGBIN) ##
#######################################
checkplotlist.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def key_worker(task):
    '''
    This gets the required keys from the requested file.

    '''
    cpf, keys = task


    cpd = checkplot._read_checkplot_picklefile(cpf)

    resultkeys = []

    for k in keys:

        try:
            resultkeys.append(dict_get(cpd, k))
        except:
            resultkeys.append(np.nan)

    return resultkeys


############
## CONFIG ##
############
hatlc.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def smartcast(castee, caster, subval=None):
    '''
    This just tries to apply the caster function to castee.

    Returns None on failure.

    '''

    try:
        return caster(castee)
    except Exception as e:
        if caster is float or caster is int:
            return nan
        elif caster is str:
            return ''
        else:
            return subval



# these are the keys used in the metadata section of the CSV LC
test_PlotCurveItem.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_PlotCurveItem():
    p = pg.GraphicsWindow()
    p.ci.layout.setContentsMargins(4, 4, 4, 4)  # default margins vary by platform
    v = p.addViewBox()
    p.resize(200, 150)
    data = np.array([1,4,2,3,np.inf,5,7,6,-np.inf,8,10,9,np.nan,-1,-2,0])
    c = pg.PlotCurveItem(data)
    v.addItem(c)
    v.autoRange()

    # Check auto-range works. Some platform differences may be expected..
    checkRange = np.array([[-1.1457564053237301, 16.145756405323731], [-3.076811473165955, 11.076811473165955]])
    assert np.allclose(v.viewRange(), checkRange)

    assertImageApproved(p, 'plotcurveitem/connectall', "Plot curve with all points connected.")

    c.setData(data, connect='pairs')
    assertImageApproved(p, 'plotcurveitem/connectpairs', "Plot curve with pairs connected.")

    c.setData(data, connect='finite')
    assertImageApproved(p, 'plotcurveitem/connectfinite', "Plot curve with finite points connected.")

    c.setData(data, connect=np.array([1,1,1,0,1,1,0,0,1,0,0,0,1,1,0,0]))
    assertImageApproved(p, 'plotcurveitem/connectarray', "Plot curve with connection array.")
utils.py 文件源码 项目:kaggle-review 作者: daxiongshu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def rank_cat(df_tr,ycol,df_te=None,cols=None,rank=True,tag=''):
    if cols is None:
        cols = [i for i in df_tr.columns.values if df_tr[i].dtype=='object']
    if len(cols)==0:
        print("no cat cols found")
        return
    for col in cols:
        dic = df_tr.groupby(col)[ycol].mean().to_dict()
        if rank:
            ks = [i for i in dic]
            vs = np.array([dic[i] for i in ks]).argsort().argsort()
            dic = {i:j for i,j in zip(ks,vs)}
        df_tr[tag+col] = df_tr[col].apply(lambda x: dic[x])
        if df_te is not None:
            df_te[tag+col] = df_te[col].apply(lambda x: dic.get(x,np.nan))

#overfitting! try LOO!
analysis.py 文件源码 项目:risk-slim 作者: ustunb 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def get_calibration_metrics(model, data):
    scores = (data['X'] * data['Y']).dot(model)

    #distinct scores

    #compute calibration error at each score

    full_metrics = {
        'scores': float('nan'),
        'count': float('nan'),
        'predicted_risk': float('nan'),
        'empirical_risk': float('nan')
    }

    cal_error = np.sqrt(np.sum(a*(a-b)^2)) ( - full_metrics['empirical_risk'])

    summary_metrics = {
        'mean_calibration_error': float('nan')
    }

    #counts
    #metrics
    #mean calibration error across all scores

    pass
lattice_cpa.py 文件源码 项目:risk-slim 作者: ustunb 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def round_solution_pool(pool, constraints):

    pool.distinct().sort()
    P = pool.P
    L0_reg_ind = np.isnan(constraints['coef_set'].C_0j)
    L0_max = constraints['L0_max']
    rounded_pool = SolutionPool(P)

    for solution in pool.solutions:
        # sort from largest to smallest coefficients
        feature_order = np.argsort([-abs(x) for x in solution])
        rounded_solution = np.zeros(shape=(1, P))
        l0_norm_count = 0
        for k in range(0, P):
            j = feature_order[k]
            if not L0_reg_ind[j]:
                rounded_solution[0, j] = np.round(solution[j], 0)
            elif l0_norm_count < L0_max:
                rounded_solution[0, j] = np.round(solution[j], 0)
                l0_norm_count += L0_reg_ind[j]

        rounded_pool.add(objvals=np.nan, solutions=rounded_solution)

    rounded_pool.distinct().sort()
    return rounded_pool
utils.py 文件源码 项目:ssbio 作者: SBRG 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def clean_df(df, fill_nan=True, drop_empty_columns=True):
    """Clean a pandas dataframe by:
        1. Filling empty values with Nan
        2. Dropping columns with all empty values

    Args:
        df: Pandas DataFrame
        fill_nan (bool): If any empty values (strings, None, etc) should be replaced with NaN
        drop_empty_columns (bool): If columns whose values are all empty should be dropped

    Returns:
        DataFrame: cleaned DataFrame

    """
    if fill_nan:
        df = df.fillna(value=np.nan)
    if drop_empty_columns:
        df = df.dropna(axis=1, how='all')
    return df.sort_index()
quality.py 文件源码 项目:ssbio 作者: SBRG 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def parse_psqs(psqs_results_file):
    """Parse a PSQS result file and returns a Pandas DataFrame of the results

    Args:
        psqs_results_file: Path to psqs results file

    Returns:
        Pandas DataFrame: Summary of PSQS results

    """

    # TODO: generalize column names for all results, save as dict instead

    psqs_results = pd.read_csv(psqs_results_file, sep='\t', header=None)
    psqs_results['pdb_file'] = psqs_results[0].apply(lambda x: str(x).strip('./').strip('.pdb'))
    psqs_results = psqs_results.rename(columns = {1:'psqs_local', 2:'psqs_burial', 3:'psqs_contact', 4:'psqs_total'}).drop(0, axis=1)
    psqs_results['u_pdb'] = psqs_results['pdb_file'].apply(lambda x: x.upper() if len(x)==4 else np.nan)
    psqs_results['i_entry_name'] = psqs_results['pdb_file'].apply(lambda x: x.split('_model1')[0] if len(x)>4 else np.nan)
    psqs_results = psqs_results[pd.notnull(psqs_results.psqs_total)]

    return psqs_results
HBLRWrapper.py 文件源码 项目:PersonalizedMultitaskLearning 作者: mitmedialab 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def getAccuracyAucOnAllTasks(self, task_list):
        all_task_Y = []
        all_preds = []
        for i in range(len(task_list)):
            preds, task_Y = self.getPredsTrueOnOneTask(task_list,i)
            if preds is None:
                # Skipping task because it does not have valid data
                continue
            if len(task_Y)>0:
                all_task_Y.extend(task_Y)
                all_preds.extend(preds)
        if not helper.containsEachLabelType(all_preds):
            print "for some bizarre reason, the preds for all tasks are the same class"
            print "preds", all_preds
            print "true_y", all_task_Y
            auc = np.nan
        else:
            auc=roc_auc_score(all_task_Y, all_preds)
        acc=hblr.getBinaryAccuracy(all_preds,all_task_Y)
        return acc,auc
MTMKL.py 文件源码 项目:PersonalizedMultitaskLearning 作者: mitmedialab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getAccuracyAucOnOneTask(self, task_list, task, debug=False):
        X_t, y_t = self.extractTaskData(task_list,task)
        if len(X_t) == 0:
            return np.nan, np.nan

        preds = self.internal_predict(X_t, int(task))

        if debug:
            print "y_t:", y_t
            print "preds:", preds

        acc = helper.getBinaryAccuracy(preds,y_t)
        if len(y_t) > 1 and helper.containsEachSVMLabelType(y_t) and helper.containsEachSVMLabelType(preds):
            auc = roc_auc_score(y_t, preds)
        else:
            auc = np.nan

        return acc, auc
MTMKLWrapper.py 文件源码 项目:PersonalizedMultitaskLearning 作者: mitmedialab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sweepAllParameters(self):
        print "\nSweeping all parameters!"

        self.calcNumSettingsDesired()
        print "\nYou have chosen to test a total of", self.num_settings, "settings"
        sys.stdout.flush()

        #sweep all possible combinations of parameters
        for C in self.c_vals:
            for v in self.v_vals:
                for regularizer in self.regularizers:
                    for kernel in self.kernels:
                        if kernel == 'linear':
                            self.testOneSetting(C, np.nan, kernel, v, regularizer)
                        else:
                            for beta in self.beta_vals:
                                self.testOneSetting(C, beta, kernel, v, regularizer)
        self.val_results_df.to_csv(self.results_path + self.save_prefix + '.csv')
test_dc_stat_think.py 文件源码 项目:dc_stat_think 作者: justinbois 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_ecdf_formal_custom():
    assert dcst.ecdf_formal(0.1, [0, 1, 2, 3]) == 0.25
    assert dcst.ecdf_formal(-0.1, [0, 1, 2, 3]) == 0.0
    assert dcst.ecdf_formal(0.1, [3, 2, 0, 1]) == 0.25
    assert dcst.ecdf_formal(-0.1, [3, 2, 0, 1]) == 0.0
    assert dcst.ecdf_formal(2, [3, 2, 0, 1]) == 0.75
    assert dcst.ecdf_formal(1, [3, 2, 0, 1]) == 0.5
    assert dcst.ecdf_formal(3, [3, 2, 0, 1]) == 1.0
    assert dcst.ecdf_formal(0, [3, 2, 0, 1]) == 0.25

    with pytest.raises(RuntimeError) as excinfo:
        dcst.ecdf_formal([np.nan, np.inf], [0, 1, 2, 3])
    excinfo.match('Input cannot have NaNs.')

    correct = np.array([1.0, 1.0])
    result = dcst.ecdf_formal([3.1, np.inf], [3, 2, 0, 1])
    assert np.allclose(correct, result, atol=atol)
test_dc_stat_think.py 文件源码 项目:dc_stat_think 作者: justinbois 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def test_draw_bs_pairs_linreg_nan():
    x = np.array([])
    y = np.array([])
    with pytest.raises(RuntimeError) as excinfo:
        dcst.draw_bs_pairs_linreg(x, y, size=1)
    excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')

    x = np.array([np.nan])
    y = np.array([np.nan])
    with pytest.raises(RuntimeError) as excinfo:
        dcst.draw_bs_pairs_linreg(x, y, size=1)
    excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')

    x = np.array([np.nan, 1])
    y = np.array([1, np.nan])
    with pytest.raises(RuntimeError) as excinfo:
        dcst.draw_bs_pairs_linreg(x, y, size=1)
    excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')

    x = np.array([0, 1, 5])
    y = np.array([1, np.inf, 3])
    with pytest.raises(RuntimeError) as excinfo:
        dcst.draw_bs_pairs_linreg(x, y, size=1)
    excinfo.match('All entries in arrays must be finite.')
test_dc_stat_think.py 文件源码 项目:dc_stat_think 作者: justinbois 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_pearson_r_edge():
    x = np.array([])
    y = np.array([])
    with pytest.raises(RuntimeError) as excinfo:
        dcst.pearson_r(x, y)
    excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')

    x = np.array([np.nan])
    y = np.array([np.nan])
    with pytest.raises(RuntimeError) as excinfo:
        dcst.pearson_r(x, y)
    excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')

    x = np.array([np.nan, 1])
    y = np.array([1, np.nan])
    with pytest.raises(RuntimeError) as excinfo:
        dcst.pearson_r(x, y)
    excinfo.match('Arrays must have at least 2 mutual non-NaN entries.')

    x = np.array([0, 1, 5])
    y = np.array([1, np.inf, 3])
    with pytest.raises(RuntimeError) as excinfo:
        dcst.pearson_r(x, y)
    excinfo.match('All entries in arrays must be finite.')
dc_stat_think.py 文件源码 项目:dc_stat_think 作者: justinbois 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def studentized_diff_of_means(data_1, data_2):
    """
    Studentized difference in means of two arrays.

    Parameters
    ----------
    data_1 : array_like
        One-dimensional array of data.
    data_2 : array_like
        One-dimensional array of data.

    Returns
    -------
    output : float
        Studentized difference of means.

    Notes
    -----
    .. If the variance of both `data_1` and `data_2` is zero, returns
       np.nan.
    """
    data_1 = _convert_data(data_1)
    data_2 = _convert_data(data_2)

    return _studentized_diff_of_means(data_1, data_2)
filters.py 文件源码 项目:pypiv 作者: jr7 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def outlier_from_local_median(piv, treshold=2.0):
    """Outlier detection algorithm for mask creation.

    The calculated residual is compared to a threshold which produces a mask.
    The mask consists of nan values at the outlier positions.
    This mask can be interpolated to remove the outliers.

    :param object piv: Piv Class Object
    :param double threshold: threshold for identifying outliers

    """
    u_res = get_normalized_residual(piv.u)
    v_res = get_normalized_residual(piv.v)
    res_total = np.sqrt(u_res**2 + v_res**2)
    mask =  res_total > treshold
    piv.u[mask] = np.nan
    piv.v[mask] = np.nan
test_basc.py 文件源码 项目:PyBASC 作者: AkiNikolaidis 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_timeseries_bootstrap():
    """
    Tests the timeseries_bootstrap method of BASC workflow
    """
    np.random.seed(27)
    #np.set_printoptions(threshold=np.nan)

    # Create a 10x5 matrix which counts up by column-wise
    x = np.arange(50).reshape((5,10)).T
    actual= timeseries_bootstrap(x,3)
    desired = np.array([[ 4, 14, 24, 34, 44],
                       [ 5, 15, 25, 35, 45],
                       [ 6, 16, 26, 36, 46],
                       [ 8, 18, 28, 38, 48],
                       [ 9, 19, 29, 39, 49],
                       [ 0, 10, 20, 30, 40],
                       [ 7, 17, 27, 37, 47],
                       [ 8, 18, 28, 38, 48],
                       [ 9, 19, 29, 39, 49],
                       [ 8, 18, 28, 38, 48]])
    np.testing.assert_equal(actual, desired)


问题


面经


文章

微信
公众号

扫码关注公众号