def test_handle_message_heartbeat():
"""Test if the gateway token is updated on a gateway heartbeat"""
gw_addr = "10.10.10.10"
gw_sid = "123456"
mock_client = AqaraClient()
mock_gateway = AqaraGateway(mock_client, gw_sid, gw_addr, None)
mock_gateway.on_device_heartbeat = MagicMock()
mock_client._gateways[gw_sid] = mock_gateway
mock_client._device_to_gw[gw_sid] = mock_gateway
msg_heartbeat = {
"cmd": "heartbeat",
"model": "gateway",
"sid": gw_sid,
"token": "ffffff",
"data": json.dumps({"ip": gw_addr})
}
mock_client.handle_message(msg_heartbeat, gw_addr)
mock_gateway.on_device_heartbeat.assert_called_once_with(
"gateway", gw_sid, {"ip": gw_addr}, "ffffff"
)
评论列表
文章目录