python类SparkConf()的实例源码

spark_session.py 文件源码 项目:tensormsa_old 作者: TensorMSA 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_session(self):
        """
        spark Loader Class
        creadted for the purpose of handling Spark Jobs
        """
        try :
            tfmsa_logger("Spark Session Created")

            # #tfmsa_logger("spark_context : {0}".format(spark_context))
            # if (isinstance(spark_context, (SparkContext))):
            #     return spark_context

            conf = SparkConf()
            conf.setMaster('spark://{0}'.format(settings.SPARK_HOST))
            conf.setAppName("tfmsa_session_manager")
            conf.set('spark.driver.cores', settings.SPARK_CORE)
            conf.set('spark.driver.memory', settings.SPARK_MEMORY)
            conf.set('spark.executor.cores', settings.SPARK_WORKER_CORE)
            conf.set('spark.executor.memory', settings.SPARK_WORKER_MEMORY)
            #conf.set('spark.driver.allowMultipleContexts', "true")
            SparkSession.spark_context = SparkContext(conf=conf)
            return spark_context
        except Exception as e :
            # tfmsa_logger(e)
            # raise Exception(e)
            return spark_context
server.py 文件源码 项目:Content-Based-News-Recommendation-System-in-Spark 作者: Labyrinth108 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def init_spark_context():
    # load spark context
    conf = SparkConf().setAppName("movie_recommendation-server")
    # IMPORTANT: pass aditional Python modules to each worker
    sc = SparkContext(conf=conf, pyFiles=['engine.py', 'app.py', 'util.py'])

    return sc
mon_metrics_kafka.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def invoke():
    # object to keep track of offsets
    ConfigInitializer.basic_config()

    # app name
    application_name = "mon_metrics_kafka"

    my_spark_conf = SparkConf().setAppName(application_name)

    spark_context = SparkContext(conf=my_spark_conf)

    # read at the configured interval
    spark_streaming_context = \
        StreamingContext(spark_context, cfg.CONF.service.stream_interval)

    kafka_stream = MonMetricsKafkaProcessor.get_kafka_stream(
        cfg.CONF.messaging.topic,
        spark_streaming_context)

    # transform to recordstore
    MonMetricsKafkaProcessor.transform_to_recordstore(kafka_stream)

    # catch interrupt, stop streaming context gracefully
    # signal.signal(signal.SIGINT, signal_handler)

    # start processing
    spark_streaming_context.start()

    # FIXME: stop spark context to relinquish resources

    # FIXME: specify cores, so as not to use all the resources on the cluster.

    # FIXME: HA deploy multiple masters, may be one on each control node

    try:
        # Wait for the Spark driver to "finish"
        spark_streaming_context.awaitTermination()
    except Exception as e:
        MonMetricsKafkaProcessor.log_debug(
            "Exception raised during Spark execution : " + str(e))
        # One exception that can occur here is the result of the saved
        # kafka offsets being obsolete/out of range.  Delete the saved
        # offsets to improve the chance of success on the next execution.

        # TODO(someone) prevent deleting all offsets for an application,
        # but just the latest revision
        MonMetricsKafkaProcessor.log_debug(
            "Deleting saved offsets for chance of success on next execution")

        MonMetricsKafkaProcessor.reset_kafka_offsets(application_name)

        # delete pre hourly processor offsets
        if cfg.CONF.stage_processors.pre_hourly_processor_enabled:
            PreHourlyProcessor.reset_kafka_offsets()
spark_mnist.py 文件源码 项目:tensoronspark 作者: liangfengsid 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def train(sc=None, user=None, name='spark_mnist', server_host='localhost', server_port=10080, sync_interval=100, batch_size=100, num_partition=1, num_epoch=1, server_reusable=True):
    is_new_sc = False
    if sc is None:
        sc = pyspark.SparkContext(conf=pyspark.SparkConf())
        is_new_sc = True

    image_rdd = extract_images(sc, train_image_path)
    label_rdd = extract_labels(sc, train_label_path, num_class=10, one_hot=True)
    image_label_rdd = image_rdd.join(label_rdd, numPartitions=num_partition).mapPartitions(flatten_image_label).cache()

    x = tf.placeholder(tf.float32, [None, 784])
    W = tf.Variable(tf.zeros([784, 10]))
    b = tf.Variable(tf.zeros([10]))
    y = tf.nn.softmax(tf.matmul(x, W) + b)
    y_ = tf.placeholder(tf.float32, [None, 10])
    cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
    train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)
    init = tf.initialize_all_variables()
    sess = tf.Session()
    sess.run(init)

    feed_name_list = [x.name, y_.name]
    param_list = [W, b]

    spark_sess = sps.SparkSession(sc, sess, user=user, name=name, server_host=server_host, server_port=server_port, sync_interval=sync_interval, batch_size=batch_size)
    partitioner = par.RandomPartitioner(num_partition)
    combiner = comb.DeltaWeightCombiner()
    for i in range(num_epoch):
        spark_sess.run(train_step, feed_rdd=image_label_rdd, feed_name_list=feed_name_list, param_list=param_list, weight_combiner=combiner, shuffle_within_partition=True, server_reusable=server_reusable)
        if i != num_epoch-1:
            temp_image_label_rdd = image_label_rdd.partitionBy(num_partition, partitioner).cache()
            image_label_rdd.unpersist()
            image_label_rdd = temp_image_label_rdd

    # Since the parameter server is reusable in this spark_sess.run() example, one should stop the parameter server manually when it is no long used. 
    if server_reusable:
        spark_sess.stop_param_server()

    if is_new_sc:
        sc.close()

    from tensorflow.examples.tutorials.mnist import input_data
    mnist_data = input_data.read_data_sets("MNIST_data/", one_hot=True)
    correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    print(sess.run(accuracy, feed_dict={x: mnist_data.test.images, y_: mnist_data.test.labels}))

    spark_sess.close()


问题


面经


文章

微信
公众号

扫码关注公众号