def psgd_3(sgd, n_iter_per_job, n_jobs, n_syncs, X_train, y_train):
"""
Parallel SGD implementation using multiprocessing. All workers sync n_syncs times while running SGD independently
for n_iter_per_job iterations. Each worker will have an increased learning rate -- multiple of n_jobs.
Parameters
----------
sgd: input SGDRegression() object
n_iter_per_job: number of iterations per worker
n_jobs: number of parallel processes to run
n_syncs: number of syncs
X_train: train input data
y_train: train target data
Returns
-------
sgd: the input SGDRegressor() object with updated coef_ and intercept_
"""
n_iter_sync = n_iter_per_job/n_syncs # Iterations per model between syncs
eta = sgd.eta0 * n_jobs
sgds = [SGDRegressor(warm_start=True, n_iter=n_iter_sync, eta0=eta)
for _ in range(n_jobs)]
for _ in range(n_syncs):
sgds = Parallel(n_jobs=n_jobs)(
delayed(psgd_method_1)(s, X_train, y_train) for s in sgds)
coef = np.array([x.coef_ for x in sgds]).mean(axis=0)
intercept = np.array([x.intercept_ for x in sgds]).mean(axis=0)
for s in sgds:
s.coef_ = coef
s.intercept_ = intercept
sgd.coef_ = coef
sgd.intercept_ = intercept
return sgd
评论列表
文章目录