def test_verify_user(self):
"""
Test verify user if passed user is unverified in group
"""
usrmgr = self.__get_dummy_object()
config = {"foogroup": {"unverified": ["@foouser"]}}
self.__set_config(usrmgr, config)
with patch.object(usrmgr, "_UserManager__load_config"),\
patch.object(usrmgr, "_UserManager__save_config"):
result = usrmgr.verify_user(1337, "@foouser", "foogroup")
self.assertTrue(result)
expected_config = {
"foogroup": {"users": [{"id": 1337,
"username": "@foouser"}]}
}
self.assertEqual(usrmgr.config, expected_config)
python类object()的实例源码
def test_add_user_already_in_grp(self):
"""
Test adduser if the user is already in the passed group
"""
usrmgr = self.__get_dummy_object()
config = {"foogroup": {"unverified": ["@foouser"]}}
self.__set_config(usrmgr, config)
with patch.object(usrmgr, "_UserManager__load_config"),\
patch.object(usrmgr, "_UserManager__save_config"):
result = usrmgr.add_user("@foouser", "foogroup")
self.assertFalse(result)
def test_add_user_verified(self):
"""
Test adduser if the username and the user_id is passed
"""
usrmgr = self.__get_dummy_object()
self.__set_config(usrmgr, {})
with patch.object(usrmgr, "_UserManager__load_config"),\
patch.object(usrmgr, "_UserManager__save_config"):
result = usrmgr.add_user("@foouser", "foogroup", user_id=1337)
self.assertTrue(result)
expected_config = {
"foogroup": {"users": [{"id": 1337,
"username": "@foouser"}]}
}
self.assertEqual(usrmgr.config, expected_config)
def make_test_handler(testcase, *args, **kwargs):
"""
Returns a TestHandler which will be used by the given testcase. This
handler can be used to test log messages.
Parameters
----------
testcase: unittest.TestCase
The test class in which the log handler will be used.
*args, **kwargs
Forwarded to the new TestHandler object.
Returns
-------
handler: logbook.TestHandler
The handler to use for the test case.
"""
handler = TestHandler(*args, **kwargs)
testcase.addCleanup(handler.close)
return handler
def setUp(self):
def test_view():
pass # pragma: no cover
root_patterns = [
url(r'^foo/$', url, name='foo'),
url(r'^((?:[\w\-]+/)*)$', url, name='wagtail_serve'),
url(r'^bar/$', url, name='bar'),
]
self.patcher = patch.object(
wagtail.wagtailcore.urls,
'urlpatterns',
root_patterns
)
self.patcher.start()
self.addCleanup(self.patcher.stop)
reload(wagtailsharing.urls)
self.urlpatterns = wagtailsharing.urls.urlpatterns
def test_replace_loggroup_section(mocked_configparser_class):
config_dir = api_utils.user_config_dir(cli.lecli.__name__)
if not os.path.exists(api_utils.CONFIG_FILE_PATH):
if not os.path.exists(config_dir):
os.makedirs(config_dir)
loggroups_section = api_utils.LOGGROUPS_SECTION
config_parser_mock = mocked_configparser_class.return_value
config_parser_mock.add_section(loggroups_section)
with patch.object(api_utils.CONFIG, 'items',
return_value=[('test-log-group-favs', ["test-log-key1", "test-log-key2"])]):
api_utils.replace_loggroup_section()
assert not api_utils.CONFIG.has_section(loggroups_section)
assert config_parser_mock.has_section(api_utils.CLI_FAVORITES_SECTION)
try:
os.remove(api_utils.CONFIG_FILE_PATH)
os.rmdir(config_dir)
except OSError:
pass
def test_valid_server_pool_add_slot(query_mock, add_mo_mock, commit_mock):
query_mock.return_value = ComputePool(parent_mo_or_dn="org-root",
name="test-pool")
add_mo_mock.return_value = True
commit_mock.return_value = True
handle = UcsHandle('169.254.1.1', 'admin', 'password')
# Scenario: Default parameters
pool_retval = server_pool_add_slot(handle, 1, 8)
# Verify we were passed back the correct object type
assert isinstance(pool_retval, ComputePooledSlot)
# Verify the ID we gave it was assigned correctly
assert pool_retval.chassis_id == str(1)
assert pool_retval.slot_id == str(8)
# Patch UcsHandle.commit to simulate ucsm interaction w/o real ucsm
def test_non_backoff_exception(
self, container_factory, rabbit_config, rpc_proxy
):
""" Non-backoff exceptions are handled normally
"""
class Boom(Exception):
pass
class Service(object):
name = "service"
@rpc
def method(self):
raise Boom()
container = container_factory(Service, rabbit_config)
container.start()
with pytest.raises(RemoteError) as exc_info:
rpc_proxy.service.method()
assert exc_info.value.exc_type == "Boom"
def container(self, container_factory):
class Service(object):
name = "service"
@rpc
def nothing_expected(self):
raise Backoff()
@rpc(expected_exceptions=self.UserException)
def something_expected(self):
raise Backoff()
@rpc(expected_exceptions=(self.UserException, Backoff))
def backoff_expected(self):
raise Backoff()
config = {'AMQP_URI': 'memory://localhost'}
container = container_factory(Service, config)
return container
def entrypoint_tracker():
class CallTracker(object):
def __init__(self):
self.calls = []
def __len__(self):
return len(self.calls)
def track(self, **call):
self.calls.append(call)
def get_results(self):
return [call['result'] for call in self.calls]
def get_exceptions(self):
return [call['exc_info'] for call in self.calls]
return CallTracker()
def container(self, container_factory, rabbit_config, counter):
class Service(object):
name = "service"
backoff = BackoffPublisher()
@rpc
def slow(self):
if counter["slow"].increment() <= 1:
raise SlowBackoff()
return "slow"
@rpc
def quick(self):
if counter["quick"].increment() <= 1:
raise QuickBackoff()
return "quick"
container = container_factory(Service, rabbit_config)
container.start()
return container
def container(self, container_factory, rabbit_config, counter):
class Service(object):
name = "service"
backoff = BackoffPublisher()
@rpc
def bad(self):
class BadBackoff(Backoff):
schedule = (-10, )
raise BadBackoff()
container = container_factory(Service, rabbit_config)
container.start()
return container
def test_noop_model_direct_update(self):
""" Tests that calling update on a model with no changes will do nothing. """
m0 = TestUpdateModel.create(count=5, text='monkey')
with patch.object(self.session, 'execute') as execute:
m0.update()
assert execute.call_count == 0
with patch.object(self.session, 'execute') as execute:
m0.update(count=5)
assert execute.call_count == 0
with patch.object(self.session, 'execute') as execute:
m0.update(partition=m0.partition)
with patch.object(self.session, 'execute') as execute:
m0.update(cluster=m0.cluster)
def test_noop_model_assignation_update(self):
""" Tests that assigning the same value on a model will do nothing. """
# Create object and fetch it back to eliminate any hidden variable
# cache effect.
m0 = TestUpdateModel.create(count=5, text='monkey')
m1 = TestUpdateModel.get(partition=m0.partition, cluster=m0.cluster)
with patch.object(self.session, 'execute') as execute:
m1.save()
assert execute.call_count == 0
with patch.object(self.session, 'execute') as execute:
m1.count = 5
m1.save()
assert execute.call_count == 0
with patch.object(self.session, 'execute') as execute:
m1.partition = m0.partition
m1.save()
assert execute.call_count == 0
with patch.object(self.session, 'execute') as execute:
m1.cluster = m0.cluster
m1.save()
assert execute.call_count == 0
def test_only_set_values_is_updated(self):
"""
Test the updates work as expected when an object is deleted
@since 3.9
@jira_ticket PYTHON-657
@expected_result the non updated column is None and the
updated column has the set value
@test_category object_mapper
"""
ModelWithDefault.create(id=1, mf={1: 1}, dummy=1).save()
item = ModelWithDefault.filter(id=1).first()
ModelWithDefault.objects(id=1).delete()
item.mf = {1: 2}
udt, udt_default = UDT(age=1, mf={2:3}), UDT(age=1, mf={2:3})
item.udt, item.udt_default = udt, udt_default
item.save()
self.assertEqual(ModelWithDefault.get()._as_dict(),
{'id': 1, 'dummy': None, 'mf': {1: 2}, "udt": udt, "udt_default": udt_default})
def test_speculative_and_timeout(self):
"""
Test to ensure the timeout is honored when using speculative execution
@since 3.10
@jira_ticket PYTHON-750
@expected_result speculative retries be schedule every fixed period, during the maximum
period of the timeout.
@test_category metadata
"""
# We mock this so no messages are sent, otherwise a reponse might arrive
# and we would not know how many hosts we queried
with patch.object(Connection, "send_msg", return_value = 100) as mocked_send_msg:
statement = SimpleStatement("INSERT INTO test3rf.test (k, v) VALUES (0, 1);", is_idempotent=True)
# An OperationTimedOut is placed here in response_future,
# that's why we can't call session.execute,which would raise it, but
# we have to directly wait for the event
response_future = self.session.execute_async(statement, execution_profile='spec_ep_brr_lim', timeout=2.2)
response_future._event.wait(4)
self.assertIsInstance(response_future._final_exception, OperationTimedOut)
# This is because 2.2 / 0.4 + 1 = 6
self.assertEqual(len(response_future.attempted_hosts), 6)
def test_get_genesis_block_whenCalled_thenCreatesAndReturnsBlockWithGenesisTransactions(self):
genesis_transaction_one = Transaction(
"0",
"03dd1e57d05d9cab1d8d9b727568ad951ac2d9ecd082bc36f69e021b8427812924",
500000,
0
)
genesis_transaction_two = Transaction(
"0",
"03dd1e3defd36c8c0c7282ca1a324851efdb15f742cac0c5b258ef7b290ece9e5d",
500000,
0
)
genesis_transactions = [genesis_transaction_one, genesis_transaction_two]
with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
patch('crankycoin.blockchain.Block') as patched_Block:
patched_Block._calculate_block_hash.return_value = "mock_block_hash"
subject = Blockchain()
genesis_block = subject.get_genesis_block()
patched_Block.assert_called_once_with(0, genesis_transactions, 0, 0, 0)
def test_check_hash_and_hash_pattern_whenBlockHasValidHashAndPattern_thenReturnsTrue(self):
mock_block = Mock(Block)
transaction = Mock(Transaction)
transaction.source = "from"
transaction.timestamp = 1498923800
transaction.destination = "to"
transaction.amount = 50
transaction.signature = "signature"
transaction.tx_hash = "transaction_hash"
with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
patch.object(Blockchain, 'calculate_hash_difficulty', return_value=4) as patched_calculate_hash_difficulty:
mock_block.current_hash = "0000_valid_block_hash"
mock_block.index = 35
mock_block.previous_hash = "0000_valid_previous_hash"
mock_block.transactions = [transaction]
mock_block.nonce = 37
mock_block.timestamp = 12341234
subject = Blockchain()
resp = subject._check_hash_and_hash_pattern(mock_block)
self.assertIsNone(resp)
def test_check_hash_and_hash_pattern_whenBlockHasInvalidHash_thenReturnsFalse(self):
mock_block = Mock(Block)
transaction = Mock(Transaction)
transaction.source = "from"
transaction.timestamp = 1498923800
transaction.destination = "to"
transaction.amount = 50
transaction.signature = "signature"
transaction.tx_hash = "transaction_hash"
with patch.object(Blockchain, '__init__', return_value=None) as patched_init:
mock_block.current_hash = "0000_valid_block_hash"
mock_block.index = 35
mock_block.previous_hash = "0000_valid_previous_hash"
mock_block.transactions = [transaction]
mock_block.nonce = 37
mock_block.timestamp = 12341234
subject = Blockchain()
with self.assertRaises(InvalidHash) as context:
subject._check_hash_and_hash_pattern(mock_block)
self.assertTrue("Block Hash Mismatch" in str(context.exception))
def test_check_hash_and_hash_pattern_whenBlockHasInvalidPattern_thenReturnsFalse(self):
mock_block = Mock(Block)
transaction = Mock(Transaction)
transaction.source = "from"
transaction.timestamp = 1498923800
transaction.destination = "to"
transaction.amount = 50
transaction.signature = "signature"
transaction.tx_hash = "transaction_hash"
with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
patch.object(Blockchain, 'calculate_hash_difficulty', return_value=4) as patched_calculate_hash_difficulty:
mock_block.current_hash = "invalid_block_hash"
mock_block.index = 35
mock_block.previous_hash = "0000_valid_previous_hash"
mock_block.transactions = [transaction]
mock_block.nonce = 37
mock_block.timestamp = 12341234
subject = Blockchain()
with self.assertRaises(InvalidHash) as context:
subject._check_hash_and_hash_pattern(mock_block)
self.assertTrue("Incompatible Block Hash" in str(context.exception))
def test_mine_block_whenOneTransaction_andIncorrectTransactionHash_thenReturnsNone(self):
transaction = Mock(Transaction)
transaction.source = "from"
transaction.timestamp = 1498923800
transaction.destination = "to"
transaction.amount = 25
transaction.fee = 0.1
transaction.signature = "signature"
transaction.tx_hash = "incorrect_transaction_hash"
latest_block = Mock(Block)
latest_block.index = 35
latest_block.current_hash = "latest_block_hash"
with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
patch.object(Blockchain, 'get_latest_block', return_value=latest_block) as patched_get_latest_block, \
patch.object(Blockchain, 'pop_next_unconfirmed_transaction', side_effect=[transaction, None]) as patched_pop_next_unconfirmed_transaction:
subject = Blockchain()
resp = subject.mine_block("reward_address")
self.assertIsNone(resp)
self.assertEqual(patched_pop_next_unconfirmed_transaction.call_count, 2)
def test_mine_block_whenOneDuplicateTransaction_thenReturnsNone(self):
transaction = Mock(Transaction)
transaction.source = "from"
transaction.timestamp = 1498923800
transaction.destination = "to"
transaction.amount = 25
transaction.fee = 0.1
transaction.signature = "signature"
transaction.tx_hash = "transaction_hash"
block_id_with_same_transaction = 38
latest_block = Mock(Block)
latest_block.index = 35
latest_block.current_hash = "latest_block_hash"
with patch.object(Blockchain, '__init__', return_value=None) as patched_init, \
patch.object(Blockchain, 'get_latest_block', return_value=latest_block) as patched_get_latest_block, \
patch.object(Blockchain, 'pop_next_unconfirmed_transaction', side_effect=[transaction, None]) as patched_pop_next_unconfirmed_transaction, \
patch.object(Blockchain, 'find_duplicate_transactions', return_value=block_id_with_same_transaction) as patched_find_duplicate_transactions:
subject = Blockchain()
resp = subject.mine_block("reward_address")
self.assertIsNone(resp)
self.assertEqual(patched_pop_next_unconfirmed_transaction.call_count, 2)
patched_find_duplicate_transactions.asssert_called_once_with("transaction_hash")
def test_push_unconfirmed_transaction_thenPushesTransactionAndReturnsNone(self):
transaction_one = {
'from': 'from',
'timestamp': 1498923800,
'to': 'to',
'amount': 1,
'signature': 'signature_one',
'hash': "transaction_hash_one"
}
with patch.object(Blockchain, '__init__', return_value=None) as patched_init:
subject = Blockchain()
subject.unconfirmed_transactions = Queue()
resp = subject.push_unconfirmed_transaction(transaction_one)
self.assertTrue(resp)
self.assertEqual(subject.unconfirmed_transactions.qsize(), 1)
def test_broadcast_transaction_thenBroadcastsToAllNodes(self):
with patch.object(FullNode, '__init__', return_value=None) as patched_init, \
patch.object(FullNode, 'request_nodes_from_all') as patched_request_nodes_from_all, \
patch("crankycoin.time.time", return_value="1508823223") as patched_time_time, \
patch("crankycoin.requests.post") as patched_requests:
transaction = Transaction("source", "destination", 0, 0)
node = FullNode("127.0.0.1", "reward_address")
node.full_nodes = {"127.0.0.1", "127.0.0.2", "127.0.0.3"}
node.broadcast_transaction(transaction)
patched_request_nodes_from_all.assert_called_once()
patched_requests.assert_has_calls([
call("http://127.0.0.1:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
call("http://127.0.0.2:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
call("http://127.0.0.3:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'})
], True)
def test_broadcast_transaction_whenRequestException_thenFailsGracefully(self):
with patch.object(FullNode, '__init__', return_value=None) as patched_init, \
patch.object(FullNode, 'request_nodes_from_all') as patched_request_nodes_from_all, \
patch("crankycoin.time.time", return_value="1508823223") as patched_time_time, \
patch("crankycoin.requests.post", side_effect=requests.exceptions.RequestException()) as patched_requests:
transaction = Transaction("source", "destination", 0, 0)
node = FullNode("127.0.0.1", "reward_address")
node.full_nodes = {"127.0.0.1", "127.0.0.2", "127.0.0.3"}
node.broadcast_transaction(transaction)
patched_request_nodes_from_all.assert_called_once()
patched_requests.assert_has_calls([
call("http://127.0.0.1:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
call("http://127.0.0.2:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'}),
call("http://127.0.0.3:30013/transactions", json={'transaction': '{"amount": 0, "destination": "destination", "fee": 0, "signature": null, "source": "source", "timestamp": 1508823223, "tx_hash": null}'})
], True)
def test_request_block_whenIndexIsNumeric_thenRequestsCorrectBlockFromNode(self):
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = '{"nonce": 12345, "index": 29, "transactions": [], "timestamp": 1234567890, "current_hash": "current_hash", "previous_hash": "previous_hash"}'
with patch.object(FullNode, '__init__', return_value=None) as patched_init, \
patch("crankycoin.node.Block.current_hash", new_callable=PropertyMock) as patched_block_current_hash, \
patch("crankycoin.requests.get", return_value=mock_response) as patched_requests:
patched_block_current_hash.return_value = "current_hash"
node = FullNode("127.0.0.1", "reward_address")
block = node.request_block("127.0.0.2", "30013", 29)
self.assertIsNotNone(block)
self.assertEqual(block.index, 29)
self.assertEqual(block.transactions, [])
self.assertEqual(block.previous_hash, "previous_hash")
self.assertEqual(block.current_hash, "current_hash")
self.assertEqual(block.timestamp, 1234567890)
self.assertEqual(block.nonce, 12345)
patched_requests.assert_called_once_with('http://127.0.0.2:30013/block/29')
def test__enter__(slot):
# returns its self
with patch.object(slot, '_get_connection', side_effects=[Mock(), Mock()]) as mock_gc, \
patch.object(slot, '_send_keepalive') as mock_ka:
assert slot == slot.__enter__(), 'Returns itself'
assert mock_gc.call_count == 2
assert call.set_isolation_level(0) in slot._normal_conn.method_calls, 'make sure we are in autocommit'
assert call.cursor() in slot._repl_conn.method_calls, 'we opened a cursor'
assert not mock_ka.called, "with no window we didn't start keep alive"
slot._keepalive_window = 1
with patch.object(slot, '_get_connection', side_effects=[Mock(), Mock()]) as mock_gc, \
patch.object(slot, '_send_keepalive') as mock_ka:
slot.__enter__()
assert mock_ka.called, " we started up keepalive"
def test_delete_slot(slot):
with patch.object(psycopg2.ProgrammingError, 'pgcode',
new_callable=PropertyMock,
return_value=psycopg2.errorcodes.UNDEFINED_OBJECT):
pe = psycopg2.ProgrammingError()
slot._repl_cursor.drop_replication_slot = Mock(side_effect=pe)
slot.delete_slot()
slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')
with patch.object(psycopg2.ProgrammingError, 'pgcode',
new_callable=PropertyMock,
return_value=-1):
pe = psycopg2.ProgrammingError()
slot._repl_cursor.create_replication_slot = Mock(side_effect=pe)
with pytest.raises(psycopg2.ProgrammingError) as e_info:
slot.delete_slot()
slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')
assert e_info.value.pgcode == -1
slot._repl_cursor.create_replication_slot = Mock(side_effect=Exception)
with pytest.raises(Exception):
slot.delete_slot()
slot._repl_cursor.drop_replication_slot.assert_called_with('pg2kinesis')
def test__init__():
mock_client = Mock()
with patch.object(boto3, 'client', return_value=mock_client):
error_response = {'Error': {'Code': 'ResourceInUseException'}}
mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream'))
StreamWriter('blah')
assert mock_client.create_stream.called
assert call.get_waiter('stream_exists') in mock_client.method_calls, "We handled stream existence"
error_response = {'Error': {'Code': 'Something else'}}
mock_client.create_stream = Mock(side_effect=ClientError(error_response, 'create_stream'))
mock_client.reset_mock()
with pytest.raises(ClientError):
StreamWriter('blah')
assert mock_client.create_stream.called
assert call.get_waiter('stream_exists') not in mock_client.method_calls, "never reached"
def test_noop_model_direct_update(self):
"""
Tests that calling update on a model with no changes will do nothing.
"""
m0 = TestUpdateModel.create(self.conn, count=5, text='monkey')
with patch.object(self.conn.session, 'execute') as execute:
m0.update(self.conn)
assert execute.call_count == 0
with patch.object(self.conn.session, 'execute') as execute:
m0.update(self.conn, count=5)
assert execute.call_count == 0
with self.assertRaises(ValidationError):
m0.update(self.conn, partition=m0.partition)
with self.assertRaises(ValidationError):
m0.update(self.conn, cluster=m0.cluster)