def impl(self, x):
# If x is an int8 or uint8, numpy.expm1 will compute the result in
# half-precision (float16), where we want float32.
x_dtype = str(getattr(x, 'dtype', ''))
if x_dtype in ('int8', 'uint8'):
return numpy.expm1(x, sig='f')
return numpy.expm1(x)
python类expm1()的实例源码
def c_code(self, node, name, inputs, outputs, sub):
(x,) = inputs
(z,) = outputs
if node.inputs[0].type in complex_types:
raise NotImplementedError('type not supported', type)
return "%(z)s = expm1(%(x)s);" % locals()
def __init__(self, daily_returns, benchmark_daily_returns, risk_free_rate, days, period=DAILY):
assert(len(daily_returns) == len(benchmark_daily_returns))
self._portfolio = daily_returns
self._benchmark = benchmark_daily_returns
self._risk_free_rate = risk_free_rate
self._annual_factor = _annual_factor(period)
self._daily_risk_free_rate = self._risk_free_rate / self._annual_factor
self._alpha = None
self._beta = None
self._sharpe = None
self._return = np.expm1(np.log1p(self._portfolio).sum())
self._annual_return = (1 + self._return) ** (365 / days) - 1
self._benchmark_return = np.expm1(np.log1p(self._benchmark).sum())
self._benchmark_annual_return = (1 + self._benchmark_return) ** (365 / days) - 1
self._max_drawdown = None
self._volatility = None
self._annual_volatility = None
self._benchmark_volatility = None
self._benchmark_annual_volatility = None
self._information_ratio = None
self._sortino = None
self._tracking_error = None
self._annual_tracking_error = None
self._downside_risk = None
self._annual_downside_risk = None
self._calmar = None
self._avg_excess_return = None
def mape_ln(y,d):
c=d.get_label()
result=np.sum(np.abs(np.expm1(y)-np.abs(np.expm1(c)))/np.abs(np.expm1(c)))/len(c)
return "mape",result
def transform_back(self, transformed_target):
return np.expm1(transformed_target)
def process(self, **kwargs):
"""Process module."""
Transform.process(self, **kwargs)
self._kappa = kwargs[self.key('kappa')]
self._kappa_gamma = kwargs[self.key('kappagamma')]
self._m_ejecta = kwargs[self.key('mejecta')]
self._v_ejecta = kwargs[self.key('vejecta')]
self._tau_diff = np.sqrt(self.DIFF_CONST * self._kappa *
self._m_ejecta / self._v_ejecta) / DAY_CGS
self._trap_coeff = (
self.TRAP_CONST * self._kappa_gamma * self._m_ejecta /
(self._v_ejecta ** 2)) / DAY_CGS ** 2
td2, A = self._tau_diff ** 2, self._trap_coeff # noqa: F841
new_lums = np.zeros_like(self._times_to_process)
if len(self._dense_times_since_exp) < 2:
return {self.dense_key('luminosities'): new_lums}
min_te = min(self._dense_times_since_exp)
tb = max(0.0, min_te)
linterp = interp1d(
self._dense_times_since_exp, self._dense_luminosities, copy=False,
assume_sorted=True)
uniq_times = np.unique(self._times_to_process[
(self._times_to_process >= tb) & (
self._times_to_process <= self._dense_times_since_exp[-1])])
lu = len(uniq_times)
num = int(round(self.N_INT_TIMES / 2.0))
lsp = np.logspace(
np.log10(self._tau_diff /
self._dense_times_since_exp[-1]) +
self.MIN_LOG_SPACING, 0, num)
xm = np.unique(np.concatenate((lsp, 1 - lsp)))
int_times = np.clip(
tb + (uniq_times.reshape(lu, 1) - tb) * xm, tb,
self._dense_times_since_exp[-1])
int_te2s = int_times[:, -1] ** 2
int_lums = linterp(int_times) # noqa: F841
int_args = int_lums * int_times * np.exp(
(int_times ** 2 - int_te2s.reshape(lu, 1)) / td2)
int_args[np.isnan(int_args)] = 0.0
uniq_lums = np.trapz(int_args, int_times)
uniq_lums *= -2.0 * np.expm1(-A / int_te2s) / td2
new_lums = uniq_lums[np.searchsorted(uniq_times,
self._times_to_process)]
return {self.key('tau_diffusion'): self._tau_diff,
self.dense_key('luminosities'): new_lums}
def gpinv(p, k, sigma):
"""Inverse Generalised Pareto distribution function."""
x = np.empty(p.shape)
x.fill(np.nan)
if sigma <= 0:
return x
ok = (p > 0) & (p < 1)
if np.all(ok):
if np.abs(k) < np.finfo(float).eps:
np.negative(p, out=x)
np.log1p(x, out=x)
np.negative(x, out=x)
else:
np.negative(p, out=x)
np.log1p(x, out=x)
x *= -k
np.expm1(x, out=x)
x /= k
x *= sigma
else:
if np.abs(k) < np.finfo(float).eps:
# x[ok] = - np.log1p(-p[ok])
temp = p[ok]
np.negative(temp, out=temp)
np.log1p(temp, out=temp)
np.negative(temp, out=temp)
x[ok] = temp
else:
# x[ok] = np.expm1(-k * np.log1p(-p[ok])) / k
temp = p[ok]
np.negative(temp, out=temp)
np.log1p(temp, out=temp)
temp *= -k
np.expm1(temp, out=temp)
temp /= k
x[ok] = temp
x *= sigma
x[p == 0] = 0
if k >= 0:
x[p == 1] = np.inf
else:
x[p == 1] = -sigma / k
return x
common.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_numpy_ufuncs(self):
# test ufuncs of numpy 1.9.2. see:
# http://docs.scipy.org/doc/numpy/reference/ufuncs.html
# some functions are skipped because it may return different result
# for unicode input depending on numpy version
for name, idx in compat.iteritems(self.indices):
for func in [np.exp, np.exp2, np.expm1, np.log, np.log2, np.log10,
np.log1p, np.sqrt, np.sin, np.cos, np.tan, np.arcsin,
np.arccos, np.arctan, np.sinh, np.cosh, np.tanh,
np.arcsinh, np.arccosh, np.arctanh, np.deg2rad,
np.rad2deg]:
if isinstance(idx, pd.tseries.base.DatetimeIndexOpsMixin):
# raise TypeError or ValueError (PeriodIndex)
# PeriodIndex behavior should be changed in future version
with tm.assertRaises(Exception):
func(idx)
elif isinstance(idx, (Float64Index, Int64Index)):
# coerces to float (e.g. np.sin)
result = func(idx)
exp = Index(func(idx.values), name=idx.name)
self.assert_index_equal(result, exp)
self.assertIsInstance(result, pd.Float64Index)
else:
# raise AttributeError or TypeError
if len(idx) == 0:
continue
else:
with tm.assertRaises(Exception):
func(idx)
for func in [np.isfinite, np.isinf, np.isnan, np.signbit]:
if isinstance(idx, pd.tseries.base.DatetimeIndexOpsMixin):
# raise TypeError or ValueError (PeriodIndex)
with tm.assertRaises(Exception):
func(idx)
elif isinstance(idx, (Float64Index, Int64Index)):
# results in bool array
result = func(idx)
exp = func(idx.values)
self.assertIsInstance(result, np.ndarray)
tm.assertNotIsInstance(result, Index)
else:
if len(idx) == 0:
continue
else:
with tm.assertRaises(Exception):
func(idx)
def nonnegative_bandpass_filter(data,fa=None,fb=None,
Fs=1000.,order=4,zerophase=True,bandstop=False,
offset=1.0):
'''
For filtering data that must remain non-negative. Due to ringing
conventional fitering can create values less than zero for non-
negative real inputs. This may be unrealistic for some data.
To compensate, this performs the filtering on the natural
logarithm of the input data. For small numbers, this can lead to
numeric underflow, so an offset parameter (default 1) is added
to the data for stability.
Parameters
----------
data (ndarray):
data, filtering performed over last dimension
fa (number):
low-freq cutoff Hz. If none, lowpass at fb
fb (number):
high-freq cutoff Hz. If none, highpass at fa
Fs (int):
Sample rate in Hz
order (1..6):
butterworth filter order. Default 4
zerophase (boolean):
Use forward-backward filtering? (true)
bandstop (boolean):
Do band-stop rather than band-pass
offset (positive number):
Offset data to avoid underflow (1)
Returns
-------
filtered :
Filtered signal
'''
offset -= 1.0
data = np.log1p(data+offset)
filtered = bandpass_filter(data,
fa=fa, fb=fb, Fs=Fs,
order=order,
zerophase=zerophase,
bandstop=bandstop)
return np.expm1(filtered)