def insert_knot(self, knot, direction=0):
""" Insert a new knot into the spline.
:param int direction: The direction to insert in
:param knot: The new knot(s) to insert
:type knot: float or [float]
:raises ValueError: For invalid direction
:return: self
"""
shape = self.controlpoints.shape
# for single-value input, wrap it into a list
knot = ensure_listlike(knot)
direction = check_direction(direction, self.pardim)
C = np.matrix(np.identity(shape[direction]))
for k in knot:
C = self.bases[direction].insert_knot(k) * C
self.controlpoints = np.tensordot(C, self.controlpoints, axes=(1, direction))
self.controlpoints = self.controlpoints.transpose(transpose_fix(self.pardim, direction))
return self
python类insert()的实例源码
two_sigma_financial_modelling.py 文件源码
项目:PortfolioTimeSeriesAnalysis
作者: MizioAnd
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def outlier_identification(self, model, x_train, y_train):
# Split the training data into an extra set of test
x_train_split, x_test_split, y_train_split, y_test_split = train_test_split(x_train, y_train)
print('\nOutlier shapes')
print(np.shape(x_train_split), np.shape(x_test_split), np.shape(y_train_split), np.shape(y_test_split))
model.fit(x_train_split, y_train_split)
y_predicted = model.predict(x_test_split)
residuals = np.absolute(y_predicted - y_test_split)
rmse_pred_vs_actual = self.rmse(y_predicted, y_test_split)
outliers_mask = residuals >= rmse_pred_vs_actual
outliers_mask = np.concatenate([np.zeros((np.shape(y_train_split)[0],), dtype=bool), outliers_mask])
not_an_outlier = outliers_mask == 0
# Resample the training set from split, since the set was randomly split
x_out = np.insert(x_train_split, np.shape(x_train_split)[0], x_test_split, axis=0)
y_out = np.insert(y_train_split, np.shape(y_train_split)[0], y_test_split, axis=0)
return x_out[not_an_outlier, ], y_out[not_an_outlier, ]
def arma_predictor_nonlinear(x, y, m, n, x_hat0=None):
"""
Calculate the nonlinear fit between the (*m*, *n*) ARMA model and
the input *x* and output *y*. The optimization starts at *x_hat*
(a vector with all 0s when `None`). The output is the tuple of the
*m* AR and *n* MA coefficients.
"""
if x_hat0 is None:
x_hat0 = NP.zeros(m + n)
(x_hat,
cov_x,
info,
mesg,
ier) = SP.optimize.leastsq(residual,
x_hat0,
args=(m, x, y),
Dfun=Dfun,
full_output=True)
if ier not in [1, 2, 3, 4]:
raise RuntimeError('optimization failed (ier={}) --- {}'.fomat(ier,
mesg))
a_hat = x_hat[:m]
b_hat = x_hat[m:]
a_hat = NP.insert(a_hat, 0, 1)
return a_hat, b_hat
def q(self, new_q):
# Update epsilon
new_q = np.insert(new_q, 0, 1.)
self._epsilon = new_q / fsum(new_q)
try:
if np.array(new_q).size == self._epsilon.size - 1:
# Case 3: the entire lens is defined (new_q changes
# the values of q)
pass
else:
# Case 2: the primary is defined (new_q adds masses)
if ((self._total_mass is not None) and
(self._last_mass_set != 'total_mass')):
self._total_mass = self._total_mass * fsum(new_q)
except AttributeError:
# Case 1: nothing is initialized (new_q directly sets epsilon)
pass
def _add_mass(self, new_mass, index):
"""
Private function: Updates the total_mass and adds a component
to the epsilon array if masses are added
sequentially. e.g. the lens is defined by defining mass_1 and
mass_2.
"""
if not isinstance(new_mass, u.Quantity):
new_mass *= u.solMass
elif new_mass.unit.physical_type == 'dimensionless':
new_mass *= u.solMass
elif new_mass.unit.physical_type != 'mass':
msg = 'wrong physical_type of new total_mass: {:}'
raise ValueError(msg.format(new_mass.unit.physical_type))
new_total_mass = self._total_mass + new_mass
self._epsilon = self._total_mass * self._epsilon / new_total_mass
self._epsilon = np.insert(
self._epsilon, index, new_mass / new_total_mass)
self._total_mass = new_total_mass
def xover(rate):
"""
This is a mimic of a fwdpp
recombination policy.
We return a sorted list of breakpoints
on the interval [0,1). The list is capped
with the max value of a float (C/C++ double),
which is a trick fwdpp uses.
It happens that we generate the exact same value
from time to time. Internall, fwdpp doesn't care,
and recoginizes that as a "double x-over". However,
msprime cares, b/c it results in an edge with
left == right and an Exception gets raised. So,
we purge out double x-overs via np.unique.
"""
nbreaks = np.random.poisson(rate)
if nbreaks == 0:
return np.empty([0], dtype=np.float)
rv = np.random.random_sample(nbreaks)
rv = np.unique(rv)
rv = np.insert(rv, len(rv), np.finfo(np.float).max)
return rv
def split_breakpoints(breakpoints):
"""
Take the breakpoints from a meiosis,
and return them as segments contributed
by gamete 1 and gamete 2
Note: bug source could be here. If breakpoints[0] == 0.0,
we will insert stuff 2x into s1. This needs updating,
and so does the C++ version that this is copied from...
"""
s1 = np.array([(0.0, breakpoints[0])], dtype=[
('left', np.float), ('right', np.float)])
s2 = np.empty([0], dtype=s1.dtype)
for i in range(1, len(breakpoints)):
a = breakpoints[i - 1]
b = breakpoints[i] if i < len(breakpoints) - 1 else 1.0
assert(a != b)
if i % 2 == 0.:
s1 = np.insert(s1, len(s1), (a, b))
else:
s2 = np.insert(s2, len(s2), (a, b))
return (s1, s2)
def test_out(self):
mat = np.random.rand(3, 3)
nan_mat = np.insert(mat, [0, 2], np.nan, axis=1)
resout = np.zeros(3)
tgt = np.median(mat, axis=1)
res = np.nanmedian(nan_mat, axis=1, out=resout)
assert_almost_equal(res, resout)
assert_almost_equal(res, tgt)
# 0-d output:
resout = np.zeros(())
tgt = np.median(mat, axis=None)
res = np.nanmedian(nan_mat, axis=None, out=resout)
assert_almost_equal(res, resout)
assert_almost_equal(res, tgt)
res = np.nanmedian(nan_mat, axis=(0, 1), out=resout)
assert_almost_equal(res, resout)
assert_almost_equal(res, tgt)
def test_out(self):
mat = np.random.rand(3, 3)
nan_mat = np.insert(mat, [0, 2], np.nan, axis=1)
resout = np.zeros(3)
tgt = np.percentile(mat, 42, axis=1)
res = np.nanpercentile(nan_mat, 42, axis=1, out=resout)
assert_almost_equal(res, resout)
assert_almost_equal(res, tgt)
# 0-d output:
resout = np.zeros(())
tgt = np.percentile(mat, 42, axis=None)
res = np.nanpercentile(nan_mat, 42, axis=None, out=resout)
assert_almost_equal(res, resout)
assert_almost_equal(res, tgt)
res = np.nanpercentile(nan_mat, 42, axis=(0, 1), out=resout)
assert_almost_equal(res, resout)
assert_almost_equal(res, tgt)
def test_basic(self):
a = [1, 2, 3]
assert_equal(insert(a, 0, 1), [1, 1, 2, 3])
assert_equal(insert(a, 3, 1), [1, 2, 3, 1])
assert_equal(insert(a, [1, 1, 1], [1, 2, 3]), [1, 1, 2, 3, 2, 3])
assert_equal(insert(a, 1, [1, 2, 3]), [1, 1, 2, 3, 2, 3])
assert_equal(insert(a, [1, -1, 3], 9), [1, 9, 2, 9, 3, 9])
assert_equal(insert(a, slice(-1, None, -1), 9), [9, 1, 9, 2, 9, 3])
assert_equal(insert(a, [-1, 1, 3], [7, 8, 9]), [1, 8, 2, 7, 3, 9])
b = np.array([0, 1], dtype=np.float64)
assert_equal(insert(b, 0, b[0]), [0., 0., 1.])
assert_equal(insert(b, [], []), b)
# Bools will be treated differently in the future:
# assert_equal(insert(a, np.array([True]*4), 9), [9, 1, 9, 2, 9, 3, 9])
with warnings.catch_warnings(record=True) as w:
warnings.filterwarnings('always', '', FutureWarning)
assert_equal(
insert(a, np.array([True] * 4), 9), [1, 9, 9, 9, 9, 2, 3])
assert_(w[0].category is FutureWarning)
def test_place(self):
# Make sure that non-np.ndarray objects
# raise an error instead of doing nothing
assert_raises(TypeError, place, [1, 2, 3], [True, False], [0, 1])
a = np.array([1, 4, 3, 2, 5, 8, 7])
place(a, [0, 1, 0, 1, 0, 1, 0], [2, 4, 6])
assert_array_equal(a, [1, 2, 3, 4, 5, 6, 7])
place(a, np.zeros(7), [])
assert_array_equal(a, np.arange(1, 8))
place(a, [1, 0, 1, 0, 1, 0, 1], [8, 9])
assert_array_equal(a, [8, 2, 9, 4, 8, 6, 9])
assert_raises_regex(ValueError, "Cannot insert from an empty array",
lambda: place(a, [0, 0, 0, 0, 0, 1, 0], []))
def create_knots(pts, metric="DISTANCE"):
if metric == "DISTANCE":
tmp = np.linalg.norm(pts[:-1] - pts[1:], axis=1)
tknots = np.insert(tmp, 0, 0).cumsum()
tknots = tknots / tknots[-1]
elif metric == "MANHATTAN":
tmp = np.sum(np.absolute(pts[:-1] - pts[1:]), 1)
tknots = np.insert(tmp, 0, 0).cumsum()
tknots = tknots / tknots[-1]
elif metric == "POINTS":
tknots = np.linspace(0, 1, len(pts))
elif metric == "CHEBYSHEV":
tknots = np.max(np.absolute(pts[1:] - pts[:-1]), 1)
tmp = np.insert(tmp, 0, 0).cumsum()
tknots = tknots / tknots[-1]
return tknots
def updateIncomeProcessAlt(self):
'''
An alternative method for constructing the income process in the infinite
horizon model, where the labor supply l_bar creates a small oddity.
Parameters
----------
none
Returns
-------
none
'''
tax_rate = (self.IncUnemp*self.UnempPrb)/(self.l_bar*(1.0-self.UnempPrb))
TranShkDstn = deepcopy(approxMeanOneLognormal(self.TranShkCount,sigma=self.TranShkStd[0],tail_N=0))
TranShkDstn[0] = np.insert(TranShkDstn[0]*(1.0-self.UnempPrb),0,self.UnempPrb)
TranShkDstn[1] = np.insert(self.l_bar*TranShkDstn[1]*(1.0-tax_rate),0,self.IncUnemp)
PermShkDstn = approxMeanOneLognormal(self.PermShkCount,sigma=self.PermShkStd[0],tail_N=0)
self.IncomeDstn = [combineIndepDstns(PermShkDstn,TranShkDstn)]
self.TranShkDstn = TranShkDstn
self.PermShkDstn = PermShkDstn
self.addToTimeVary('IncomeDstn')
def updateIncomeProcess(self):
'''
An alternative method for constructing the income process in the infinite horizon model.
Parameters
----------
none
Returns
-------
none
'''
if self.cycles == 0:
tax_rate = (self.IncUnemp*self.UnempPrb)/((1.0-self.UnempPrb)*self.IndL)
TranShkDstn = deepcopy(approxMeanOneLognormal(self.TranShkCount,sigma=self.TranShkStd[0],tail_N=0))
TranShkDstn[0] = np.insert(TranShkDstn[0]*(1.0-self.UnempPrb),0,self.UnempPrb)
TranShkDstn[1] = np.insert(TranShkDstn[1]*(1.0-tax_rate)*self.IndL,0,self.IncUnemp)
PermShkDstn = approxMeanOneLognormal(self.PermShkCount,sigma=self.PermShkStd[0],tail_N=0)
self.IncomeDstn = [combineIndepDstns(PermShkDstn,TranShkDstn)]
self.TranShkDstn = TranShkDstn
self.PermShkDstn = PermShkDstn
self.addToTimeVary('IncomeDstn')
else: # Do the usual method if this is the lifecycle model
EstimationAgentClass.updateIncomeProcess(self)
def updatePermIncGrid(self):
'''
Update the grid of permanent income levels. Currently only works for
infinite horizon models (cycles=0) and lifecycle models (cycles=1). Not
clear what to do about cycles>1. Identical to version in persistent
shocks model, but pLvl=0 is manually added to the grid (because there is
no closed form lower-bounding cFunc for pLvl=0).
Parameters
----------
None
Returns
-------
None
'''
# Run basic version of this method
PersistentShockConsumerType.updatePermIncGrid(self)
for j in range(len(self.pLvlGrid)): # Then add 0 to the bottom of each pLvlGrid
this_grid = self.pLvlGrid[j]
self.pLvlGrid[j] = np.insert(this_grid,0,0.0001)
def makeEndOfPrdvFunc(self,EndOfPrdvP):
'''
Construct the end-of-period value function for this period, storing it
as an attribute of self for use by other methods.
Parameters
----------
EndOfPrdvP : np.array
Array of end-of-period marginal value of assets corresponding to the
asset values in self.aNrmNow.
Returns
-------
none
'''
VLvlNext = (self.PermShkVals_temp**(1.0-self.CRRA)*\
self.PermGroFac**(1.0-self.CRRA))*self.vFuncNext(self.mNrmNext)
EndOfPrdv = self.DiscFacEff*np.sum(VLvlNext*self.ShkPrbs_temp,axis=0)
EndOfPrdvNvrs = self.uinv(EndOfPrdv) # value transformed through inverse utility
EndOfPrdvNvrsP = EndOfPrdvP*self.uinvP(EndOfPrdv)
EndOfPrdvNvrs = np.insert(EndOfPrdvNvrs,0,0.0)
EndOfPrdvNvrsP = np.insert(EndOfPrdvNvrsP,0,EndOfPrdvNvrsP[0]) # This is a very good approximation, vNvrsPP = 0 at the asset minimum
aNrm_temp = np.insert(self.aNrmNow,0,self.BoroCnstNat)
EndOfPrdvNvrsFunc = CubicInterp(aNrm_temp,EndOfPrdvNvrs,EndOfPrdvNvrsP)
self.EndOfPrdvFunc = ValueFunc(EndOfPrdvNvrsFunc,self.CRRA)
def _tipping_point_update(self, tmp, consump, peak_temp_interval=30.0):
"""Determine whether a tipping point has occurred, if so reduce consumption for
all periods after this date.
"""
draws = tmp.shape[0]
disaster = self._disaster_simulation()
disaster_cons = self._disaster_cons_simulation()
period_lengths = self.tree.decision_times[1:] - self.tree.decision_times[:-1]
tmp_scale = np.maximum(self.peak_temp, tmp)
ave_prob_of_survival = 1.0 - np.square(tmp / tmp_scale)
prob_of_survival = ave_prob_of_survival**(period_lengths / peak_temp_interval)
# this part may be done better, this takes a long time to loop over
res = prob_of_survival < disaster
rows, cols = np.nonzero(res)
row, count = np.unique(rows, return_counts=True)
first_occurance = zip(row, cols[np.insert(count.cumsum()[:-1],0,0)])
for pos in first_occurance:
consump[pos[0], pos[1]:] *= np.exp(-disaster_cons[pos[0]])
return consump
def fix_point(x, y, interval):
np.insert(x, 0, 0)
np.insert(y, 0, 0)
fx, fy = [], []
pointer = 0
ninterval = int(max(x) / interval + 1)
for i in range(ninterval):
tmpx = interval * i
while pointer + 1 < len(x) and tmpx > x[pointer + 1]:
pointer += 1
if pointer + 1 < len(x):
alpha = (y[pointer + 1] - y[pointer]) / \
(x[pointer + 1] - x[pointer])
tmpy = y[pointer] + alpha * (tmpx - x[pointer])
fx.append(tmpx)
fy.append(tmpy)
return fx, fy
def __init__(self, input_size, layerSize, num_of_classes, learning_rate_local=0.001, save_file='',
activation_function=0, cov_net=False):
self.covnet = cov_net
self.input_size = input_size
self.layerSize = layerSize
self.all_layer_sizes = np.copy(layerSize)
self.all_layer_sizes = np.insert(self.all_layer_sizes, 0, input_size)
self.num_of_classes = num_of_classes
self._num_of_layers = len(layerSize) + 1
self.learning_rate_local = learning_rate_local
self._save_file = save_file
self.hidden = None
self.savers = []
if activation_function == 1:
self.activation_function = tf.nn.relu
elif activation_function == 2:
self.activation_function = None
else:
self.activation_function = tf.nn.tanh
self.prediction
self.optimize
self.accuracy
def insert_zeros_evenly(input_data, number_zeros):
"""Insert zeros evenly in input_data.
These zeros are distibuted evenly throughout
the function, to help for binning of oddly
shaped arrays.
@param[in] input_data 1D array to contain zeros.
@param[out] number_zeros Number of zeros that need
to be added.
@returns input_data with extra zeros"""
insert_index = np.floor(
np.arange(
number_zeros,
step=1.0) * float(input_data.size) / number_zeros)
output_data = np.insert(
input_data, insert_index,
np.zeros(number_zeros))
return output_data
def solve_linear(model:Model.fem_model):
K_bar,F_bar,index=model.K_,model.F_,model.index
Dvec=model.D
Logger.info('Solving linear model with %d DOFs...'%model.DOF)
n_nodes=model.node_count
try:
#sparse matrix solution
delta_bar = sl.spsolve(sp.csr_matrix(K_bar),F_bar,sym_pos=True)
delta = delta_bar
#fill original displacement vector
prev = 0
for idx in index:
gap=idx-prev
if gap>0:
delta=np.insert(delta,prev,[0]*gap)
prev = idx + 1
if idx==index[-1] and idx!=n_nodes-1:
delta = np.insert(delta,prev, [0]*(n_nodes*6-prev))
delta += Dvec
except Exception as e:
print(e)
return None
model.is_solved=True
return delta
def solve_linear2(model:Model.fem_model):
K_bar,F_bar,index=model.K_,model.F_,model.index
Dvec=model.D
Logger.info('Solving linear model with %d DOFs...'%model.DOF)
n_nodes=model.node_count
#sparse matrix solution
delta_bar = sl.spsolve(sp.csc_matrix(K_bar),F_bar)
#delta_bar=linalg.solve(K_bar,F_bar,sym_pos=True)
delta = delta_bar
#fill original displacement vector
prev = 0
for idx in index:
gap=idx-prev
if gap>0:
delta=np.insert(delta,prev,[0]*gap)
prev = idx + 1
if idx==index[-1] and idx!=n_nodes-1:
delta = np.insert(delta,prev, [0]*(n_nodes*6-prev))
delta += Dvec
model.is_solved=True
return delta
def shuffle_transmat(transmat):
"""Shuffle transition probability matrix within each row, leaving self transitions in tact.
It is assumed that the transmat is stochastic-row-wise, meaning that A_{ij} = Pr(S_{t+1}=j|S_t=i).
Parameters
----------
transmat : array of size (n_states, n_states)
Transition probability matrix, where A_{ij} = Pr(S_{t+1}=j|S_t=i).
Returns
-------
shuffled : array of size (n_states, n_states)
Shuffled transition probability matrix.
"""
shuffled = transmat.copy()
nrows, ncols = transmat.shape
for rowidx in range(nrows):
all_but_diagonal = np.append(np.arange(rowidx), np.arange(rowidx+1, ncols))
shuffle_idx = np.random.permutation(all_but_diagonal)
shuffle_idx = np.insert(shuffle_idx, rowidx, rowidx)
shuffled[rowidx,:] = shuffled[rowidx, shuffle_idx]
return shuffled
def _within_event_coherent_shuffle(self, kind='train'):
"""Time swap on BinnedSpikeTrainArray, swapping only within each epoch."""
if kind == 'train':
bst = self.PBEs_train
elif kind == 'test':
bst = self.PBEs_test
else:
raise ValueError("kind '{}' not understood!".format(kind))
out = copy.deepcopy(bst) # should this be deep?
shuffled = np.arange(bst.n_bins)
edges = np.insert(np.cumsum(bst.lengths),0,0)
for ii in range(bst.n_epochs):
segment = shuffled[edges[ii]:edges[ii+1]]
shuffled[edges[ii]:edges[ii+1]] = np.random.permutation(segment)
out._data = out._data[:,shuffled]
if kind == 'train':
self.PBEs_train = out
else:
self.PBEs_test = out
def _within_event_incoherent_shuffle(self, kind='train'):
"""Time cycle on BinnedSpikeTrainArray, cycling only within each epoch.
We cycle each unit independently, within each epoch.
"""
if kind == 'train':
bst = self.PBEs_train
elif kind == 'test':
bst = self.PBEs_test
else:
raise ValueError("kind '{}' not understood!".format(kind))
out = copy.deepcopy(bst) # should this be deep?
data = out._data
edges = np.insert(np.cumsum(bst.lengths),0,0)
for uu in range(bst.n_units):
for ii in range(bst.n_epochs):
segment = np.squeeze(data[uu, edges[ii]:edges[ii+1]])
segment = np.roll(segment, np.random.randint(len(segment)))
data[uu, edges[ii]:edges[ii+1]] = segment
if kind == 'train':
self.PBEs_train = out
else:
self.PBEs_test = out
def _within_event_unit_id_shuffle(self, kind='train'):
"""Unit ID shuffle on BinnedSpikeTrainArray, shuffling independently within each epoch."""
if kind == 'train':
bst = self.PBEs_train
elif kind == 'test':
bst = self.PBEs_test
else:
raise ValueError("kind '{}' not understood!".format(kind))
out = copy.deepcopy(bst) # should this be deep?
data = out._data
edges = np.insert(np.cumsum(bst.lengths),0,0)
unit_list = np.arange(bst.n_units)
for ii in range(bst.n_epochs):
segment = data[:, edges[ii]:edges[ii+1]]
out._data[:, edges[ii]:edges[ii+1]] = segment[np.random.permutation(unit_list)]
if kind == 'train':
self.PBEs_train = out
else:
self.PBEs_test = out
def findSignificantContours(img, sobel_8u, sobel):
image, contours, heirarchy = cv2.findContours(sobel_8u, \
cv2.RETR_EXTERNAL, \
cv2.CHAIN_APPROX_SIMPLE)
mask = np.ones(image.shape[:2], dtype="uint8") * 255
level1 = []
for i, tupl in enumerate(heirarchy[0]):
if tupl[3] == -1:
tupl = np.insert(tupl, 0, [i])
level1.append(tupl)
significant = []
tooSmall = sobel_8u.size * 10 / 100
for tupl in level1:
contour = contours[tupl[0]];
area = cv2.contourArea(contour)
if area > tooSmall:
cv2.drawContours(mask, \
[contour], 0, (0, 255, 0), \
2, cv2.LINE_AA, maxLevel=1)
significant.append([contour, area])
significant.sort(key=lambda x: x[1])
significant = [x[0] for x in significant];
peri = cv2.arcLength(contour, True)
approx = cv2.approxPolyDP(contour, 0.02 * peri, True)
mask = sobel.copy()
mask[mask > 0] = 0
cv2.fillPoly(mask, significant, 255, 0)
mask = np.logical_not(mask)
img[mask] = 0;
return img
def diff_encode(x):
"""Encode phase differential baseband signal.
:param x: complex baseband data to encode differentially
:returns: differentially encoded complex baseband data of length len(x)+1
>>> import arlpy
>>> x = arlpy.comms.modulate(arlpy.comms.random_data(100, 4), arlpy.comms.psk(4)) # QPSK
>>> len(x)
100
>>> y = arlpy.comms.diff_encode(x) # DQPSK
>>> len(y)
101
>>> x[0]
(0.707+0.707j)
>>> y[1]/y[0]
(0.707+0.707j)
"""
x = _np.asarray(x)
y = _np.insert(x, 0, 1)
for j in range(2,len(y)):
y[j] *= y[j-1]
return y
def __init__(self, table,reg=False,lamda=0):
"""Initializes Class for Linear Regression
Parameters
----------
table : ndarray(n-rows,m-features + 1)
Numerical training data, last column as training values
reg : Boolean
Set True to enable regularization, false by default
"""
#regularization parameters
self.reg = reg
self.lamda = lamda
self.num_training = np.shape(table)[0]
# remove the last column from training data to extract features data
self.X = np.delete(table, -1, 1)
# add a column of ones in front of the training data
self.X = np.insert(self.X, 0, np.ones(self.num_training), axis=1)
self.num_features = np.shape(self.X)[1]
# extract the values of the training set from the provided data
self.y = table[:, self.num_features - 1]
# create parameters and initialize to 1
self.theta = np.ones(self.num_features)
def init_state(indata, test=False):
close = indata['close'].values
diff = np.diff(close)
diff = np.insert(diff, 0, 0)
sma15 = SMA(indata, timeperiod=15)
sma60 = SMA(indata, timeperiod=60)
rsi = RSI(indata, timeperiod=14)
atr = ATR(indata, timeperiod=14)
#--- Preprocess data
xdata = np.column_stack((close, diff, sma15, close-sma15, sma15-sma60, rsi, atr))
xdata = np.nan_to_num(xdata)
if test == False:
scaler = preprocessing.StandardScaler()
xdata = np.expand_dims(scaler.fit_transform(xdata), axis=1)
joblib.dump(scaler, 'data/scaler.pkl')
elif test == True:
scaler = joblib.load('data/scaler.pkl')
xdata = np.expand_dims(scaler.fit_transform(xdata), axis=1)
state = xdata[0:1, 0:1, :]
return state, xdata, close
#Take Action