def rand_pop(m: int):
"""
Parameters
----------
m: int
the model number
Returns
-------
"""
pop = [ValidParticle(m,
Parameter({"a": np.random.randint(10),
"b": np.random.randn()}),
sp.rand()*42,
[sp.rand()],
[{"ss_float": 0.1,
"ss_int": 42,
"ss_str": "foo bar string",
"ss_np": sp.rand(13, 42),
"ss_df": example_df()}])
for _ in range(np.random.randint(10)+3)]
return pop
python类rand()的实例源码
def test_observed_sum_stats(history_uninitialized: History, gt_model):
h = history_uninitialized
obs_sum_stats = {"s1": 1,
"s2": 1.1,
"s3": np.array(.1),
"s4": np.random.rand(10)}
h.store_initial_data(gt_model, {}, obs_sum_stats, {}, [""], "", "", "")
h2 = History(h.db_identifier)
loaded_sum_stats = h2.observed_sum_stat()
for k in ["s1", "s2", "s3"]:
assert loaded_sum_stats[k] == obs_sum_stats[k]
assert (loaded_sum_stats["s4"] == obs_sum_stats["s4"]).all()
assert loaded_sum_stats["s1"] == obs_sum_stats["s1"]
assert loaded_sum_stats["s2"] == obs_sum_stats["s2"]
assert loaded_sum_stats["s3"] == obs_sum_stats["s3"]
assert loaded_sum_stats["s4"] is not obs_sum_stats["s4"]
def test_transitions_not_modified(population_strategy: PopulationStrategy):
n = 10
kernels = []
test_points = pd.DataFrame([{"s": sp.rand()} for _ in range(n)])
for _ in range(2):
df = pd.DataFrame([{"s": sp.rand()} for _ in range(n)])
w = sp.ones(n) / n
kernel = MultivariateNormalTransition()
kernel.fit(df, w)
kernels.append(kernel)
test_weights = [k.pdf(test_points) for k in kernels]
population_strategy.adapt_population_size(kernels, sp.array([.7, .2]))
after_adaptation_weights = [k.pdf(test_points) for k in kernels]
same = all([(k1 == k2).all()
for k1, k2 in zip(test_weights, after_adaptation_weights)])
err_msg = ("Population strategy {}"
" modified the transitions".format(population_strategy))
assert same, err_msg
def data(self, size=20):
"""Create some fake data in a dataframe"""
numpy.random.seed(0)
random.seed(0)
x = scipy.rand(size)
M = scipy.zeros([size,size])
for i in range(size):
for j in range(size): M[i,j] = abs(x[i] - x[j])
df = pandas.DataFrame(M, index=[names.get_last_name() for _ in range(size)],
columns=[names.get_first_name() for _ in range(size)])
df['Mary']['Day'] = 1.5
df['Issac']['Day'] = 1.0
return df
def test_sum_stats_save_load(history: History):
arr = sp.random.rand(10)
arr2 = sp.random.rand(10, 2)
particle_population = [
ValidParticle(0, Parameter({"a": 23, "b": 12}),
.2,
[.1],
[{"ss1": .1, "ss2": arr2,
"ss3": example_df(),
"rdf0": r["iris"]}]),
ValidParticle(0,
Parameter({"a": 23, "b": 12}),
.2,
[.1],
[{"ss12": .11, "ss22": arr,
"ss33": example_df(),
"rdf": r["mtcars"]}])
]
history.append_population(0, 42, particle_population, 2, ["m1", "m2"])
weights, sum_stats = history.get_sum_stats(0, 0)
assert (weights == 0.5).all()
assert sum_stats[0]["ss1"] == .1
assert (sum_stats[0]["ss2"] == arr2).all()
assert (sum_stats[0]["ss3"] == example_df()).all().all()
assert (sum_stats[0]["rdf0"] == pandas2ri.ri2py(r["iris"])).all().all()
assert sum_stats[1]["ss12"] == .11
assert (sum_stats[1]["ss22"] == arr).all()
assert (sum_stats[1]["ss33"] == example_df()).all().all()
assert (sum_stats[1]["rdf"] == pandas2ri.ri2py(r["mtcars"])).all().all()
def test_adapt_single_model(population_strategy: PopulationStrategy):
n = 10
df = pd.DataFrame([{"s": sp.rand()} for _ in range(n)])
w = sp.ones(n) / n
kernel = MultivariateNormalTransition()
kernel.fit(df, w)
population_strategy.adapt_population_size([kernel], sp.array([1.]))
assert population_strategy.nr_particles > 0
def test_adapt_two_models(population_strategy: PopulationStrategy):
n = 10
kernels = []
for _ in range(2):
df = pd.DataFrame([{"s": sp.rand()} for _ in range(n)])
w = sp.ones(n) / n
kernel = MultivariateNormalTransition()
kernel.fit(df, w)
kernels.append(kernel)
population_strategy.adapt_population_size(kernels, sp.array([.7, .2]))
assert population_strategy.nr_particles > 0
def test_continuous_non_gaussian(db_path, sampler):
def model(args):
return {"result": sp.rand() * args['u']}
models = [model]
models = list(map(SimpleModel, models))
population_size = ConstantPopulationSize(250)
parameter_given_model_prior_distribution = [Distribution(u=RV("uniform", 0,
1))]
abc = ABCSMC(models, parameter_given_model_prior_distribution,
MinMaxDistanceFunction(measures_to_use=["result"]),
population_size,
eps=MedianEpsilon(.2),
sampler=sampler)
d_observed = .5
abc.new(db_path, {"result": d_observed})
abc.do_not_stop_when_only_single_model_alive()
minimum_epsilon = -1
history = abc.run(minimum_epsilon, max_nr_populations=2)
posterior_x, posterior_weight = history.get_distribution(0, None)
posterior_x = posterior_x["u"].as_matrix()
sort_indices = sp.argsort(posterior_x)
f_empirical = sp.interpolate.interp1d(sp.hstack((-200,
posterior_x[sort_indices],
200)),
sp.hstack((0,
sp.cumsum(
posterior_weight[
sort_indices]),
1)))
@sp.vectorize
def f_expected(u):
return (sp.log(u)-sp.log(d_observed)) / (- sp.log(d_observed)) * \
(u > d_observed)
x = sp.linspace(0.1, 1)
max_distribution_difference = sp.absolute(f_empirical(x) -
f_expected(x)).max()
assert max_distribution_difference < 0.12