python类chisquare()的实例源码

conversation.py 文件源码 项目:facebook-message-analysis 作者: szheng17 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def n_messages_chi_square(self, time_interval):
        """
        Computes a chi square test against the null hypothesis that the number
        of messages is uniformly distributed across the time interval. Only
        makes sense for the time intervals 'minute in hour', 'minute in day',
        'hour' since those ones have a fixed number of values.

        Args:
            time_interval: One of 'minute in hour', 'minute in day', 'hour'.

        Returns:
            chisq: A float representing the chi square statistic where the
                observations consist of the number of messages in each value of
                time_interval and the null hypothesis is that the number of
                messages is uniformly distributed.
            p: A float representing the p-value of the chi square test.
        """
        valid_time_intervals = ['minute in hour', 'minute in day', 'hour']
        if time_interval not in valid_time_intervals:
            raise ValueError('time_interval must be in {}'.format(valid_time_intervals))
        result = chisquare(self.get_n_messages_in_time_interval(time_interval))
        return (result.statistic, result.pvalue)
test_mvkde.py 文件源码 项目:cgpm 作者: probcomp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_univariate_categorical():
    # This test generates univariate data from a nominal variable with 6 levels
    # and probability vector p_theory, and performs a chi-square test on
    # posterior samples from MvKde.

    rng = gu.gen_rng(2)
    N_SAMPLES = 1000
    p_theory = [.3, .1, .2, .15, .15, .1]
    samples_test = rng.choice(range(6), p=p_theory, size=N_SAMPLES)
    kde = MultivariateKde(
        [7], None, distargs={O: {ST: [C], SA:[{'k': 6}]}}, rng=rng)
    # Incorporate observations.
    for rowid, x in enumerate(samples_test):
        kde.incorporate(rowid, {7: x})
    kde.transition()
    # Posterior samples.
    samples_gen = kde.simulate(-1, [7], N=N_SAMPLES)
    f_obs = np.bincount([s[7] for s in samples_gen])
    f_exp = np.bincount(samples_test)
    _, pval = chisquare(f_obs, f_exp)
    assert 0.05 < pval
    # Get some coverage on logpdf_score.
    assert kde.logpdf_score() < 0
stats.py 文件源码 项目:CHAID 作者: Rambatino 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def chisquare(n_ij, weighted):
    """
    Calculates the chisquare for a matrix of ind_v x dep_v
    for the unweighted and SPSS weighted case
    """
    if weighted:
        m_ij = n_ij / n_ij

        nan_mask = np.isnan(m_ij)
        m_ij[nan_mask] = 0.000001  # otherwise it breaks the chi-squared test

        w_ij = m_ij
        n_ij_col_sum = n_ij.sum(axis=1)
        n_ij_row_sum = n_ij.sum(axis=0)
        alpha, beta, eps = (1, 1, 1)
        while eps > 10e-6:
            alpha = alpha * np.vstack(n_ij_col_sum / m_ij.sum(axis=1))
            beta = n_ij_row_sum / (alpha * w_ij).sum(axis=0)
            eps = np.max(np.absolute(w_ij * alpha * beta - m_ij))
            m_ij = w_ij * alpha * beta

    else:
        m_ij = (np.vstack(n_ij.sum(axis=1)) * n_ij.sum(axis=0)) / n_ij.sum().astype(float)

    dof = (n_ij.shape[0] - 1) * (n_ij.shape[1] - 1)
    chi, p_val = stats.chisquare(n_ij, f_exp=m_ij, ddof=n_ij.size - 1 - dof, axis=None)

    return (chi, p_val, dof)
