def test_intersection_com_mock_2(self):
ls = LineString([(1, 1, 9.48024060e+08), (2, 2, 9.49363260e+08),
(3, 1, 9.51868860e+08)])
poly = Polygon([(1, 1), (1, 3), (4, 3), (4, 1), (1, 1)])
self.traj2.intersection_shapely = MagicMock(return_value=ls)
response = self.traj2.intersection_shapely(poly)
ls = np.array(ls)
trajMock = self.traj2.to_Trajectory(response)
traj = Trajectory(ls[:, 0], ls[:, 1], ls[:, 2])
assert (np.array_equal(trajMock.getX(), traj.getX()))
assert (np.array_equal(trajMock.getY(), traj.getY()))
assert (np.array_equal(trajMock.getTime(), traj.getTime()))
python类MagicMock()的实例源码
def test_fetch_from_url_or_retry_post_json(self):
# mocked requests
identifier = "1csb, 2pah"
base_url = c.http_pdbe
endpoint_url = "api/pdb/entry/summary/"
response = response_mocker(kwargs={}, base_url=base_url,
endpoint_url=endpoint_url,
content_type='application/octet-stream',
post=True, data=identifier)
self.fetch_from_url_or_retry = MagicMock(return_value=response)
url = base_url + endpoint_url + identifier
r = self.fetch_from_url_or_retry(url, json=True, post=True, data=identifier,
header={'application/octet-stream'},
retry_in=None, wait=0,
n_retries=10, stream=False).json()
self.assertEqual(r, json.loads('{"data": "some json formatted output"}'))
message_handlers_test.py 文件源码
项目:meetup-facebook-bot
作者: Stark-Mountain
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_handle_talk_rate_command(self, talk_class_mock, send_rate_mock):
messaging_event = self.generate_postback('rate talk 1')
talk_mock = MagicMock(talk_id=1)
talk_liked_mock = False
talk_mock.is_liked_by = MagicMock(return_value=False)
server.db_session.query(talk_class_mock).get = MagicMock(return_value=talk_mock)
message_handlers.handle_talk_rate_command(messaging_event, self.access_token, server.db_session)
talk_mock.is_liked_by.assert_called_once_with(
self.sender_id,
server.db_session
)
send_rate_mock.assert_called_once_with(
self.access_token,
self.sender_id,
talk_mock,
talk_liked_mock
)
message_handlers_test.py 文件源码
项目:meetup-facebook-bot
作者: Stark-Mountain
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_handle_talk_like_command(self, talk_class_mock, send_like_confirmation_mock):
messaging_event = self.generate_quick_reply('like talk 1')
talk_mock = MagicMock(talk_id=1)
talk_mock.revert_like = MagicMock()
talk_liked_mock = False
talk_mock.is_liked_by = MagicMock(return_value=talk_liked_mock)
server.db_session.query(talk_class_mock).get = MagicMock(return_value=talk_mock)
message_handlers.handle_talk_like_command(messaging_event, self.access_token, server.db_session)
talk_mock.revert_like.assert_called_once_with(
self.sender_id,
server.db_session
)
send_like_confirmation_mock.assert_called_once_with(
self.access_token,
self.sender_id,
talk_mock,
talk_liked_mock
)
message_handlers_test.py 文件源码
项目:meetup-facebook-bot
作者: Stark-Mountain
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_handle_schedule_command(self, talk_class_mock, send_schedule_mock):
messaging_event = self.generate_postback('schedule payload')
talks_mock = [MagicMock(talk_id=1), MagicMock(talk_id=2)]
server.db_session.query().all = MagicMock(return_value=talks_mock)
talk_like_numbers_mock = {}
for mock_index, talk_mock in enumerate(talks_mock):
talk_mock.id = mock_index
talk_mock.count_likes = MagicMock(return_value=1)
talk_like_numbers_mock[talk_mock.id] = 1
talk_like_ids_mock = []
message_handlers.handle_schedule_command(messaging_event, self.access_token, server.db_session)
send_schedule_mock.assert_called_once_with(
self.access_token,
self.sender_id,
talks_mock,
talk_like_numbers_mock,
talk_like_ids_mock
)
def call_runscheduler(loops=1, mock_call_command=None):
ctx = {'sleep_count': 0}
def fake_sleep(seconds):
ctx['sleep_count'] += 1
if ctx['sleep_count'] > loops:
raise KeyboardInterrupt()
if mock_call_command is None:
mock_call_command = mock.MagicMock()
with mock.patch.object(runscheduler, 'call_command', mock_call_command):
with mock.patch.object(runscheduler, 'logger') as mock_logger:
with mock.patch('time.sleep', fake_sleep):
with pytest.raises(KeyboardInterrupt):
call_command('runscheduler')
return mock_call_command, mock_logger
def test_call(self, mocked_predict, mocked_load):
mocked_load.return_value = 'model'
settings = MagicMock()
settings.UNIQUE_IDS = ['number']
settings.CLASSIFIERS = {'answer': 42, 'another': 13}
core = Core(settings, self.adapter)
core.suspicions = MagicMock()
core()
# assert load and predict was called for each classifier
mocked_load.assert_has_calls((call(42), call(13)), any_order=True)
mocked_predict.assert_has_calls((
call('model', 'answer'),
call('model', 'another')
), any_order=True)
# assert suspicions.xz was created
expected_path = os.path.join('tmp', 'test', 'suspicions.xz')
core.suspicions.to_csv.assert_called_once_with(
expected_path,
compression='xz',
encoding='utf-8',
index=False
)
def test_AnswerMail_UserOnSpam(self):
r = MagicMock()
msg = MagicMock()
msg.subreddit = None
msg.author.name = 'user'
msg.id = 'msgid'
msg.distinguished = None
pmUserCache = {'user' : 1234}
helper = MagicMock()
# test
hsbot.answerPM(r, msg, pmUserCache, helper)
self.assertEqual(r.method_calls, [], 'no reddit calls')
self.assertEqual(helper.method_calls, [], 'no helper calls')
def test_AnswerMail_Success(self):
r = MagicMock()
msg = MagicMock()
msg.subreddit = None
msg.author.name = 'user'
msg.id = 'msgid'
msg.distinguished = None
msg.subject = 'sub'
msg.body = 'body'
pmUserCache = { }
helper = MagicMock()
helper.parseText = MagicMock(return_value=(['card'], 'text'))
# test
hsbot.answerPM(r, msg, pmUserCache, helper)
self.assertTrue('user' in pmUserCache, 'user added to cache')
self.assertEqual(r.method_calls, [], 'no reddit calls')
expected = [call.parseText('sub body')]
self.assertEqual(helper.method_calls, expected, 'parseText')
expected = [call.reply('text')]
self.assertEqual(msg.method_calls, expected, 'reply')
def test_getattr_with_under(self, *args, **kwrags):
connection = Mock(spec=core.SanicMongoAgnosticClient)
delegate = MagicMock()
delegate.name = 'blabla'
connection.delegate = delegate
name = 'blabla'
db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
asyncio_framework,
self.__module__)(connection, name)
coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
asyncio_framework,
self.__module__)
coll = coll(db, name)
self.assertEqual(coll._get_write_mode, coll.delegate._get_write_mode)
def test_getitem(self, *args, **kwargs):
connection = Mock(spec=core.SanicMongoAgnosticClient)
delegate = MagicMock()
delegate.name = 'blabla'
connection.delegate = delegate
name = 'blabla'
db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
asyncio_framework,
self.__module__)(connection, name)
coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
asyncio_framework,
self.__module__)
coll = coll(db, name)
other_coll = coll['other']
self.assertTrue(isinstance(other_coll, type(coll)))
def test_find(self, *args, **kwargs):
connection = Mock(spec=core.SanicMongoAgnosticClient)
delegate = MagicMock()
delegate.name = 'blabla'
connection.delegate = delegate
name = 'blabla'
db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
asyncio_framework,
self.__module__)(connection, name)
coll = create_class_with_framework(core.SanicMongoAgnosticCollection,
asyncio_framework,
self.__module__)(db, name)
cursor = create_class_with_framework(core.SanicMongoAgnosticCursor,
asyncio_framework,
self.__module__)
self.assertIsInstance(coll.find(), cursor)
def test_get_item(self, *args, **kwargs):
connection = Mock(spec=core.SanicMongoAgnosticClient)
delegate = MagicMock()
connection.delegate = delegate
name = 'blabla'
db = create_class_with_framework(core.SanicMongoAgnosticDatabase,
asyncio_framework,
self.__module__)(connection, name)
comp_coll = create_class_with_framework(
core.SanicMongoAgnosticCollection,
asyncio_framework,
self.__module__)
coll = db['bla']
self.assertTrue(isinstance(coll, comp_coll))
def test_background_queue():
publish_event = MagicMock()
publish_event.return_value = 'aaa'
task_queue = TaskQueue(publish_event)
@task_queue.task()
def funcy():
global funcy_called
funcy_called += 1
return "blah"
assert funcy() == "blah"
assert funcy.delay() is True
event = jsonpickle.dumps((funcy.path, (), {}))
publish_event.assert_called_once_with(event)
task_queue.process_event(event)
assert funcy_called == 2
def test_reset_kill_reconnect(self):
client = AprsClient(aprs_user='testuser', aprs_filter='')
client.connect()
# .run() should be allowed to execute after .connect()
mock_callback = mock.MagicMock(
side_effect=lambda raw_msg: client.disconnect())
self.assertFalse(client._kill)
client.run(callback=mock_callback, autoreconnect=True)
# After .disconnect(), client._kill should be True
self.assertTrue(client._kill)
self.assertEqual(mock_callback.call_count, 1)
# After we reconnect, .run() should be able to run again
mock_callback.reset_mock()
client.connect()
client.run(callback=mock_callback, autoreconnect=True)
self.assertEqual(mock_callback.call_count, 1)
def test_handle_message_iam():
"""Test if a gateway is created and discover_devices is called with gateway IP
after message "iam" is received"""
src_addr = "10.10.10.10"
msg_iam = {
"cmd": "iam",
"ip": src_addr,
"sid": "123456"
}
mock_handler = MagicMock()
client = AqaraClient()
client.subscribe(mock_handler)
client.handle_message(msg_iam, src_addr)
assert len(client.gateways.keys()) == 1
new_gateway = client.gateways["123456"]
mock_handler.assert_called_once_with(
sender=client,
gateway=new_gateway,
signal=AQARA_EVENT_NEW_GATEWAY
)
def test_handle_message_device_list():
"""Test if client maps all sids to the gateway and call gateway.on_devices_discovered"""
gw_addr = "10.10.10.10"
gw_sid = "123456"
msg_get_id_list_ack = {
"cmd": "get_id_list_ack",
"sid": gw_sid,
"data": json.dumps(["1", "2", "3"])
}
mock_client = AqaraClient()
mock_client.read_device = MagicMock()
mock_gateway = AqaraGateway(mock_client, gw_sid, gw_addr, None)
mock_client._gateways[gw_sid] = mock_gateway
mock_gateway.on_devices_discovered = MagicMock()
mock_client.handle_message(msg_get_id_list_ack, gw_addr)
mock_gateway.on_devices_discovered.called_once_with(["1", "2", "3"])
assert len(mock_client._device_to_gw.keys()) == 3
def test_handle_message_heartbeat():
"""Test if the gateway token is updated on a gateway heartbeat"""
gw_addr = "10.10.10.10"
gw_sid = "123456"
mock_client = AqaraClient()
mock_gateway = AqaraGateway(mock_client, gw_sid, gw_addr, None)
mock_gateway.on_device_heartbeat = MagicMock()
mock_client._gateways[gw_sid] = mock_gateway
mock_client._device_to_gw[gw_sid] = mock_gateway
msg_heartbeat = {
"cmd": "heartbeat",
"model": "gateway",
"sid": gw_sid,
"token": "ffffff",
"data": json.dumps({"ip": gw_addr})
}
mock_client.handle_message(msg_heartbeat, gw_addr)
mock_gateway.on_device_heartbeat.assert_called_once_with(
"gateway", gw_sid, {"ip": gw_addr}, "ffffff"
)
def test_run_activity_error(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
target = mock.MagicMock()
target.side_effect = ActivityError('error', 'cause')
task = TaskMixin(process = target)
task.handle_task('token', None)
self.assertEqual(task.token, None)
call = mock.call.send_task_failure(taskToken = 'token',
error = 'error',
cause = 'cause')
self.assertEqual(client.mock_calls, [call])
def test_run_exception(self, mCreateSession):
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
target = mock.MagicMock()
target.side_effect = BossError('cause')
task = TaskMixin(process = target)
task.handle_task('token', None)
self.assertEqual(task.token, None)
call = mock.call.send_task_failure(taskToken = 'token',
error = 'BossError',
cause = 'cause')
self.assertEqual(client.mock_calls, [call])
def test_intersection_com_mock(self):
ls = LineString([(1.5, 1, 9.48024060e+08), (2, 2, 9.49363260e+08),
(3, 2, 9.51868860e+08), (4, 3, 9.53208060e+08)])
poly = Polygon([(1, 1), (1, 3), (4, 3), (4, 1), (1, 1)])
self.traj.intersection_shapely = MagicMock(return_value=ls)
response = self.traj.intersection_shapely(poly)
ls = np.array(ls)
trajMock = self.traj.to_Trajectory(response)
traj = Trajectory(ls[:, 0], ls[:, 1], ls[:, 2])
assert (np.array_equal(trajMock.getX(), traj.getX()))
assert (np.array_equal(trajMock.getY(), traj.getY()))
assert (np.array_equal(trajMock.getTime(), traj.getTime()))
def test_multiple_instances():
# Reset all watchers
Dummy.prop._watchers = []
o1 = Dummy()
o2 = Dummy()
callback = MagicMock()
Dummy.prop.add_watcher(callback)
assert callback.call_count == 0
o1.prop = 'test'
assert callback.call_count == 1
callback.assert_called_with('test')
o2.prop = 'caribou'
assert callback.call_count == 2
callback.assert_called_with('caribou')
o2.prop = 'caribou'
assert callback.call_count == 2
def test_getExutor(self, mock_sparksession, mock_kafka_utils):
mock_context = MagicMock()
mock_context.addFile.return_value = "test"
mock_spark = MagicMock()
mock_spark.sparkContext.return_value = mock_context
mock_builder = MagicMock()
mock_builder.getOrCreate.return_value = mock_spark
mock_sparksession.builder
mock_sparksession.builder.return_value = mock_builder
mock_dstream = MagicMock()
mock_dstream.map.return_value = None
mock_kafka_utils.createDirectStream.return_value = mock_dstream
config = Config(CONFIG_PATH)
factory = ReadFactory(config)
test_executor = factory.get_executor()
self.assertIsInstance(test_executor, StreamingExecutor,
"When read csv file executor should be instance of BatchExecutable")
def test_define_services(uuid4):
mock_service = mock.MagicMock()
with mock.patch.dict(SERVICES, {'bar': mock_service}, clear=True):
result = define_services([{'name': 'bar'}, {'name': 'baz'}])
uuid4.assert_called_once_with()
assert result == {uuid4().hex: mock_service.from_config.return_value}
def test_fetch_from_url_or_retry_get_text(self):
# mocked requests
identifier = "P00439"
base_url = c.http_uniprot
endpoint_url = "{}.fasta".format(identifier)
response = response_mocker(kwargs={"P00439.fasta"}, base_url=base_url,
endpoint_url="",
content_type='text/plain')
self.fetch_from_url_or_retry = MagicMock(return_value=response)
url = base_url + endpoint_url
r = self.fetch_from_url_or_retry(url, json=True,
header={'text/plain'}).content
self.assertEqual(str(r, 'utf-8'),
"Some text-based content\n spanning multiple lines")
def test_fetch_from_url_or_retry_get_json(self):
# mocked requests
identifier = "2pah"
base_url = c.http_pdbe
endpoint_url = "api/pdb/entry/summary/"
response = response_mocker(kwargs={identifier}, base_url=base_url,
endpoint_url=endpoint_url,
content_type='application/json')
self.fetch_from_url_or_retry = MagicMock(return_value=response)
url = base_url + endpoint_url + identifier
r = self.fetch_from_url_or_retry(url, json=True,
header={'application/json'}).json()
self.assertEqual(r, json.loads('{"data": "some json formatted output"}'))
def test_fetch_from_url_or_retry_get_binary(self):
# mocked requests
identifier = "P00439"
base_url = c.http_uniprot
endpoint_url = "{}.fasta".format(identifier)
response = response_mocker(kwargs={"P00439.fasta"}, base_url=base_url,
endpoint_url="",
content_type='application/octet-stream')
self.fetch_from_url_or_retry = MagicMock(return_value=response)
url = base_url + endpoint_url
r = self.fetch_from_url_or_retry(url, json=True,
header={'application/octet-stream'},
retry_in=None, wait=0,
n_retries=10, stream=False).content
self.assertEqual(r, b"Some other binary stuff...")
def test_fetch_from_url_or_retry_get_404(self):
# mocked requests
identifier = "P00439"
base_url = c.http_uniprot
endpoint_url = "{}.fasta".format(identifier)
response = response_mocker(kwargs={"P00439.fasta"}, base_url=base_url,
endpoint_url="",
content_type='text/plain', status=404)
self.fetch_from_url_or_retry = MagicMock(return_value=response)
url = base_url + endpoint_url
r = self.fetch_from_url_or_retry(url, json=True, header={'text/plain'},
retry_in=None, wait=0,
n_retries=10, stream=False)
self.assertEqual(r.status_code, 404)
self.assertFalse(r.ok)
def testCreateSQLQueryModelWithUserInputNoError(self):
"""test method CreateEventModelWithUserInput"""
fake_execution = fake_sqlite_query_execution.SQLQueryExecution(
sql_query_data.SQLQueryData(has_error=False)
)
sql_query = 'SELECT createdDate FROM Users ORDER BY createdDate'
name = 'Contact'
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(),
confirm=True, confirm_amount_same=2, prompt_info=name)
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
folder_exists=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
controller.GetTimestamps = mock.MagicMock(return_value=([], []))
actual = controller._CreateSQLQueryModelWithUserInput(sql_query, False,
fake_execution)
prompt_output_actual = self._ReadFromFile(path)
prompt_output_expected = (
'The SQL query was ok.'
'Do you want to name the query parse row: ?'
'Does the event need customizing?'
'Enter columns that are customizable [columnName,aliasName...] '
'or [abort]'
'Added: Failed: Contact'
'Do you want to add more columns that are customizable?')
expected = sql_query_model.SQLQueryModel(
sql_query, name, [], [], False, 0)
self.assertEqual(actual.name, '')
self.assertEqual(expected.query, actual.query)
self.assertEqual(prompt_output_expected, prompt_output_actual)
def testCreateSQLQueryModelWithUserInputWithExamplesAndFourDataExamples(self):
"""test method CreateEventModelWithUserInput with examples"""
fake_execution = fake_sqlite_query_execution.SQLQueryExecution(
sql_query_data.SQLQueryData(
has_error=False, data=['first', 'second', 'third', 'fourth'],
columns=[type('columns', (object,), {'sql_column': 'id'}),
type('columns', (object,), {'sql_column': 'name'})])
)
sql_query = 'SELECT id from Users'
name = 'Contact'
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(),
confirm=True, confirm_amount_same=2, prompt_info=name)
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
folder_exists=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
controller.GetTimestamps = mock.MagicMock(return_value=([], []))
actual = controller._CreateSQLQueryModelWithUserInput(sql_query, True,
fake_execution)
prompt_output_actual = self._ReadFromFile(path)
prompt_output_expected = ('Your query output could look like this.'
'[\'id\', \'name\']'
'first'
'second'
'third'
'Do you want to add this query?'
'Do you want to name the query parse row: ?'
'Does the event need customizing?')
expected = sql_query_model.SQLQueryModel(sql_query, name, [], [], False,
0)
self.assertEqual(actual.name, '')
self.assertEqual(expected.query, actual.query)
self.assertEqual(prompt_output_expected, prompt_output_actual)