def test_uint_multi_port(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_ports = random.sample(
[d for d in x_series_device.do_ports if d.do_port_width <= 16], 2)
total_port_width = sum([d.do_port_width for d in do_ports])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
flatten_channel_string([d.name for d in do_ports]),
line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(total_port_width))
for _ in range(10)]
values_read = []
for value_to_test in values_to_test:
task.write(value_to_test)
time.sleep(0.001)
values_read.append(task.read())
assert values_read == values_to_test
python类getrandbits()的实例源码
def test_one_sample_one_line(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_line = random.choice(x_series_device.do_lines).name
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_line, line_grouping=LineGrouping.CHAN_PER_LINE)
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
# Generate random values to test.
values_to_test = [bool(random.getrandbits(1)) for _ in range(10)]
values_read = []
for value_to_test in values_to_test:
writer.write_one_sample_one_line(value_to_test)
time.sleep(0.001)
values_read.append(reader.read_one_sample_one_line())
numpy.testing.assert_array_equal(values_read, values_to_test)
def test_one_sample_port_byte(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_port = random.choice(
[d for d in x_series_device.do_ports if d.do_port_width <= 8])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(do_port.do_port_width))
for _ in range(10)]
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
values_read = []
for value_to_test in values_to_test:
writer.write_one_sample_port_byte(value_to_test)
time.sleep(0.001)
values_read.append(reader.read_one_sample_port_byte())
numpy.testing.assert_array_equal(values_read, values_to_test)
def test_one_sample_port_uint16(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_port = random.choice(
[do for do in x_series_device.do_ports if do.do_port_width <= 16])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(do_port.do_port_width))
for _ in range(10)]
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
values_read = []
for value_to_test in values_to_test:
writer.write_one_sample_port_uint16(value_to_test)
time.sleep(0.001)
values_read.append(reader.read_one_sample_port_uint16())
numpy.testing.assert_array_equal(values_read, values_to_test)
def test_one_sample_port_uint32(self, x_series_device, seed):
# Reset the pseudorandom number generator with seed.
random.seed(seed)
do_port = random.choice(
[do for do in x_series_device.do_ports if do.do_port_width <= 32])
with nidaqmx.Task() as task:
task.do_channels.add_do_chan(
do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)
# Generate random values to test.
values_to_test = [int(random.getrandbits(do_port.do_port_width))
for _ in range(10)]
writer = DigitalSingleChannelWriter(task.out_stream)
reader = DigitalSingleChannelReader(task.in_stream)
values_read = []
for value_to_test in values_to_test:
writer.write_one_sample_port_uint32(value_to_test)
time.sleep(0.001)
values_read.append(reader.read_one_sample_port_uint32())
numpy.testing.assert_array_equal(values_read, values_to_test)
def random_context():
result = {}
if random.getrandbits(1):
result["function_name"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["function_version"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["invoked_function_arn"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["memory_limit_in_mb"] = str(random.randint(100, 200))
if random.getrandbits(1):
result["aws_request_id"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["log_group_name"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["log_stream_name"] = RunLambdaCliTest.random_string()
if random.getrandbits(1):
result["identity"] = RunLambdaCliTest.random_identity()
if random.getrandbits(1):
result["client_context"] = RunLambdaCliTest.random_client_context()
return result
def __init__(self, *args, **kwds):
## Create shared memory for rendered image
#pg.dbg(namespace={'r': self})
if sys.platform.startswith('win'):
self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows
else:
self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1))
fd = self.shmFile.fileno()
self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
atexit.register(self.close)
GraphicsView.__init__(self, *args, **kwds)
self.scene().changed.connect(self.update)
self.img = None
self.renderTimer = QtCore.QTimer()
self.renderTimer.timeout.connect(self.renderView)
self.renderTimer.start(16)
def __init__(self, *args, **kwds):
## Create shared memory for rendered image
#pg.dbg(namespace={'r': self})
if sys.platform.startswith('win'):
self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows
else:
self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1))
fd = self.shmFile.fileno()
self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
atexit.register(self.close)
GraphicsView.__init__(self, *args, **kwds)
self.scene().changed.connect(self.update)
self.img = None
self.renderTimer = QtCore.QTimer()
self.renderTimer.timeout.connect(self.renderView)
self.renderTimer.start(16)
def main():
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
socket.send_string(str('hello'))
message = '00101110'
cnt = 0
while True:
reward = socket.recv() # 1 or 0, or '-1' for None
print(reward)
msg_in = socket.recv()
print(msg_in)
# think...
msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1)
if cnt % 2 == 0:
msg_out = str(message[cnt % 8])
socket.send(msg_out)
cnt = cnt + 1
def gen_pseudo_word(self, L=None):
if not L:
L = random.randint(1, 8)
# generate one word that we hadn't used before
while True:
if self.readable:
# alternating between vowels and consonants, sampled with repl.
_choice, _range = random.choice, range(int(math.ceil(L / 2)))
v = [_choice(self.V) for i in _range]
c = [_choice(self.C) for i in _range]
zipped = zip(v, c) if random.getrandbits(1) else zip(c, v)
pseudo_word = ''.join([a for b in zipped for a in b])[:L]
else:
pseudo_word = ''.join(random.sample(
string.ascii_lowercase, L))
if pseudo_word not in self.inv_word_mapping:
return pseudo_word
def dated_path(obj, file_data):
try:
prefix = getattr(obj, 'model_name')
except BaseException:
prefix = "undefined"
parts = op.splitext(file_data.filename)
rand = random.getrandbits(16)
filename = u"{name}_{rand}{ext}".format(
rand=rand, name=parts[0], ext=parts[1]
)
filename = secure_filename(filename)
today = date.today()
path = u"{prefix}/{t.year}/{t.month}/{filename}".format(
prefix=prefix, t=today, filename=filename
)
return path
def action_clone_item(self, ids):
if len(ids) > 1:
flash(
"You can select only one item for this action",
'error'
)
return
model = current_app.db.get_with_content(_id=ids[0])
clone = deepcopy(model)
del clone['_id']
clone['slug'] = f'{clone["slug"]}-{random.getrandbits(32)}'
clone['_isclone'] = True
self._on_model_change(None, clone, True)
self.coll.insert(clone)
self.after_model_change(None, clone, True)
return redirect(url_for('.edit_view', id=clone['_id']))
# TODO: Serialize and activate thia action
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
"""
This function follows from the following discussion:
`http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
It works like uuid.uuid4 except that tries to use os.urandom() if possible
and it XORs the output with the tokens uniquely associated with this machine.
"""
rand_longs = (random.getrandbits(64), random.getrandbits(64))
if HAVE_URANDOM:
urand_longs = _struct_2_long_long.unpack(fast_urandom16())
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
else:
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
rand_longs[1] ^ ctokens[1])
return str(uuid.UUID(bytes=byte_s, version=4))
def main():
random.seed()
# ????
print(random.getrandbits(3))
print(random.randint(200, 800))
print(2, 400, 2)
# ????
seq = range(1, 10)
print(random.choice(seq))
print(random.sample(seq, 4))
a = list(seq)
random.shuffle(a)
print(a)
# ??
print(random.random()) # [0.0, 1.0)???????
print(random.uniform(2.1, 4.99)) # ??????????
pass
def __init__(self, length=0, defined=False, value=None):
""" Creates a bit string of a certain length, using a certain underlying
implementation. """
self.length = length
self.fitness = 0
self.isCacheValid = False
self.defined = bitarray(length)
self.data = bitarray(length)
if defined:
self.defined = None
else:
self.defined.setall(False)
if value is not None:
self.data = [bool(X) for X in value]
for i in range(1, length+1):
if bool(random.getrandbits(1)):
self.flip(i)
def test_wrong_method(self):
test_host = 'frob.nitz'
test_path = '/fnord'
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b'POST ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
request += b'Connection: Upgrade\r\n'
request += b'Upgrade: WebSocket\r\n'
request += b'Sec-WebSocket-Version: 13\r\n'
request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
request += b'\r\n'
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, ConnectionFailed)
def test_bad_connection(self):
test_host = 'frob.nitz'
test_path = '/fnord'
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
request += b'Connection: Zoinks\r\n'
request += b'Upgrade: WebSocket\r\n'
request += b'Sec-WebSocket-Version: 13\r\n'
request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
request += b'\r\n'
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, ConnectionFailed)
def test_bad_upgrade(self):
test_host = 'frob.nitz'
test_path = '/fnord'
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
request += b'Connection: Upgrade\r\n'
request += b'Upgrade: WebPocket\r\n'
request += b'Sec-WebSocket-Version: 13\r\n'
request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
request += b'\r\n'
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, ConnectionFailed)
def test_missing_version(self):
test_host = 'frob.nitz'
test_path = '/fnord'
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
request += b'Connection: Upgrade\r\n'
request += b'Upgrade: WebSocket\r\n'
request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
request += b'\r\n'
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, ConnectionFailed)
def test_accept_wrong_subprotocol(self):
test_host = 'frob.nitz'
test_path = '/fnord'
ws = WSConnection(SERVER)
nonce = bytes(random.getrandbits(8) for x in range(0, 16))
nonce = base64.b64encode(nonce)
request = b'GET ' + test_path.encode('ascii') + b' HTTP/1.1\r\n'
request += b'Host: ' + test_host.encode('ascii') + b'\r\n'
request += b'Connection: Upgrade\r\n'
request += b'Upgrade: WebSocket\r\n'
request += b'Sec-WebSocket-Version: 13\r\n'
request += b'Sec-WebSocket-Key: ' + nonce + b'\r\n'
request += b'Sec-WebSocket-Protocol: one, two\r\n'
request += b'\r\n'
ws.receive_bytes(request)
event = next(ws.events())
assert isinstance(event, ConnectionRequested)
assert event.proposed_subprotocols == ['one', 'two']
with pytest.raises(ValueError):
ws.accept(event, 'three')
def generate(
cls,
user_id=lambda: random.getrandbits(16),
contents=lambda: random_alphanumeric_string(length=8192),
expiry_time=lambda: int(time.time() + random.getrandbits(16)),
title=lambda: random_alphanumeric_string(),
language=lambda: random.choice(['python', 'css', 'javascript', 'text']),
password=lambda: random_alphanumeric_string(),
is_api_post=lambda: False,
):
return database.paste.create_new_paste(
contents=cls.random_or_specified_value(contents),
user_id=cls.random_or_specified_value(user_id),
expiry_time=cls.random_or_specified_value(expiry_time),
title=cls.random_or_specified_value(title),
language=cls.random_or_specified_value(language),
password=cls.random_or_specified_value(password),
is_api_post=cls.random_or_specified_value(is_api_post),
)
def generate(
cls,
paste_id=lambda: random.getrandbits(16),
file_name=lambda: random_alphanumeric_string(),
file_size=lambda: random.getrandbits(16),
mime_type=lambda: 'image/png',
file_data=lambda: random_alphanumeric_string(8192)
):
with mock.patch.object(database.attachment, '_store_attachment_file'):
return database.attachment.create_new_attachment(
paste_id=cls.random_or_specified_value(paste_id),
file_name=cls.random_or_specified_value(file_name),
file_size=cls.random_or_specified_value(file_size),
mime_type=cls.random_or_specified_value(mime_type),
file_data=cls.random_or_specified_value(file_data),
)
def random_marker():
""" A random marker used to identify a pytest run.
Some tests will spawn a private chain, the private chain will be one or
more ethereum nodes on a new subprocesss. These nodes may fail to start on
concurrent test runs, mostly because of port number conflicts, but even
though the test fails to start its private chain it may run interacting
with the geth process from a different test run! This leads to
unreasonable test errors.
This fixture creates a random marker used to distinguish pytest runs and
avoid test failures. Note this could fail for other reasons and fail to
detect unwanted interations if the user sets the PYTHONHASHSEED to the same
value.
"""
random_hex = hex(random.getrandbits(100))
# strip the leading 0x and trailing L
return random_hex[2:-1]
def circlepoint(c,r):
"""Generate a point [x,y] lying on a circle by the circle equation.
Args:
c (list): The position of circle centre.
r (float): The radius of the circle.
Returns:
A point lies on a circle. The point position is random.
"""
x = random.uniform(-r,r) # Randomly find the x position.
negative = bool(random.getrandbits(1)) # Randomly set whether the point is on the positive y side or negative side.
y = math.sqrt(r**2-x**2) # The equation of the circle.
if negative:
y = -y
return [x+c[0],y+c[1]]
def encode_params(self, base_url, method, params):
params = params.copy()
if self.token:
params['oauth_token'] = self.token
params['oauth_consumer_key'] = self.consumer_key
params['oauth_signature_method'] = 'HMAC-SHA1'
params['oauth_version'] = '1.0'
params['oauth_timestamp'] = str(int(time()))
params['oauth_nonce'] = str(getrandbits(64))
enc_params = urlencode_noplus(sorted(params.items()))
key = self.consumer_secret + "&" + urllib_parse.quote(self.token_secret, safe='~')
message = '&'.join(
urllib_parse.quote(i, safe='~') for i in [method.upper(), base_url, enc_params])
signature = (base64.b64encode(hmac.new(
key.encode('ascii'), message.encode('ascii'), hashlib.sha1)
.digest()))
return enc_params + "&" + "oauth_signature=" + urllib_parse.quote(signature, safe='~')
def gouv_api_csv(csv):
output_csv = ''
for line in csv.splitlines()[1:]:
output_csv += line
# Always fills the data of the result used for tests.
if bool(random.getrandbits(1)) or "-21.9419851,64.14602" in line: #"64,1460200,-21,9419851" in line:
output_csv += ',' + ','.join([
str(fake.latitude()), # "result_latitude"
str(fake.longitude()), # "result_longitude"
fake.address().replace('\n', ' ').replace(',', ' '), # "result_label"
str(random.randint(0, 300)), # "result_distance"
"housenumber", # "result_type"
"44109_XXXX_984eec", # "result_id"
fake.building_number(), # "result_housenumber"
fake.street_name(), # "result_name", the name of the street.
'', # "result_street", empty in most case.
fake.postcode(), # "result_postcode"
fake.city(), # "result_city"
'"' + fake.postcode()[:2] + ' ' + fake.department()[1] + ', ' + fake.region() + '"', # "result_context"
fake.postcode()[:2] + str(random.randint(100, 300)), # "result_citycode"
])
else:
output_csv += ',,,,,,,,,,,,,'
output_csv += '\n'
return output_csv
def result_properties():
properties = {}
properties["access"] = "yes"
if bool(random.getrandbits(1)):
properties["name"] = fake.company()
if bool(random.getrandbits(1)):
properties["phone"] = fake.phone_number()
if bool(random.getrandbits(1)):
properties["fee"] = "yes"
elif bool(random.getrandbits(1)):
properties["fee"] = "no"
if bool(random.getrandbits(1)):
properties["opening_hours"] = "Mo-Su 09:00-19:00"
elif bool(random.getrandbits(1)):
properties["opening_hours"] = "Mo-Th 12:30-16:30"
return properties
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
"""
This function follows from the following discussion:
`http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09`
It works like uuid.uuid4 except that tries to use os.urandom() if possible
and it XORs the output with the tokens uniquely associated with this machine.
"""
rand_longs = (random.getrandbits(64), random.getrandbits(64))
if HAVE_URANDOM:
urand_longs = _struct_2_long_long.unpack(fast_urandom16())
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
else:
byte_s = _struct_2_long_long.pack(rand_longs[0] ^ ctokens[0],
rand_longs[1] ^ ctokens[1])
return str(uuid.UUID(bytes=byte_s, version=4))
def web2py_uuid(ctokens=UNPACKED_CTOKENS):
"""
This function follows from the following discussion:
http://groups.google.com/group/web2py-developers/browse_thread/thread/7fd5789a7da3f09
It works like uuid.uuid4 except that tries to use os.urandom() if possible
and it XORs the output with the tokens uniquely associated with this machine.
"""
rand_longs = (random.getrandbits(64), random.getrandbits(64))
if HAVE_URANDOM:
urand_longs = struct.unpack('=QQ', fast_urandom16())
byte_s = struct.pack('=QQ',
rand_longs[0] ^ urand_longs[0] ^ ctokens[0],
rand_longs[1] ^ urand_longs[1] ^ ctokens[1])
else:
byte_s = struct.pack('=QQ',
rand_longs[0] ^ ctokens[0],
rand_longs[1] ^ ctokens[1])
return str(uuid.UUID(bytes=byte_s, version=4))
def test_random_read_write(self):
"""Test random read/write"""
q = Queue(self.path)
n = 0
for i in range(1000):
if random.random() < 0.5:
if n > 0:
q.get_nowait()
q.task_done()
n -= 1
else:
with self.assertRaises(Empty):
q.get_nowait()
else:
q.put('var%d' % random.getrandbits(16))
n += 1