def setUp(self):
super(TestDictCursor, self).setUp()
self.conn = conn = self.connections[0]
c = conn.cursor(self.cursor_type)
# create a table ane some data to query
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists dictcursor")
# include in filterwarnings since for unbuffered dict cursor warning for lack of table
# will only be propagated at start of next execute() call
c.execute("""CREATE TABLE dictcursor (name char(20), age int , DOB datetime)""")
data = [("bob", 21, "1990-02-06 23:04:56"),
("jim", 56, "1955-05-09 13:12:45"),
("fred", 100, "1911-09-12 01:01:01")]
c.executemany("insert into dictcursor values (%s,%s,%s)", data)
python类filterwarnings()的实例源码
def test_issue_3(self):
""" undefined methods datetime_or_None, date_or_None """
conn = self.connections[0]
c = conn.cursor()
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists issue3")
c.execute("create table issue3 (d date, t time, dt datetime, ts timestamp)")
try:
c.execute("insert into issue3 (d, t, dt, ts) values (%s,%s,%s,%s)", (None, None, None, None))
c.execute("select d from issue3")
self.assertEqual(None, c.fetchone()[0])
c.execute("select t from issue3")
self.assertEqual(None, c.fetchone()[0])
c.execute("select dt from issue3")
self.assertEqual(None, c.fetchone()[0])
c.execute("select ts from issue3")
self.assertTrue(isinstance(c.fetchone()[0], datetime.datetime))
finally:
c.execute("drop table issue3")
def test_issue_8(self):
""" Primary Key and Index error when selecting data """
conn = self.connections[0]
c = conn.cursor()
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists test")
c.execute("""CREATE TABLE `test` (`station` int(10) NOT NULL DEFAULT '0', `dh`
datetime NOT NULL DEFAULT '2015-01-01 00:00:00', `echeance` int(1) NOT NULL
DEFAULT '0', `me` double DEFAULT NULL, `mo` double DEFAULT NULL, PRIMARY
KEY (`station`,`dh`,`echeance`)) ENGINE=MyISAM DEFAULT CHARSET=latin1;""")
try:
self.assertEqual(0, c.execute("SELECT * FROM test"))
c.execute("ALTER TABLE `test` ADD INDEX `idx_station` (`station`)")
self.assertEqual(0, c.execute("SELECT * FROM test"))
finally:
c.execute("drop table test")
def test_issue_13(self):
""" can't handle large result fields """
conn = self.connections[0]
cur = conn.cursor()
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
cur.execute("drop table if exists issue13")
try:
cur.execute("create table issue13 (t text)")
# ticket says 18k
size = 18*1024
cur.execute("insert into issue13 (t) values (%s)", ("x" * size,))
cur.execute("select t from issue13")
# use assertTrue so that obscenely huge error messages don't print
r = cur.fetchone()[0]
self.assertTrue("x" * size == r)
finally:
cur.execute("drop table issue13")
def test_issue_17(self):
"""could not connect mysql use passwod"""
conn = self.connections[0]
host = self.databases[0]["host"]
db = self.databases[0]["db"]
c = conn.cursor()
# grant access to a table to a user with a password
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists issue17")
c.execute("create table issue17 (x varchar(32) primary key)")
c.execute("insert into issue17 (x) values ('hello, world!')")
c.execute("grant all privileges on %s.issue17 to 'issue17user'@'%%' identified by '1234'" % db)
conn.commit()
conn2 = pymysql.connect(host=host, user="issue17user", passwd="1234", db=db)
c2 = conn2.cursor()
c2.execute("select x from issue17")
self.assertEqual("hello, world!", c2.fetchone()[0])
finally:
c.execute("drop table issue17")
def disabled_test_issue_54(self):
conn = self.connections[0]
c = conn.cursor()
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists issue54")
big_sql = "select * from issue54 where "
big_sql += " and ".join("%d=%d" % (i,i) for i in range(0, 100000))
try:
c.execute("create table issue54 (id integer primary key)")
c.execute("insert into issue54 (id) values (7)")
c.execute(big_sql)
self.assertEqual(7, c.fetchone()[0])
finally:
c.execute("drop table issue54")
def setUp(self):
super(TestDictCursor, self).setUp()
self.conn = conn = self.connections[0]
c = conn.cursor(self.cursor_type)
# create a table ane some data to query
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
c.execute("drop table if exists dictcursor")
# include in filterwarnings since for unbuffered dict cursor warning for lack of table
# will only be propagated at start of next execute() call
c.execute("""CREATE TABLE dictcursor (name char(20), age int , DOB datetime)""")
data = [("bob", 21, "1990-02-06 23:04:56"),
("jim", 56, "1955-05-09 13:12:45"),
("fred", 100, "1911-09-12 01:01:01")]
c.executemany("insert into dictcursor values (%s,%s,%s)", data)
def expect_warnings(*messages):
"""Context manager to expect warnings with the given messages."""
filters = [dict(action='ignore',
category=sa_exc.SAPendingDeprecationWarning)]
if not messages:
filters.append(dict(action='ignore',
category=sa_exc.SAWarning))
else:
filters.extend(dict(action='ignore',
message=message,
category=sa_exc.SAWarning)
for message in messages)
for f in filters:
warnings.filterwarnings(**f)
try:
yield
finally:
resetwarnings()
def emits_warning_on(db, *warnings):
"""Mark a test as emitting a warning on a specific dialect.
With no arguments, squelches all SAWarning failures. Or pass one or more
strings; these will be matched to the root of the warning description by
warnings.filterwarnings().
"""
spec = db_spec(db)
@decorator
def decorate(fn, *args, **kw):
if isinstance(db, util.string_types):
if not spec(config._current):
return fn(*args, **kw)
else:
wrapped = emits_warning(*warnings)(fn)
return wrapped(*args, **kw)
else:
if not _is_excluded(*db):
return fn(*args, **kw)
else:
wrapped = emits_warning(*warnings)(fn)
return wrapped(*args, **kw)
return decorate
def uses_deprecated(*messages):
"""Mark a test as immune from fatal deprecation warnings.
With no arguments, squelches all SADeprecationWarning failures.
Or pass one or more strings; these will be matched to the root
of the warning description by warnings.filterwarnings().
As a special case, you may pass a function name prefixed with //
and it will be re-written as needed to match the standard warning
verbiage emitted by the sqlalchemy.util.deprecated decorator.
"""
@decorator
def decorate(fn, *args, **kw):
with expect_deprecated(*messages):
return fn(*args, **kw)
return decorate
def run(): # pragma: no cover
"""Run Markdown from the command line."""
# Parse options and adjust logging level if necessary
options, logging_level = parse_options()
if not options:
sys.exit(2)
logger.setLevel(logging_level)
console_handler = logging.StreamHandler()
logger.addHandler(console_handler)
if logging_level <= WARNING:
# Ensure deprecation warnings get displayed
warnings.filterwarnings('default')
logging.captureWarnings(True)
warn_logger = logging.getLogger('py.warnings')
warn_logger.addHandler(console_handler)
# Run
markdown.markdownFromFile(**options)
def fit(self, X, Y=None):
import scipy.sparse
import sklearn.decomposition
self.preprocessor = sklearn.decomposition.KernelPCA(
n_components=self.n_components, kernel=self.kernel,
degree=self.degree, gamma=self.gamma, coef0=self.coef0,
remove_zero_eig=True)
if scipy.sparse.issparse(X):
X = X.astype(np.float64)
with warnings.catch_warnings():
warnings.filterwarnings("error")
self.preprocessor.fit(X)
# Raise an informative error message, equation is based ~line 249 in
# kernel_pca.py in scikit-learn
if len(self.preprocessor.alphas_ / self.preprocessor.lambdas_) == 0:
raise ValueError('KernelPCA removed all features!')
return self
def fit(self, X, Y=None):
import sklearn.decomposition
self.preprocessor = sklearn.decomposition.FastICA(
n_components=self.n_components, algorithm=self.algorithm,
fun=self.fun, whiten=self.whiten, random_state=self.random_state
)
# Make the RuntimeWarning an Exception!
with warnings.catch_warnings():
warnings.filterwarnings("error")
try:
self.preprocessor.fit(X)
except ValueError as e:
if 'array must not contain infs or NaNs' in e.args[0]:
raise ValueError("Bug in scikit-learn: https://github.com/scikit-learn/scikit-learn/pull/2738")
else:
import traceback
traceback.format_exc()
raise ValueError()
return self
def expect_warnings(*messages):
"""Context manager to expect warnings with the given messages."""
filters = [dict(action='ignore',
category=sa_exc.SAPendingDeprecationWarning)]
if not messages:
filters.append(dict(action='ignore',
category=sa_exc.SAWarning))
else:
filters.extend(dict(action='ignore',
message=message,
category=sa_exc.SAWarning)
for message in messages)
for f in filters:
warnings.filterwarnings(**f)
try:
yield
finally:
resetwarnings()
def emits_warning_on(db, *warnings):
"""Mark a test as emitting a warning on a specific dialect.
With no arguments, squelches all SAWarning failures. Or pass one or more
strings; these will be matched to the root of the warning description by
warnings.filterwarnings().
"""
spec = db_spec(db)
@decorator
def decorate(fn, *args, **kw):
if isinstance(db, util.string_types):
if not spec(config._current):
return fn(*args, **kw)
else:
wrapped = emits_warning(*warnings)(fn)
return wrapped(*args, **kw)
else:
if not _is_excluded(*db):
return fn(*args, **kw)
else:
wrapped = emits_warning(*warnings)(fn)
return wrapped(*args, **kw)
return decorate
def uses_deprecated(*messages):
"""Mark a test as immune from fatal deprecation warnings.
With no arguments, squelches all SADeprecationWarning failures.
Or pass one or more strings; these will be matched to the root
of the warning description by warnings.filterwarnings().
As a special case, you may pass a function name prefixed with //
and it will be re-written as needed to match the standard warning
verbiage emitted by the sqlalchemy.util.deprecated decorator.
"""
@decorator
def decorate(fn, *args, **kw):
with expect_deprecated(*messages):
return fn(*args, **kw)
return decorate
def _mkstemp(*args, **kw):
old_open = os.open
try:
# temporarily bypass sandboxing
os.open = os_open
return tempfile.mkstemp(*args, **kw)
finally:
# and then put it back
os.open = old_open
# Silence the PEP440Warning by default, so that end users don't get hit by it
# randomly just because they use pkg_resources. We want to append the rule
# because we want earlier uses of filterwarnings to take precedence over this
# one.
def _mkstemp(*args, **kw):
old_open = os.open
try:
# temporarily bypass sandboxing
os.open = os_open
return tempfile.mkstemp(*args, **kw)
finally:
# and then put it back
os.open = old_open
# Silence the PEP440Warning by default, so that end users don't get hit by it
# randomly just because they use pkg_resources. We want to append the rule
# because we want earlier uses of filterwarnings to take precedence over this
# one.
def _mkstemp(*args, **kw):
old_open = os.open
try:
# temporarily bypass sandboxing
os.open = os_open
return tempfile.mkstemp(*args, **kw)
finally:
# and then put it back
os.open = old_open
# Silence the PEP440Warning by default, so that end users don't get hit by it
# randomly just because they use pkg_resources. We want to append the rule
# because we want earlier uses of filterwarnings to take precedence over this
# one.
def _mkstemp(*args, **kw):
old_open = os.open
try:
# temporarily bypass sandboxing
os.open = os_open
return tempfile.mkstemp(*args, **kw)
finally:
# and then put it back
os.open = old_open
# Silence the PEP440Warning by default, so that end users don't get hit by it
# randomly just because they use pkg_resources. We want to append the rule
# because we want earlier uses of filterwarnings to take precedence over this
# one.