def test_uint_multi_port(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_ports = random.sample(
[d for d in x_series_device.do_ports if d.do_port_width <= 16], 2)
total_port_width = sum([d.do_port_width for d in do_ports])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
flatten_channel_string([d.name for d in do_ports]),
line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(total_port_width))
for _ in range(10)]
values_read = []
for value_to_test in values_to_test:
task.write(value_to_test)
time.sleep(0.001)
values_read.append(task.read())
assert values_read == values_to_test
python类seed()的实例源码
def test_one_sample_one_line(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_line = random.choice(x_series_device.do_lines).name
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_line, line_grouping=LineGrouping.CHAN_PER_LINE)
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
# Generate random values to test.
values_to_test = [bool(random.getrandbits(1)) for _ in range(10)]
values_read = []
for value_to_test in values_to_test:
writer.write_one_sample_one_line(value_to_test)
time.sleep(0.001)
values_read.append(reader.read_one_sample_one_line())
numpy.testing.assert_array_equal(values_read, values_to_test)
def test_one_sample_port_byte(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_port = random.choice(
[d for d in x_series_device.do_ports if d.do_port_width <= 8])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(do_port.do_port_width))
for _ in range(10)]
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
values_read = []
for value_to_test in values_to_test:
writer.write_one_sample_port_byte(value_to_test)
time.sleep(0.001)
values_read.append(reader.read_one_sample_port_byte())
numpy.testing.assert_array_equal(values_read, values_to_test)
def test_one_sample_port_uint32(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_port = random.choice(
[do for do in x_series_device.do_ports if do.do_port_width <= 32])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(do_port.do_port_width))
for _ in range(10)]
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
values_read = []
for value_to_test in values_to_test:
writer.write_one_sample_port_uint32(value_to_test)
time.sleep(0.001)
values_read.append(reader.read_one_sample_port_uint32())
numpy.testing.assert_array_equal(values_read, values_to_test)
def test_insufficient_numpy_write_data(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
# Randomly select physical channels to test.
number_of_channels = random.randint(
2, len(x_series_device.ao_physical_chans))
channels_to_test = random.sample(
x_series_device.ao_physical_chans, number_of_channels)
with nidaqmx.Task() as task:
task.ao_channels.add_ao_voltage_chan(
flatten_channel_string([c.name for c in channels_to_test]),
max_val=10, min_val=-10)
number_of_samples = random.randint(1, number_of_channels - 1)
values_to_test = numpy.float64([
random.uniform(-10, 10) for _ in range(number_of_samples)])
with pytest.raises(DaqError) as e:
task.write(values_to_test, auto_start=True)
assert e.value.error_code == -200524
def test_create_ai_voltage_chan(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
ai_phys_chan = random.choice(x_series_device.ai_physical_chans).name
with nidaqmx.Task() as task:
ai_channel = task.ai_channels.add_ai_voltage_chan(
ai_phys_chan, name_to_assign_to_channel="VoltageChannel",
terminal_config=TerminalConfiguration.NRSE, min_val=-20.0,
max_val=20.0, units=VoltageUnits.FROM_CUSTOM_SCALE,
custom_scale_name="double_gain_scale")
assert ai_channel.physical_channel.name == ai_phys_chan
assert ai_channel.name == "VoltageChannel"
assert ai_channel.ai_term_cfg == TerminalConfiguration.NRSE
assert ai_channel.ai_min == -20.0
assert ai_channel.ai_max == 20.0
assert (ai_channel.ai_voltage_units ==
VoltageUnits.FROM_CUSTOM_SCALE)
assert (ai_channel.ai_custom_scale.name ==
"double_gain_scale")
def test_create_ai_resistance_chan(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
ai_phys_chan = random.choice(x_series_device.ai_physical_chans).name
with nidaqmx.Task() as task:
ai_channel = task.ai_channels.add_ai_resistance_chan(
ai_phys_chan, name_to_assign_to_channel="ResistanceChannel",
min_val=-1000.0, max_val=1000.0, units=ResistanceUnits.OHMS,
resistance_config=ResistanceConfiguration.TWO_WIRE,
current_excit_source=ExcitationSource.EXTERNAL,
current_excit_val=0.002, custom_scale_name="")
assert ai_channel.physical_channel.name == ai_phys_chan
assert ai_channel.name == "ResistanceChannel"
assert numpy.isclose(ai_channel.ai_min, -1000.0, atol=1)
assert numpy.isclose(ai_channel.ai_max, 1000.0, atol=1)
assert ai_channel.ai_resistance_units == ResistanceUnits.OHMS
assert (ai_channel.ai_resistance_cfg ==
ResistanceConfiguration.TWO_WIRE)
assert ai_channel.ai_excit_src == ExcitationSource.EXTERNAL
assert ai_channel.ai_excit_val == 0.002
def test_watchdog_expir_state(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_line = random.choice(x_series_device.do_lines)
with nidaqmx.system.WatchdogTask(
x_series_device.name, timeout=0.1) as task:
expir_states = [DOExpirationState(
physical_channel=do_line.name,
expiration_state=Level.TRISTATE)]
task.cfg_watchdog_do_expir_states(expir_states)
expir_state_obj = task.expiration_states[do_line.name]
assert expir_state_obj.expir_states_do_state == Level.TRISTATE
expir_state_obj.expir_states_do_state = Level.LOW
assert expir_state_obj.expir_states_do_state == Level.LOW
def test_arm_start_trigger(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
counter = random.choice(self._get_device_counters(x_series_device))
with nidaqmx.Task() as task:
task.co_channels.add_co_pulse_chan_freq(counter)
task.triggers.arm_start_trigger.trig_type = (
TriggerType.DIGITAL_EDGE)
assert (task.triggers.arm_start_trigger.trig_type ==
TriggerType.DIGITAL_EDGE)
task.triggers.arm_start_trigger.trig_type = (
TriggerType.NONE)
assert (task.triggers.arm_start_trigger.trig_type ==
TriggerType.NONE)
def test_pause_trigger(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
counter = random.choice(self._get_device_counters(x_series_device))
with nidaqmx.Task() as task:
task.co_channels.add_co_pulse_chan_freq(counter)
task.timing.cfg_implicit_timing(
sample_mode=AcquisitionType.CONTINUOUS)
task.triggers.pause_trigger.trig_type = (
TriggerType.DIGITAL_LEVEL)
assert (task.triggers.pause_trigger.trig_type ==
TriggerType.DIGITAL_LEVEL)
task.triggers.pause_trigger.trig_type = (
TriggerType.NONE)
assert (task.triggers.pause_trigger.trig_type ==
TriggerType.NONE)
def test_int_property(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
with nidaqmx.Task() as task:
task.ci_channels.add_ci_count_edges_chan(
x_series_device.ci_physical_chans[0].name)
# Test property default value.
assert task.in_stream.offset == 0
# Test property setter and getter.
value_to_test = random.randint(0, 100)
task.in_stream.offset = value_to_test
assert task.in_stream.offset == value_to_test
value_to_test = random.randint(-100, 0)
task.in_stream.offset = value_to_test
assert task.in_stream.offset == value_to_test
# Test property deleter.
del task.in_stream.offset
assert task.in_stream.offset == 0
def test_uint_property(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
with nidaqmx.Task() as task:
task.ai_channels.add_ai_voltage_chan(
x_series_device.ai_physical_chans[0].name)
task.timing.cfg_samp_clk_timing(1000)
# Test property initial value.
assert task.timing.samp_clk_timebase_div == 100000
# Test property setter and getter.
value_to_test = random.randint(500, 10000)
task.timing.samp_clk_timebase_div = value_to_test
assert task.timing.samp_clk_timebase_div == value_to_test
# Test property deleter.
del task.timing.samp_clk_timebase_div
assert task.timing.samp_clk_timebase_div == 100000
def test_list_of_floats_property(self, bridge_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
with nidaqmx.Task() as task:
ai_channel = task.ai_channels.add_ai_bridge_chan(
bridge_device.ai_physical_chans[0].name)
# Test default property value.
assert isinstance(ai_channel.ai_bridge_poly_forward_coeff, list)
assert len(ai_channel.ai_bridge_poly_forward_coeff) == 0
# Test property setter and getter.
value_to_test = [random.randint(-10, 10) for _ in
range(random.randint(2, 5))]
ai_channel.ai_bridge_poly_forward_coeff = value_to_test
assert ai_channel.ai_bridge_poly_forward_coeff == value_to_test
# Test property deleter.
del ai_channel.ai_bridge_poly_forward_coeff
assert isinstance(ai_channel.ai_bridge_poly_forward_coeff, list)
assert len(ai_channel.ai_bridge_poly_forward_coeff) == 0
def __init__(self, database, windows, left, right, cluster, table_output,
consensus_threshold, v_error_rate, downsample,
cluster_subsample_size, approx_columns, max_n_bases, exact_copies,
d_coverage, d_evalue, seed):
self.database = database
self.windows = windows
self.left = left
self.right = right
self.cluster = cluster
self.table_output = table_output
self.consensus_threshold = consensus_threshold
self.v_error_rate = v_error_rate
self.downsample = downsample
self.cluster_subsample_size = cluster_subsample_size
self.approx_columns = approx_columns
self.max_n_bases = max_n_bases
self.exact_copies = exact_copies
self.d_coverage = d_coverage
self.d_evalue = d_evalue
self.seed = seed
def minutes_for_days():
"""
500 randomly selected days.
This is used to make sure our test coverage is unbaised towards any rules.
We use a random sample because testing on all the trading days took
around 180 seconds on my laptop, which is far too much for normal unit
testing.
We manually set the seed so that this will be deterministic.
Results of multiple runs were compared to make sure that this is actually
true.
This returns a generator of tuples each wrapping a single generator.
Iterating over this yeilds a single day, iterating over the day yields
the minutes for that day.
"""
env = TradingEnvironment()
random.seed('deterministic')
return ((env.market_minutes_for_day(random.choice(env.trading_days)),)
for _ in range(500))
def loadLogoSet(path, rows,cols,test_data_rate=0.15):
random.seed(612)
_, imgID = readItems('data.txt')
y, _ = modelDict(path)
nPics = len(y)
faceassset = np.zeros((nPics,rows,cols), dtype = np.uint8) ### gray images
noImg = []
for i in range(nPics):
temp = cv2.imread(path +'logo/'+imgID[i]+'.jpg', 0)
if temp == None:
noImg.append(i)
elif temp.size < 1000:
noImg.append(i)
else:
temp = cv2.resize(temp,(cols, rows), interpolation = cv2.INTER_CUBIC)
faceassset[i,:,:] = temp
y = np.delete(y, noImg,0); faceassset = np.delete(faceassset, noImg, 0)
nPics = len(y)
index = random.sample(np.arange(nPics), int(nPics*test_data_rate))
x_test = faceassset[index,:,:]; x_train = np.delete(faceassset, index, 0)
y_test = y[index]; y_train = np.delete(y, index, 0)
return (x_train, y_train), (x_test, y_test)
def plot_labeled_images_random(image_list, label_list, categories, n, title_str, ypixels, xpixels, seed, filename):
random.seed(seed)
index_sample = random.sample(range(len(image_list)), n)
plt.figure(figsize=(2*n, 2))
#plt.suptitle(title_str)
for i, ind in enumerate(index_sample):
ax = plt.subplot(1, n, i + 1)
plt.imshow(image_list[ind].reshape(ypixels, xpixels))
plt.gray()
ax.set_title(categories[label_list[ind]], fontsize=20)
ax.get_xaxis().set_visible(False); ax.get_yaxis().set_visible(False)
if 1:
pylab.savefig(filename, bbox_inches='tight')
else:
plt.show()
# plot_unlabeled_images_random: plots unlabeled images at random
def plot_unlabeled_images_random(image_list, n, title_str, ypixels, xpixels, seed, filename):
random.seed(seed)
index_sample = random.sample(range(len(image_list)), n)
plt.figure(figsize=(2*n, 2))
plt.suptitle(title_str)
for i, ind in enumerate(index_sample):
ax = plt.subplot(1, n, i + 1)
plt.imshow(image_list[ind].reshape(ypixels, xpixels))
plt.gray()
ax.get_xaxis().set_visible(False); ax.get_yaxis().set_visible(False)
if 1:
pylab.savefig(filename, bbox_inches='tight')
else:
plt.show()
# plot_compare: given test images and their reconstruction, we plot them for visual comparison
def generate_graphs(self, n_edges_list, use_seed=True):
"""For each number of edges (n_edges) in n_edges_list create
an Erdos Renyi Precision Graph that allows us to sample
from later.
Parameters
----------
n_edges : list[int] or int
list of number of edges for each graph or scalar
if only one graph is wanted
use_seed : bool
indicates if seed shall be reset
"""
if use_seed and self.seed is not None:
random.seed(self.seed)
n_edges = n_edges_list if type(n_edges_list) is list \
else [n_edges_list]
self.graphs = [ErdosRenyiPrecisionGraph(self.n_vertices, n_es)
for n_es in n_edges]
def make_list(args):
image_list = list_image(args.root, args.recursive, args.exts)
image_list = list(image_list)
if args.shuffle is True:
random.seed(100)
random.shuffle(image_list)
N = len(image_list)
chunk_size = (N + args.chunks - 1) / args.chunks
for i in xrange(args.chunks):
chunk = image_list[i * chunk_size:(i + 1) * chunk_size]
if args.chunks > 1:
str_chunk = '_%d' % i
else:
str_chunk = ''
sep = int(chunk_size * args.train_ratio)
sep_test = int(chunk_size * args.test_ratio)
if args.train_ratio == 1.0:
write_list(args.prefix + str_chunk + '.lst', chunk)
else:
if args.test_ratio:
write_list(args.prefix + str_chunk + '_test.lst', chunk[:sep_test])
if args.train_ratio + args.test_ratio < 1.0:
write_list(args.prefix + str_chunk + '_val.lst', chunk[sep_test + sep:])
write_list(args.prefix + str_chunk + '_train.lst', chunk[sep_test:sep_test + sep])
def get_random_string(length=12,
allowed_chars='abcdefghijklmnopqrstuvwxyz'
'ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'):
"""
Returns a securely generated random string.
The default length of 12 with the a-z, A-Z, 0-9 character set returns
a 71-bit value. log_2((26+26+10)^12) =~ 71 bits
"""
if not using_sysrandom:
# This is ugly, and a hack, but it makes things better than
# the alternative of predictability. This re-seeds the PRNG
# using a value that is hard for an attacker to predict, every
# time a random string is required. This may change the
# properties of the chosen random sequence slightly, but this
# is better than absolute predictability.
random.seed(
hashlib.sha256(
("%s%s" % (random.getstate(), time.time())).encode('utf-8')
).digest())
return ''.join(random.choice(allowed_chars) for i in range(length))
def write_parameter_log(options, output_dir):
"""
Write paramter values to a log file, named by current time.
"""
merge_method_dict={1:'narrowPeak', 2:'broadPeak'}
correction_method_dict={1:'Bonferroni', 2:'BH_FDR'}
with open(output_dir+'/CLAM_Peaker.Parameters.'+ strftime("%Y%m%d_%H%M") + '.txt', 'w') as log:
log.write('CLAM Peaker ' + __version__ + '\n')
log.write('resume: ' + str(options.resume) + '\n')
log.write('verbose: ' + str(options.verbose) + '\n')
log.write('output_dir:' + str(options.output_dir) + '\n')
log.write('tmp_dir: ' + str(options.tmp_dir) + '\n')
log.write('peak_file: ' + str(options.peak_file) + '\n')
log.write('is_stranded: ' + str(options.is_stranded) + '\n')
log.write('extend: ' + str(options.extend) + '\n')
log.write('pval_cutoff: ' + str(options.pval_cutoff) + '\n')
log.write('merge_size: ' + str(options.merge_size) + '\n')
log.write('max_iter: ' + str(options.max_iter) + '\n')
log.write('gtf: ' + str(options.gtf) + '\n')
log.write('seed: ' + str(options.seed) + '\n')
log.write('merge_method: ' + merge_method_dict[options.merge_method] + '\n')
log.write('correction_method: ' + correction_method_dict[options.correction_method] + '\n')
log.write('thread: ' + str(options.nb_proc) + '\n')
def write_parameter_log(options, output_dir):
"""
Write paramter values to a log file, named by current time.
"""
merge_method_dict={1:'narrowPeak', 2:'broadPeak'}
correction_method_dict={1:'Bonferroni', 2:'BH_FDR'}
with open(output_dir+'/CLAM_Peaker.Parameters.'+ strftime("%Y%m%d_%H%M") + '.txt', 'w') as log:
log.write('CLAM Peaker ' + __version__ + '\n')
log.write('resume: ' + str(options.resume) + '\n')
log.write('verbose: ' + str(options.verbose) + '\n')
log.write('output_dir:' + str(options.output_dir) + '\n')
log.write('tmp_dir: ' + str(options.tmp_dir) + '\n')
log.write('peak_file: ' + str(options.peak_file) + '\n')
log.write('is_stranded: ' + str(options.is_stranded) + '\n')
log.write('extend: ' + str(options.extend) + '\n')
log.write('pval_cutoff: ' + str(options.pval_cutoff) + '\n')
log.write('merge_size: ' + str(options.merge_size) + '\n')
log.write('max_iter: ' + str(options.max_iter) + '\n')
log.write('gtf: ' + str(options.gtf) + '\n')
log.write('seed: ' + str(options.seed) + '\n')
log.write('merge_method: ' + merge_method_dict[options.merge_method] + '\n')
log.write('correction_method: ' + correction_method_dict[options.correction_method] + '\n')
log.write('thread: ' + str(options.nb_proc) + '\n')
def split_keys(profiles, bin_sites, random_state=1234):
"""Balanced split over binding/non-binding sequences."""
random.seed(random_state)
pos_keys = bin_sites.keys()
neg_keys = list(set(profiles.keys()) - set(pos_keys))
random.shuffle(pos_keys)
random.shuffle(neg_keys)
len_pos = len(pos_keys)
pos_keys1 = pos_keys[:len_pos / 2]
pos_keys2 = pos_keys[len_pos / 2:]
len_neg = len(neg_keys)
neg_keys1 = neg_keys[:len_neg / 2]
neg_keys2 = neg_keys[len_neg / 2:]
return [pos_keys1, pos_keys2, neg_keys1, neg_keys2]
def get_batches(data, batch_size, vocabulary, pos_vocabulary):
'''
Get batches without any restrictions on number of antecedents and negative candidates.
'''
random.seed(24)
random.shuffle(data)
data_size = len(data)
if data_size % float(batch_size) == 0:
num_batches = int(data_size / float(batch_size))
else:
num_batches = int(data_size / float(batch_size)) + 1
batches = []
for batch_num in range(num_batches):
start_index = batch_num * batch_size
end_index = min((batch_num + 1) * batch_size, data_size)
batch = pad_batch(data[start_index:end_index], vocabulary, pos_vocabulary)
batches.append(batch)
logging.info('Data size: %s' % len(data))
logging.info('Number of batches: %s' % len(batches))
return batches
def split(flags):
if os.path.exists(flags.split_path):
return np.load(flags.split_path).item()
folds = flags.folds
path = flags.input_path
random.seed(6)
img_list = ["%s/%s"%(path,img) for img in os.listdir(path)]
random.shuffle(img_list)
dic = {}
n = len(img_list)
num = (n+folds-1)//folds
for i in range(folds):
s,e = i*num,min(i*num+num,n)
dic[i] = img_list[s:e]
np.save(flags.split_path,dic)
return dic
def randomly_grouped_by(key_from_example: Callable[[LabeledExample], Any], training_share: float = .9) -> Callable[
[List[LabeledExample]], Tuple[List[LabeledExample], List[LabeledExample]]]:
def split(examples: List[LabeledExample]) -> Tuple[List[LabeledExample], List[LabeledExample]]:
examples_by_directory = group(examples, key=key_from_example)
directories = examples_by_directory.keys()
# split must be the same every time:
random.seed(42)
keys = set(random.sample(directories, int(training_share * len(directories))))
training_examples = [example for example in examples if key_from_example(example) in keys]
test_examples = [example for example in examples if key_from_example(example) not in keys]
return training_examples, test_examples
return split
def gen_training_data(
num_features,
num_training_samples,
num_outputs,
noise_scale=0.1,
):
np.random.seed(0)
random.seed(1)
input_distribution = stats.norm()
training_inputs = input_distribution.rvs(
size=(num_training_samples, num_features)
).astype(np.float32)
weights = np.random.normal(size=(num_outputs, num_features)
).astype(np.float32).transpose()
noise = np.multiply(
np.random.normal(size=(num_training_samples, num_outputs)), noise_scale
)
training_outputs = (np.dot(training_inputs, weights) +
noise).astype(np.float32)
return training_inputs, training_outputs, weights, input_distribution
def test(args=None, BSTtype=BST):
import random, sys
random.seed(19920206)
if not args:
args = sys.argv[1:]
if not args:
print 'usage: %s <number-of-random-items | item item item ...>' % \
sys.argv[0]
sys.exit()
elif len(args) == 1:
items = (random.randrange(100) for i in xrange(int(args[0])))
else:
items = [int(i) for i in args]
tree = BSTtype()
source = []
for item in items:
tree.insert(item)
source += [str(item)]
print ' '.join(source)
print tree
def generate():
import random, sys
random.seed(19920206)
Lmin = 2 ** 2 - 1
Lmax = 2 ** 4 - 1
Xnum = 1000000
voc = 26
wfile = open('/home/thoma/Work/Dial-DRL/dataset/BST_1M.txt', 'w')
for id in xrange(Xnum):
tree = BST()
items = (random.randrange(voc) for i in
xrange(random.randint(Lmin, Lmax)))
source = []
for item in items:
item = chr(item + 65)
tree.insert(item)
source += [str(item)]
source = ' '.join(source)
target = str(tree)
line = '{0} -> {1}'.format(source, target)
wfile.write(line + '\n')
if id % 10000 == 0:
print id