def test_alice_finds_ursula(alice, ursulas):
"""
With the help of any Ursula, Alice can find a specific Ursula.
"""
ursula_index = 1
all_ursulas = blockchain_client._ursulas_on_blockchain
getter = alice.server.get(all_ursulas[ursula_index])
loop = asyncio.get_event_loop()
value = loop.run_until_complete(getter)
_signature, _ursula_pubkey_sig, _hrac, interface_info = dht_value_splitter(value.lstrip(b"uaddr-"),
return_remainder=True)
port = msgpack.loads(interface_info)[0]
assert port == URSULA_PORT + ursula_index
python类loads()的实例源码
def test_base_api_list_filter(http_client, base_url, url_param, cnt, monkeypatch):
monkeypatch.setattr(ApiListTestHandler, 'get_schema_input',
{
})
res = await http_client.fetch(base_url + '/test/api_test_model/?%s' % url_param)
assert res.code == 200
data = json.loads(res.body.decode())
assert data['success']
assert data['errors'] == []
assert len(data['result']['items']) == cnt
def test_base_api_list_prefetch(http_client, base_url, test_data, app_base_handlers, total):
# Create test FK models
for i in range(5):
await app_base_handlers.objects.create(ApiTestModelFK, tf_foreign_key=test_data[0])
res = await http_client.fetch(base_url + '/test/api_test_model_prefetch/?total=%s' % total)
assert res.code == 200
data = json.loads(res.body.decode())
assert data['success']
assert data['errors'] == []
# Check prefetch
assert len(data['result']['items'][0]['rel_items']) == 5
def test_base_api_list_filter_default(http_client, base_url, monkeypatch):
monkeypatch.setattr(ApiListTestHandler, 'default_filter', {'tf_integer__gt': 0})
monkeypatch.setattr(ApiListTestHandler, 'default_order_by', ('tf_text',))
res = await http_client.fetch(base_url + '/test/api_test_model/')
assert res.code == 200
data = json.loads(res.body.decode())
assert data['success']
assert data['errors'] == []
assert len(data['result']['items']) == 2
def test_base_api_list_force_total_header(http_client, base_url):
res = await http_client.fetch(base_url + '/test/api_test_model/', headers={'X-Total': ''})
assert res.code == 200
data = json.loads(res.body.decode())
assert data['errors'] == []
assert data['success']
assert data['pagination']['total'] == len(data['result']['items']) == len(TEST_DATA)
def test_base_api_list_force_total_query(http_client, base_url):
res = await http_client.fetch(base_url + '/test/api_test_model/?total=1')
assert res.code == 200
data = json.loads(res.body.decode())
assert data['errors'] == []
assert data['success']
assert data['pagination']['total'] == len(data['result']['items']) == len(TEST_DATA)
def test_base_api_list_filter_bad_request1(http_client, base_url, url_param):
with pytest.raises(HTTPError) as e:
await http_client.fetch(base_url + '/test/api_test_model/?%s' % url_param)
assert e.value.code == 400
data = json.loads(e.value.response.body.decode())
assert data['result'] is None
assert not data['success']
assert len(data['errors']) == 1
assert data['errors'][0]['message'] == 'Bad query arguments'
def test_base_api_list_filter_bad_request1(http_client, base_url, url_param):
with pytest.raises(HTTPError) as e:
await http_client.fetch(base_url + '/test/api_test_model/?%s' % url_param)
assert e.value.code == 400
data = json.loads(e.value.response.body.decode())
assert data['result'] is None
assert not data['success']
assert len(data['errors']) == 1
assert '<' in data['errors'][0]['detail']
assert '>' in data['errors'][0]['detail']
def test_base_api_list_bad_request(http_client, base_url, body, message):
with pytest.raises(HTTPError) as e:
await http_client.fetch(base_url + '/test/api_test_model/', method='POST', body=body)
assert e.value.code == 400
data = json.loads(e.value.response.body.decode())
assert data['result'] is None
assert not data['success']
for error in data['errors']:
print(error)
assert error['message'] == message
def test_base_api_list_bad_fk(http_client, base_url):
# Create model with invalid FK
data = {
'tf_foreign_key': 1
}
with pytest.raises(HTTPError) as e:
await http_client.fetch(base_url + '/test/api_test_model_fk/', method='POST', body=json.dumps(data).encode())
assert e.value.code == 400
data = json.loads(e.value.response.body.decode())
assert data['result'] is None
assert not data['success']
assert len(data['errors']) == 1
assert data['errors'][0]['message'] == 'Invalid parameters'
def test_base_api_list_bad_fk_invalid_integer(http_client, base_url):
# Create model with invalid FK
data = {
'tf_foreign_key': ''
}
with pytest.raises(HTTPError) as e:
await http_client.fetch(base_url + '/test/api_test_model_fk/', method='POST', body=json.dumps(data).encode())
assert e.value.code == 400
data = json.loads(e.value.response.body.decode())
assert data['result'] is None
assert not data['success']
assert len(data['errors']) == 1
assert data['errors'][0]['message'] == 'Validation failed'
def test_base_api_list_post(http_client, base_url, app_base_handlers):
data = TEST_INVALID_DATA[0]
resp = await http_client.fetch(base_url + '/test/api_test_model/', method='POST',
body=json.dumps(data, default=json_serial).encode())
assert resp.code == 400
data = json.loads(e.value.response.body.decode())
assert data['result'] is None
assert not data['success']
assert len(data['errors']) == 1
assert data['errors'][0]['message'] == 'Invalid parameters'
def test_base_api_list_post(http_client, base_url, app_base_handlers):
data = TEST_DATA[0]
resp = await http_client.fetch(base_url + '/test/api_test_model/', method='POST',
body=json.dumps(data, default=json_serial).encode())
assert resp.code == 200
data = json.loads(resp.body.decode())
assert data['errors'] == []
assert data['success']
item_id = data['result']['id']
# Fetch item from database
await app_base_handlers.objects.get(ApiTestModel, id=item_id)
def test_base_api_item_get(http_client, base_url, test_data):
resp = await http_client.fetch(base_url + '/test/api_test_model/%s' % test_data[0].id)
assert resp.code == 200
data = json.loads(resp.body.decode())
assert data['success']
assert data['errors'] == []
for k, v in TEST_DATA[0].items():
if isinstance(v, datetime.datetime):
assert data['result'][k] == v.isoformat()
else:
assert data['result'][k] == v
def test_base_api_list_overridden_orderby(http_client, base_url):
data = TEST_DATA[0]
await http_client.fetch(base_url + '/test/api_test_model/', method='POST',
body=json.dumps(data, default=json_serial).encode())
res = await http_client.fetch(base_url + '/test/api_test_model_overridden_orderby/?order_by=ololo')
assert res.code == 200
data = json.loads(res.body.decode())
assert data['result'] == {'items': []}
assert data['success']
def test_base_api_item_delete(http_client, base_url, app_base_handlers, test_data):
resp = await http_client.fetch(base_url + '/test/api_test_model/%s' % test_data[0].id, method='DELETE')
assert resp.code == 200
data = json.loads(resp.body.decode())
assert data['success']
assert data['errors'] == []
assert data['result'] == 'Item deleted'
# Check that item has been deleted
with pytest.raises(ApiTestModel.DoesNotExist):
await app_base_handlers.objects.get(ApiTestModel, id=test_data[0].id)
def test_api_list_validate_get(http_client, base_url, monkeypatch):
monkeypatch.setattr(ApiListTestHandler, 'get_schema_input',
{
'type': 'object',
'additionalProperties': False,
'properties': {}
})
with pytest.raises(HTTPError) as e:
await http_client.fetch(base_url + '/test/api_test_model/?a=1')
assert e.value.code == 400
data = json.loads(e.value.response.body.decode())
assert not data['success']
assert len(data['errors']) == 1
assert data['errors'][0]['message'] == 'Validation failed'
def _record_pupil(self):
while self.running:
topic = self.socket.recv_string()
payload = serializer.loads(self.socket.recv(), encoding='utf-8')
samples_lock.acquire()
self.samples.append(('pupil', local_clock(), payload['diameter']))
samples_lock.release()
print('Terminating pupil tracker recording.')
def reset(self):
self.status = READY
self._num_recv = 0
self._drained = False
def run():
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.set_hwm(32)
socket.RCVTIMEO = 1
socket.connect(self._address)
while self.status != DRAINED:
try:
packet = socket.recv(copy=False)
sample = msgpack.loads(packet)
except zmq.error.Again:
sleep(0.1)
sample = None
if sample == b'END':
self._drained = True
elif sample is not None:
self._num_recv += 1
sample['__process_id__'] = mp.current_process().name
sample['__recv_count__'] = self._num_recv
self._queue.put(sample)
socket.close()
self._thread = threading.Thread(target=run)
self._thread.start()
def get(self):
try:
sample = self._socket.recv(copy=False)
sample = msgpack.loads(sample)
except zmq.error.Again:
sample = None
if sample == b'END':
sample = None
self._drained = True
elif sample is not None:
self._num_recv += 1
sample['__process_id__'] = mp.current_process().name
sample['__recv_count__'] = self._num_recv
self.status = WAITING
return sample