def test_service_client_with_plugins(self):
definition = {'name': 'test1',
'spec': {'test': {'path': 'baz'}},
'plugins': ['sc-plugins:PathTokens',
{'type': 'sc-plugins:InnerLogger',
'params': {'logger': {'type': 'logging:Logger',
'params': {'name': 'foo.bar.test.3',
'handlers': ['logging:NullHandler']}}}}]}
sc = self.loader.factory('sc:ServiceClient', **definition)
self.assertIsInstance(sc, ServiceClient)
self.assertEqual(sc.name, 'test1')
self.assertEqual(sc.spec, definition['spec'])
self.assertEqual(len(sc._plugins), 2)
self.assertIsInstance(sc._plugins[0], PathTokens)
self.assertIsInstance(sc._plugins[1], InnerLogger)
self.assertEqual(sc._plugins[1].logger, getLogger('foo.bar.test.3'))
self.assertEqual(len(sc._plugins[1].logger.handlers), 1)
self.assertIsInstance(sc._plugins[1].logger.handlers[0], NullHandler)
python类NullHandler()的实例源码
def test_exception(self):
# Add a handler, or python complains "no handler assigned
# to...."
jl = logging.getLogger('pykit.jobq')
jl.addHandler(logging.NullHandler())
def err_on_even(args):
if args % 2 == 0:
raise Exception('even number')
else:
return args
def collect(args):
rst.append(args)
rst = []
jobq.run(range(10), [err_on_even, collect])
self.assertEqual(list(range(1, 10, 2)), rst)
# remove NullHandler
jl.handlers = []
def __init__(self, builders, num_workers=0, processor=None):
"""
Initialize with a list of builders
Args:
builders(list): list of builders
num_workers (int): number of processes. Used only for multiprocessing.
Will be automatically set to (number of cpus - 1) if set to 0.
processor(BaseProcessor): set this if custom processor is needed(must
subclass BaseProcessor though)
"""
self.builders = builders
self.num_workers = num_workers
self.logger = logging.getLogger(type(self).__name__)
self.logger.addHandler(logging.NullHandler())
default_processor = MPIProcessor(
builders) if self.use_mpi else MultiprocProcessor(builders, num_workers)
self.processor = default_processor if processor is None else processor
self.dependency_graph = self._get_builder_dependency_graph()
self.has_run = [] # for bookkeeping builder runs
def __init_logging(self):
try: # To avoid "No handler found" warnings for Python version < 2.7
from logging import NullHandler
except ImportError:
class NullHandler(logging.Handler):
def emit(self, record): pass
self.__logger = logging.getLogger(self.__logger_name)
self.__logger.addHandler(NullHandler()) # defaults to the do-nothing NullHandler
#self.__logger.setLevel(logging.INFO) # defaults to logging.INFO
# Logger test messages
#self.__logger.debug('debug message')
#self.__logger.info('info message')
# self.__logger.warn('warn message')
# self.__logger.error('error message')
# self.__logger.critical('critical message')
def setup_logger(log_comp, handler, level):
"""
Setup the logger for the specified log component to add the specified
handler (removing a possibly present NullHandler) and to set it to the
specified log level. The handler is also set to the specified log level
because the default level of a handler is 0 which causes it to process all
levels.
"""
name = LOGGER_NAMES[log_comp]
logger = logging.getLogger(name)
for h in logger.handlers:
if isinstance(h, logging.NullHandler):
logger.removeHandler(h)
handler.setLevel(level)
logger.addHandler(handler)
logger.setLevel(level)
def test_service_client_basic(self):
definition = {'name': 'test1',
'spec': {'test': {'path': 'baz'}},
'parser': 'sc:json.json_decoder',
'serializer': 'sc:json.json_encoder',
'logger': {'type': 'logging:Logger',
'params': {'name': 'foo.bar.test.2',
'handlers': ['logging:NullHandler']}}}
sc = self.loader.factory('sc:ServiceClient', **definition)
self.assertIsInstance(sc, ServiceClient)
self.assertEqual(sc.name, 'test1')
self.assertEqual(sc.spec, definition['spec'])
self.assertEqual(sc.parser, json_decoder)
self.assertEqual(sc.serializer, json_encoder)
self.assertEqual(sc.logger, getLogger('foo.bar.test.2'))
self.assertEqual(len(sc.logger.handlers), 1)
self.assertIsInstance(sc.logger.handlers[0], NullHandler)
def setup_ndr_client_config(self):
logging.getLogger().addHandler(logging.NullHandler())
self._ncc = ndr.Config(NDR_CONFIG_FILE)
self._ncc.logger = logging.getLogger()
self._ncc.image_information_file = IMAGE_CONFIG
self._created_files = []
config_ndr_for_signing_and_local_queue(self)
# Create a temporary directory for handling config files that are optional but may get
# written out or updated or something.
self._ncc_config_dir = tempfile.mkdtemp()
# Override optional config files with paths that won't conflict
self._ncc.nmap_configuration_file = self._ncc_config_dir + "/nmap_config.yml"
# Write out the test config for testing mainrun programs
self._ndr_config_file = create_temp_file(self)
with open(self._ndr_config_file, 'w') as f:
yaml_content = yaml.dump(self._ncc.to_dict())
f.write(yaml_content)
def setUp(self, *args, **kwargs):
super(LogSetupMiddlewareTest, self).setUp(*args, **kwargs)
self.factory = RequestFactory()
# LogSetupMiddleware only looks under this module
logging_root = __name__
self.middleware = LogSetupMiddleware(root=logging_root)
self.filter = RequestFilter(request=None)
# Create test logger with a placeholder logger
self.logger = logging.getLogger(__name__)
self.logger.filters = []
self.logger.addFilter(self.filter)
# Stub out original handlers
self._original_handlers = logging._handlers
self._original_handlerList = logging._handlerList
logging._handlers = {}
logging._handlerList = []
# Create test handler
self.handler = logging.NullHandler()
self.handler.filters = []
self.handler.addFilter(self.filter)
self.logger.addHandler(self.handler)
def set_verbosity(self, verbose): # pragma: no cover
if verbose >= 5:
print(self.args)
if verbose >= 4:
import http.client
http.client.HTTPConnection.debuglevel = 1
logging.getLogger("requests.packages.urllib3").setLevel(logging.DEBUG)
logging.getLogger("requests.packages.urllib3").propagate = True
if verbose >= 3:
Git.GIT_PYTHON_TRACE = 'full'
if verbose >= 2:
Git.GIT_PYTHON_TRACE = True
FORMAT = '> %(message)s'
formatter = logging.Formatter(fmt=FORMAT)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logging.getLogger('git.cmd').removeHandler(logging.NullHandler())
logging.getLogger('git.cmd').addHandler(handler)
if verbose >= 1:
log_root.setLevel(logging.DEBUG)
else:
log_root.setLevel(logging.INFO)
log.addHandler(logging.StreamHandler())
def get_logger():
"""Grab the global logger instance.
If a global Application is instantiated, grab its logger.
Otherwise, grab the root logger.
"""
global _logger
if _logger is None:
from .config import Application
if Application.initialized():
_logger = Application.instance().log
else:
_logger = logging.getLogger('traitlets')
# Add a NullHandler to silence warnings about not being
# initialized, per best practice for libraries.
_logger.addHandler(logging.NullHandler())
return _logger
def test_workflow_folder_can_be_rendered_successfully(self, tmpdir):
mocked_fs = Mock()
mocked_fs.write_file = Mock(return_value=True)
mocked_templates_lib = Mock()
mocked_templates_lib.render = Mock(return_value=True)
d = tmpdir.mkdir('test')
t = d.join('template.j2')
t.write('content of the template')
logger = logging.getLogger('test')
logger.addHandler(logging.NullHandler())
WfTemplatesRender(mocked_templates_lib, mocked_fs, logger).render_workflow_folder(str(d.realpath()), '', 'output',
{})
assert 1 == mocked_fs.write_file.call_count
assert 1 == mocked_templates_lib.render.call_count
def _call_api(self, path):
"""
Call the REST API and convert the results into JSON.
:param path: str
"""
url = '{0}://{1}:{2}/api/{3}'.format(self.plugin_config('protocol'), self.plugin_config('host'), self.plugin_config('port'), path)
self.log.debug('Issuing RabbitMQ API call to get data on ' + str(url))
try:
requests_log = logging.getLogger("requests")
requests_log.addHandler(logging.NullHandler())
requests_log.propagate = False
r = requests.get(url, auth=(self.plugin_config('username'), self.plugin_config('password')))
data = r.text
return json.loads(data)
except requests.exceptions.RequestException as e:
self.log.error('RabbitMQ API call error: {0}'.format(e))
def pytest_configure(config):
# allow imports from modules and site-packages
w2pdir = os.path.abspath(config.option.w2p_root)
config.option.w2p_root = w2pdir
sys.path.insert(0, w2pdir)
app_directory = _appdir(config)
conffile = os.path.join(app_directory, "logging.conf")
if os.path.isfile(conffile):
logging.config.fileConfig()
else:
logging.basicConfig()
logger = logging.getLogger("pytest_web2py")
logger.addHandler(logging.NullHandler())
logger.setLevel(logging.DEBUG)
def setup_logging(opts):
logging.root.addHandler(logging.NullHandler())
if opts.verbose:
logging_levels = {
0:logging.CRITICAL,
1:logging.ERROR,
2:logging.WARNING,
3:logging.INFO,
4:logging.DEBUG,
5:logging.NOTSET
}
fmt = '%(asctime)-15s.%(msecs)03d [%(threadName)10.10s] [%(levelname)6.6s] %(name)s#%(funcName)s:%(lineno)s %(message)s'
datefmt = '%Y-%m-%dT%H:%M:%S'
formatter = logging.Formatter(fmt, datefmt=datefmt)
formatter.converter = time.gmtime
hnd = logging.StreamHandler(stream=sys.stdout)
hnd.setFormatter(formatter)
logging.root.addHandler(hnd)
logging.root.setLevel(logging_levels[opts.verbose - 1])
def get_logger():
"""Grab the global logger instance.
If a global Application is instantiated, grab its logger.
Otherwise, grab the root logger.
"""
global _logger
if _logger is None:
from .config import Application
if Application.initialized():
_logger = Application.instance().log
else:
_logger = logging.getLogger('traitlets')
# Add a NullHandler to silence warnings about not being
# initialized, per best practice for libraries.
_logger.addHandler(logging.NullHandler())
return _logger
def __init__(self):
"""Constructor"""
self.logger = logging.getLogger()
self.formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
self.level = self.LEVEL_CRITICAL
self.consoleHandler = None
self.fileHandler = None
# ??NullHandler???handler?????
nullHandler = logging.NullHandler()
self.logger.addHandler(nullHandler)
# ????????
self.levelFunctionDict = {
self.LEVEL_DEBUG: self.debug,
self.LEVEL_INFO: self.info,
self.LEVEL_WARN: self.warn,
self.LEVEL_ERROR: self.error,
self.LEVEL_CRITICAL: self.critical,
}
# ----------------------------------------------------------------------
def __init__(self, hostname_or_ip_address, username, password):
"""Initialize the CiscoSwitch object.
:param hostname_or_ip_address: host name or ip address
:param username: username to login with
:param password: password to use with username
:return: CiscoSwitch object
"""
self._host = hostname_or_ip_address
self._version = None
self._username = username
self._password = password
self._client = None
self._logger = logging.getLogger(__name__)
self._logger.addHandler(logging.NullHandler())
def _create_logger(self):
# type: () -> logging.Logger
log_level_name = self._config.get('log_level', 'INFO')
if log_level_name not in LOG_LEVEL_NAMES:
raise ValueError("invalid log level " + log_level_name)
logger = logging.Logger('navdoon') # type: ignore
logger.addHandler(logging.NullHandler())
logger.setLevel(getattr(logging, log_level_name))
if self._config.get('log_stderr'):
logger.addHandler(logging.StreamHandler(sys.stderr))
if self._config.get('log_file'):
logger.addHandler(logging.FileHandler(self._config['log_file']))
if self._config.get('log_syslog'):
syslog_address = self._config['syslog_socket'].split(':')
if len(syslog_address) < 2:
syslog_address = syslog_address[0].strip()
else:
syslog_address = tuple([syslog_address[0].strip(),
int(syslog_address[1])])
logger.addHandler(logging.handlers.SysLogHandler(syslog_address))
return logger
def __init__(self, suproc_command, stdin_queue, stdout_queue, parent):
threading.Thread.__init__(self)
self.setDaemon(False) # we want it to survive parent's death so it can detect innactivity and terminate subproccess
self.setName('pjon_piper_thd')
self._subproc_command = suproc_command
self._birthtime = None
self._stopped = False
self._start_failed = False
self._pipe = None
self._stdout_queue = stdout_queue
self._stdin_queue = stdin_queue
self._parent = parent
if sys.platform == 'win32':
self._startupinfo = subprocess.STARTUPINFO()
self._startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self._startupinfo.wShowWindow = subprocess.SW_HIDE
self.log = logging.getLogger(self.name)
self.log.handlers = []
self.log.addHandler(logging.NullHandler())
#self.log.propagate = False
self.log.setLevel(logging.INFO)
def configure(taskoutpath=None, doSaveToDisk=0, doWriteStdOut=0):
global Log
Log = logging.getLogger('deletemove')
Log.setLevel(logging.DEBUG)
Log.handlers = [] # remove pre-existing handlers!
formatter = logging.Formatter('%(message)s')
# Config logger to save transcript of log messages to plain-text file
if doSaveToDisk:
fh = logging.FileHandler(
os.path.join(taskoutpath, "delete-transcript.txt"))
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
Log.addHandler(fh)
# Config logger that can write to stdout
if doWriteStdOut:
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG + 1)
ch.setFormatter(formatter)
Log.addHandler(ch)
# Config null logger, avoids error messages about no handler existing
if not doSaveToDisk and not doWriteStdOut:
Log.addHandler(logging.NullHandler())
def configure(taskoutpath, doSaveToDisk=0, doWriteStdOut=0):
""" Configure log message writing to disk/saving to file.
"""
global Log
Log = logging.getLogger('mergemove')
Log.setLevel(logging.DEBUG)
Log.handlers = [] # remove pre-existing handlers!
formatter = logging.Formatter('%(message)s')
# Config logger to save transcript of log messages to plain-text file
if doSaveToDisk:
fh = logging.FileHandler(
os.path.join(taskoutpath, "merge-transcript.txt"))
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
Log.addHandler(fh)
# Config logger that can write to stdout
if doWriteStdOut:
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.INFO)
ch.setFormatter(formatter)
Log.addHandler(ch)
# Config null logger, avoids error messages about no handler existing
if not doSaveToDisk and not doWriteStdOut:
Log.addHandler(logging.NullHandler())
def configure(taskoutpath=None, doSaveToDisk=0, doWriteStdOut=0):
global Log
Log = logging.getLogger('localstep')
Log.setLevel(logging.DEBUG)
Log.handlers = [] # remove pre-existing handlers!
formatter = logging.Formatter('%(message)s')
# Config logger to save transcript of log messages to plain-text file
if doSaveToDisk:
logfile = os.path.join(taskoutpath, "localstep-transcript.txt")
fh = logging.FileHandler(logfile)
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
Log.addHandler(fh)
# Config logger that can write to stdout
if doWriteStdOut:
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG + 1)
ch.setFormatter(formatter)
Log.addHandler(ch)
# Config null logger, avoids error messages about no handler existing
if not doSaveToDisk and not doWriteStdOut:
Log.addHandler(logging.NullHandler())
def __init__(self, args, manager, loop):
self._args = args
self._manager = manager
self._loop = loop
self.client = None
self.slug = None
self.topics = None
self._disconnector = None
self._connections = {}
self._logger = logging.getLogger(__name__)
self._logger.addHandler(logging.NullHandler())
self._logger.setLevel(logging.DEBUG)
self.prefix = self._args.get('prefix', '')
if len(self.prefix) > 0 and self.prefix[-1] != '/':
self.prefix += '/'
if 'iotile_id' not in self._args:
raise ArgumentError("No iotile_id in awsiot gateway agent argument", args=args)
self.iotile_id = int(self._args['iotile_id'], 0)
self.throttle_trace = self._args.get('trace_throttle_interval', 5.0)
self.throttle_progress = self._args.get('progress_throttle_interval', 2.0)
self.client_timeout = self._args.get('client_timeout', 60.0)
def __init__(self, adapter_id):
"""Constructor.
Args:
adapter_id (int): Since the ConnectionManager responds to callbacks on behalf
of a DeviceAdapter, it needs to know what adapter_id to send with the
callbacks.
"""
super(ConnectionManager, self).__init__()
self.id = adapter_id
self._stop_event = threading.Event()
self._actions = Queue.Queue()
self._connections = {}
self._int_connections = {}
self._data_lock = threading.Lock()
# Our thread should be a daemon so that we don't block exiting the program if we hang
self.daemon = True
self._logger = logging.getLogger(__name__)
self._logger.addHandler(logging.NullHandler())
self._logger.setLevel(logging.INFO)
def __init__(self, service, config, signer, type_mapping):
validate_config(config)
self.signer = signer
self.endpoint = regions.endpoint_for(
service,
region=config.get("region"),
endpoint=config.get("endpoint"))
self.complex_type_mappings = type_mapping
self.type_mappings = merge_type_mappings(self.primitive_type_map, type_mapping)
self.session = requests.Session()
self.user_agent = build_user_agent(get_config_value_or_default(config, "additional_user_agent"))
self.logger = logging.getLogger("{}.{}".format(__name__, id(self)))
self.logger.addHandler(logging.NullHandler())
if get_config_value_or_default(config, "log_requests"):
self.logger.setLevel(logging.DEBUG)
six.moves.http_client.HTTPConnection.debuglevel = 1
else:
six.moves.http_client.HTTPConnection.debuglevel = 0
def setup_sdk_logging(logfile=None, loglevel=logging.INFO):
"""
Setup a NullHandler to the root logger. If ``logfile`` is passed,
additionally add a FileHandler in ``loglevel`` level.
Args:
logfile(str): A path to setup a log file.
loglevel(int): :mod:`logging` log level.
Returns:
None
"""
logging.root.setLevel(logging.DEBUG)
logging.root.addHandler(logging.NullHandler())
if logfile:
fh = logging.FileHandler(logfile)
fh.setLevel(loglevel)
fh.setFormatter(get_default_log_formatter())
logging.root.addHandler(fh)
def test_000_set_interfaces(self):
server_ph = YANGPathHelper()
s = openconfig_interfaces(path_helper=server_ph)
ocif = openconfig_interfaces()
gi0 = ocif.interfaces.interface.add("gi0/0/0")
subint0 = gi0.subinterfaces.subinterface.add(0)
ip4 = subint0.ipv4.addresses.address.add("192.0.2.1")
ip4.config.prefix_length = 24
ip6 = subint0.ipv6.addresses.address.add("2001:db8::1")
ip6.config.prefix_length = 64
transaction = [PyNMSConfigOperation(ocif.interfaces, 'UPDATE_CONFIG')]
logger = logging.getLogger('PyNMSGPRC_OCMessageTests_Interfaces')
logger.addHandler(logging.NullHandler())
set_msg = PyNMSClientGRPCMethods.generate_set_message(transaction)
ret_msg = PyNMSServerGRPCMethods.service_set_request(set_msg, server_ph, logger=logger)
# TODO assertions
del server_ph
def test_001_get_interfaces(self):
server_ph = YANGPathHelper()
s = openconfig_interfaces(path_helper=server_ph)
gi0 = s.interfaces.interface.add(u"gi0/0/0")
subint0 = gi0.subinterfaces.subinterface.add(0)
ip4 = subint0.ipv4.addresses.address.add(u"192.0.2.1")
ip4.config.prefix_length = 24
ip6 = subint0.ipv6.addresses.address.add(u"2001:db8::1")
ip6.config.prefix_length = 64
logger = logging.getLogger('PyNMSGPRC_OCMessageTests_Interfaces')
logger.addHandler(logging.NullHandler())
get_msg = PyNMSClientGRPCMethods.generate_get_message(["/interfaces"], 42)
ret_msg = PyNMSServerGRPCMethods.service_get_request(get_msg, server_ph, logger=logger)
print ret_msg
#
# TODO: assertions
#
del server_ph
def test_003_delete_interface(self):
server_ph = YANGPathHelper()
s = openconfig_interfaces(path_helper=server_ph)
logger = logging.getLogger('PyNMSGPRC_OCMessageTests_Interfaces')
logger.addHandler(logging.NullHandler())
gi0 = s.interfaces.interface.add(u"gi0/0/0")
subint0 = gi0.subinterfaces.subinterface.add(0)
ip4 = subint0.ipv4.addresses.address.add(u"192.0.2.1")
ip4.config.prefix_length = 24
logger = logging.getLogger('PyNMSGPRC_OCMessageTests_Interfaces')
logger.addHandler(logging.NullHandler())
ocif = openconfig_interfaces()
gi0 = ocif.interfaces.interface.add(u"gi0/0/0")
transaction = [PyNMSConfigOperation(gi0, 'DELETE_CONFIG')]
set_msg = PyNMSClientGRPCMethods.generate_set_message(transaction)
print set_msg
ret_msg = PyNMSServerGRPCMethods.service_set_request(set_msg, server_ph, logger=logger)
print s.interfaces.get(filter=True)
def get_logger():
"""Grab the global logger instance.
If a global Application is instantiated, grab its logger.
Otherwise, grab the root logger.
"""
global _logger
if _logger is None:
from .config import Application
if Application.initialized():
_logger = Application.instance().log
else:
_logger = logging.getLogger('traitlets')
# Add a NullHandler to silence warnings about not being
# initialized, per best practice for libraries.
_logger.addHandler(logging.NullHandler())
return _logger