def create_session(self):
"""
spark Loader Class
creadted for the purpose of handling Spark Jobs
"""
try :
tfmsa_logger("Spark Session Created")
# #tfmsa_logger("spark_context : {0}".format(spark_context))
# if (isinstance(spark_context, (SparkContext))):
# return spark_context
conf = SparkConf()
conf.setMaster('spark://{0}'.format(settings.SPARK_HOST))
conf.setAppName("tfmsa_session_manager")
conf.set('spark.driver.cores', settings.SPARK_CORE)
conf.set('spark.driver.memory', settings.SPARK_MEMORY)
conf.set('spark.executor.cores', settings.SPARK_WORKER_CORE)
conf.set('spark.executor.memory', settings.SPARK_WORKER_MEMORY)
#conf.set('spark.driver.allowMultipleContexts', "true")
SparkSession.spark_context = SparkContext(conf=conf)
return spark_context
except Exception as e :
# tfmsa_logger(e)
# raise Exception(e)
return spark_context
python类SparkConf()的实例源码
server.py 文件源码
项目:Content-Based-News-Recommendation-System-in-Spark
作者: Labyrinth108
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def init_spark_context():
# load spark context
conf = SparkConf().setAppName("movie_recommendation-server")
# IMPORTANT: pass aditional Python modules to each worker
sc = SparkContext(conf=conf, pyFiles=['engine.py', 'app.py', 'util.py'])
return sc
def invoke():
# object to keep track of offsets
ConfigInitializer.basic_config()
# app name
application_name = "mon_metrics_kafka"
my_spark_conf = SparkConf().setAppName(application_name)
spark_context = SparkContext(conf=my_spark_conf)
# read at the configured interval
spark_streaming_context = \
StreamingContext(spark_context, cfg.CONF.service.stream_interval)
kafka_stream = MonMetricsKafkaProcessor.get_kafka_stream(
cfg.CONF.messaging.topic,
spark_streaming_context)
# transform to recordstore
MonMetricsKafkaProcessor.transform_to_recordstore(kafka_stream)
# catch interrupt, stop streaming context gracefully
# signal.signal(signal.SIGINT, signal_handler)
# start processing
spark_streaming_context.start()
# FIXME: stop spark context to relinquish resources
# FIXME: specify cores, so as not to use all the resources on the cluster.
# FIXME: HA deploy multiple masters, may be one on each control node
try:
# Wait for the Spark driver to "finish"
spark_streaming_context.awaitTermination()
except Exception as e:
MonMetricsKafkaProcessor.log_debug(
"Exception raised during Spark execution : " + str(e))
# One exception that can occur here is the result of the saved
# kafka offsets being obsolete/out of range. Delete the saved
# offsets to improve the chance of success on the next execution.
# TODO(someone) prevent deleting all offsets for an application,
# but just the latest revision
MonMetricsKafkaProcessor.log_debug(
"Deleting saved offsets for chance of success on next execution")
MonMetricsKafkaProcessor.reset_kafka_offsets(application_name)
# delete pre hourly processor offsets
if cfg.CONF.stage_processors.pre_hourly_processor_enabled:
PreHourlyProcessor.reset_kafka_offsets()
def train(sc=None, user=None, name='spark_mnist', server_host='localhost', server_port=10080, sync_interval=100, batch_size=100, num_partition=1, num_epoch=1, server_reusable=True):
is_new_sc = False
if sc is None:
sc = pyspark.SparkContext(conf=pyspark.SparkConf())
is_new_sc = True
image_rdd = extract_images(sc, train_image_path)
label_rdd = extract_labels(sc, train_label_path, num_class=10, one_hot=True)
image_label_rdd = image_rdd.join(label_rdd, numPartitions=num_partition).mapPartitions(flatten_image_label).cache()
x = tf.placeholder(tf.float32, [None, 784])
W = tf.Variable(tf.zeros([784, 10]))
b = tf.Variable(tf.zeros([10]))
y = tf.nn.softmax(tf.matmul(x, W) + b)
y_ = tf.placeholder(tf.float32, [None, 10])
cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1]))
train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy)
init = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init)
feed_name_list = [x.name, y_.name]
param_list = [W, b]
spark_sess = sps.SparkSession(sc, sess, user=user, name=name, server_host=server_host, server_port=server_port, sync_interval=sync_interval, batch_size=batch_size)
partitioner = par.RandomPartitioner(num_partition)
combiner = comb.DeltaWeightCombiner()
for i in range(num_epoch):
spark_sess.run(train_step, feed_rdd=image_label_rdd, feed_name_list=feed_name_list, param_list=param_list, weight_combiner=combiner, shuffle_within_partition=True, server_reusable=server_reusable)
if i != num_epoch-1:
temp_image_label_rdd = image_label_rdd.partitionBy(num_partition, partitioner).cache()
image_label_rdd.unpersist()
image_label_rdd = temp_image_label_rdd
# Since the parameter server is reusable in this spark_sess.run() example, one should stop the parameter server manually when it is no long used.
if server_reusable:
spark_sess.stop_param_server()
if is_new_sc:
sc.close()
from tensorflow.examples.tutorials.mnist import input_data
mnist_data = input_data.read_data_sets("MNIST_data/", one_hot=True)
correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
print(sess.run(accuracy, feed_dict={x: mnist_data.test.images, y_: mnist_data.test.labels}))
spark_sess.close()