spark_mnist.py 文件源码

python
阅读 18 收藏 0 点赞 0 评论 0

项目:tensoronspark 作者: liangfengsid 项目源码 文件源码
def train(sc=None, user=None, name='spark_mnist', server_host='localhost', server_port=10080, sync_interval=100, batch_size=100, num_partition=1, num_epoch=1, server_reusable=True):
    is_new_sc = False
    if sc is None:
        sc = pyspark.SparkContext(conf=pyspark.SparkConf())
        is_new_sc = True

    image_rdd = extract_images(sc, train_image_path)
    label_rdd = extract_labels(sc, train_label_path, num_class=10, one_hot=True)
    image_label_rdd = image_rdd.join(label_rdd, numPartitions=num_partition).mapPartitions(flatten_image_label).cache()

    x = tf.placeholder(tf.float32, [None, 784])
    W = tf.Variable(tf.zeros([784, 10]))
    b = tf.Variable(tf.zeros([10]))
    y = tf.nn.softmax(tf.matmul(x, W) + b)
    y_ = tf.placeholder(tf.float32, [None, 10])
    cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
    train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)
    init = tf.initialize_all_variables()
    sess = tf.Session()
    sess.run(init)

    feed_name_list = [x.name, y_.name]
    param_list = [W, b]

    spark_sess = sps.SparkSession(sc, sess, user=user, name=name, server_host=server_host, server_port=server_port, sync_interval=sync_interval, batch_size=batch_size)
    partitioner = par.RandomPartitioner(num_partition)
    combiner = comb.DeltaWeightCombiner()
    for i in range(num_epoch):
        spark_sess.run(train_step, feed_rdd=image_label_rdd, feed_name_list=feed_name_list, param_list=param_list, weight_combiner=combiner, shuffle_within_partition=True, server_reusable=server_reusable)
        if i != num_epoch-1:
            temp_image_label_rdd = image_label_rdd.partitionBy(num_partition, partitioner).cache()
            image_label_rdd.unpersist()
            image_label_rdd = temp_image_label_rdd

    # Since the parameter server is reusable in this spark_sess.run() example, one should stop the parameter server manually when it is no long used. 
    if server_reusable:
        spark_sess.stop_param_server()

    if is_new_sc:
        sc.close()

    from tensorflow.examples.tutorials.mnist import input_data
    mnist_data = input_data.read_data_sets("MNIST_data/", one_hot=True)
    correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    print(sess.run(accuracy, feed_dict={x: mnist_data.test.images, y_: mnist_data.test.labels}))

    spark_sess.close()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号