def emits_warning_on(db, *messages):
"""Mark a test as emitting a warning on a specific dialect.
With no arguments, squelches all SAWarning failures. Or pass one or more
strings; these will be matched to the root of the warning description by
warnings.filterwarnings().
Note that emits_warning_on does **not** assert that the warnings
were in fact seen.
"""
@decorator
def decorate(fn, *args, **kw):
with expect_warnings_on(db, assert_=False, *messages):
return fn(*args, **kw)
return decorate
python类filterwarnings()的实例源码
def uses_deprecated(*messages):
"""Mark a test as immune from fatal deprecation warnings.
With no arguments, squelches all SADeprecationWarning failures.
Or pass one or more strings; these will be matched to the root
of the warning description by warnings.filterwarnings().
As a special case, you may pass a function name prefixed with //
and it will be re-written as needed to match the standard warning
verbiage emitted by the sqlalchemy.util.deprecated decorator.
Note that uses_deprecated does **not** assert that the warnings
were in fact seen.
"""
@decorator
def decorate(fn, *args, **kw):
with expect_deprecated(*messages, assert_=False):
return fn(*args, **kw)
return decorate
def assert_warnings(fn, warning_msgs, regex=False):
"""Assert that each of the given warnings are emitted by fn."""
from .assertions import eq_
with warnings.catch_warnings(record=True) as log:
# ensure that nothing is going into __warningregistry__
warnings.filterwarnings("always")
result = fn()
for warning in log:
popwarn = warning_msgs.pop(0)
if regex:
assert re.match(popwarn, str(warning.message))
else:
eq_(popwarn, str(warning.message))
return result
def run(): # pragma: no cover
"""Run Markdown from the command line."""
# Parse options and adjust logging level if necessary
options, logging_level = parse_options()
if not options:
sys.exit(2)
logger.setLevel(logging_level)
console_handler = logging.StreamHandler()
logger.addHandler(console_handler)
if logging_level <= WARNING:
# Ensure deprecation warnings get displayed
warnings.filterwarnings('default')
logging.captureWarnings(True)
warn_logger = logging.getLogger('py.warnings')
warn_logger.addHandler(console_handler)
# Run
markdown.markdownFromFile(**options)
def runWithWarningsSuppressed(suppressedWarnings, f, *a, **kw):
"""Run the function C{f}, but with some warnings suppressed.
@param suppressedWarnings: A list of arguments to pass to filterwarnings.
Must be a sequence of 2-tuples (args, kwargs).
@param f: A callable, followed by its arguments and keyword arguments
"""
for args, kwargs in suppressedWarnings:
warnings.filterwarnings(*args, **kwargs)
addedFilters = warnings.filters[:len(suppressedWarnings)]
try:
result = f(*a, **kw)
except:
exc_info = sys.exc_info()
_resetWarningFilters(None, addedFilters)
raise exc_info[0], exc_info[1], exc_info[2]
else:
if isinstance(result, defer.Deferred):
result.addBoth(_resetWarningFilters, addedFilters)
else:
_resetWarningFilters(None, addedFilters)
return result
def test_norm_hash_name(self):
"""norm_hash_name()"""
from itertools import chain
from passlib.crypto.digest import norm_hash_name, _known_hash_names
# snapshot warning state, ignore unknown hash warnings
ctx = warnings.catch_warnings()
ctx.__enter__()
self.addCleanup(ctx.__exit__)
warnings.filterwarnings("ignore", '.*unknown hash')
# test string types
self.assertEqual(norm_hash_name(u("MD4")), "md4")
self.assertEqual(norm_hash_name(b"MD4"), "md4")
self.assertRaises(TypeError, norm_hash_name, None)
# test selected results
for row in chain(_known_hash_names, self.norm_hash_samples):
for idx, format in enumerate(self.norm_hash_formats):
correct = row[idx]
for value in row:
result = norm_hash_name(value, format)
self.assertEqual(result, correct,
"name=%r, format=%r:" % (value,
format))
def test_90_special(self):
"""test marker option & special behavior"""
warnings.filterwarnings("ignore", "passing settings to .*.hash\(\) is deprecated")
handler = self.handler
# preserve hash if provided
self.assertEqual(handler.genhash("stub", "!asd"), "!asd")
# use marker if no hash
self.assertEqual(handler.genhash("stub", ""), handler.default_marker)
self.assertEqual(handler.hash("stub"), handler.default_marker)
self.assertEqual(handler.using().default_marker, handler.default_marker)
# custom marker
self.assertEqual(handler.genhash("stub", "", marker="*xxx"), "*xxx")
self.assertEqual(handler.hash("stub", marker="*xxx"), "*xxx")
self.assertEqual(handler.using(marker="*xxx").hash("stub"), "*xxx")
# reject invalid marker
self.assertRaises(ValueError, handler.genhash, 'stub', "", marker='abc')
self.assertRaises(ValueError, handler.hash, 'stub', marker='abc')
self.assertRaises(ValueError, handler.using, marker='abc')
def test_15_min_verify_time(self):
"""test get_min_verify_time() method"""
# silence deprecation warnings for min verify time
warnings.filterwarnings("ignore", category=DeprecationWarning)
pa = CryptPolicy()
self.assertEqual(pa.get_min_verify_time(), 0)
self.assertEqual(pa.get_min_verify_time('admin'), 0)
pb = pa.replace(min_verify_time=.1)
self.assertEqual(pb.get_min_verify_time(), 0)
self.assertEqual(pb.get_min_verify_time('admin'), 0)
#===================================================================
# serialization
#===================================================================
def setUpWarnings(self):
"""helper to init warning filters before subclass setUp()"""
if self.resetWarningState:
ctx = reset_warnings()
ctx.__enter__()
self.addCleanup(ctx.__exit__)
# ignore warnings about PasswordHash features deprecated in 1.7
# TODO: should be cleaned in 2.0, when support will be dropped.
# should be kept until then, so we test the legacy paths.
warnings.filterwarnings("ignore", r"the method .*\.(encrypt|genconfig|genhash)\(\) is deprecated")
warnings.filterwarnings("ignore", r"the 'vary_rounds' option is deprecated")
#---------------------------------------------------------------
# tweak message formatting so longMessage mode is only enabled
# if msg ends with ":", and turn on longMessage by default.
#---------------------------------------------------------------
def test_pipe_handle(self):
h, _ = windows_utils.pipe(overlapped=(True, True))
_winapi.CloseHandle(_)
p = windows_utils.PipeHandle(h)
self.assertEqual(p.fileno(), h)
self.assertEqual(p.handle, h)
# check garbage collection of p closes handle
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "", ResourceWarning)
del p
support.gc_collect()
try:
_winapi.CloseHandle(h)
except OSError as e:
self.assertEqual(e.winerror, 6) # ERROR_INVALID_HANDLE
else:
raise RuntimeError('expected ERROR_INVALID_HANDLE')
def emits_warning_on(db, *messages):
"""Mark a test as emitting a warning on a specific dialect.
With no arguments, squelches all SAWarning failures. Or pass one or more
strings; these will be matched to the root of the warning description by
warnings.filterwarnings().
Note that emits_warning_on does **not** assert that the warnings
were in fact seen.
"""
@decorator
def decorate(fn, *args, **kw):
with expect_warnings_on(db, assert_=False, *messages):
return fn(*args, **kw)
return decorate
def uses_deprecated(*messages):
"""Mark a test as immune from fatal deprecation warnings.
With no arguments, squelches all SADeprecationWarning failures.
Or pass one or more strings; these will be matched to the root
of the warning description by warnings.filterwarnings().
As a special case, you may pass a function name prefixed with //
and it will be re-written as needed to match the standard warning
verbiage emitted by the sqlalchemy.util.deprecated decorator.
Note that uses_deprecated does **not** assert that the warnings
were in fact seen.
"""
@decorator
def decorate(fn, *args, **kw):
with expect_deprecated(*messages, assert_=False):
return fn(*args, **kw)
return decorate
def test_empty_tuple_index(self):
# Empty tuple index creates a view
a = np.array([1, 2, 3])
assert_equal(a[()], a)
assert_(a[()].base is a)
a = np.array(0)
assert_(isinstance(a[()], np.int_))
# Regression, it needs to fall through integer and fancy indexing
# cases, so need the with statement to ignore the non-integer error.
with warnings.catch_warnings():
warnings.filterwarnings('ignore', '', DeprecationWarning)
a = np.array([1.])
assert_(isinstance(a[0.], np.float_))
a = np.array([np.array(1)], dtype=object)
assert_(isinstance(a[0.], np.ndarray))
def test_scalar_none_comparison(self):
# Scalars should still just return False and not give a warnings.
# The comparisons are flagged by pep8, ignore that.
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings('always', '', FutureWarning)
assert_(not np.float32(1) == None)
assert_(not np.str_('test') == None)
# This is dubious (see below):
assert_(not np.datetime64('NaT') == None)
assert_(np.float32(1) != None)
assert_(np.str_('test') != None)
# This is dubious (see below):
assert_(np.datetime64('NaT') != None)
assert_(len(w) == 0)
# For documentation purposes, this is why the datetime is dubious.
# At the time of deprecation this was no behaviour change, but
# it has to be considered when the deprecations are done.
assert_(np.equal(np.datetime64('NaT'), None))
def test_identity_equality_mismatch(self):
a = np.array([np.nan], dtype=object)
with warnings.catch_warnings():
warnings.filterwarnings('always', '', FutureWarning)
assert_warns(FutureWarning, np.equal, a, a)
assert_warns(FutureWarning, np.not_equal, a, a)
with warnings.catch_warnings():
warnings.filterwarnings('error', '', FutureWarning)
assert_raises(FutureWarning, np.equal, a, a)
assert_raises(FutureWarning, np.not_equal, a, a)
# And the other do not warn:
with np.errstate(invalid='ignore'):
np.less(a, a)
np.greater(a, a)
np.less_equal(a, a)
np.greater_equal(a, a)
def test_skip_footer_with_invalid(self):
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
basestr = '1 1\n2 2\n3 3\n4 4\n5 \n6 \n7 \n'
# Footer too small to get rid of all invalid values
assert_raises(ValueError, np.genfromtxt,
TextIO(basestr), skip_footer=1)
# except ValueError:
# pass
a = np.genfromtxt(
TextIO(basestr), skip_footer=1, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]))
#
a = np.genfromtxt(TextIO(basestr), skip_footer=3)
assert_equal(a, np.array([[1., 1.], [2., 2.], [3., 3.], [4., 4.]]))
#
basestr = '1 1\n2 \n3 3\n4 4\n5 \n6 6\n7 7\n'
a = np.genfromtxt(
TextIO(basestr), skip_footer=1, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.], [6., 6.]]))
a = np.genfromtxt(
TextIO(basestr), skip_footer=3, invalid_raise=False)
assert_equal(a, np.array([[1., 1.], [3., 3.], [4., 4.]]))
def test_version_2_0():
f = BytesIO()
# requires more than 2 byte for header
dt = [(("%d" % i) * 100, float) for i in range(500)]
d = np.ones(1000, dtype=dt)
format.write_array(f, d, version=(2, 0))
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings('always', '', UserWarning)
format.write_array(f, d)
assert_(w[0].category is UserWarning)
f.seek(0)
n = format.read_array(f)
assert_array_equal(d, n)
# 1.0 requested but data cannot be saved this way
assert_raises(ValueError, format.write_array, f, d, (1, 0))
def test_version_2_0_memmap():
# requires more than 2 byte for header
dt = [(("%d" % i) * 100, float) for i in range(500)]
d = np.ones(1000, dtype=dt)
tf = tempfile.mktemp('', 'mmap', dir=tempdir)
# 1.0 requested but data cannot be saved this way
assert_raises(ValueError, format.open_memmap, tf, mode='w+', dtype=d.dtype,
shape=d.shape, version=(1, 0))
ma = format.open_memmap(tf, mode='w+', dtype=d.dtype,
shape=d.shape, version=(2, 0))
ma[...] = d
del ma
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings('always', '', UserWarning)
ma = format.open_memmap(tf, mode='w+', dtype=d.dtype,
shape=d.shape, version=None)
assert_(w[0].category is UserWarning)
ma[...] = d
del ma
ma = format.open_memmap(tf, mode='r')
assert_array_equal(ma, d)
def test_basic(self):
a = [1, 2, 3]
assert_equal(insert(a, 0, 1), [1, 1, 2, 3])
assert_equal(insert(a, 3, 1), [1, 2, 3, 1])
assert_equal(insert(a, [1, 1, 1], [1, 2, 3]), [1, 1, 2, 3, 2, 3])
assert_equal(insert(a, 1, [1, 2, 3]), [1, 1, 2, 3, 2, 3])
assert_equal(insert(a, [1, -1, 3], 9), [1, 9, 2, 9, 3, 9])
assert_equal(insert(a, slice(-1, None, -1), 9), [9, 1, 9, 2, 9, 3])
assert_equal(insert(a, [-1, 1, 3], [7, 8, 9]), [1, 8, 2, 7, 3, 9])
b = np.array([0, 1], dtype=np.float64)
assert_equal(insert(b, 0, b[0]), [0., 0., 1.])
assert_equal(insert(b, [], []), b)
# Bools will be treated differently in the future:
# assert_equal(insert(a, np.array([True]*4), 9), [9, 1, 9, 2, 9, 3, 9])
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings('always', '', FutureWarning)
assert_equal(
insert(a, np.array([True] * 4), 9), [1, 9, 9, 9, 9, 2, 3])
assert_(w[0].category is FutureWarning)
def test_out_nan(self):
with warnings.catch_warnings(record=True):
warnings.filterwarnings('always', '', RuntimeWarning)
o = np.zeros((4,))
d = np.ones((3, 4))
d[2, 1] = np.nan
assert_equal(np.percentile(d, 0, 0, out=o), o)
assert_equal(
np.percentile(d, 0, 0, interpolation='nearest', out=o), o)
o = np.zeros((3,))
assert_equal(np.percentile(d, 1, 1, out=o), o)
assert_equal(
np.percentile(d, 1, 1, interpolation='nearest', out=o), o)
o = np.zeros(())
assert_equal(np.percentile(d, 1, out=o), o)
assert_equal(
np.percentile(d, 1, interpolation='nearest', out=o), o)
def test_basic(self):
a0 = np.array(1)
a1 = np.arange(2)
a2 = np.arange(6).reshape(2, 3)
assert_equal(np.median(a0), 1)
assert_allclose(np.median(a1), 0.5)
assert_allclose(np.median(a2), 2.5)
assert_allclose(np.median(a2, axis=0), [1.5, 2.5, 3.5])
assert_equal(np.median(a2, axis=1), [1, 4])
assert_allclose(np.median(a2, axis=None), 2.5)
a = np.array([0.0444502, 0.0463301, 0.141249, 0.0606775])
assert_almost_equal((a[1] + a[3]) / 2., np.median(a))
a = np.array([0.0463301, 0.0444502, 0.141249])
assert_equal(a[0], np.median(a))
a = np.array([0.0444502, 0.141249, 0.0463301])
assert_equal(a[-1], np.median(a))
# check array scalar result
assert_equal(np.median(a).ndim, 0)
a[1] = np.nan
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings('always', '', RuntimeWarning)
assert_equal(np.median(a).ndim, 0)
assert_(w[0].category is RuntimeWarning)
def test_inplace_addition_array_type(self):
# Test of inplace additions
for t in self.othertypes:
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings("always")
(x, y, xm) = (_.astype(t) for _ in self.uint8data)
m = xm.mask
a = arange(10, dtype=t)
a[-1] = masked
x += a
xm += a
assert_equal(x, y + a)
assert_equal(xm, y + a)
assert_equal(xm.mask, mask_or(m, a.mask))
assert_equal(len(w), 0, "Failed on type=%s." % t)
def test_inplace_subtraction_array_type(self):
# Test of inplace subtractions
for t in self.othertypes:
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings("always")
(x, y, xm) = (_.astype(t) for _ in self.uint8data)
m = xm.mask
a = arange(10, dtype=t)
a[-1] = masked
x -= a
xm -= a
assert_equal(x, y - a)
assert_equal(xm, y - a)
assert_equal(xm.mask, mask_or(m, a.mask))
assert_equal(len(w), 0, "Failed on type=%s." % t)
def test_inplace_multiplication_array_type(self):
# Test of inplace multiplication
for t in self.othertypes:
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings("always")
(x, y, xm) = (_.astype(t) for _ in self.uint8data)
m = xm.mask
a = arange(10, dtype=t)
a[-1] = masked
x *= a
xm *= a
assert_equal(x, y * a)
assert_equal(xm, y * a)
assert_equal(xm.mask, mask_or(m, a.mask))
assert_equal(len(w), 0, "Failed on type=%s." % t)
def test_inplace_floor_division_array_type(self):
# Test of inplace division
for t in self.othertypes:
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings("always")
(x, y, xm) = (_.astype(t) for _ in self.uint8data)
m = xm.mask
a = arange(10, dtype=t)
a[-1] = masked
x //= a
xm //= a
assert_equal(x, y // a)
assert_equal(xm, y // a)
assert_equal(
xm.mask,
mask_or(mask_or(m, a.mask), (a == t(0)))
)
assert_equal(len(w), 0, "Failed on type=%s." % t)
def test_issue_3(self):
""" undefined methods datetime_or_None, date_or_None """
conn = self.connections[0]
c = conn.cursor()
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists issue3")
c.execute("create table issue3 (d date, t time, dt datetime, ts timestamp)")
try:
c.execute("insert into issue3 (d, t, dt, ts) values (%s,%s,%s,%s)", (None, None, None, None))
c.execute("select d from issue3")
self.assertEqual(None, c.fetchone()[0])
c.execute("select t from issue3")
self.assertEqual(None, c.fetchone()[0])
c.execute("select dt from issue3")
self.assertEqual(None, c.fetchone()[0])
c.execute("select ts from issue3")
self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime))
finally:
c.execute("drop table issue3")
def test_issue_8(self):
""" Primary Key and Index error when selecting data """
conn = self.connections[0]
c = conn.cursor()
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists test")
c.execute("""CREATE TABLE `test` (`station` int(10) NOT NULL DEFAULT '0', `dh`
datetime NOT NULL DEFAULT '2015-01-01 00:00:00', `echeance` int(1) NOT NULL
DEFAULT '0', `me` double DEFAULT NULL, `mo` double DEFAULT NULL, PRIMARY
KEY (`station`,`dh`,`echeance`)) ENGINE=MyISAM DEFAULT CHARSET=latin1;""")
try:
self.assertEqual(0, c.execute("SELECT * FROM test"))
c.execute("ALTER TABLE `test` ADD INDEX `idx_station` (`station`)")
self.assertEqual(0, c.execute("SELECT * FROM test"))
finally:
c.execute("drop table test")
def test_issue_13(self):
""" can't handle large result fields """
conn = self.connections[0]
cur = conn.cursor()
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
cur.execute("drop table if exists issue13")
try:
cur.execute("create table issue13 (t text)")
# ticket says 18k
size = 18*1024
cur.execute("insert into issue13 (t) values (%s)", ("x" * size,))
cur.execute("select t from issue13")
# use assertTrue so that obscenely huge error messages don't print
r = cur.fetchone()[0]
self.assertTrue("x" * size == r)
finally:
cur.execute("drop table issue13")
def test_issue_17(self):
"""could not connect mysql use passwod"""
conn = self.connections[0]
host = self.databases[0]["host"]
db = self.databases[0]["db"]
c = conn.cursor()
# grant access to a table to a user with a password
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists issue17")
c.execute("create table issue17 (x varchar(32) primary key)")
c.execute("insert into issue17 (x) values ('hello, world!')")
c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
conn.commit()
conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
c2 = conn2.cursor()
c2.execute("select x from issue17")
self.assertEqual("hello, world!", c2.fetchone()[0])
finally:
c.execute("drop table issue17")
def disabled_test_issue_54(self):
conn = self.connections[0]
c = conn.cursor()
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists issue54")
big_sql = "select * from issue54 where "
big_sql += " and ".join("%d=%d" % (i,i) for i in range(0, 100000))
try:
c.execute("create table issue54 (id integer primary key)")
c.execute("insert into issue54 (id) values (7)")
c.execute(big_sql)
self.assertEqual(7, c.fetchone()[0])
finally:
c.execute("drop table issue54")