def train(sc=None, user=None, name='spark_mnist', server_host='localhost', server_port=10080, sync_interval=100, batch_size=100, num_partition=1, num_epoch=1, server_reusable=True):
is_new_sc = False
if sc is None:
sc = pyspark.SparkContext(conf=pyspark.SparkConf())
is_new_sc = True
image_rdd = extract_images(sc, train_image_path)
label_rdd = extract_labels(sc, train_label_path, num_class=10, one_hot=True)
image_label_rdd = image_rdd.join(label_rdd, numPartitions=num_partition).mapPartitions(flatten_image_label).cache()
x = tf.placeholder(tf.float32, [None, 784])
W = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
y = tf.nn.softmax(tf.matmul(x, W) + b)
y_ = tf.placeholder(tf.float32, [None, 10])
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)
init = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init)
feed_name_list = [x.name, y_.name]
param_list = [W, b]
spark_sess = sps.SparkSession(sc, sess, user=user, name=name, server_host=server_host, server_port=server_port, sync_interval=sync_interval, batch_size=batch_size)
partitioner = par.RandomPartitioner(num_partition)
combiner = comb.DeltaWeightCombiner()
for i in range(num_epoch):
spark_sess.run(train_step, feed_rdd=image_label_rdd, feed_name_list=feed_name_list, param_list=param_list, weight_combiner=combiner, shuffle_within_partition=True, server_reusable=server_reusable)
if i != num_epoch-1:
temp_image_label_rdd = image_label_rdd.partitionBy(num_partition, partitioner).cache()
image_label_rdd.unpersist()
image_label_rdd = temp_image_label_rdd
# Since the parameter server is reusable in this spark_sess.run() example, one should stop the parameter server manually when it is no long used.
if server_reusable:
spark_sess.stop_param_server()
if is_new_sc:
sc.close()
from tensorflow.examples.tutorials.mnist import input_data
mnist_data = input_data.read_data_sets("MNIST_data/", one_hot=True)
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
print(sess.run(accuracy, feed_dict={x: mnist_data.test.images, y_: mnist_data.test.labels}))
spark_sess.close()
评论列表
文章目录