def test_one(self):
"""Test that accessing one result raises exception"""
yqlobj.one()
python类raises()的实例源码
def test_send_verification_with_no_number(self):
""" Test that send_verification with no number specified
raises a ValueError
"""
self.service.send_verification('', self.request)
def test_create_url_with_no_key(self):
""" Test that create_url with no key
raises ValueError
"""
self.service.create_url(self.request, None)
def test_bad_watch_func2(self):
counter = 0
@self.client.DataWatch(self.path)
def changed(d, stat):
if counter > 0:
raise Exception("oops")
raises(Exception)(changed)
counter += 1
self.client.set(self.path, b'asdfasdf')
def test_bad_children_watch_func(self):
counter = 0
@self.client.ChildrenWatch(self.path)
def changed(children):
if counter > 0:
raise Exception("oops")
raises(Exception)(changed)
counter += 1
self.client.create(self.path + '/' + 'smith')
def test_exception(self):
from kazoo.exceptions import NoNodeError
watcher = self._makeOne(self.client, self.path, 0.1)
result = watcher.start()
@raises(NoNodeError)
def testit():
result.get()
testit()
def test_get_with_no_block(self):
handler = eventlet_handler.SequentialEventletHandler()
@raises(handler.timeout_exception)
def test_no_block(r):
r.get(block=False)
with start_stop_one(handler):
r = handler.async_result()
test_no_block(r)
r.set(1)
self.assertEqual(1, r.get())
def test_bad_deserialization(self):
async_object = self.client.handler.async_result()
self.client._queue.append(
(Delete(self.client.chroot, -1), async_object))
self.client._connection._write_sock.send(b'\0')
@raises(ValueError)
def testit():
async_object.get()
testit()
def test_read_only(self):
from kazoo.exceptions import NotReadOnlyCallError
from kazoo.protocol.states import KeeperState
client = self.client
states = []
ev = threading.Event()
@client.add_listener
def listen(state):
states.append(state)
if client.client_state == KeeperState.CONNECTED_RO:
ev.set()
try:
self.cluster[1].stop()
self.cluster[2].stop()
ev.wait(6)
eq_(ev.is_set(), True)
eq_(client.client_state, KeeperState.CONNECTED_RO)
# Test read only command
eq_(client.get_children('/'), [])
# Test error with write command
@raises(NotReadOnlyCallError)
def testit():
client.create('/fred')
testit()
# Wait for a ping
time.sleep(15)
finally:
client.remove_listener(listen)
self.cluster[1].run()
self.cluster[2].run()
def test_version_no_connection(self):
@raises(ConnectionLoss)
def testit():
self.client.server_version()
self.client.stop()
testit()
def test_bad_creates(self):
args_list = [(True,), ('/smith', 0), ('/smith', b'', 'bleh'),
('/smith', b'', None, 'fred'),
('/smith', b'', None, True, 'fred')]
@raises(TypeError)
def testit(args):
t = self.client.transaction()
t.create(*args)
for args in args_list:
testit(args)
def test_bad_deletes(self):
args_list = [(True,), ('/smith', 'woops'), ]
@raises(TypeError)
def testit(args):
t = self.client.transaction()
t.delete(*args)
for args in args_list:
testit(args)
def test_bad_sets(self):
args_list = [(42, 52), ('/smith', False), ('/smith', b'', 'oops')]
@raises(TypeError)
def testit(args):
t = self.client.transaction()
t.set_data(*args)
for args in args_list:
testit(args)
def test_bad_checks(self):
args_list = [(42, 52), ('/smith', 'oops')]
@raises(TypeError)
def testit(args):
t = self.client.transaction()
t.check(*args)
for args in args_list:
testit(args)
def test_bad_commit(self):
t = self.client.transaction()
@raises(ValueError)
def testit():
t.commit()
t.committed = True
testit()
def test_exception_raising(self):
h = self._makeOne()
@raises(h.timeout_exception)
def testit():
raise h.timeout_exception("This is a timeout")
testit()
def test_get_with_nowait(self):
mock_handler = mock.Mock()
async = self._makeOne(mock_handler)
timeout = self._makeHandler().timeout_exception
@raises(timeout)
def test_it():
async.get(block=False)
test_it()
@raises(timeout)
def test_nowait():
async.get_nowait()
test_nowait()
def test_exception_raising(self):
h = self._makeOne()
@raises(h.timeout_exception)
def testit():
raise h.timeout_exception("This is a timeout")
testit()
def test_DatasetBase():
ds = DatasetBase()
nt.raises(NotImplementedError, ds.segments_info)
nt.raises(NotImplementedError, ds.video_info)
def test_custom_encoding_text(self):
test_file = self.test_dir / 'test_file.txt'
with stor.open(test_file, mode='w', encoding='utf-16') as fp:
fp.write(STRING_STRING)
with stor.open(test_file, mode='r', encoding='utf-16') as fp:
result = fp.read()
assert result == STRING_STRING
with pytest.raises(UnicodeDecodeError):
with stor.open(test_file, mode='r', encoding='utf-8') as fp:
result = fp.read()