def __init__(self, opt, shared=None):
"""Initialize the parameters of the DefaultTeacher"""
assert opt['train_part'] + opt['test_part'] + opt['valid_part'] == 1
self.parts = [opt['train_part'], opt['valid_part'], opt['test_part']]
# store datatype
self.dt = opt['datatype'].split(':')[0]
self.opt = opt
opt['datafile'] = _path(opt)
# store identifier for the teacher in the dialog
self.id = 'ner_teacher'
random_state = random.getstate()
random.seed(opt.get('teacher_seed'))
self.random_state = random.getstate()
random.setstate(random_state)
if shared and shared.get('metrics'):
self.metrics = shared['metrics']
else:
self.metrics = CoNLLClassificationMetrics(opt['model_file'])
# define standard question, since it doesn't change for this task
super().__init__(opt, shared)
python类setstate()的实例源码
def __init__(self, opt, shared=None):
"""Initialize the class accordint to given parameters in opt."""
# store datatype
self.datatype_strict = opt['datatype'].split(':')[0]
opt['datafile'] = _path(opt)
# store identifier for the teacher in the dialog
self.id = 'insults_teacher'
self.answer_candidates = ['Non-insult', "Insult"]
random_state = random.getstate()
random.seed(opt.get('teacher_random_seed'))
self.random_state = random.getstate()
random.setstate(random_state)
super().__init__(opt, shared)
if shared:
self.observations = shared['observations']
self.labels = shared['labels']
else:
self.observations = []
self.labels = []
def adapt_z_state(self,main_rdd, cinfo,beta):
ainv = cinfo
def Updatez(tpl):
tt=[]
for ((tx,lam,state,z),index) in tpl:
random.setstate(state)
p = random.random()
state = random.getstate()
if p<self.ptr:
znew=float(-np.matrix(tx)*ainv*np.matrix(tx).T)
else:
znew=0.0
z=(1-beta)*z+beta*znew
tt.append(((tx,lam,state,z),index))
return tt
main_rdd = main_rdd.mapValues(Updatez).cache()
return main_rdd
def get_instance_number(pid, tid):
"""
Maps the token to an instance number for a prolem.
Args:
pid: the problem id
tid: the team id
Returns:
The instance number
"""
previous_state = seed_generator(tid, pid)
total_instances = get_number_of_instances(pid)
if total_instances == 0:
raise InternalException("{} has no instances.".format(pid))
instance_number = random.randint(0, total_instances-1)
random.setstate(previous_state)
return instance_number
def random_seed(seed=None):
"""Execute code inside this with-block using the specified seed.
If no seed is specified, nothing happens.
Does not affect the state of the random number generator outside this block.
Not thread-safe.
Args:
seed (int): random seed
"""
if seed is None:
yield
else:
py_state = random.getstate() # save state
np_state = np.random.get_state()
random.seed(seed) # alter state
np.random.seed(seed)
yield
random.setstate(py_state) # restore state
np.random.set_state(np_state)
def random_seed(seed=None):
"""Execute code inside this with-block using the specified seed.
If no seed is specified, nothing happens.
Does not affect the state of the random number generator outside this block.
Not thread-safe.
Args:
seed (int): random seed
"""
if seed is None:
yield
else:
py_state = random.getstate() # save state
np_state = np.random.get_state()
random.seed(seed) # alter state
np.random.seed(seed)
yield
random.setstate(py_state) # restore state
np.random.set_state(np_state)
def random_seed(seed=42):
""" sets the random seed of Python within the context.
Example
-------
>>> import random
>>> with random_seed(seed=0):
... random.randint(0, 1000) # doctest: +SKIP
864
"""
old_state = random.getstate()
random.seed(seed)
try:
yield
finally:
random.setstate(old_state)
def derandomize(seed=123):
'''
Disable randomization for a block
Since bloomfilter generates hash seeds randomly, it is inherently instable object.
It considerably complicates testing. This helper addresses the issue:
with bloomfilter.util.derandomize():
bloom_filter_1 = BloomFilter(100, 0.1)
with bloomfilter.util.derandomize():
bloom_filter_2 = BloomFilter(100, 0.1)
The resulting bloom_filters are stable between runs.
'''
state = random.getstate()
try:
random.seed(seed)
yield
finally:
random.setstate(state)
def _reseed(config, offset=0):
seed = config.getoption('randomly_seed') + offset
if seed not in random_states:
random.seed(seed)
random_states[seed] = random.getstate()
else:
random.setstate(random_states[seed])
if have_factory_boy:
factory_set_random_state(random_states[seed])
if have_faker:
faker_random.setstate(random_states[seed])
if have_numpy:
if seed not in np_random_states:
np_random.seed(seed)
np_random_states[seed] = np_random.get_state()
else:
np_random.set_state(np_random_states[seed])
def reset(self):
"""Reset Teacher random states"""
random_state = random.getstate()
random.setstate(self.random_state)
random.shuffle(self.data.data)
self.random_state = random.getstate()
random.setstate(random_state)
self.lastY = None
self.episode_idx = self.data_offset - self.step_size
self.episode_done = True
self.epochDone = False
if not self.random and self.data_offset >= self.data.num_episodes():
# could have bigger batchsize then episodes... so nothing to do
self.epochDone = True
def setup_data(self, path):
"""Read and iteratively yield data to agent"""
print('loading: ' + path)
questions = []
y = []
# open data file with labels
# (path will be provided to setup_data from opt['datafile'] defined above)
with open(path) as labels_file:
context = csv.reader(labels_file)
next(context)
for item in context:
label, text = item
questions.append(text)
y.append([self.answer_candidates[int(label)]])
episode_done = True
indexes = range(len(questions))
if self.datatype_strict != 'test':
random_state = random.getstate()
random.setstate(self.random_state)
kf_seed = random.randrange(500000)
kf = KFold(self.opt.get('bagging_folds_number'), shuffle=True,
random_state=kf_seed)
i = 0
for train_index, test_index in kf.split(questions):
indexes = train_index if self.datatype_strict == 'train' else test_index
if i >= self.opt.get('bagging_fold_index', 0):
break
self.random_state = random.getstate()
random.setstate(random_state)
# define iterator over all queries
for i in indexes:
# get current label, both as a digit and as a text
# yield tuple with information and episode_done? flag
yield (questions[i], y[i]), episode_done
def reset(self):
"""Reset class, random state"""
super().reset()
random_state = random.getstate()
random.setstate(self.random_state)
random.shuffle(self.data.data)
self.random_state = random.getstate()
random.setstate(random_state)
def reset(self):
"""Reset class and random state."""
super().reset()
random_state = random.getstate()
random.setstate(self.random_state)
random.shuffle(self.data.data)
self.random_state = random.getstate()
random.setstate(random_state)
def use_internal_state(self):
"""Use a specific RNG state."""
old_state = random.getstate()
random.setstate(self._random_state)
yield
self._random_state = random.getstate()
random.setstate(old_state)
def set_seed_tmp(seed=None):
if seed is None:
yield
else:
state = random.getstate()
np_state = np.random.get_state()
random.seed(seed)
np.random.seed(seed)
yield
np.random.set_state(np_state)
random.setstate(state)
def do_teardown():
global _old_python_random_state
global _old_numpy_random_state
global _old_cupy_random_states
random.setstate(_old_python_random_state)
numpy.random.set_state(_old_numpy_random_state)
cupy.random.generator._random_states = _old_cupy_random_states
_old_python_random_state = None
_old_numpy_random_state = None
_old_cupy_random_states = None
# In some tests (which utilize condition.repeat or condition.retry),
# setUp/tearDown is nested. _setup_random() and _teardown_random() do their
# work only in the outermost setUp/tearDown pair.
def agentRandom(self, offset=0):
"""Return a random number that is consistent between frames but can
be offset by an integer"""
state = random.getstate()
random.seed(hash(self.userid) - 1 + offset)
# -1 so that this number is different to the first random number
# generated on frame 0 (if used) of the simulation
result = random.random()
random.setstate(state)
return result
def setUp(self):
self.runner = CliRunner()
os.environ.setdefault('LC_ALL', 'en_US.utf-8')
os.environ.setdefault('LANG', 'en_US.utf-8')
random.setstate(START)
def test_determinism(self):
# first pass
with self.runner.isolated_filesystem():
self.runner.invoke(cli, ['init'])
self.runner.invoke(cli, ['new', "First Feature"])
self.runner.invoke(cli, ['release', '--yes'])
for i in range(random.randrange(10)):
for j in range(random.randrange(3)):
line = random.choice(OPTIONS)
args = [line[0], line[1].format(random.random())]
self.runner.invoke(cli, args)
self.runner.invoke(cli, ['release', '--yes'])
with open("CHANGELOG.md", 'r') as first_pass_file:
first_pass = first_pass_file.read()
# reset random
random.setstate(START)
# second pass
with self.runner.isolated_filesystem():
self.runner.invoke(cli, ['init'])
self.runner.invoke(cli, ['new', "First Feature"])
self.runner.invoke(cli, ['release', '--yes'])
for i in range(random.randrange(10)):
for j in range(random.randrange(3)):
line = random.choice(OPTIONS)
args = [line[0], line[1].format(random.random())]
self.runner.invoke(cli, args)
self.runner.invoke(cli, ['release', '--yes'])
with open("CHANGELOG.md", 'r') as second_pass_file:
second_pass = second_pass_file.read()
self.assertEqual(first_pass, second_pass)
def load_checkpoint(self, directory: str, train_iter: data_io.BaseParallelSampleIter) -> _TrainingState:
"""
Loads the full training state from disk. This includes optimizer,
random number generators and everything needed. Note that params
should have been loaded already by the initializer.
:param directory: directory where the state has been saved.
:param train_iter: training data iterator.
"""
# Optimzer state (from mxnet)
opt_state_fname = os.path.join(directory, C.OPT_STATES_LAST)
self.load_optimizer_states(opt_state_fname)
# State of the bucket iterator
train_iter.load_state(os.path.join(directory, C.BUCKET_ITER_STATE_NAME))
# RNG states: python's random and np.random provide functions for
# storing the state, mxnet does not, but inside our code mxnet's RNG is
# not used AFAIK
with open(os.path.join(directory, C.RNG_STATE_NAME), "rb") as fp:
random.setstate(pickle.load(fp))
np.random.set_state(pickle.load(fp))
# Monitor state, in order to get the full information about the metrics
self.training_monitor.load_state(os.path.join(directory, C.MONITOR_STATE_NAME))
# And our own state
return self.load_state(os.path.join(directory, C.TRAINING_STATE_NAME))
def shuffle(lol):
'''
shuffle inplace each list in the same order by ensuring that we
use the same state for every run of shuffle.
lol :: list of list as input
'''
state = random.getstate()
for l in lol:
random.setstate(state)
random.shuffle(l)
def shuffle_together(self, mats):
"""
:param mats: shuffles the given matrices and maintains the same 'shuffled order' in all matrices
"""
rng = random.getstate()
for mat in mats:
random.setstate(rng) # reset random state to the saved state to get the same 'shuffled order' as previous shuffling
random.shuffle(mat)
def __setstate__(self, state):
self.__dict__ = state
random.setstate(state.pop('_random_state'))
np.random.set_state(state.pop('_np_random_state'))
def gradcheck_naive(f, x):
#Return an object capturing the current internal state of the generator
rndstate = random.getstate() #why use state??????
random.setstate(rndstate)
fx, grad = f(x) #fx=np.sum(x ** 2), grad=x * 2
h = 1e-4
#Efficient multi-dimensional iterator object to iterate over arrays
# Iterate over all indexes in x
it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite'])
while not it.finished:
ix = it.multi_index #starts from (0, 0) then (0, 1)
x[ix] += h #To calculate [f(xi+h)-f(xi-h)] / 2h
random.setstate(rndstate)
fxh, _ = f(x)
x[ix] -= 2*h
random.setstate(rndstate)
fxnh, _ = f(x)
x[ix] += h
numgrad = (fxh - fxnh) / 2 / h
#To compare gradient calculated by formular and calculus
reldiff = abs(numgrad - grad[ix]) / max(1, abs(numgrad), abs(grad[ix]))
if reldiff > 1e-5:
print "Gradient check failed."
print "First gradient error found at index %s" % str(ix)
print "Your gradient: %f \t Numerical gradient: %f" % (grad[ix], numgrad)
return
it.iternext()
print "Gradient check passed"
def main(id, checkpoint_name=None):
# random.seed(64)
if checkpoint_name:
# A file name has been given, then load the data from the file
cp = pickle.load(open(checkpoint_name, "rb"))
pop = cp["population"]
start_gen = cp["generation"] + 1
hof = cp["halloffame"]
logbook = cp["logbook"]
random.setstate(cp["rndstate"])
else:
pop = toolbox.population(n=Config.pop_size)
start_gen = 0
hof = tools.HallOfFame(1)
logbook = tools.Logbook()
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)
pop, log = paralg.myAsyncEA(pop, start_gen, toolbox, cxpb=0.6, mutpb=0.2, ngen=Config.ngen,
stats=stats, halloffame=hof, logbook=logbook, verbose=True,
id=id)
return pop, log, hof
def main(id, checkpoint_name=None):
# random.seed(64)
if checkpoint_name:
# A file name has been given, then load the data from the file
cp = pickle.load(open(checkpoint_name, "rb"))
pop = cp["population"]
start_gen = cp["generation"] + 1
hof = cp["halloffame"]
logbook = cp["logbook"]
random.setstate(cp["rndstate"])
else:
pop = toolbox.population(n=Config.pop_size)
start_gen = 0
hof = tools.HallOfFame(1)
logbook = tools.Logbook()
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)
pop, log = alg.myEASimple(pop, start_gen, toolbox, cxpb=0.6, mutpb=0.2, ngen=Config.ngen,
stats=stats, halloffame=hof, logbook=logbook, verbose=True,
id=id)
return pop, log, hof
def main(id, checkpoint_name=None):
# random.seed(64)
if checkpoint_name:
# A file name has been given, then load the data from the file
cp = pickle.load(open(checkpoint_name, "rb"))
pop = cp["population"]
start_gen = cp["generation"] + 1
hof = cp["halloffame"]
logbook = cp["logbook"]
random.setstate(cp["rndstate"])
else:
pop = toolbox.population(n=Config.MU)
start_gen = 0
hof = tools.HallOfFame(1)
logbook = tools.Logbook()
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)
pop, log = alg.myEAMuCommaLambda(pop, start_gen, toolbox, Config.MU, Config.LAMBDA,
cxpb=0.6, mutpb=0.2, ngen=Config.ngen,
stats=stats, halloffame=hof, logbook=logbook, verbose=True,
id=id)
return pop, log, hof
def main(id, checkpoint_name=None):
# random.seed(64)
if checkpoint_name:
# A file name has been given, then load the data from the file
cp = pickle.load(open(checkpoint_name, "rb"))
pop = cp["population"]
start_gen = cp["generation"] + 1
hof = cp["halloffame"]
logbook = cp["logbook"]
random.setstate(cp["rndstate"])
else:
pop = toolbox.population(n=Config.pop_size)
start_gen = 0
hof = tools.HallOfFame(1)
logbook = tools.Logbook()
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)
pop, log = paralg.myAsyncEA(pop, start_gen, toolbox, cxpb=0.6, mutpb=0.2, ngen=Config.ngen,
stats=stats, halloffame=hof, logbook=logbook, verbose=True,
id=id)
return pop, log, hof
def gradcheck_naive(f, x):
"""
Gradient check for a function f
- f should be a function that takes a single argument and outputs the cost and its gradients
- x is the point (numpy array) to check the gradient at
"""
rndstate = random.getstate()
random.setstate(rndstate)
fx, grad = f(x) # Evaluate function value at original point
h = 1e-6
# Iterate over all indexes in x
it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite'])
while not it.finished:
ix = it.multi_index
### try modifying x[ix] with h defined above to compute numerical gradients
### make sure you call random.setstate(rndstate) before calling f(x) each time, this will make it
### possible to test cost functions with built in randomness later
old_ix = x[ix]
x[ix] += h
random.setstate(rndstate)
fxh, _ = f(x)
numgrad = (fxh - fx) / (x[ix] - (x[ix] - h))
x[ix] = old_ix
# Compare gradients
reldiff = abs(numgrad - grad[ix]) / max(1, abs(numgrad), abs(grad[ix]))
if reldiff > 1e-5:
print "Gradient check failed."
print "First gradient error found at index %s" % str(ix)
print "Your gradient: %f \t Numerical gradient: %f" % (grad[ix], numgrad)
return
it.iternext() # Step to next dimension
print "Gradient check passed!"
def flip(self, *args):
random.setstate(self.state)
bit = super(SeededCoin, self).flip()
self.state = random.getstate()
return bit