def test_update_details(debug, service):
service.current_iteration = 1
service.project_version = 1
service._cached = {'foo': 'bar'}
name = 'foo'
responses.add(
responses.GET,
'https://www.pivotaltracker.com/services/v5/projects/123',
json={'current_iteration_number': 1, 'name': name},
adding_headers={'X-Tracker-Project-Version': '2'},
)
responses.add(
responses.GET,
'https://www.pivotaltracker.com/services/v5/projects/123/iterations'
'/1?fields=%3Adefault%2Cvelocity%2Cstories',
json={'velocity': 10, 'stories': []}
)
result = service.update()
debug.assert_has_calls([
mock.call('fetching Tracker project data'),
mock.call('project updated, fetching iteration details'),
])
assert result == dict(velocity=10, stories={}, name=name)
python类call()的实例源码
def test_iam_role_policy(resource_action, get_template, get_properties, get_details, construct_policy, session,
attach_profile_to_role):
"""IAM Role Policy should match deployment type."""
get_properties.return_value = {'type': 'ec2'}
get_details.return_value.iam.return_value = {'group': 1, 'policy': 2, 'profile': 3, 'role': 4, 'user': 5}
assert create_iam_resources()
get_template.assert_called_with(EC2_TEMPLATE_NAME)
calls = [
mock.call(
mock.ANY,
action='create_role',
log_format=mock.ANY,
RoleName=mock.ANY,
AssumeRolePolicyDocument=get_template.return_value)
]
resource_action.assert_has_calls(calls)
def test_create_s3_event_multiple_buckets(mock_get_properties, mock_create_s3_event, mock_remove_perms, mock_boto3, mock_arn, mock_perms):
"""Try to create a lambda with two S3 triggers from different buckets."""
triggers = TRIGGERS_BUCKET_A + TRIGGERS_BUCKET_B
properties = get_properties_with_triggers(triggers)
mock_get_properties.return_value = properties
mock_arn.return_value = 'fake_arn'
events = LambdaEvent(app='test_app', env='test_env', region='us-east-1', prop_path='other')
events.create_lambda_events()
s3_calls = [
mock.call(app_name='test_app', env='test_env', region='us-east-1', bucket='my.shared.bucket', triggers=TRIGGERS_BUCKET_A),
mock.call(app_name='test_app', env='test_env', region='us-east-1', bucket='my.other.shared.bucket', triggers=TRIGGERS_BUCKET_B)
]
mock_create_s3_event.assert_has_calls(s3_calls, any_order=True)
def test_call(self, mocked_predict, mocked_load):
mocked_load.return_value = 'model'
settings = MagicMock()
settings.UNIQUE_IDS = ['number']
settings.CLASSIFIERS = {'answer': 42, 'another': 13}
core = Core(settings, self.adapter)
core.suspicions = MagicMock()
core()
# assert load and predict was called for each classifier
mocked_load.assert_has_calls((call(42), call(13)), any_order=True)
mocked_predict.assert_has_calls((
call('model', 'answer'),
call('model', 'another')
), any_order=True)
# assert suspicions.xz was created
expected_path = os.path.join('tmp', 'test', 'suspicions.xz')
core.suspicions.to_csv.assert_called_once_with(
expected_path,
compression='xz',
encoding='utf-8',
index=False
)
def test_interactive(self, mock_print, mock_markdown,
mock_print_heading, mock_import_readline):
def MockInputFactory(return_values):
_counter = -1
def mock_input(prompt=''):
nonlocal _counter
_counter += 1
if _counter < len(return_values):
return return_values[_counter]
elif _counter == len(return_values):
raise EOFError
else:
raise KeyboardInterrupt
return mock_input
return_values = ['foo', 'bar', 'baz']
with patch('builtins.input', MockInputFactory(return_values)):
cli.interactive(sentinel.RendererCls)
mock_import_readline.assert_called_with()
mock_print_heading.assert_called_with(sentinel.RendererCls)
mock_markdown.assert_called_with(['foo\n', 'bar\n', 'baz\n'],
sentinel.RendererCls)
calls = [call('\nrendered text', end=''), call('\nExiting.')]
mock_print.assert_has_calls(calls)
def test_normal_execution(self, mock_isfile, mock_glob):
""" Test the normal behaviour of the function.
"""
# Set the mocked function returned values.
mock_isfile.side_effect = [True]
mock_glob.return_value = ["/my/path/mock_output"]
# Test execution
fslreorient2std(**self.kwargs)
self.assertEqual([
mock.call(["which", "fslreorient2std"],
env={}, stderr=-1, stdout=-1),
mock.call(["fslreorient2std",
self.kwargs["input_image"],
self.kwargs["output_image"]],
cwd=None, env={}, stderr=-1, stdout=-1)],
self.mock_popen.call_args_list)
self.assertEqual(len(self.mock_env.call_args_list), 1)
def test_get_account_info(api):
await api.get_account_info()
assert api._make_request.call_args == call('get_account_info')
#
# class CoroutineContextManager(CoroutineMock):
# async def __aexit__(self, *args, **kwargs):
# pass
#
# async def __aenter__(self, *args, **kwargs):
# return Mock()
#
#
# async def test_make_request(bot_configuration, loop):
# api = Api(bot_configuration, CoroutineContextManager(), loop)
# api._logger = Mock()
# await api._make_request(api.endpoints.SEND_MESSAGE)
def test_credentials(self, mSession):
iSession = mSession.return_value = MockSession()
access = 'XXX'
secret = 'YYY'
account = '123456'
region = 'east'
credentials = {
'aws_access_key': access,
'aws_secret_key': secret,
'aws_account_id': account,
'aws_region': region
}
session, account_id = utils.create_session(credentials = credentials)
self.assertEqual(session, iSession)
self.assertEqual(account_id, account)
self.assertEqual(iSession.region, region)
call = mock.call(aws_access_key_id = access,
aws_secret_access_key = secret)
calls = [call]
self.assertEqual(mSession.mock_calls, calls)
def test_keys(self, mSession):
iSession = mSession.return_value = MockSession()
access = 'XXX'
secret = 'YYY'
account = '123456'
region = 'east'
credentials = {
'aws_access_key': access,
'aws_secret_key': secret,
'aws_account_id': account,
'aws_region': region
}
# Note: Same as test_credentials, except passing the arguments are key word args
session, account_id = utils.create_session(**credentials)
self.assertEqual(session, iSession)
self.assertEqual(account_id, account)
self.assertEqual(iSession.region, region)
call = mock.call(aws_access_key_id = access,
aws_secret_access_key = secret)
calls = [call]
self.assertEqual(mSession.mock_calls, calls)
def test_constructor(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
client.list_state_machines.return_value = {
'stateMachines':[{
'name': 'name',
'stateMachineArn': 'XXX'
}]
}
machine = StateMachine('name')
self.assertEqual(machine.arn, 'XXX')
calls = [
mock.call.list_state_machines()
]
self.assertEqual(client.mock_calls, calls)
def test_create_exists(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
client.list_state_machines.return_value = {
'stateMachines':[{
'name': 'name',
'stateMachineArn': 'XXX'
}]
}
machine = StateMachine('name')
with self.assertRaises(Exception):
machine.create('source', 'role')
self.assertEqual(machine.arn, 'XXX')
calls = [
mock.call.list_state_machines(),
]
self.assertEqual(client.mock_calls, calls)
def test_delete_exception(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
client.list_state_machines.return_value = {
'stateMachines':[]
}
machine = StateMachine('name')
with self.assertRaises(Exception):
machine.delete(True)
self.assertEqual(machine.arn, None)
calls = [
mock.call.list_state_machines(),
]
self.assertEqual(client.mock_calls, calls)
def test_start_doesnt_exist(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
client.list_state_machines.return_value = {
'stateMachines':[]
}
machine = StateMachine('name')
with self.assertRaises(Exception):
machine.start({}, 'run')
calls = [
mock.call.list_state_machines(),
]
self.assertEqual(client.mock_calls, calls)
def test_stop(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
client.list_state_machines.return_value = {
'stateMachines':[{
'name': 'name',
'stateMachineArn': 'XXX'
}]
}
machine = StateMachine('name')
machine.stop('arn', 'error', 'cause')
calls = [
mock.call.list_state_machines(),
mock.call.stop_execution(executionArn = 'arn',
error = 'error',
cause = 'cause')
]
self.assertEqual(client.mock_calls, calls)
def test_success(self, mCreateSession):
iSession = MockSession()
client = iSession.client('stepfunctions')
mCreateSession.return_value = (iSession, '123456')
task = TaskMixin()
task.token = 'token'
self.assertEqual(task.token, 'token')
task.success(None)
self.assertEqual(task.token, None)
call = mock.call.send_task_success(taskToken = 'token',
output = 'null')
self.assertEqual(client.mock_calls, [call])
def test_failure(self, mCreateSession):
iSession = MockSession()
client = iSession.client('stepfunctions')
mCreateSession.return_value = (iSession, '123456')
task = TaskMixin()
task.token = 'token'
self.assertEqual(task.token, 'token')
task.failure(None, None)
self.assertEqual(task.token, None)
call = mock.call.send_task_failure(taskToken = 'token',
error = None,
cause = None)
self.assertEqual(client.mock_calls, [call])
def test_run_generator(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
def target(input_):
yield
yield
return
# Just make sure the target is actually a generator
self.assertEqual(type(target(None)), types.GeneratorType)
task = TaskMixin(process = target)
task.handle_task('token', None)
self.assertEqual(task.token, None)
call = mock.call.send_task_success(taskToken = 'token',
output = 'null')
call_ = mock.call.send_task_heartbeat(taskToken = 'token')
calls = [call_, call_, call]
self.assertEqual(client.mock_calls, calls)
def test_run_exception(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
target = mock.MagicMock()
target.side_effect = BossError('cause')
task = TaskMixin(process = target)
task.handle_task('token', None)
self.assertEqual(task.token, None)
call = mock.call.send_task_failure(taskToken = 'token',
error = 'BossError',
cause = 'cause')
self.assertEqual(client.mock_calls, [call])
def test_create_activity(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
client.create_activity.return_value = {
'activityArn': 'XXX'
}
activity = ActivityMixin()
activity.name = 'name'
self.assertEqual(activity.arn, None)
activity.create_activity()
self.assertEqual(activity.arn, 'XXX')
calls = [
mock.call.create_activity(name = 'name')
]
self.assertEqual(client.mock_calls, calls)
def test_delete_activity(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
activity = ActivityMixin()
activity.arn = 'XXX'
activity.delete_activity()
self.assertEqual(activity.arn, None)
calls = [
mock.call.delete_activity(activityArn = 'XXX')
]
self.assertEqual(client.mock_calls, calls)
def test_poll_task(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
client.get_activity_task.return_value = {
'taskToken': 'YYY',
'input': '{}'
}
activity = ActivityMixin()
activity.arn = 'XXX'
token, input_ = activity.poll_task('worker')
self.assertEqual(token, 'YYY')
self.assertEqual(input_, {})
calls = [
mock.call.get_activity_task(activityArn = 'XXX',
workerName = 'worker')
]
self.assertEqual(client.mock_calls, calls)
def test_poll_task_no_work(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
client.get_activity_task.return_value = {
'taskToken': ''
}
activity = ActivityMixin()
activity.arn = 'XXX'
token, input_ = activity.poll_task('worker')
self.assertEqual(token, None)
self.assertEqual(input_, None)
calls = [
mock.call.get_activity_task(activityArn = 'XXX',
workerName = 'worker')
]
self.assertEqual(client.mock_calls, calls)
test_get_products_from_site_use_case.py 文件源码
项目:python-webscraping
作者: pgrangeiro
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_execute_calls_save_only_if_product_has_not_saved_before(self):
def side_effect():
return [
(1, 'name', 'title', 'url'),
(2, 'Name', 'Title', 'Url'),
(1, 'name', 'title', 'url'),
]
self.crawler_uc.execute.side_effect = side_effect
GetProductsFromSiteUseCase.execute()
assert 2 == self.saveinfo_uc.execute.call_count
self.saveinfo_uc.execute.assert_has_calls([
call('name', 'title', 'url'),
call('Name', 'Title', 'Url'),
])
def test_memoize():
"""Test Caching with no Time-To-Live (TTL)."""
global x
for fn in memoize_fns:
x = 0
assert_eq(4, fn(1))
assert_eq(1, x)
assert_eq(8, fn(2, 4))
assert_eq(2, x)
# should not result in another call
assert_eq(8, fn(2, z=4))
assert_eq(2, x)
assert_eq(8, fn(y=2, z=4))
assert_eq(2, x)
fn.clear_cache()
assert_eq(4, fn(1))
assert_eq(3, x)
def test_memoize_with_ttl_unhashable():
"""Test Caching with TTL using dictionary arguments."""
global x
x = 0
assert_eq(12, cached_fn_with_ttl_unhashable(2))
assert_eq(1, x)
assert_eq(10, cached_fn_with_ttl_unhashable(1, z={'a': 2, 'b': 3, 'c': 5}))
assert_eq(2, x)
# should not result in another call
assert_eq(10, cached_fn_with_ttl_unhashable(1, z={'a': 2, 'b': 3, 'c': 5}))
assert_eq(2, x)
assert_eq(12, cached_fn_with_ttl_unhashable(2, z={'a': 1, 'b': 2, 'c': 3}))
assert_eq(2, x)
cached_fn_with_ttl_unhashable.clear_cache()
assert_eq(12, cached_fn_with_ttl_unhashable(2))
assert_eq(3, x)
def test_discriminator(discriminator, tf_module):
with TmpMock(tf_module, 'variable_scope') as mock_variable_scope:
image = tf.placeholder(tf.float32, [None, 28, 28, 3])
output, logits = discriminator(image)
_assert_tensor_shape(output, [None, 1], 'Discriminator Training(reuse=false) output')
_assert_tensor_shape(logits, [None, 1], 'Discriminator Training(reuse=false) Logits')
assert mock_variable_scope.called,\
'tf.variable_scope not called in Discriminator Training(reuse=false)'
assert mock_variable_scope.call_args == mock.call('discriminator', reuse=False), \
'tf.variable_scope called with wrong arguments in Discriminator Training(reuse=false)'
mock_variable_scope.reset_mock()
output_reuse, logits_reuse = discriminator(image, True)
_assert_tensor_shape(output_reuse, [None, 1], 'Discriminator Inference(reuse=True) output')
_assert_tensor_shape(logits_reuse, [None, 1], 'Discriminator Inference(reuse=True) Logits')
assert mock_variable_scope.called, \
'tf.variable_scope not called in Discriminator Inference(reuse=True)'
assert mock_variable_scope.call_args == mock.call('discriminator', reuse=True), \
'tf.variable_scope called with wrong arguments in Discriminator Inference(reuse=True)'
def test_generator(generator, tf_module):
with TmpMock(tf_module, 'variable_scope') as mock_variable_scope:
z = tf.placeholder(tf.float32, [None, 100])
out_channel_dim = 5
output = generator(z, out_channel_dim)
_assert_tensor_shape(output, [None, 28, 28, out_channel_dim], 'Generator output (is_train=True)')
assert mock_variable_scope.called, \
'tf.variable_scope not called in Generator Training(reuse=false)'
assert mock_variable_scope.call_args == mock.call('generator', reuse=False), \
'tf.variable_scope called with wrong arguments in Generator Training(reuse=false)'
mock_variable_scope.reset_mock()
output = generator(z, out_channel_dim, False)
_assert_tensor_shape(output, [None, 28, 28, out_channel_dim], 'Generator output (is_train=False)')
assert mock_variable_scope.called, \
'tf.variable_scope not called in Generator Inference(reuse=True)'
assert mock_variable_scope.call_args == mock.call('generator', reuse=True), \
'tf.variable_scope called with wrong arguments in Generator Inference(reuse=True)'
def test_normal_start(self, runner, process, getLogger):
assert isinstance(process, mock.MagicMock)
# Arrange
process_count = 4
logger = getLogger.return_value = mock.MagicMock()
expected_result = runner.return_value = "SOME_RETURN_VALUE"
result_backend = {
'result_backend_class': 'easy_job.result_backends.dummy.DummyBackend'
}
# Act
initializer = RabbitMQInitializer(count=process_count, logger="logger", result_backend=result_backend,
options={'serialization_method': 'json', 'queue_name': 'queue',
'rabbitmq_configs': {}})
result = initializer.start()
# Assert
logger.log.assert_called_once_with(10, "Starting {} RabbitMQ workers...".format(process_count))
process.assert_has_calls([mock.call(args=(mock.ANY,), target=mock.ANY), mock.call().start()] * process_count)
self.assertEqual(result, expected_result)
_, args, kwargs = process.mock_calls[0]
worker_object = kwargs['args'][0]
self.assertIsInstance(worker_object, RabbitMQWorker)
def test_actually_calling_retrying_library_in_a_failing_condition(self, ):
# Arrange
function_side_effects = [IndentationError(), SyntaxError(), OSError()]
func = mock.MagicMock(side_effect=function_side_effects)
from easy_job.workers.common import call_with_retry as target
retrying_params = {
"stop_max_attempt_number": 3
}
args = (1, 2, 3)
kwargs = {"p": "arameter"}
# Act
with self.assertRaises(OSError):
target(func, args, kwargs, retry_policy=retrying_params)
# Assert
func.assert_has_calls([mock.call(*args, **kwargs),
mock.call(*args, **kwargs),
mock.call(*args, **kwargs)]
)
def test_case_study_create_api_success(
mock_create_case_study, supplier_case_study_end_to_end, sso_user,
all_case_study_data, api_response_200
):
mock_create_case_study.return_value = api_response_200
response = supplier_case_study_end_to_end()
assert response.status_code == http.client.FOUND
assert response.get('Location') == reverse('company-detail')
data = {
**all_case_study_data,
'image_one': ANY, 'image_two': ANY, 'image_three': ANY,
}
# django converts uploaded files to UploadedFile, which makes
# `assert_called_once_with` tricky.
assert mock_create_case_study.call_count == 1
assert mock_create_case_study.call_args == call(
data=data,
sso_session_id=sso_user.session_id,
)