def run(self, q_job):
"""Run circuits in q_job"""
# Generating a string id for the job
job_id = str(uuid.uuid4())
result_list = []
qobj = q_job.qobj
self._sim = Simulator(gate_fusion=True)
if 'seed' in qobj['config']:
self._seed = qobj['config']['seed']
self._sim._simulator = CppSim(self._seed)
else:
self._seed = random.getrandbits(32)
self._shots = qobj['config']['shots']
for circuit in qobj['circuits']:
result_list.append(self.run_circuit(circuit))
return Result({'job_id': job_id, 'result': result_list,
'status': 'COMPLETED'},
qobj)
python类getrandbits()的实例源码
def test_append_rules(self, asa):
rules = []
for i in range(1, 351):
protocol = choice(["tcp", "udp"])
if bool(getrandbits(1)):
src = IPAddress(randint(0, 4294967295))
else:
src = IPNetwork(f"{IPAddress(randint(0, 4294967295))}/{randint(0, 31)}").cidr
if bool(getrandbits(1)):
dst = IPAddress(randint(0, 4294967295))
else:
dst = IPNetwork(f"{IPAddress(randint(0, 4294967295))}/{randint(0, 31)}").cidr
dst_port = randint(1, 65535)
src_comp = choice([comp for comp in ServiceComparator])
dst_comp = choice([comp for comp in ServiceComparator])
rule = RuleTCPUDP(protocol=protocol, src=src, dst=dst, src_port=i, dst_port=dst_port,
src_comp=src_comp, dst_comp=dst_comp)
rules.append(rule)
asa.acl.append_rules(settings.test_acl, rules)
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
"""
This function follows from the following discussion:
`http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
It works like uuid.uuid4 except that tries to use os.urandom() if possible
and it XORs the output with the tokens uniquely associated with this machine.
"""
rand_longs = (random.getrandbits(64), random.getrandbits(64))
if HAVE_URANDOM:
urand_longs = _struct_2_long_long.unpack(fast_urandom16())
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
else:
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
rand_longs[1] ^ ctokens[1])
return str(uuid.UUID(bytes=byte_s, version=4))
fake_hsm_coordinator.py 文件源码
项目:intel-manager-for-lustre
作者: intel-hpdd
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def run(self):
if not self.coordinator.enabled:
return
log.debug("Coordinator thread for %s starting: %s" %
(self.coordinator.filesystem, threading.current_thread()))
while not self._stopping.is_set():
# Keep agents busy
self.generate_actions_for_agents()
# Stack up waiting actions
if bool(random.getrandbits(1)):
self.unassigned_requests.put(self.get_random_action())
self.cull_completed_requests()
# TODO: Very infrequently, randomly cancel an agent action
self._stopping.wait(HSM_COORDINATOR_LOOP_INTERVAL)
ilsvrc_cls_multithread_scipy.py 文件源码
项目:tensorflow_yolo2
作者: wenxichen
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def image_read(self, imname):
image = misc.imread(imname, mode='RGB').astype(np.float)
r,c,ch = image.shape
if r < 299 or c < 299:
# TODO: check too small images
# print "##too small!!"
image = misc.imresize(image, (299, 299, 3))
elif r > 299 or c > 299:
image = image[(r-299)/2 : (r-299)/2 + 299, (c-299)/2 : (c-299)/2 + 299, :]
# print r, c, image.shape
assert image.shape == (299, 299, 3)
image = (image / 255.0) * 2.0 - 1.0
if self.random_noise:
add_noise = bool(random.getrandbits(1))
if add_noise:
eps = random.choice([4.0, 8.0, 12.0, 16.0]) / 255.0 * 2.0
noise_image = image + eps * np.random.choice([-1, 1], (299,299,3))
image = np.clip(noise_image, -1.0, 1.0)
return image
def AdminLogin(req,onlyhead):
global currentcookie
passwd = req.query.get('passwd',[''])[0]
salt = Config.conf['AdministratorPassword'][:2]
passwd = crypt.crypt(passwd,salt)
if passwd != Config.conf['AdministratorPassword']:
return Delegate('/errors/wrongpasswd.html',req,onlyhead)
random.seed(os.urandom(200));
currentcookie = str(random.getrandbits(200))
handler = EH_Generic_class()
handler.iamadmin = 1
(header,content) = Site['/adminmenu.html'].getresult(req,onlyhead,handler)
header['Set-Cookie'] = 'OKUSON='+currentcookie+ \
';Path=/;Max-Age=3600;Version=1'
# Max-Age is one hour
#header['Location'] = '/adminmenu.html'
# Taken out to please opera, which does not get the cookie for the
# login with this header. Max.
return (header,content)
def uniform_random(lower_bound, upper_bound):
"""
Question 5.10: Generates uniform random number
within lower and upper bounds, inclusive
"""
num_outcomes = upper_bound - lower_bound + 1
result = None
while True:
result = 0
i = 0
while 1 << i < num_outcomes:
result = (result << 1) | getrandbits(1)
i += 1
if result < num_outcomes:
break
return result + lower_bound
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
"""
This function follows from the following discussion:
`http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
It works like uuid.uuid4 except that tries to use os.urandom() if possible
and it XORs the output with the tokens uniquely associated with this machine.
"""
rand_longs = (random.getrandbits(64), random.getrandbits(64))
if HAVE_URANDOM:
urand_longs = _struct_2_long_long.unpack(fast_urandom16())
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
else:
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
rand_longs[1] ^ ctokens[1])
return str(uuid.UUID(bytes=byte_s, version=4))
def generateChallengeResponseV2(password, user, server_challenge, server_info, domain = '', client_challenge = None):
client_timestamp = '\0' * 8
if not client_challenge:
client_challenge = ''
for i in range(0, 8):
client_challenge += chr(random.getrandbits(8))
assert len(client_challenge) == 8
d = MD4()
d.update(password.encode('UTF-16LE'))
ntlm_hash = d.digest() # The NT password hash
response_key = hmac.new(ntlm_hash, (user.upper() + domain).encode('UTF-16LE')).digest() # The NTLMv2 password hash. In [MS-NLMP], this is the result of NTOWFv2 and LMOWFv2 functions
temp = client_timestamp + client_challenge + domain.encode('UTF-16LE') + server_info
nt_challenge_response = hmac.new(response_key, server_challenge + temp).digest()
lm_challenge_response = hmac.new(response_key, server_challenge + client_challenge).digest() + client_challenge
session_key = hmac.new(response_key, nt_challenge_response).digest()
return nt_challenge_response, lm_challenge_response, session_key
################
# NTLMv1 Methods
################
def __init__(self, url, mode = "r"):
import random
try:
import xbmc
except:
xbmc = None
self.url = url
self.remote, self.share, self.path = path = connect(url)
self.mode = mode
self.binary = False
self.canread = False
self.canwrite = False
self.closed = True
self.size = 0
self.pos = 0
if xbmc:
self.tmp_path = os.path.join(xbmc.translatePath("special://temp/"), "%08x" % (random.getrandbits(32)))
else:
self.tmp_path = os.path.join(os.getenv("TEMP") or os.getenv("TMP") or os.getenv("TMPDIR"), "%08x" % (random.getrandbits(32)))
self.tmp_file = None
self.__get_mode__()
def dialog_select(self, heading, list):
ID = "%032x" %(random.getrandbits(128))
response = '<?xml version="1.0" encoding="UTF-8" ?>\n'
response +='<rss version="2.0" xmlns:dc="http://purl.org/dc/elements/1.1/">\n'
response +='<channel>\n'
response +='<link>/rss</link>\n'
response +='<title>'+heading+'</title>\n'
for option in list:
response += '<item>\n'
response += '<title>'+option +'</title>\n'
response += '<image>%s</image>\n'
response += '<link>http://'+self.controller.host+'/data/'+threading.current_thread().name +'/'+ID+'/'+str(list.index(option))+'</link>\n'
response += '</item>\n\n'
response += '</channel>\n'
response += '</rss>\n'
self.controller.send_data(response)
self.handler.server.shutdown_request(self.handler.request)
while not self.controller.get_data(ID):
continue
return int(self.controller.get_data(ID))
def test_bool_1_chan_1_samp(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_line = random.choice(x_series_device.do_lines).name
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_line, line_grouping=LineGrouping.CHAN_PER_LINE)
# Generate random values to test.
values_to_test = [bool(random.getrandbits(1)) for _ in range(10)]
values_read = []
for value_to_test in values_to_test:
task.write(value_to_test)
time.sleep(0.001)
values_read.append(task.read())
assert values_read == values_to_test
# Verify setting number_of_samples_per_channel (even to 1)
# returns a list.
value_read = task.read(number_of_samples_per_channel=1)
assert isinstance(value_read, list)
assert len(value_read) == 1
def test_bool_n_chan_1_samp(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
number_of_channels = random.randint(2, len(x_series_device.do_lines))
do_lines = random.sample(x_series_device.do_lines, number_of_channels)
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
flatten_channel_string([d.name for d in do_lines]),
line_grouping=LineGrouping.CHAN_PER_LINE)
# Generate random values to test.
values_to_test = [bool(random.getrandbits(1)) for _ in
range(number_of_channels)]
task.write(values_to_test)
time.sleep(0.001)
values_read = task.read()
assert values_read == values_to_test
# Verify setting number_of_samples_per_channel (even to 1)
# returns a list of lists.
value_read = task.read(number_of_samples_per_channel=1)
assert isinstance(value_read, list)
assert isinstance(value_read[0], list)
def test_uint_1_chan_1_samp(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_port = random.choice(x_series_device.do_ports)
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(do_port.do_port_width))
for _ in range(10)]
values_read = []
for value_to_test in values_to_test:
task.write(value_to_test)
time.sleep(0.001)
values_read.append(task.read())
assert values_read == values_to_test
# Verify setting number_of_samples_per_channel (even to 1)
# returns a list.
value_read = task.read(number_of_samples_per_channel=1)
assert isinstance(value_read, list)
assert len(value_read) == 1
def test_many_sample_port_byte(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
number_of_samples = random.randint(2, 20)
do_port = random.choice(
[d for d in x_series_device.do_ports if d.do_port_width <= 8])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = numpy.array(
[int(random.getrandbits(do_port.do_port_width))
for _ in range(number_of_samples)], dtype=numpy.uint8)
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
task.start()
writer.write_many_sample_port_byte(values_to_test)
time.sleep(0.001)
# Since we're writing to and reading from ONLY the digital
# output lines, we can't use sample clocks to correlate the
# read and write sampling times. Thus, we essentially read
# the last value written multiple times.
values_read = numpy.zeros(number_of_samples, dtype=numpy.uint8)
reader.read_many_sample_port_byte(
values_read, number_of_samples_per_channel=number_of_samples)
expected_values = [
values_to_test[-1] for _ in range(number_of_samples)]
numpy.testing.assert_array_equal(values_read, expected_values)