def __enter__(self):
# let parent class archive filter state
ret = super(reset_warnings, self).__enter__()
# reset the filter to list everything
if self._reset_filter:
warnings.resetwarnings()
warnings.simplefilter(self._reset_filter)
# archive and clear the __warningregistry__ key for all modules
# that match the 'reset' pattern.
pattern = self._reset_registry
if pattern:
backup = self._orig_registry = {}
for name, mod in list(sys.modules.items()):
if mod is None or not pattern.match(name):
continue
reg = getattr(mod, "__warningregistry__", None)
if reg:
backup[name] = reg.copy()
reg.clear()
return ret
python类resetwarnings()的实例源码
def package():
# Apply gevent monkey patches as early as possible during tests.
from gevent.monkey import patch_all
patch_all()
# Enable all warnings in test mode.
warnings.resetwarnings()
warnings.simplefilter('default')
# Look for memory leaks.
gc.set_debug(gc.DEBUG_UNCOLLECTABLE)
yield None
# Print memory leaks.
if gc.garbage: # pragma: no cover
print('Uncollectable objects found:')
for obj in gc.garbage:
print(obj)
def _run_generated_code(self, code, globs, locs,
fails_under_py3k=True,
):
import warnings
#from guillotina.component._compat import PYTHON3
PYTHON3 = False
with warnings.catch_warnings(record=True) as log:
warnings.resetwarnings()
if not PYTHON3:
exec(code, globs, locs)
self.assertEqual(len(log), 0) # no longer warn
return True
else:
try:
exec(code, globs, locs)
except TypeError:
return False
else:
if fails_under_py3k:
self.fail("Didn't raise TypeError")
def test_called_from_function(self):
import warnings
from guillotina.component._declaration import adapts
from zope.interface import Interface
class IFoo(Interface):
pass
globs = {'adapts': adapts, 'IFoo': IFoo}
locs = {}
CODE = "\n".join([
'def foo():',
' adapts(IFoo)'
])
if self._run_generated_code(CODE, globs, locs, False):
foo = locs['foo']
with warnings.catch_warnings(record=True) as log:
warnings.resetwarnings()
self.assertRaises(TypeError, foo)
self.assertEqual(len(log), 0) # no longer warn
def test_called_twice_from_class(self):
import warnings
from guillotina.component._declaration import adapts
from zope.interface import Interface
from zope.interface._compat import PYTHON3
class IFoo(Interface):
pass
class IBar(Interface):
pass
globs = {'adapts': adapts, 'IFoo': IFoo, 'IBar': IBar}
locs = {}
CODE = "\n".join([
'class Foo(object):',
' adapts(IFoo)',
' adapts(IBar)',
])
with warnings.catch_warnings(record=True) as log:
warnings.resetwarnings()
try:
exec(CODE, globs, locs)
except TypeError:
if not PYTHON3:
self.assertEqual(len(log), 0) # no longer warn
else:
self.fail("Didn't raise TypeError")
def test_catch_string(self):
# Catching a string should trigger a DeprecationWarning.
with warnings.catch_warnings():
warnings.resetwarnings()
warnings.filterwarnings("error")
str_exc = "spam"
with self.assertRaises(DeprecationWarning):
try:
raise StandardError
except str_exc:
pass
# Make sure that even if the string exception is listed in a tuple
# that a warning is raised.
with self.assertRaises(DeprecationWarning):
try:
raise StandardError
except (AssertionError, str_exc):
pass
def test_catch_string(self):
# Catching a string should trigger a DeprecationWarning.
with warnings.catch_warnings():
warnings.resetwarnings()
warnings.filterwarnings("error")
str_exc = "spam"
with self.assertRaises(DeprecationWarning):
try:
raise StandardError
except str_exc:
pass
# Make sure that even if the string exception is listed in a tuple
# that a warning is raised.
with self.assertRaises(DeprecationWarning):
try:
raise StandardError
except (AssertionError, str_exc):
pass
def __enter__(self):
# let parent class archive filter state
ret = super(reset_warnings, self).__enter__()
# reset the filter to list everything
if self._reset_filter:
warnings.resetwarnings()
warnings.simplefilter(self._reset_filter)
# archive and clear the __warningregistry__ key for all modules
# that match the 'reset' pattern.
pattern = self._reset_registry
if pattern:
orig = self._orig_registry = {}
for name, mod in sys.modules.items():
if pattern.match(name):
reg = getattr(mod, "__warningregistry__", None)
if reg:
orig[name] = reg.copy()
reg.clear()
return ret
def test_ExceptionsAsConnectionAttributes(self):
# OPTIONAL EXTENSION
# Test for the optional DB API 2.0 extension, where the exceptions
# are exposed as attributes on the Connection object
# I figure this optional extension will be implemented by any
# driver author who is using this test suite, so it is enabled
# by default.
warnings.simplefilter("ignore")
con = self._connect()
drv = self.driver
self.assertEqual(con.Warning is drv.Warning, True)
self.assertEqual(con.Error is drv.Error, True)
self.assertEqual(con.InterfaceError is drv.InterfaceError, True)
self.assertEqual(con.DatabaseError is drv.DatabaseError, True)
self.assertEqual(con.OperationalError is drv.OperationalError, True)
self.assertEqual(con.IntegrityError is drv.IntegrityError, True)
self.assertEqual(con.InternalError is drv.InternalError, True)
self.assertEqual(con.ProgrammingError is drv.ProgrammingError, True)
self.assertEqual(con.NotSupportedError is drv.NotSupportedError, True)
warnings.resetwarnings()
con.close()
def _run_generated_code(self, code, globs, locs,
fails_under_py3k=True,
):
import warnings
from zope.interface._compat import PYTHON3
with warnings.catch_warnings(record=True) as log:
warnings.resetwarnings()
if not PYTHON3:
exec(code, globs, locs)
self.assertEqual(len(log), 0) # no longer warn
return True
else:
try:
exec(code, globs, locs)
except TypeError:
return False
else:
if fails_under_py3k:
self.fail("Didn't raise TypeError")
def test_called_from_function(self):
import warnings
from zope.interface.declarations import implements
from zope.interface.interface import InterfaceClass
IFoo = InterfaceClass("IFoo")
globs = {'implements': implements, 'IFoo': IFoo}
locs = {}
CODE = "\n".join([
'def foo():',
' implements(IFoo)'
])
if self._run_generated_code(CODE, globs, locs, False):
foo = locs['foo']
with warnings.catch_warnings(record=True) as log:
warnings.resetwarnings()
self.assertRaises(TypeError, foo)
self.assertEqual(len(log), 0) # no longer warn
def test_called_twice_from_class(self):
import warnings
from zope.interface.declarations import implements
from zope.interface.interface import InterfaceClass
from zope.interface._compat import PYTHON3
IFoo = InterfaceClass("IFoo")
IBar = InterfaceClass("IBar")
globs = {'implements': implements, 'IFoo': IFoo, 'IBar': IBar}
locs = {}
CODE = "\n".join([
'class Foo(object):',
' implements(IFoo)',
' implements(IBar)',
])
with warnings.catch_warnings(record=True) as log:
warnings.resetwarnings()
try:
exec(CODE, globs, locs)
except TypeError:
if not PYTHON3:
self.assertEqual(len(log), 0) # no longer warn
else:
self.fail("Didn't raise TypeError")
def test_called_from_function(self):
import warnings
from zope.interface.declarations import classProvides
from zope.interface.interface import InterfaceClass
from zope.interface._compat import PYTHON3
IFoo = InterfaceClass("IFoo")
globs = {'classProvides': classProvides, 'IFoo': IFoo}
locs = {}
CODE = "\n".join([
'def foo():',
' classProvides(IFoo)'
])
exec(CODE, globs, locs)
foo = locs['foo']
with warnings.catch_warnings(record=True) as log:
warnings.resetwarnings()
self.assertRaises(TypeError, foo)
if not PYTHON3:
self.assertEqual(len(log), 0) # no longer warn
def test_catch_string(self):
# Catching a string should trigger a DeprecationWarning.
with warnings.catch_warnings():
warnings.resetwarnings()
warnings.filterwarnings("error")
str_exc = "spam"
with self.assertRaises(DeprecationWarning):
try:
raise StandardError
except str_exc:
pass
# Make sure that even if the string exception is listed in a tuple
# that a warning is raised.
with self.assertRaises(DeprecationWarning):
try:
raise StandardError
except (AssertionError, str_exc):
pass
def test_catch_string(self):
# Catching a string should trigger a DeprecationWarning.
with warnings.catch_warnings():
warnings.resetwarnings()
warnings.filterwarnings("error")
str_exc = "spam"
with self.assertRaises(DeprecationWarning):
try:
raise StandardError
except str_exc:
pass
# Make sure that even if the string exception is listed in a tuple
# that a warning is raised.
with self.assertRaises(DeprecationWarning):
try:
raise StandardError
except (AssertionError, str_exc):
pass
def __enter__(self):
# let parent class archive filter state
ret = super(reset_warnings, self).__enter__()
# reset the filter to list everything
if self._reset_filter:
warnings.resetwarnings()
warnings.simplefilter(self._reset_filter)
# archive and clear the __warningregistry__ key for all modules
# that match the 'reset' pattern.
pattern = self._reset_registry
if pattern:
orig = self._orig_registry = {}
for name, mod in sys.modules.items():
if pattern.match(name):
reg = getattr(mod, "__warningregistry__", None)
if reg:
orig[name] = reg.copy()
reg.clear()
return ret
def setUp(self):
self.server = self.server_class()
self.server.start()
self.client = self.client_class(timeout=TIMEOUT)
self.client.encoding = 'utf8' # PY3 only
self.client.connect(self.server.host, self.server.port)
self.client.login(USER, PASSWD)
if PY3:
safe_mkdir(bytes(TESTFN_UNICODE, 'utf8'))
touch(bytes(TESTFN_UNICODE_2, 'utf8'))
self.utf8fs = TESTFN_UNICODE in os.listdir('.')
else:
warnings.filterwarnings("ignore")
safe_mkdir(TESTFN_UNICODE)
touch(TESTFN_UNICODE_2)
self.utf8fs = unicode(TESTFN_UNICODE, 'utf8') in os.listdir(u('.'))
warnings.resetwarnings()
def __enter__(self):
# let parent class archive filter state
ret = super(reset_warnings, self).__enter__()
# reset the filter to list everything
if self._reset_filter:
warnings.resetwarnings()
warnings.simplefilter(self._reset_filter)
# archive and clear the __warningregistry__ key for all modules
# that match the 'reset' pattern.
pattern = self._reset_registry
if pattern:
backup = self._orig_registry = {}
for name, mod in list(sys.modules.items()):
if mod is None or not pattern.match(name):
continue
reg = getattr(mod, "__warningregistry__", None)
if reg:
backup[name] = reg.copy()
reg.clear()
return ret
def _run_generated_code(self, code, globs, locs,
fails_under_py3k=True,
):
import warnings
from zope.interface._compat import PYTHON3
with warnings.catch_warnings(record=True) as log:
warnings.resetwarnings()
if not PYTHON3:
exec(code, globs, locs)
self.assertEqual(len(log), 0) # no longer warn
return True
else:
try:
exec(code, globs, locs)
except TypeError:
return False
else:
if fails_under_py3k:
self.fail("Didn't raise TypeError")
def test_called_from_function(self):
import warnings
from zope.interface.declarations import implements
from zope.interface.interface import InterfaceClass
IFoo = InterfaceClass("IFoo")
globs = {'implements': implements, 'IFoo': IFoo}
locs = {}
CODE = "\n".join([
'def foo():',
' implements(IFoo)'
])
if self._run_generated_code(CODE, globs, locs, False):
foo = locs['foo']
with warnings.catch_warnings(record=True) as log:
warnings.resetwarnings()
self.assertRaises(TypeError, foo)
self.assertEqual(len(log), 0) # no longer warn
def test_called_twice_from_class(self):
import warnings
from zope.interface.declarations import implements
from zope.interface.interface import InterfaceClass
from zope.interface._compat import PYTHON3
IFoo = InterfaceClass("IFoo")
IBar = InterfaceClass("IBar")
globs = {'implements': implements, 'IFoo': IFoo, 'IBar': IBar}
locs = {}
CODE = "\n".join([
'class Foo(object):',
' implements(IFoo)',
' implements(IBar)',
])
with warnings.catch_warnings(record=True) as log:
warnings.resetwarnings()
try:
exec(CODE, globs, locs)
except TypeError:
if not PYTHON3:
self.assertEqual(len(log), 0) # no longer warn
else:
self.fail("Didn't raise TypeError")
def test_called_from_function(self):
import warnings
from zope.interface.declarations import classProvides
from zope.interface.interface import InterfaceClass
from zope.interface._compat import PYTHON3
IFoo = InterfaceClass("IFoo")
globs = {'classProvides': classProvides, 'IFoo': IFoo}
locs = {}
CODE = "\n".join([
'def foo():',
' classProvides(IFoo)'
])
exec(CODE, globs, locs)
foo = locs['foo']
with warnings.catch_warnings(record=True) as log:
warnings.resetwarnings()
self.assertRaises(TypeError, foo)
if not PYTHON3:
self.assertEqual(len(log), 0) # no longer warn
def __enter__(self):
# let parent class archive filter state
ret = super(reset_warnings, self).__enter__()
# reset the filter to list everything
if self._reset_filter:
warnings.resetwarnings()
warnings.simplefilter(self._reset_filter)
# archive and clear the __warningregistry__ key for all modules
# that match the 'reset' pattern.
pattern = self._reset_registry
if pattern:
orig = self._orig_registry = {}
for name, mod in sys.modules.items():
if pattern.match(name):
reg = getattr(mod, "__warningregistry__", None)
if reg:
orig[name] = reg.copy()
reg.clear()
return ret
def __enter__(self):
# let parent class archive filter state
ret = super(reset_warnings, self).__enter__()
# reset the filter to list everything
if self._reset_filter:
warnings.resetwarnings()
warnings.simplefilter(self._reset_filter)
# archive and clear the __warningregistry__ key for all modules
# that match the 'reset' pattern.
pattern = self._reset_registry
if pattern:
orig = self._orig_registry = {}
for name, mod in sys.modules.items():
if pattern.match(name):
reg = getattr(mod, "__warningregistry__", None)
if reg:
orig[name] = reg.copy()
reg.clear()
return ret
def clean_warning_registry():
"""Safe way to reset warnings """
warnings.resetwarnings()
reg = "__warningregistry__"
bad_names = ['MovedModule'] # this is in six.py, and causes bad things
for mod in list(sys.modules.values()):
if mod.__class__.__name__ not in bad_names and hasattr(mod, reg):
getattr(mod, reg).clear()
# hack to deal with old scipy/numpy in tests
if os.getenv('TRAVIS') == 'true' and sys.version.startswith('2.6'):
warnings.simplefilter('default')
try:
np.rank([])
except Exception:
pass
warnings.simplefilter('always')
def __enter__(self):
# The __warningregistry__'s need to be in a pristine state for tests
# to work properly.
warnings.resetwarnings()
for v in list(values(sys.modules)):
# do not evaluate Django moved modules and other lazily
# initialized modules.
if v and not _is_magic_module(v):
# use raw __getattribute__ to protect even better from
# lazily loaded modules
try:
object.__getattribute__(v, '__warningregistry__')
except AttributeError:
pass
else:
object.__setattr__(v, '__warningregistry__', {})
self.warnings_manager = warnings.catch_warnings(record=True)
self.warnings = self.warnings_manager.__enter__()
warnings.simplefilter('always', self.expected)
return self
def create_db(instance, db):
""" Create a database if it does not already exist
Args:
instance - a hostAddr object
db - the name of the to be created
"""
conn = connect_mysql(instance)
cursor = conn.cursor()
sql = ('CREATE DATABASE IF NOT EXISTS ' '`{db}`;'.format(db=db))
log.info(sql)
# We don't care if the db already exists and this was a no-op
warnings.filterwarnings('ignore', category=MySQLdb.Warning)
cursor.execute(sql)
warnings.resetwarnings()
def test_deprecated(self):
import warnings
warnings.resetwarnings() # in case we've instantiated one before
# set up warning filters to allow all, set up restore when this test is done
filters_backup, warnings.filters = warnings.filters, []
self.addCleanup(setattr, warnings, 'filters', filters_backup)
with warnings.catch_warnings(record=True) as caught_warnings:
WhiteListRoundRobinPolicy([])
self.assertEqual(len(caught_warnings), 1)
warning_message = caught_warnings[-1]
self.assertEqual(warning_message.category, DeprecationWarning)
self.assertIn('4.0', warning_message.message.args[0])
def error_on_ResourceWarning():
"""This fixture captures ResourceWarning's and reports an "error"
describing the file handles left open.
This is shown regardless of how successful the test was, if a test fails
and leaves files open then those files will be reported. Ideally, even
those files should be closed properly after a test failure or exception.
Since only Python 3 and PyPy3 have ResourceWarning's, this context will
have no effect when running tests on Python 2 or PyPy.
Because of autouse=True, this function will be automatically enabled for
all test_* functions in this module.
This code is primarily based on the examples found here:
https://stackoverflow.com/questions/24717027/convert-python-3-resourcewarnings-into-exception
"""
try:
ResourceWarning
except NameError:
# Python 2, PyPy
yield
return
# Python 3, PyPy3
with warnings.catch_warnings(record=True) as caught:
warnings.resetwarnings() # clear all filters
warnings.simplefilter('ignore') # ignore all
warnings.simplefilter('always', ResourceWarning) # add filter
yield # run tests in this context
gc.collect() # run garbage collection (for pypy3)
if not caught:
return
pytest.fail('The following file descriptors were not closed properly:\n' +
'\n'.join((str(warning.message) for warning in caught)),
pytrace=False)
def SetDebugCallback(option, opt, value, parser, *args, **kwargs):
setBasicLoggerDEBUG()
warnings.resetwarnings()