def test_allnans(self):
mat = np.array([np.nan]*9).reshape(3, 3)
for axis in [None, 0, 1]:
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
assert_(np.isnan(np.nanpercentile(mat, 60, axis=axis)).all())
if axis is None:
assert_(len(w) == 1)
else:
assert_(len(w) == 3)
assert_(issubclass(w[0].category, RuntimeWarning))
# Check scalar
assert_(np.isnan(np.nanpercentile(np.nan, 60)))
if axis is None:
assert_(len(w) == 2)
else:
assert_(len(w) == 4)
assert_(issubclass(w[0].category, RuntimeWarning))
python类simplefilter()的实例源码
def test_clear_and_catch_warnings():
# Initial state of module, no warnings
my_mod = _get_fresh_mod()
assert_equal(getattr(my_mod, '__warningregistry__', {}), {})
with clear_and_catch_warnings(modules=[my_mod]):
warnings.simplefilter('ignore')
warnings.warn('Some warning')
assert_equal(my_mod.__warningregistry__, {})
# Without specified modules, don't clear warnings during context
with clear_and_catch_warnings():
warnings.simplefilter('ignore')
warnings.warn('Some warning')
assert_warn_len_equal(my_mod, 1)
# Confirm that specifying module keeps old warning, does not add new
with clear_and_catch_warnings(modules=[my_mod]):
warnings.simplefilter('ignore')
warnings.warn('Another warning')
assert_warn_len_equal(my_mod, 1)
# Another warning, no module spec does add to warnings dict, except on
# Python 3.4 (see comments in `assert_warn_len_equal`)
with clear_and_catch_warnings():
warnings.simplefilter('ignore')
warnings.warn('Another warning')
assert_warn_len_equal(my_mod, 2)
def test_topython(self):
# Tests some communication issues with Python.
assert_equal(1, int(array(1)))
assert_equal(1.0, float(array(1)))
assert_equal(1, int(array([[[1]]])))
assert_equal(1.0, float(array([[1]])))
self.assertRaises(TypeError, float, array([1, 1]))
with warnings.catch_warnings():
warnings.simplefilter('ignore', UserWarning)
assert_(np.isnan(float(array([1], mask=[1]))))
a = array([1, 2, 3], mask=[1, 0, 0])
self.assertRaises(TypeError, lambda:float(a))
assert_equal(float(a[-1]), 3.)
self.assertTrue(np.isnan(float(a[0])))
self.assertRaises(TypeError, int, a)
assert_equal(int(a[-1]), 3)
self.assertRaises(MAError, lambda:int(a[0]))
def test_warning_raised(self):
ret_val = "lololol"
for test in [{"args": {}, # No args just means deprecated
"warning": deprecation.DeprecatedWarning},
{"args": {"deprecated_in": "1.0",
"current_version": "2.0"},
"warning": deprecation.DeprecatedWarning},
{"args": {"deprecated_in": "1.0",
"removed_in": "2.0",
"current_version": "2.0"},
"warning": deprecation.UnsupportedWarning}]:
with self.subTest(**test):
class Test(object):
@deprecation.deprecated(**test["args"])
def method(self):
return ret_val
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter("always")
sot = Test()
self.assertEqual(ret_val, sot.method())
self.assertEqual(len(caught_warnings), 1)
self.assertEqual(caught_warnings[0].category, test["warning"])
def disable_warnings(category=exceptions.HTTPWarning):
"""
Helper for quickly disabling all urllib3 warnings.
"""
warnings.simplefilter('ignore', category)
def install_warning_logger():
# Enable our Deprecation Warnings
warnings.simplefilter("default", PipDeprecationWarning, append=True)
global _warnings_showwarning
if _warnings_showwarning is None:
_warnings_showwarning = warnings.showwarning
warnings.showwarning = _showwarning
def main(argv):
parser = build_cli_parser()
opts, args = parser.parse_args(argv)
if not opts.url or not opts.token or not opts.sensor:
print "Missing required param; run with --help for usage"
sys.exit(-1)
cb = cbapi.CbApi(opts.url, token=opts.token, ssl_verify=opts.ssl_verify, ignore_system_proxy=True)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
sensor = cb.sensor(opts.sensor)
pprint.pprint(sensor)
def disable_warnings(category=exceptions.HTTPWarning):
"""
Helper for quickly disabling all urllib3 warnings.
"""
warnings.simplefilter('ignore', category)
def disable_warnings(category=exceptions.HTTPWarning):
"""
Helper for quickly disabling all urllib3 warnings.
"""
warnings.simplefilter('ignore', category)
def install_warning_logger():
# Enable our Deprecation Warnings
warnings.simplefilter("default", PipDeprecationWarning, append=True)
global _warnings_showwarning
if _warnings_showwarning is None:
_warnings_showwarning = warnings.showwarning
warnings.showwarning = _showwarning
def test_propertyInitialization(self):
"""
Tests for the correct initialization of 'property' kind properties
based on whether command line is set, and overrides via launch().
"""
# First, test with defaults
comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml')
self.assertFalse('initial' in comp.cmdline_args)
self.assertFalse('cmdline' in comp.initialize_props)
comp.releaseObject()
# Test with overrides
comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml',
properties={'cmdline':'override', 'initial':'override'})
self.assertFalse('initial' in comp.cmdline_args)
self.assertFalse('cmdline' in comp.initialize_props)
self.assertEquals('override', comp.cmdline)
self.assertEquals('override', comp.initial)
comp.releaseObject()
# Test with overrides in deprecated 'execparams' and 'configure' arguments
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml',
execparams={'cmdline':'override'}, configure={'initial':'override'})
self.assertFalse('initial' in comp.cmdline_args)
self.assertFalse('cmdline' in comp.initialize_props)
self.assertEquals('override', comp.cmdline)
self.assertEquals('override', comp.initial)
comp.releaseObject()
# Test with misplaced command line property in deprecated 'configure' argument
comp = sb.launch('sdr/dom/components/property_init/property_init.spd.xml',
configure={'cmdline':'override'})
self.assertFalse('initial' in comp.cmdline_args)
self.assertFalse('cmdline' in comp.initialize_props)
self.assertEquals('override', comp.cmdline)
comp.releaseObject()
def test_slider_slice_warning(self):
index = 3
kwargs = dict(grid_longitude=index)
coords = ('time', 'grid_latitude')
plot = Plot2D(self.cube, self.axes, coords=coords)
plot.alias(wibble=2)
wmsg = ("expected to be called with alias 'wibble' for dimension 2, "
"rather than with 'grid_longitude'")
with warnings.catch_warnings():
# Cause all warnings to raise an exception.
warnings.simplefilter('error')
with self.assertRaisesRegexp(UserWarning, wmsg):
plot(**kwargs)
def test_auto_deltas_fail_warn(self):
with warnings.catch_warnings(record=True) as ws:
warnings.simplefilter('always')
loader = BlazeLoader()
expr = bz.data(self.df, dshape=self.dshape)
from_blaze(
expr,
loader=loader,
no_deltas_rule=no_deltas_rules.warn,
missing_values=self.missing_values,
)
self.assertEqual(len(ws), 1)
w = ws[0].message
self.assertIsInstance(w, NoDeltasWarning)
self.assertIn(str(expr), str(w))
def setUp(self):
# ``warnings='ignore'`` is used to suppress ``ResourceWarning``.
# Another solution is to write the following lines when you call KNP.
# >> knp.subprocess.process.stdout.close()
# >> knp.juman.subprocess.process.stdout.close()
warnings.simplefilter('ignore', ResourceWarning)
def __init__(self):
warnings.simplefilter('always', category=DeprecationWarning)
warnings.warn("data_pipeline.schema_cache.SchematizerClient is deprecated.",
DeprecationWarning)
self.schema_id_to_schema_map = {}
self.schema_id_to_topic_map = {}
self.base_to_transformed_schema_id_map = {}
self.schema_id_to_pii_map = {}
def index_worker(self, queue, size=200):
actions = []
indexed = 0
while True:
item = queue.get()
if item is None:
break
id_submission, analysis = item
doc = {
'_index': 'fcc-comments',
'_type': 'document',
'_op_type': 'update',
'_id': id_submission,
'doc': {'analysis': analysis},
}
actions.append(doc)
if len(actions) == size:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
try:
response = bulk(self.es, actions)
indexed += response[0]
print('\tanalyzed %s/%s\t%s%%' % (indexed, self.limit,
int(indexed / self.limit * 100)))
actions = []
except ConnectionTimeout:
print('error indexing: connection timeout')
with warnings.catch_warnings():
warnings.simplefilter('ignore')
response = bulk(self.es, actions)
indexed += response[0]
print('indexed %s' % (indexed))
def bulk_index(self, queue, size=20):
actions = []
indexed = 0
ids = set()
while True:
item = queue.get()
if item is None:
break
doc_id = item
doc = {
'_index': 'fcc-comments',
'_type': 'document',
'_op_type': 'update',
'_id': doc_id,
'doc': {'analysis.sentiment_sig_terms_ordered': True},
}
actions.append(doc)
ids.add(doc_id)
if len(actions) == size:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
try:
response = bulk(self.es, actions)
indexed += response[0]
if not indexed % 200:
print('\tindexed %s/%s\t%s%%' % (indexed, self.limit,
int(indexed / self.limit * 100)))
actions = []
except ConnectionTimeout:
print('error indexing: connection timeout')
with warnings.catch_warnings():
warnings.simplefilter('ignore')
response = bulk(self.es, actions)
indexed += response[0]
print('indexed %s' % (indexed))
ids = list(ids)
#print('%s\n%s' % (len(ids), ' '.join(ids))
def make_table(self, *args, **kwargs):
# type: (*Any, **Any) -> str
with warnings.catch_warnings():
# PendingDeprecationWarning acknowledged.
warnings.simplefilter("ignore")
table = super(HtmlMultiDiff, self).make_table(*args, **kwargs)
def unescape_zeroonetwo(m):
return unichr(ord(m.group(1)) - ord('0'))
table = re.sub('\x02([012])', unescape_zeroonetwo, table)
return table
def disable_warnings(category=exceptions.HTTPWarning):
"""
Helper for quickly disabling all urllib3 warnings.
"""
warnings.simplefilter('ignore', category)
def get_random_giphy(phrase):
"""Return the URL of a random GIF related to the phrase, if possible"""
with warnings.catch_warnings():
warnings.simplefilter('ignore')
giphy = giphypop.Giphy()
results = giphy.search_list(phrase=phrase, limit=100)
if not results:
raise ValueError('There were no results for that phrase')
return random.choice(results).media_url