def test_interp_alt_scipy(self):
tm._skip_if_no_scipy()
df = DataFrame({'A': [1, 2, np.nan, 4, 5, np.nan, 7],
'C': [1, 2, 3, 5, 8, 13, 21]})
result = df.interpolate(method='barycentric')
expected = df.copy()
expected.ix[2, 'A'] = 3
expected.ix[5, 'A'] = 6
assert_frame_equal(result, expected)
result = df.interpolate(method='barycentric', downcast='infer')
assert_frame_equal(result, expected.astype(np.int64))
result = df.interpolate(method='krogh')
expectedk = df.copy()
expectedk['A'] = expected['A']
assert_frame_equal(result, expectedk)
_skip_if_no_pchip()
import scipy
result = df.interpolate(method='pchip')
expected.ix[2, 'A'] = 3
if LooseVersion(scipy.__version__) >= '0.17.0':
expected.ix[5, 'A'] = 6.0
else:
expected.ix[5, 'A'] = 6.125
assert_frame_equal(result, expected)
python类__version__()的实例源码
test_generic.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 50
收藏 0
点赞 0
评论 0
testing.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def _skip_if_mpl_1_5():
import matplotlib
v = matplotlib.__version__
if v > LooseVersion('1.4.3') or v[0] == '0':
import nose
raise nose.SkipTest("matplotlib 1.5")
testing.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def _skip_if_no_xarray():
try:
import xarray
except ImportError:
import nose
raise nose.SkipTest("xarray not installed")
v = xarray.__version__
if v < LooseVersion('0.7.0'):
import nose
raise nose.SkipTest("xarray not version is too low: {0}".format(v))
testing.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def _incompat_bottleneck_version(method):
""" skip if we have bottleneck installed
and its >= 1.0
as we don't match the nansum/nanprod behavior for all-nan
ops, see GH9422
"""
if method not in ['sum','prod']:
return False
try:
import bottleneck as bn
return bn.__version__ >= LooseVersion('1.0')
except ImportError:
return False
def __init__(self, package=None, raise_warnings=None):
if raise_warnings is None and (
not hasattr(np, '__version__') or '.dev0' in np.__version__):
raise_warnings = "develop"
elif raise_warnings is None:
raise_warnings = "release"
package_name = None
if package is None:
f = sys._getframe(1)
package_path = f.f_locals.get('__file__', None)
if package_path is None:
raise AssertionError
package_path = os.path.dirname(package_path)
package_name = f.f_locals.get('__name__', None)
elif isinstance(package, type(os)):
package_path = os.path.dirname(package.__file__)
package_name = getattr(package, '__name__', None)
else:
package_path = str(package)
self.package_path = package_path
# Find the package name under test; this name is used to limit coverage
# reporting (if enabled).
if package_name is None:
package_name = get_package_name(package_path)
self.package_name = package_name
# Set to "release" in constructor in maintenance branches.
self.raise_warnings = raise_warnings
def _numpy_tester():
if hasattr(np, "__version__") and ".dev0" in np.__version__:
mode = "develop"
else:
mode = "release"
return NoseTester(raise_warnings=mode, depth=1)
def _numpy_tester():
if hasattr(np, "__version__") and ".dev0" in np.__version__:
mode = "develop"
else:
mode = "release"
return NoseTester(raise_warnings=mode, depth=1)
def _fix_fill(fill):
"""Helper to fix bug on old scipy"""
if LooseVersion(scipy.__version__) < LooseVersion('0.12'):
fill = fill[:, np.newaxis]
return fill
def check_cdist(self, metric, kwargs, D_true):
if metric == 'canberra' and cmp_version(scipy.__version__, '0.9') <= 0:
raise SkipTest("Canberra distance incorrect in scipy < 0.9")
dm = DistanceMetric.get_metric(metric, **kwargs)
D12 = dm.pairwise(self.X1, self.X2)
assert_array_almost_equal(D12, D_true)
def check_pdist(self, metric, kwargs, D_true):
if metric == 'canberra' and cmp_version(scipy.__version__, '0.9') <= 0:
raise SkipTest("Canberra distance incorrect in scipy < 0.9")
dm = DistanceMetric.get_metric(metric, **kwargs)
D12 = dm.pairwise(self.X1)
assert_array_almost_equal(D12, D_true)
def _numpy_tester():
if hasattr(np, "__version__") and ".dev0" in np.__version__:
mode = "develop"
else:
mode = "release"
return NoseTester(raise_warnings=mode, depth=1)
def main(pkl_list, name_list, cut=sys.maxint):
pickles = plot_util.load_pickles(name_list, pkl_list)
best_dict, idx_dict, keys = plot_util.get_best_dict(name_list, pickles,
cut=cut)
for k in keys:
sys.stdout.write("%10s: %s experiment(s)\n" % (k, len(best_dict[k])))
sys.stdout.write("Unpaired t-tests-----------------------------------------------------\n")
# TODO: replace by itertools
for idx, k in enumerate(keys):
if len(keys) > 1:
for j in keys[idx+1:]:
t_true, p_true = stats.ttest_ind(best_dict[k], best_dict[j])
rounded_t_true, rounded_p_true = stats.ttest_ind(numpy.round(best_dict[k], 3),
numpy.round(best_dict[j], 3))
sys.stdout.write("%10s vs %10s\n" % (k, j))
sys.stdout.write("Standard independent 2 sample test, equal population variance\n")
sys.stdout.write(" "*24 + " T: %10.5e, p-value: %10.5e (%5.3f%%) \n" %
(t_true, p_true, p_true*100))
sys.stdout.write("Rounded: ")
sys.stdout.write(" T: %10.5e, p-value: %10.5e (%5.3f%%)\n" %
(rounded_t_true, rounded_p_true, rounded_p_true*100))
if tuple(map(int, (scipy.__version__.split(".")))) >= (0, 11, 0):
# print scipy.__version__ >= '0.11.0'
t_false, p_false = stats.ttest_ind(best_dict[k], best_dict[j], equal_var=False)
rounded_t_false, rounded_p_false = stats.ttest_ind(numpy.round(best_dict[k], 3),
numpy.round(best_dict[j], 3),
equal_var=False)
sys.stdout.write("Welch's t-test, no equal population variance\n")
sys.stdout.write(" "*24)
sys.stdout.write(": T: %10.5e, p-value: %10.5e (%5.3f%%)\n" %
(t_false, p_false, p_false*100))
sys.stdout.write("Rounded: ")
sys.stdout.write(": T: %10.5e, p-value: %10.5e (%5.3f%%)\n" %
(rounded_t_false, rounded_p_false, rounded_p_false*100))
sys.stdout.write("\n")
sys.stdout.write("Best Value-----------------------------------------------------------\n")
for k in keys:
sys.stdout.write("%10s: %10.5f (min: %10.5f, max: %10.5f, std: %5.3f)\n" %
(k, float(numpy.mean(best_dict[k])), float(numpy.min(best_dict[k])),
numpy.max(best_dict[k]), float(numpy.std(best_dict[k]))))
sys.stdout.write("Needed Trials--------------------------------------------------------\n")
for k in keys:
sys.stdout.write("%10s: %10.5f (min: %10.5f, max: %10.5f, std: %5.3f)\n" %
(k, float(numpy.mean(idx_dict[k])), float(numpy.min(idx_dict[k])),
numpy.max(idx_dict[k]), float(numpy.std(idx_dict[k]))))
sys.stdout.write("------------------------------------------------------------------------\n")
testing.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def package_check(pkg_name, version=None, app='pandas', checker=LooseVersion,
exc_failed_import=ImportError,
exc_failed_check=RuntimeError):
"""Check that the minimal version of the required package is installed.
Parameters
----------
pkg_name : string
Name of the required package.
version : string, optional
Minimal version number for required package.
app : string, optional
Application that is performing the check. For instance, the
name of the tutorial being executed that depends on specific
packages.
checker : object, optional
The class that will perform the version checking. Default is
distutils.version.LooseVersion.
exc_failed_import : Exception, optional
Class of the exception to be thrown if import failed.
exc_failed_check : Exception, optional
Class of the exception to be thrown if version check failed.
Examples
--------
package_check('numpy', '1.3')
package_check('networkx', '1.0', 'tutorial1')
"""
if app:
msg = '%s requires %s' % (app, pkg_name)
else:
msg = 'module requires %s' % pkg_name
if version:
msg += ' with version >= %s' % (version,)
try:
mod = __import__(pkg_name)
except ImportError:
raise exc_failed_import(msg)
if not version:
return
try:
have_version = mod.__version__
except AttributeError:
raise exc_failed_check('Cannot find version for %s' % pkg_name)
if checker(have_version) < checker(version):
raise exc_failed_check(msg)
def optimal_t_compressed(self, seq_pair, multiplicity):
"""
Find the optimal distance between the two sequences
"""
def _neg_prob(t, seq_pair, multiplicity):
"""
Probability to observe child given the the parent state, transition
matrix and the time of evolution (branch length).
Parameters
----------
t : double
Branch length (time between sequences)
parent : numpy.array
Parent sequence
child : numpy.array
Child sequence
tm : GTR
Model of evolution
Returns
-------
prob : double
Negative probability of the two given sequences
to be separated by the time t.
"""
return -1.0*self.prob_t_compressed(seq_pair, multiplicity,t, return_log=True)
try:
from scipy.optimize import minimize_scalar
opt = minimize_scalar(_neg_prob,
bounds=[0,ttconf.MAX_BRANCH_LENGTH],
method='bounded',
args=(seq_pair, multiplicity), options={'xatol':1e-8})
new_len = opt["x"]
except:
import scipy
print('legacy scipy', scipy.__version__)
from scipy.optimize import fminbound
new_len = fminbound(_neg_prob,
0,ttconf.MAX_BRANCH_LENGTH,
args=(seq_pair, multiplicity))
opt={'success':True}
if new_len > .9 * ttconf.MAX_BRANCH_LENGTH:
self.logger("WARNING: GTR.optimal_t_compressed -- The branch length seems to be very long!", 4, warn=True)
if opt["success"] != True:
# return hamming distance: number of state pairs where state differs/all pairs
new_len = np.sum(multiplicity[seq_pair[:,1]!=seq_pair[:,0]])/np.sum(multiplicity)
return new_len