def with_sql_context(application_name, conf=None):
"""Context manager for a spark context
Returns
-------
sc : SparkContext
sql_context: SQLContext
Examples
--------
Used within a context manager
>>> with with_sql_context("MyApplication") as (sc, sql_context):
... import pyspark
... # Do stuff
... pass
"""
if conf is None:
conf = default_configuration
assert isinstance(conf, SparkConfiguration)
sc = conf.spark_context(application_name)
import pyspark.sql
try:
yield sc, pyspark.sql.SQLContext(sc)
finally:
sc.stop()
python类SparkContext()的实例源码
def main():
#parameters
num_features = 400 #vocabulary size
#load data
print "loading 20 newsgroups dataset..."
categories = ['rec.autos','rec.sport.hockey','comp.graphics','sci.space']
tic = time()
dataset = fetch_20newsgroups(shuffle=True, random_state=0, categories=categories, remove=('headers','footers','quotes'))
train_corpus = dataset.data # a list of 11314 documents / entries
train_labels = dataset.target
toc = time()
print "elapsed time: %.4f sec" %(toc - tic)
#tf-idf vectorizer
tfidf = TfidfVectorizer(max_df=0.5, max_features=num_features, \
min_df=2, stop_words='english', use_idf=True)
X_tfidf = tfidf.fit_transform(train_corpus).toarray()
#append document labels
train_labels = train_labels.reshape(-1,1)
X_all = np.hstack([train_labels, X_tfidf])
#distribute the data
sc = SparkContext('local', 'log_reg')
rdd = sc.parallelize(X_all)
labeled_corpus = rdd.map(parse_doc)
train_RDD, test_RDD = labeled_corpus.randomSplit([8, 2], seed=0)
#distributed logistic regression
print "training logistic regression..."
model = LogisticRegressionWithLBFGS.train(train_RDD, regParam=1, regType='l1', numClasses=len(categories))
#evaluated the model on test data
labels_and_preds = test_RDD.map(lambda p: (p.label, model.predict(p.features)))
test_err = labels_and_preds.filter(lambda (v, p): v != p).count() / float(test_RDD.count())
print "log-reg test error: ", test_err
#model.save(sc, './log_reg_lbfgs_model')
Consumer.py 文件源码
项目:Location-based-Restaurants-Recommendation-System
作者: patilankita79
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def main():
conf = SparkConf().setMaster("local[2]").setAppName("YelpConsumer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
kstream = KafkaUtils.createDirectStream(ssc, topics=['yelp-stream'],
kafkaParams={"metadata.broker.list": 'localhost:9092'})
parsed_json = kstream.map(lambda (k, v): json.loads(v))
remapped_data = parsed_json.map(remap_elastic)
remapped_data.foreachRDD(writeElasticSearch)
ssc.start()
ssc.awaitTermination()
def setup_env(cls):
cls.sc = SparkContext('local[*]', cls.__name__)
cls.sql = SQLContext(cls.sc)
cls.session = SparkSession.builder.getOrCreate()
def _count_child(job, masterHostname):
# noinspection PyUnresolvedReferences
from pyspark import SparkContext
# start spark context and connect to cluster
sc = SparkContext(master='spark://%s:7077' % masterHostname,
appName='count_test')
# create an rdd containing 0-9999 split across 10 partitions
rdd = sc.parallelize(xrange(10000), 10)
# and now, count it
assert rdd.count() == 10000
def setup_backend():
global backend
import pyspark
sc = pyspark.SparkContext()
from abcpy.backends import BackendSpark as Backend
backend = Backend(sc, parallelism=4)
def _default_sparkconf_builder():
"""
Build a SparkConf object that can be used for the worker's SparkContext.
"""
from pyspark import SparkConf
return SparkConf().setAppName('SparkCeleryTask') \
.set('spark.dynamicAllocation.minExecutors', 1) \
.set('spark.dynamicAllocation.executorIdleTimeout', 60) \
.set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 3600)
def worker_init(self, loader):
"""
Initialize Spark config and context now.
"""
from pyspark import SparkContext
from pyspark.sql import SparkSession
sparkconf_builder = self.sparkconf_builder or _default_sparkconf_builder
self.spark_conf = sparkconf_builder()
self.sc = SparkContext(conf=self.spark_conf)
self.spark = SparkSession.builder.config(conf=self.spark_conf).getOrCreate()
def __init__(self, _config):
self._links = None
self._sources = None
self._orchestrator = None
self.set_links(config.instantiate_components(_config))
def restart_spark():
self._ssc = streamingctx.create_streaming_context(
self._sc,
_config)
self._restart_spark = restart_spark
self._sc = pyspark.SparkContext(
appName=_config["spark_config"]["appName"])
self._ssc = streamingctx.create_streaming_context(self._sc, _config)
def init_spark_context():
# load spark context
conf = SparkConf().setAppName("movie_recommendation-server")
# IMPORTANT: pass aditional Python modules to each worker
# sc = SparkContext(conf=conf, pyFiles=['rec_engine.py', 'app.py'])
sc = SparkContext(conf=conf)
return sc
def main():
if len(sys.argv) != 4:
print(USAGE)
exit(-1)
sc = SparkContext(appName="Realtime-Analytics-Engine")
ssc = StreamingContext(sc, TIMER)
zookeeper, in_topic, out_topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zookeeper, "analytics-engine-consumer", {in_topic: 1})
aggRDD = aggregate(kvs)
aggRDD.foreachRDD(lambda rec : publish(rec, out_topic))
ssc.start()
ssc.awaitTermination()
def get_logger(self, spark_context=None):
"""Get logger from SparkContext or (if None) from logging module"""
if spark_context is None:
return logging.getLogger(self.name)
return spark_context._jvm.org.apache.log4j.LogManager \
.getLogger(self.name)
def _test():
import doctest
import pyspark.mllib.fpm
globs = pyspark.mllib.fpm.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest')
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
def _test():
import doctest
from pyspark import SparkContext
import pyspark.mllib.evaluation
globs = pyspark.mllib.evaluation.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest')
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
def load(cls, sc, path):
"""Load the GaussianMixtureModel from disk.
:param sc: SparkContext
:param path: str, path to where the model is stored.
"""
model = cls._load_java(sc, path)
wrapper = sc._jvm.GaussianMixtureModelWrapper(model)
return cls(wrapper)
def load(cls, sc, path):
"""Load the LDAModel from disk.
:param sc: SparkContext
:param path: str, path to where the model is stored.
"""
if not isinstance(sc, SparkContext):
raise TypeError("sc should be a SparkContext, got type %s" % type(sc))
if not isinstance(path, basestring):
raise TypeError("path should be a basestring, got type %s" % type(path))
model = callMLlibFunc("loadLDAModel", sc, path)
return LDAModel(model)
def _test():
import doctest
import pyspark.mllib.clustering
globs = pyspark.mllib.clustering.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
def _test():
import doctest
from pyspark import SparkContext
import pyspark.mllib.classification
globs = pyspark.mllib.classification.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
def _test():
import doctest
from pyspark import SparkContext
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)