def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.log_fixture = self.useFixture(fixtures.FakeLogger())
python类FakeLogger()的实例源码
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.log_fixture = self.useFixture(fixtures.FakeLogger())
def setup_logging(self):
# Assign default logs to self.LOG so we can still
# assert on bilean logs.
default_level = logging.INFO
if os.environ.get('OS_DEBUG') in _TRUE_VALUES:
default_level = logging.DEBUG
self.LOG = self.useFixture(
fixtures.FakeLogger(level=default_level, format=_LOG_FORMAT))
base_list = set([nlog.split('.')[0]
for nlog in logging.Logger.manager.loggerDict])
for base in base_list:
if base in TEST_DEFAULT_LOGLEVELS:
self.useFixture(fixtures.FakeLogger(
level=TEST_DEFAULT_LOGLEVELS[base],
name=base, format=_LOG_FORMAT))
elif base != 'bilean':
self.useFixture(fixtures.FakeLogger(
name=base, format=_LOG_FORMAT))
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.log_fixture = self.useFixture(fixtures.FakeLogger())
def test_logs_on_no_permissions(self):
workdir = self.useFixture(fixtures.TempDir())
fake_logger = self.useFixture(fixtures.FakeLogger())
bad_path = os.path.join(workdir.path, 'path_not_readable')
with open(bad_path, 'w') as f:
f.write("# You can't read me")
os.chmod(bad_path, 0)
result = _build_doubles.extract_schemas_from_file(bad_path)
self.assertIsNone(result)
self.assertThat(
fake_logger.output,
Contains('Extracting schemas from %s' % bad_path))
self.assertThat(
fake_logger.output,
Contains('Cannot extract schemas: Permission denied'))
def test_logs_on_syntax_error(self):
workdir = self.useFixture(fixtures.TempDir())
fake_logger = self.useFixture(fixtures.FakeLogger())
bad_path = os.path.join(workdir.path, 'foo.py')
with open(bad_path, 'w') as f:
f.write("not valid pyton")
result = _build_doubles.extract_schemas_from_file(bad_path)
self.assertIsNone(result)
self.assertThat(
fake_logger.output,
Contains('Extracting schemas from %s' % bad_path))
self.assertThat(
fake_logger.output,
Contains('Cannot extract schemas: invalid syntax (foo.py, line 1)')
)
def test_listen(self):
logger = self.useFixture(FakeLogger(format="%(asctime)s %(message)s"))
self.executable.listen(6666)
self.executable.sleep(1)
self.executable.spawn()
# Try to connect to the socket
sock = socket.socket()
transient = (errno.ECONNREFUSED, errno.ECONNRESET)
attempts = 10
for i in range(1, attempts + 1):
try:
sock.connect(("127.0.0.1", self.executable.port))
except socket.error as error: # pragma: no cover
if error.errno in transient and i != attempts:
time.sleep(0.05 * i)
continue
logging.error("connection attempt %d failed", i)
raise error
break
self.assertEqual("127.0.0.1", sock.getsockname()[0])
def test__logs_if_suggests_previously_observed_neighbour(self):
# Note: 10.0.0.0/30 --> 10.0.0.1 and 10.0.0.0.2 are usable.
subnet = self.make_Subnet(
cidr="10.0.0.0/30", gateway_ip=None, dns_servers=None)
rackif = factory.make_Interface(vlan=subnet.vlan)
now = datetime.now()
yesterday = now - timedelta(days=1)
factory.make_Discovery(
ip="10.0.0.1", interface=rackif, updated=now)
factory.make_Discovery(
ip="10.0.0.2", interface=rackif, updated=yesterday)
logger = self.useFixture(FakeLogger("maas"))
ip = subnet.get_next_ip_for_allocation()
self.assertThat(ip, Equals("10.0.0.2"))
self.assertThat(logger.output, DocTestMatches(
"Next IP address...observed previously..."
))
def test__handle_uncaught_exception_logs_other_failure(self):
handler = views.WebApplicationHandler()
request = make_request()
request.path = factory.make_name("path")
# Capture an exc_info tuple with traceback.
exc_type = factory.make_exception_type()
exc_msg = factory.make_name("message")
try:
raise exc_type(exc_msg)
except exc_type:
exc_info = sys.exc_info()
with FakeLogger(views.__name__, logging.ERROR) as logger:
handler.handle_uncaught_exception(
request=request, resolver=get_resolver(None),
exc_info=exc_info)
self.assertThat(
logger.output, DocTestMatches("""\
500 Internal Server Error @ %s
Traceback (most recent call last):
...
maastesting.factory.TestException#...: %s
""" % (request.path, exc_msg)))
def test__scan_as_admin_logs_the_fact_that_a_scan_happened(self):
user = factory.make_admin()
handler = SubnetHandler(user, {})
subnet = factory.make_Subnet(version=4)
rack = factory.make_RackController()
factory.make_Interface(node=rack, subnet=subnet)
logger = self.useFixture(FakeLogger())
cidr = subnet.get_ipnetwork()
handler.scan({
"id": subnet.id,
})
# Use MatchesRegex here rather than DocTestMatches because usernames
# can contain characters that confuse DocTestMatches (e.g. periods).
self.assertThat(logger.output, MatchesRegex(
"User '%s' initiated a neighbour discovery scan against subnet: %s"
% (re.escape(user.username), re.escape(str(cidr)))))
def test_raises_error_when_omshell_not_connected(self):
error = ExternalProcessError(
returncode=2, cmd=("omshell",), output="")
self.patch(ExternalProcessError, 'output_as_unicode', 'not connected.')
omshell = Mock()
omshell.remove.side_effect = error
mac = factory.make_mac_address()
with FakeLogger("maas.dhcp") as logger:
error = self.assertRaises(
exceptions.CannotRemoveHostMap, dhcp._remove_host_map,
omshell, mac)
# The CannotCreateHostMap exception includes a message describing the
# problematic mapping.
self.assertDocTestMatches(
"Could not remove host map for %s: "
"The DHCP server could not be reached." % (mac),
str(error))
# A message is also written to the maas.dhcp logger that describes the
# problematic mapping.
self.assertDocTestMatches(
"Could not remove host map for %s: "
"The DHCP server could not be reached." % (mac),
logger.output)
def test_raises_error_when_omshell_crashes(self):
error_message = factory.make_name("error").encode("ascii")
omshell = Mock()
omshell.create.side_effect = ExternalProcessError(
returncode=2, cmd=("omshell",), output=error_message)
mac = factory.make_mac_address()
ip = factory.make_ip_address()
with FakeLogger("maas.dhcp") as logger:
error = self.assertRaises(
exceptions.CannotCreateHostMap, dhcp._create_host_map,
omshell, mac, ip)
# The CannotCreateHostMap exception includes a message describing the
# problematic mapping.
self.assertDocTestMatches(
"Could not create host map for %s -> %s: ..." % (mac, ip),
str(error))
# A message is also written to the maas.dhcp logger that describes the
# problematic mapping.
self.assertDocTestMatches(
"Could not create host map for %s -> %s: ..." % (mac, ip),
logger.output)
def test_raises_error_when_omshell_not_connected(self):
error = ExternalProcessError(
returncode=2, cmd=("omshell",), output="")
self.patch(ExternalProcessError, 'output_as_unicode', 'not connected.')
omshell = Mock()
omshell.create.side_effect = error
mac = factory.make_mac_address()
ip = factory.make_ip_address()
with FakeLogger("maas.dhcp") as logger:
error = self.assertRaises(
exceptions.CannotCreateHostMap, dhcp._create_host_map,
omshell, mac, ip)
# The CannotCreateHostMap exception includes a message describing the
# problematic mapping.
self.assertDocTestMatches(
"Could not create host map for %s -> %s: "
"The DHCP server could not be reached." % (mac, ip),
str(error))
# A message is also written to the maas.dhcp logger that describes the
# problematic mapping.
self.assertDocTestMatches(
"Could not create host map for %s -> %s: "
"The DHCP server could not be reached." % (mac, ip),
logger.output)
def test__does_log_other_exceptions_when_restarting(self):
self.patch_sudo_write_file()
self.patch_restartService().side_effect = (
factory.make_exception("DHCP is on strike today"))
failover_peers = [make_failover_peer_config()]
shared_networks = fix_shared_networks_failover(
[make_shared_network()], failover_peers)
with FakeLogger("maas") as logger:
with ExpectedException(exceptions.CannotConfigureDHCP):
yield self.configure(
factory.make_name('key'),
failover_peers, shared_networks,
[make_host()], [make_interface()],
make_global_dhcp_snippets())
self.assertDocTestMatches(
"DHCPv... server failed to restart: "
"DHCP is on strike today", logger.output)
def test_query_all_nodes_swallows_PowerActionFail(self):
node1, node2 = self.make_nodes(2)
new_state_2 = self.pick_alternate_state(node2['power_state'])
get_power_state = self.patch(power, 'get_power_state')
error_msg = factory.make_name("error")
get_power_state.side_effect = [
fail(exceptions.PowerActionFail(error_msg)),
succeed(new_state_2),
]
suppress_reporting(self)
with FakeLogger("maas.power", level=logging.DEBUG) as maaslog:
yield power.query_all_nodes([node1, node2])
self.assertDocTestMatches(
"""\
hostname-...: Could not query power state: %s.
hostname-...: Power state has changed from ... to ...
""" % error_msg,
maaslog.output)
def test_query_all_nodes_swallows_PowerError(self):
node1, node2 = self.make_nodes(2)
new_state_2 = self.pick_alternate_state(node2['power_state'])
get_power_state = self.patch(power, 'get_power_state')
error_msg = factory.make_name("error")
get_power_state.side_effect = [
fail(PowerError(error_msg)),
succeed(new_state_2),
]
suppress_reporting(self)
with FakeLogger("maas.power", level=logging.DEBUG) as maaslog:
yield power.query_all_nodes([node1, node2])
self.assertDocTestMatches(
"""\
%s: Could not query power state: %s.
%s: Power state has changed from %s to %s.
""" % (node1['hostname'], error_msg,
node2['hostname'], node2['power_state'], new_state_2),
maaslog.output)
def test_query_all_nodes_swallows_NoSuchNode(self):
node1, node2 = self.make_nodes(2)
new_state_2 = self.pick_alternate_state(node2['power_state'])
get_power_state = self.patch(power, 'get_power_state')
get_power_state.side_effect = [
fail(exceptions.NoSuchNode()),
succeed(new_state_2),
]
suppress_reporting(self)
with FakeLogger("maas.power", level=logging.DEBUG) as maaslog:
yield power.query_all_nodes([node1, node2])
self.assertDocTestMatches(
"""\
hostname-...: Could not update power state: no such node.
hostname-...: Power state has changed from ... to ...
""",
maaslog.output)
def test___performServiceAction_logs_error_if_action_fails(self):
service = make_fake_service(SERVICE_STATE.ON)
service_monitor = self.make_service_monitor()
mock_execSystemDServiceAction = self.patch(
service_monitor, "_execSystemDServiceAction")
error_output = factory.make_name("error")
mock_execSystemDServiceAction.return_value = (1, "", error_output)
action = factory.make_name("action")
with FakeLogger(
"maas.service_monitor", level=logging.ERROR) as maaslog:
with ExpectedException(ServiceActionError):
yield service_monitor._performServiceAction(service, action)
self.assertDocTestMatches(
"Service '%s' failed to %s: %s" % (
service.name, action, error_output),
maaslog.output)
def test___ensureService_logs_warning_in_mismatch_process_state(self):
service = make_fake_service(SERVICE_STATE.ON)
service_monitor = self.make_service_monitor([service])
invalid_process_state = factory.make_name("invalid_state")
mock_getServiceState = self.patch(
service_monitor, "getServiceState")
mock_getServiceState.return_value = succeed(
ServiceState(SERVICE_STATE.ON, invalid_process_state))
with FakeLogger(
"maas.service_monitor", level=logging.WARNING) as maaslog:
yield service_monitor._ensureService(service)
self.assertDocTestMatches(
"Service '%s' is %s but not in the expected state of "
"'%s', its current state is '%s'." % (
service.service_name, SERVICE_STATE.ON.value,
service_monitor.PROCESS_STATE[SERVICE_STATE.ON],
invalid_process_state),
maaslog.output)
def test___ensureService_logs_debug_in_expected_states(self):
state = SERVICE_STATE.ON
service = make_fake_service(state)
service_monitor = self.make_service_monitor([service])
expected_process_state = service_monitor.PROCESS_STATE[state]
mock_getServiceState = self.patch(
service_monitor, "getServiceState")
mock_getServiceState.return_value = succeed(
ServiceState(SERVICE_STATE.ON, expected_process_state))
with FakeLogger(
"maas.service_monitor", level=logging.DEBUG) as maaslog:
yield service_monitor._ensureService(service)
self.assertDocTestMatches(
"Service '%s' is %s and '%s'." % (
service.service_name, state, expected_process_state),
maaslog.output)
def test___ensureService_logs_mismatch_for_dead_process_state(self):
service = make_fake_service(SERVICE_STATE.OFF)
service_monitor = self.make_service_monitor([service])
invalid_process_state = factory.make_name("invalid")
mock_getServiceState = self.patch(
service_monitor, "getServiceState")
mock_getServiceState.return_value = succeed(
ServiceState(SERVICE_STATE.DEAD, invalid_process_state))
with FakeLogger(
"maas.service_monitor", level=logging.WARNING) as maaslog:
yield service_monitor._ensureService(service)
self.assertDocTestMatches(
"Service '%s' is %s but not in the expected state of "
"'%s', its current state is '%s'." % (
service.service_name, SERVICE_STATE.DEAD.value,
service_monitor.PROCESS_STATE[SERVICE_STATE.DEAD],
invalid_process_state),
maaslog.output)
def test___ensureService_performs_start_for_off_service(self):
service = make_fake_service(SERVICE_STATE.ON)
service_monitor = self.make_service_monitor([service])
mock_getServiceState = self.patch(
service_monitor, "getServiceState")
mock_getServiceState.side_effect = [
succeed(ServiceState(SERVICE_STATE.OFF, "waiting")),
succeed(ServiceState(SERVICE_STATE.ON, "running")),
]
mock_performServiceAction = self.patch(
service_monitor, "_performServiceAction")
mock_performServiceAction.return_value = succeed(None)
with FakeLogger(
"maas.service_monitor", level=logging.INFO) as maaslog:
yield service_monitor._ensureService(service)
self.assertThat(
mock_performServiceAction, MockCalledOnceWith(service, "start"))
self.assertDocTestMatches(
"""\
Service '%s' is not on, it will be started.
Service '%s' has been started and is 'running'.
""" % (service.service_name, service.service_name),
maaslog.output)
def test__ensureService_performs_stop_for_on_service(self):
service = make_fake_service(SERVICE_STATE.OFF)
service_monitor = self.make_service_monitor([service])
mock_getServiceState = self.patch(
service_monitor, "getServiceState")
mock_getServiceState.side_effect = [
succeed(ServiceState(SERVICE_STATE.ON, "running")),
succeed(ServiceState(SERVICE_STATE.OFF, "waiting")),
]
mock_performServiceAction = self.patch(
service_monitor, "_performServiceAction")
mock_performServiceAction.return_value = succeed(None)
with FakeLogger(
"maas.service_monitor", level=logging.INFO) as maaslog:
yield service_monitor._ensureService(service)
self.assertThat(
mock_performServiceAction, MockCalledOnceWith(service, "stop"))
self.assertDocTestMatches(
"""\
Service '%s' is not off, it will be stopped.
Service '%s' has been stopped and is 'waiting'.
""" % (service.service_name, service.service_name),
maaslog.output)
def test_logs_other_errors(self):
service = ImageDownloadService(
sentinel.rpc, sentinel.tftp_root, Clock())
maybe_start_download = self.patch(service, "maybe_start_download")
maybe_start_download.return_value = defer.fail(
ZeroDivisionError("Such a shame I can't divide by zero"))
with FakeLogger("maas") as maaslog, TwistedLoggerFixture() as logger:
d = service.try_download()
self.assertEqual(None, extract_result(d))
self.assertDocTestMatches(
"Failed to download images: "
"Such a shame I can't divide by zero",
maaslog.output)
self.assertDocTestMatches(
"""\
Downloading images failed.
Traceback (most recent call last):
Failure: builtins.ZeroDivisionError: Such a shame ...
""",
logger.output)
def test_try_query_nodes_logs_other_errors(self):
service = self.make_monitor_service()
self.patch(npms, "getRegionClient").return_value = sentinel.client
sentinel.client.localIdent = factory.make_UUID()
query_nodes = self.patch(service, "query_nodes")
query_nodes.return_value = fail(
ZeroDivisionError("Such a shame I can't divide by zero"))
with FakeLogger("maas") as maaslog, TwistedLoggerFixture():
d = service.try_query_nodes()
self.assertEqual(None, extract_result(d))
self.assertDocTestMatches(
"Failed to query nodes' power status: "
"Such a shame I can't divide by zero",
maaslog.output)
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
try:
test_timeout = int(test_timeout)
except ValueError:
# If timeout value is invalid do not set a timeout.
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
if os.environ.get('OS_STDOUT_CAPTURE') in _TRUE_VALUES:
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if os.environ.get('OS_STDERR_CAPTURE') in _TRUE_VALUES:
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.log_fixture = self.useFixture(fixtures.FakeLogger())
def setUp(self):
super(StandardLogging, self).setUp()
# set root logger to debug
root = std_logging.getLogger()
root.setLevel(std_logging.DEBUG)
# supports collecting debug level for local runs
if os.environ.get('OS_DEBUG') in _TRUE_VALUES:
level = std_logging.DEBUG
else:
level = std_logging.INFO
# Collect logs
fs = '%(asctime)s %(levelname)s [%(name)s] %(message)s'
self.logger = self.useFixture(
fixtures.FakeLogger(format=fs, level=None))
# TODO(sdague): why can't we send level through the fake
# logger? Tests prove that it breaks, but it's worth getting
# to the bottom of.
root.handlers[0].setLevel(level)
if level > std_logging.DEBUG:
# Just attempt to format debug level logs, but don't save them
handler = NullHandler()
self.useFixture(fixtures.LogHandler(handler, nuke_handlers=False))
handler.setLevel(std_logging.DEBUG)
# Don't log every single DB migration step
std_logging.getLogger(
'migrate.versioning.api').setLevel(std_logging.WARNING)
def setUp(self):
super(DeckhandTestCase, self).setUp()
self.useFixture(fixtures.FakeLogger('deckhand'))
def setUp(self):
super(StandardLogging, self).setUp()
# set root logger to debug
root = logging.getLogger()
root.setLevel(logging.DEBUG)
# supports collecting debug level for local runs
if os.environ.get('OS_DEBUG') in _TRUE_VALUES:
level = logging.DEBUG
else:
level = logging.INFO
# Collect logs
fs = '%(asctime)s %(levelname)s [%(name)s] %(message)s'
self.logger = self.useFixture(
fixtures.FakeLogger(format=fs, level=None))
# TODO(sdague): why can't we send level through the fake
# logger? Tests prove that it breaks, but it's worth getting
# to the bottom of.
root.handlers[0].setLevel(level)
if level > logging.DEBUG:
# Just attempt to format debug level logs, but don't save them
handler = NullHandler()
self.useFixture(fixtures.LogHandler(handler, nuke_handlers=False))
handler.setLevel(logging.DEBUG)
def setUp(self):
super().setUp()
# Capture logging
self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
# Capturing printing
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))