def init_state(data):
close = data
diff = np.diff(data)
diff = np.insert(diff, 0, 0)
#--- Preprocess data
xdata = np.column_stack((close, diff))
xdata = np.nan_to_num(xdata)
scaler = preprocessing.StandardScaler()
xdata = scaler.fit_transform(xdata)
state = xdata[0:1, :]
return state, xdata
#Take Action
python类diff()的实例源码
def test_topk_invariants():
s = SpaceSaving(capacity=5, dtype='f8')
s.update(data_f8)
for k in [0, 5]:
top = s.topk(k)
assert isinstance(top, np.ndarray)
dtype = np.dtype([('item', 'f8'), ('count', 'i8'), ('error', 'i8')])
assert top.dtype == dtype
assert len(top) == k
assert (np.diff(top['count']) <= 0).all()
top2 = s.topk(k, astuples=True)
assert len(top2) == k
np.testing.assert_equal(top['item'], [i.item for i in top2])
np.testing.assert_equal(top['count'], [i.count for i in top2])
np.testing.assert_equal(top['error'], [i.error for i in top2])
with pytest.raises(ValueError):
s.topk(-1)
def plot_change(results):
''' This plot shows how each algorithm changes after each iteration. '''
f, (ax1, ax2) = plt.subplots(2, 1, sharex=True)
n_range = np.linspace(0, 50, 11)
model_names = results[0].keys()
model_range = range(len(model_names))
for idx, model in enumerate(model_names):
if idx == 0:
pass
else:
ax1.plot(n_range, np.insert(np.absolute(np.diff(results[0][model])), 0, results[0][model][0]), label=model)
ax2.plot(n_range, np.insert(np.absolute(np.diff(results[1][model])), 0, results[1][model][0]), label=model)
ax1.set_title('Root Mean Squared Error')
ax2.set_title('Time in Seconds')
plt.xlabel('Number of Iterations')
plt.legend()
plt.show()
def test_search_document_id(app):
dsid, lsi_id, _, input_ds = get_features_lsi_cached(app, hashed=False)
parent_id = lsi_id
max_results = 2
query_document_id = 3844
pars = dict(parent_id=parent_id,
max_results=max_results,
sort=True,
query_document_id=query_document_id)
data = app.post_check(V01 + "/search/", json=pars)
assert sorted(data.keys()) == ['data', 'pagination']
data = data['data']
for row in data:
assert dict2type(row) == {'score': 'float',
'document_id': 'int'}
scores = np.array([row['score'] for row in data])
assert (np.diff(scores) <= 0).all()
assert len(data) == min(max_results, len(input_ds['dataset']))
# assert data[0]['document_id'] == query_document_id
# assert data[0]['score'] >= 0.99
def plot_numvisc(diagfile):
plt.figure()
nc = Dataset(diagfile)
t=nc.variables['t'][:]
ke=nc.variables['ke'][:]
dkdt=np.diff(ke)/np.diff(t)
ens=nc.variables['enstrophy'][:]
ensm=0.5*(ens[1:]+ens[:-1])
# deltake[visc,res]=-(ke[-1]-ke[0])
# deltaens[visc,res]=max(medfilt(ens,21))-ens[5]
visc_tseries = -dkdt/ensm*4.4*np.pi
visc_num = max(visc_tseries[t[1:]>0.02])
#print('N=%4i / visc = %4.1e / num = %4.2e'%(N[res],Kdiff[visc],visc_num[res]))
plt.semilogy(t[1:],visc_tseries)
plt.xlabel('time')
plt.ylabel('viscosity (-(1/2V)dE/dt)')
plt.grid('on')
plt.show()
def get_beats(x, sr):
"""Track beats in an audio excerpt, using librosa's standard
beat tracker.
Args:
x (1d-array) audio signal, mono
sr (int): sample rate
Returns:
2d-array: beat times and beat intervals
"""
_, beat_frames = librosa.beat.beat_track(x, sr=sr)
beat_times = librosa.frames_to_time(beat_frames, sr=sr)
t = beat_times[:-1,]
beat_intervals = np.diff(beat_times)
return t, beat_intervals
def get_onsets(x, sr):
"""Compute inter-onset intervals (IOI) from audio, using librosa.
Args:
x (1d-array) audio signal, mono
sr (int): sample rate
Returns:
2d-array: onset times and IOI
"""
onset_frames = librosa.onset.onset_detect(x, sr=sr)
onset_times = librosa.frames_to_time(onset_frames, sr=sr)
t = onset_times[:-1,]
onset_intervals = np.diff(onset_times)
return t, onset_intervals
def calcsteps(current_time=current_time, speedparms=speedparms):
speed=np.linspace(speedparms[0], speedparms[1], len(current_time))
expected=np.multiply(current_time,speed)
steps=np.diff(np.floor(expected))
steplocs=np.where(steps !=0)[0]
steptimes=current_time[steplocs]
stepdelta=np.diff(np.insert(steptimes,0,0))
stepdir=speed[steplocs]>0
deltap=np.sum(stepdir)-np.sum(np.invert(stepdir))
full=False
retval={'steplocs':steplocs,
'steptimes':current_time[steplocs],
'speeds': speed[steplocs],
'stepdelta': stepdelta,
'stepdir':stepdir,
'deltap': deltap}
#logging.debug('steplocs %r' %steplocs)
#logging.debug('steptimes %r' %current_time[steplocs])
#logging.debug('speeds %r' %speed[steplocs])
#logging.debug('retval %r' %retval)
return retval
#return steps, current_time
def existing_index_and_interval(self):
indices = [i for i, f in self.existing_indices_and_files()]
if len(indices) == 0:
return None, 1
elif len(indices) == 1:
return indices[0], 1
indices.sort()
diff = np.diff(indices)
interval = diff[0]
return max(indices), interval
def existing_index_and_interval(self):
indices = [i for i, f in self.existing_indices_and_files()]
if len(indices) == 0:
return None, 1
elif len(indices) == 1:
return indices[0], 1
indices.sort()
diff = np.diff(indices)
interval = diff[0]
return max(indices), interval
def existing_index_and_interval(self):
indices = [i for i, f in self.existing_indices_and_files()]
if len(indices) == 0:
return None, 1
elif len(indices) == 1:
return indices[0], 1
indices.sort()
diff = np.diff(indices)
interval = diff[0]
return max(indices), interval
def existing_index_and_interval(self):
indices = [i for i, f in self.existing_indices_and_files()]
if len(indices) == 0:
return None, 1
elif len(indices) == 1:
return indices[0], 1
indices.sort()
diff = np.diff(indices)
interval = diff[0]
return max(indices), interval
def existing_index_and_interval(self):
indices = [i for i, f in self.existing_indices_and_files()]
if len(indices) == 0:
return None, 1
elif len(indices) == 1:
return indices[0], 1
indices.sort()
diff = np.diff(indices)
interval = diff[0]
return max(indices), interval
def hit_object_angles(hit_objects, *, double_time=False, half_time=False):
"""Compute the angle from one hit object to the next in 3d space with time
along the Z axis.
Parameters
----------
hit_objects : iterable[HitObject]
The hit objects to compute the angles about.
double_time : bool, optional
Apply double time compression to the Z axis.
half_time : bool, optional
Apply half time expansion to the Z axis.
Returns
-------
angles : ndarray[float]
An array shape (3, len(hit_objects) - 1) of pitch, roll, and yaw
between each hit object. All angles are measured in radians.
"""
coords = hit_object_coordinates(
hit_objects,
double_time=double_time,
half_time=half_time,
)
diff = np.diff(coords, axis=1)
# (pitch, roll, yaw) x transitions
out = np.empty((3, len(hit_objects) - 1), dtype=np.float64)
np.arctan2(diff[Axis.y], diff[Axis.z], out=out[Angle.pitch])
np.arctan2(diff[Axis.y], diff[Axis.x], out=out[Angle.roll])
np.arctan2(diff[Axis.z], diff[Axis.x], out=out[Angle.yaw])
return out
def sparse_temporal_forward_pass(inputs, weights, biases = None, scales = None, hidden_activations='relu', output_activations = 'relu', quantization_method = 'herd', rng=None):
"""
Feed a sequence of inputs into a sparse temporal difference net and get the resulting activations.
:param inputs: A (n_frames, n_dims_in) array
:param weights: A list of (n_dim_in, n_dim_out) weight matrices
:param biases: An optional (len(weights)) list of (w.shape[1]) biases for each weight matrix
:param scales: An optional (len(weights)) list of (w.shape[0]) scales to scale each layer before rounding.
:param hidden_activations: Indicates the hidden layer activation function
:param output_activations: Indicates the output layer activation function
:return: activations:
A len(weights)*3+1 list of (n_frames, n_dims) activations.
Elements [::3] will be a length(w)+1 list containing the input to each rounding unit, and the final output
Elements [1::3] will be the length(w) rounded "spike" signal.
Elements [2::3] will be the length(w) inputs to each nonlinearity
"""
activations = [inputs]
if biases is None:
biases = [0]*len(weights)
if scales is None:
scales = [1.]*len(weights)
else:
assert len(scales) in (len(weights), len(weights)+1)
real_activations = inputs
for w, b, k in zip(weights, biases, scales):
deltas = np.diff(np.insert(real_activations, 0, 0, axis=0), axis=0) # (n_steps, n_in)
spikes = quantize_sequence(k*deltas, method=quantization_method, rng=rng) # (n_steps, n_in)
delta_inputs = (spikes/k).dot(w) # (n_steps, n_out)
cumulated_inputs = np.cumsum(delta_inputs, axis=0)+b # (n_steps, n_out)
real_activations = activation_function(cumulated_inputs, output_activations if w is weights[-1] else hidden_activations) # (n_steps, n_out)
activations += [spikes, cumulated_inputs, real_activations]
if len(scales)==len(weights)+1:
activations[-1]*=scales[-1]
return activations
def __diff(x):
"""
First derivative/diff (while keeping same size as input)
Args:
x (array): numpy array of data
Returns:
dx (array): numpy array of first derivative of data
(same size as x)
"""
dx = np.diff(x)
dx = np.concatenate((dx[0], dx)) # output len == input len
return dx
def _fourierTransform(self, x, y):
## Perform fourier transform. If x values are not sampled uniformly,
## then use np.interp to resample before taking fft.
dx = np.diff(x)
uniform = not np.any(np.abs(dx-dx[0]) > (abs(dx[0]) / 1000.))
if not uniform:
x2 = np.linspace(x[0], x[-1], len(x))
y = np.interp(x2, x, y)
x = x2
f = np.fft.fft(y) / len(y)
y = abs(f[1:len(f)/2])
dt = x[-1] - x[0]
x = np.linspace(0, 0.5*len(x)/dt, len(y))
return x, y
def __find_temperature(self,tree,mintemp,maxtemp,tempstep,min_clus):
num_temp = int(floor(float(maxtemp-mintemp)/tempstep))
aux = np.diff(tree[:,4])
aux1 = np.diff(tree[:,5])
aux2 = np.diff(tree[:,6])
aux3 = np.diff(tree[:,7])
temp=0;
for t in range(0,num_temp-1):
if(aux[t] > min_clus or aux1[t] > min_clus or aux2[t] > min_clus or aux3[t] >min_clus):
temp=t+1
if (temp==0 and tree[temp][5]<min_clus):
temp=1
return temp
def test_sorting(self):
"""
Test if presorting of columns work properly.
"""
result = self.testIO.get_columns(sorting_columns=0)
assert len(result) > 0
assert all(np.diff(result[:, 0]) >= 0)
def _fourierTransform(self, x, y):
## Perform fourier transform. If x values are not sampled uniformly,
## then use np.interp to resample before taking fft.
dx = np.diff(x)
uniform = not np.any(np.abs(dx-dx[0]) > (abs(dx[0]) / 1000.))
if not uniform:
x2 = np.linspace(x[0], x[-1], len(x))
y = np.interp(x2, x, y)
x = x2
f = np.fft.fft(y) / len(y)
y = abs(f[1:len(f)/2])
dt = x[-1] - x[0]
x = np.linspace(0, 0.5*len(x)/dt, len(y))
return x, y