fake_data_generator.py 文件源码 项目:partycrasher 作者: naturalness 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test(self):
        nr_observations = sum(self.histogram)
        observed_frequencies = []
        expected_frequencies = []
        frequencies_of = []
        thresh = 10
        for i in range(0, len(self.histogram)):
            observed = self.histogram[i]
            expected = stats.poisson.pmf(i, self.lambda_) * nr_observations
            if (
                (observed >= thresh)
                and (expected >= thresh)):
                observed_frequencies.append(observed)
                expected_frequencies.append(expected)
                frequencies_of.append(i)
        results = stats.chisquare(observed_frequencies,
                                  expected_frequencies)
        print("expected: mean %f variance %f" % (
                      self.expected_mean(),
                      self.expected_variance()))
        print("actual: mean %f variance %f" % (
                      self.mean(),
                      self.variance()))
        print(len(expected_frequencies))
        print(results)
        from matplotlib import pyplot
        import matplotlib
        pyplot.switch_backend('Qt5Agg')
        actual_plot, = pyplot.plot(frequencies_of, observed_frequencies, label='actual')
        expected_plot, = pyplot.plot(frequencies_of, expected_frequencies, 'r', linewidth=1, label='expected')
        matplotlib.interactive(True)
        #pyplot.ylabel("People at Table")
        #pyplot.xlabel("Table Number")
        #pyplot.title("Chinese Restaurant Process Unit Test")
        pyplot.legend()
        pyplot.show(block=True)
        return results
disabled_test_simulate_univariate.py 文件源码 项目:cgpm 作者: probcomp 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def two_sample_test(cctype, X, Y):
    model = cu.cctype_class(cctype)
    if model.is_numeric(): # XXX WRONG CHOICE FOR DISCRETE NUMERIC XXX
        _, pval = ks_2samp(X, Y)
    else:
        Xb, Yb = aligned_bincount([X, Y])
        ignore = np.logical_and(Xb==0, Yb==0)
        Xb, Yb = Xb[np.logical_not(ignore)], Yb[np.logical_not(ignore)]
        Xb = Xb/float(sum(Xb)) * 1000
        Yb = Yb/float(sum(Yb)) * 1000
        _, pval = chisquare(Yb, f_exp=Xb)
    return pval
stats.py 文件源码 项目:CHAID 作者: Rambatino 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def best_cat_split(self, ind, dep):
        """ detrmine best categorical variable split """
        split = Split(None, None, None, None, 0)
        all_dep = np.unique(dep.arr)
        for i, ind_var in enumerate(ind):
            ind_var = ind_var.deep_copy()
            unique = np.unique(ind_var.arr)

            freq = {}
            if dep.weights is None:
                for col in unique:
                    counts = np.unique(np.compress(ind_var.arr == col, dep.arr), return_counts=True)
                    freq[col] = cl.defaultdict(int)
                    freq[col].update(np.transpose(counts))
            else:
                for col in unique:
                    counts = np.unique(np.compress(ind_var.arr == col, dep.arr), return_counts=True)
                    freq[col] = cl.defaultdict(int)
                    for dep_v in all_dep:
                        freq[col][dep_v] = dep.weights[(ind_var.arr == col) * (dep.arr == dep_v)].sum()

            if len(list(ind_var.possible_groupings())) == 0:
                split.invalid_reason = InvalidSplitReason.PURE_NODE

            choice, highest_p_join, split_chi, dof = None, None, None, None
            for comb in ind_var.all_combinations():
                freqs = [ sum( [ cl.Counter(freq[key]) for key in c ], cl.Counter()) for c in comb ]
                keys = set(sum([ list(f.keys()) for f in freqs ], []))

                n_ij = np.array(
                    [ [ col.get(k, 0) for k in keys ] for col in freqs ]
                )

                chi, p_split, dof = chisquare(n_ij, dep.weights is not None)

                if (choice is None or p_split < highest_p_join or (p_split == highest_p_join and chi > split_chi)) and (n_ij.sum(axis=1) >= self.min_child_node_size).all() and p_split < self.alpha_merge:
                    choice, highest_p_join, split_chi = comb, p_split, chi

            temp_split = Split(i, choice, split_chi, highest_p_join, dof, split_name=ind_var.name)
            better_split = (not split.valid() or p_split < split.p or (p_split == split.p and chi > split.score)) and choice is not None
            if better_split: split, temp_split = temp_split, split

            if split.valid() and choice is not None:
                chi_threshold = self.split_threshold * split.score

                if temp_split.valid() and temp_split.score >= chi_threshold:
                    for sur in temp_split.surrogates:
                        if sur.column_id != i and sur.score >= chi_threshold:
                            split.surrogates.append(sur)

                    temp_split.surrogates = []
                    split.surrogates.append(temp_split)

                split.sub_split_values(ind[split.column_id].metadata)

        return split


问题


面经


文章

微信
公众号

扫码关注公众号