def create_sc():
sc_conf = SparkConf()
sc_conf.setAppName("finance-similarity-app")
sc_conf.setMaster('spark://10.21.208.21:7077')
sc_conf.set('spark.executor.memory', '2g')
sc_conf.set('spark.executor.cores', '4')
sc_conf.set('spark.cores.max', '40')
sc_conf.set('spark.logConf', True)
print sc_conf.getAll()
sc = None
try:
sc.stop()
sc = SparkContext(conf=sc_conf)
except:
sc = SparkContext(conf=sc_conf)
return sc
python类SparkConf()的实例源码
finance_similarity.py 文件源码
项目:Spark-in-Finance-Quantitative-Investing
作者: litaotao
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def get_rdd(es_index, es_type):
if es_type is "":
resource = es_index
else:
resource = es_index + "/" + es_type
es_read_conf = {
"es.nodes": ES_IP,
"es.port": ES_PORT,
"es.resource": resource,
"es.index.read.missing.as.empty": 'yes'
}
conf = SparkConf().setAppName("Unfetter")
sc = SparkContext(conf=conf)
rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
return rdd
twitterStream.py 文件源码
项目:Twitter-and-IMDB-Sentimental-Analytics
作者: abhinandanramesh
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def main():
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10) # Create a streaming context with batch interval of 10 sec
ssc.checkpoint("checkpoint")
pwords = load_wordlist("positive.txt")
nwords = load_wordlist("negative.txt")
ts = time.time()
counts = stream(ssc, pwords, nwords, 100)
make_plot(counts)
def getOrCreate(cls, checkpointPath, setupFunc):
"""
Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
recreated from the checkpoint data. If the data does not exist, then the provided setupFunc
will be used to create a new context.
@param checkpointPath: Checkpoint directory used in an earlier streaming program
@param setupFunc: Function to create a new context and setup DStreams
"""
cls._ensure_initialized()
gw = SparkContext._gateway
# Check whether valid checkpoint information exists in the given path
ssc_option = gw.jvm.StreamingContextPythonHelper().tryRecoverFromCheckpoint(checkpointPath)
if ssc_option.isEmpty():
ssc = setupFunc()
ssc.checkpoint(checkpointPath)
return ssc
jssc = gw.jvm.JavaStreamingContext(ssc_option.get())
# If there is already an active instance of Python SparkContext use it, or create a new one
if not SparkContext._active_spark_context:
jsc = jssc.sparkContext()
conf = SparkConf(_jconf=jsc.getConf())
SparkContext(conf=conf, gateway=gw, jsc=jsc)
sc = SparkContext._active_spark_context
# update ctx in serializer
cls._transformerSerializer.ctx = sc
return StreamingContext(sc, None, jssc)
def main():
conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)
def run():
rdd = sc.parallelize(range(10), 10).map(delayed(2))
reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
return reduced.map(delayed(2)).collect()
result = call_in_background(run)
status = sc.statusTracker()
while result.empty():
ids = status.getJobIdsForGroup()
for id in ids:
job = status.getJobInfo(id)
print("Job", id, "status: ", job.status)
for sid in job.stageIds:
info = status.getStageInfo(sid)
if info:
print("Stage %d: %d tasks total (%d active, %d complete)" %
(sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))
time.sleep(1)
print("Job results are:", result.get())
sc.stop()
def benchmark_spark(ratings, factors, iterations=5):
conf = (SparkConf()
.setAppName("implicit_benchmark")
.setMaster('local[*]')
.set('spark.driver.memory', '16G')
)
context = SparkContext(conf=conf)
spark = SparkSession(context)
times = {}
try:
ratings = convert_sparse_to_dataframe(spark, context, ratings)
for rank in factors:
als = ALS(rank=rank, maxIter=iterations,
alpha=1, implicitPrefs=True,
userCol="row", itemCol="col", ratingCol="data")
start = time.time()
als.fit(ratings)
elapsed = time.time() - start
times[rank] = elapsed / iterations
print("spark. factors=%i took %.3f" % (rank, elapsed/iterations))
finally:
spark.stop()
return times
recommender.py 文件源码
项目:Location-based-Restaurants-Recommendation-System
作者: patilankita79
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def main():
conf = SparkConf().setMaster("local[2]").setAppName("YelpRecommender")
sc = SparkContext(conf=conf)
rdd_data = readElasticSearch(sc)
parsed_mapped_data = rdd_data.filter(location_recommender)
sorted_data = parsed_mapped_data.top(150, key=lambda a: a[1]["stars"])
topn_data = copyUniqueData(sorted_data, 5)
printResult(topn_data)
clearElasticSearch()
sorted_rdd = sc.parallelize(topn_data)
es_data = sorted_rdd.map(remap_es)
es_data.saveAsNewAPIHadoopFile(path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf={ "es.resource" : "yelpreco/resturant"})
def run(self):
self.args = self.parse_arguments()
conf = SparkConf().setAll((
("spark.task.maxFailures", "10"),
("spark.locality.wait", "20s"),
("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
))
sc = SparkContext(
appName=self.name,
conf=conf)
sqlc = SQLContext(sparkContext=sc)
self.records_processed = sc.accumulator(0)
self.warc_input_processed = sc.accumulator(0)
self.warc_input_failed = sc.accumulator(0)
self.run_job(sc, sqlc)
sc.stop()
def main():
conf = SparkConf().setAppName("binarize nifti")
sc = SparkContext(conf=conf)
sc.setLogLevel('ERROR')
parser = argparse.ArgumentParser(description='Binarize images using FSL installed in a Docker container')
parser.add_argument('threshold', type=int, help="binarization threshold")
parser.add_argument('folder_path', type=str, help='folder path containing all of the splits')
parser.add_argument('output_path', type=str, help='output folder path')
args = parser.parse_args()
print args.folder_path
client = Config().get_client('dev')
nibRDD = sc.binaryFiles(args.folder_path)\
.map(lambda x: get_data(x))\
.map(lambda x: binarize(x, args.threshold))\
.map(lambda x: copy_to_hdfs(x, args.output_path, client)).collect()
matrix_multiply_sparse.py 文件源码
项目:Scalable-Matrix-Multiplication-on-Apache-Spark
作者: Abhishek-Arora
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def main():
input = sys.argv[1]
output = sys.argv[2]
conf = SparkConf().setAppName('Sparse Matrix Multiplication')
sc = SparkContext(conf=conf)
assert sc.version >= '1.5.1'
sparseMatrix = sc.textFile(input).map(lambda row : row.split(' ')).map(createCSRMatrix).map(multiplyMatrix).reduce(operator.add)
outputFile = open(output, 'w')
for row in range(len(sparseMatrix.indptr)-1):
col = sparseMatrix.indices[sparseMatrix.indptr[row]:sparseMatrix.indptr[row+1]]
data = sparseMatrix.data[sparseMatrix.indptr[row]:sparseMatrix.indptr[row+1]]
indexValuePairs = zip(col,data)
formattedOutput = formatOutput(indexValuePairs)
outputFile.write(formattedOutput + '\n')
def setUp(self):
# Create a local Spark context with 4 cores
spark_conf = SparkConf().setMaster('local[4]').\
setAppName("monasca-transform unit tests").\
set("spark.sql.shuffle.partitions", "10")
self.spark_context = SparkContext.getOrCreate(conf=spark_conf)
# quiet logging
logger = self.spark_context._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
logger.LogManager.getLogger("akka").setLevel(logger.Level.WARN)
def spark_context(self, application_name):
"""Create a spark context given the parameters configured in this class.
The caller is responsible for calling ``.close`` on the resulting spark context
Parameters
----------
application_name : string
Returns
-------
sc : SparkContext
"""
# initialize the spark configuration
self._init_spark()
import pyspark
import pyspark.sql
# initialize conf
spark_conf = pyspark.SparkConf()
for k, v in self._spark_conf_helper._conf_dict.items():
spark_conf.set(k, v)
log.info("Starting SparkContext")
return pyspark.SparkContext(appName=application_name, conf=spark_conf)
Consumer.py 文件源码
项目:Location-based-Restaurants-Recommendation-System
作者: patilankita79
项目源码
文件源码
阅读 36
收藏 0
点赞 0
评论 0
def main():
conf = SparkConf().setMaster("local[2]").setAppName("YelpConsumer")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
kstream = KafkaUtils.createDirectStream(ssc, topics=['yelp-stream'],
kafkaParams={"metadata.broker.list": 'localhost:9092'})
parsed_json = kstream.map(lambda (k, v): json.loads(v))
remapped_data = parsed_json.map(remap_elastic)
remapped_data.foreachRDD(writeElasticSearch)
ssc.start()
ssc.awaitTermination()
def _default_sparkconf_builder():
"""
Build a SparkConf object that can be used for the worker's SparkContext.
"""
from pyspark import SparkConf
return SparkConf().setAppName('SparkCeleryTask') \
.set('spark.dynamicAllocation.minExecutors', 1) \
.set('spark.dynamicAllocation.executorIdleTimeout', 60) \
.set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 3600)
def sparkconfig_builder():
from pyspark import SparkConf
return SparkConf().setAppName('SparkCeleryTask') \
.set('spark.dynamicAllocation.enabled', 'true') \
.set('spark.dynamicAllocation.schedulerBacklogTimeout', 1) \
.set('spark.dynamicAllocation.minExecutors', 1) \
.set('spark.dynamicAllocation.executorIdleTimeout', 20) \
.set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 60)
def init_spark_context():
# load spark context
conf = SparkConf().setAppName("movie_recommendation-server")
# IMPORTANT: pass aditional Python modules to each worker
# sc = SparkContext(conf=conf, pyFiles=['rec_engine.py', 'app.py'])
sc = SparkContext(conf=conf)
return sc
def getOrCreate(cls, checkpointPath, setupFunc):
"""
Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
recreated from the checkpoint data. If the data does not exist, then the provided setupFunc
will be used to create a new context.
@param checkpointPath: Checkpoint directory used in an earlier streaming program
@param setupFunc: Function to create a new context and setup DStreams
"""
cls._ensure_initialized()
gw = SparkContext._gateway
# Check whether valid checkpoint information exists in the given path
ssc_option = gw.jvm.StreamingContextPythonHelper().tryRecoverFromCheckpoint(checkpointPath)
if ssc_option.isEmpty():
ssc = setupFunc()
ssc.checkpoint(checkpointPath)
return ssc
jssc = gw.jvm.JavaStreamingContext(ssc_option.get())
# If there is already an active instance of Python SparkContext use it, or create a new one
if not SparkContext._active_spark_context:
jsc = jssc.sparkContext()
conf = SparkConf(_jconf=jsc.getConf())
SparkContext(conf=conf, gateway=gw, jsc=jsc)
sc = SparkContext._active_spark_context
# update ctx in serializer
cls._transformerSerializer.ctx = sc
return StreamingContext(sc, None, jssc)
def __init__(self):
conf = SparkConf().setAppName("ntu-speech").setMaster("local")
self.sc = SparkContext(conf=conf)
self.sqlCtx = SQLContext(self.sc)
def getSparkConf(mode="mesos", node=0):
"""
get the spark configuration according to the setting
:param mode:
:param node:
:return:
"""
global options
'''
get spark configuration
'''
sconf=SparkConf()
'''
set spark configuration
'''
sconf.setAppName("%s" % (str(options)))
# set run mode, now only support spark standalone and mesos coarse mode
if (mode == 'spark'):
sconf.setMaster(Setting.SPARK_STANDALONE_URL)
elif (mode == 'mesos'):
sconf.setMaster(Setting.MESOS_COARSE_URL)
# sconf.set("spark.mesos.coarse", "false")
sconf.set("spark.mesos.coarse", "true")
sconf.set("spark.mesos.executor.home", Setting.SPARK_HOME)
else:
print("****unknown mode")
exit(0)
# set core limit if need
if (0 >= node):
print "****Spark:no cores max"
else:
sconf.set("spark.cores.max", "%d" % (options.cpu * node))
return sconf
def main():
conf = SparkConf().setAppName("binarize nifti")
sc = SparkContext(conf=conf)
sc.setLogLevel('ERROR')
parser = argparse.ArgumentParser(description='Binarize images')
parser.add_argument('threshold', type=int, help="binarization threshold")
parser.add_argument('folder_path', type=str, help='folder path containing all of the splits')
parser.add_argument('output_path', type=str, help='output folder path')
parser.add_argument('num', type=int,choices=[2,4,6,8], help='number of binarization operations')
parser.add_argument('-m', '--in_memory', type=bool, default=True, help='in memory computation')
args = parser.parse_args()
nibRDD = sc.binaryFiles(args.folder_path)\
.map(lambda x: get_data(x))
client = Config().get_client('dev')
if args.in_memory == 'True':
print "Performing in-memory computations"
for i in xrange(num - 1):
nibRDD = nibRDD.map(lambda x: binarize(x, args.threshold))
nibRDD = nibRDD.map(lambda x: binarize_and_save(x, args.threshold, args.output_path, client)).collect()
else:
print "Writing intermediary results to disk and loading from disk"
binRDD = nibRDD.map(lambda x: binarize_and_save(x, args.threshold, args.output_path + "1", client)).collect()
for i in xrange(num - 1):
binRDD = sc.binaryFiles(args.output_path + "1")\
.map(lambda x: get_data(x))\
.map(lambda x: binarize_and_save(x, args.threshold, args.output_path + "1", client)).collect()
def main():
# Arguments parsing
parser=argparse.ArgumentParser()
# Required inputs
parser.add_argument("bids_app_boutiques_descriptor", help="Boutiques descriptor of the BIDS App that will process the dataset.")
parser.add_argument("bids_dataset", help="BIDS dataset to be processed.")
parser.add_argument("output_dir", help="Output directory.")
# Optional inputs
parser.add_argument("--skip-participant-analysis", action = 'store_true', help="Skips participant analysis.")
parser.add_argument("--skip-group-analysis", action = 'store_true', help="Skips groups analysis.")
parser.add_argument("--skip-participants", metavar="FILE", type=lambda x: is_valid_file(parser, x), help="Skips participant labels in the text file.")
parser.add_argument("--hdfs", action = 'store_true', help="Passes data by value rather than by reference in the pipeline. Use it with HDFS only. Requires HDFS to be started.")
args=parser.parse_args()
spark_bids = SparkBIDS(args.bids_app_boutiques_descriptor,
args.bids_dataset,
args.output_dir,
{ 'use_hdfs': args.hdfs,
'skip_participant_analysis': args.skip_participant_analysis,
'skip_group_analysis': args.skip_group_analysis,
'skip_participants_file': args.skip_participants})
sc = None
if spark_bids.spark_required():
# Spark initialization
conf = SparkConf().setAppName("BIDS pipeline")
sc = SparkContext(conf=conf)
# Run!
spark_bids.run(sc)
# Execute program
twitterStream.py 文件源码
项目:Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka
作者: sridharswamy
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def main():
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
# Creating a streaming context with batch interval of 10 sec
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")
pwords = load_wordlist("./Dataset/positive.txt")
nwords = load_wordlist("./Dataset/negative.txt")
counts = stream(ssc, pwords, nwords, 100)
make_plot(counts)
def __init__(self, additional_options=None):
os.environ['PYSPARK_PYTHON'] = sys.executable
submit_args = [
self._setup_repositories(),
self._setup_packages(),
self._setup_jars(),
'pyspark-shell',
]
os.environ['PYSPARK_SUBMIT_ARGS'] = ' '.join(filter(None, submit_args))
def _create_spark_context():
spark_conf = SparkConf()
spark_conf.set('spark.sql.catalogImplementation', 'hive')
spark_conf.setAll(self._setup_options(additional_options))
return SparkContext(conf=spark_conf)
# If we are in instant testing mode
if InstantTesting.is_activated():
spark_context = InstantTesting.get_context()
# It's the first run, so we have to create context and demonise the process.
if spark_context is None:
spark_context = _create_spark_context()
if os.fork() == 0: # Detached process.
signal.pause()
else:
InstantTesting.set_context(spark_context)
else:
spark_context = _create_spark_context()
# Init HiveContext
super(SparklySession, self).__init__(spark_context)
self._setup_udfs()
self.read_ext = SparklyReader(self)
self.catalog_ext = SparklyCatalog(self)
attach_writer_to_dataframe()
def setUp(self):
super(TestSparklySession, self).setUp()
self.spark_conf_mock = mock.Mock(spec=SparkConf)
self.spark_context_mock = mock.Mock(spec=SparkContext)
self.patches = [
mock.patch('sparkly.session.SparkConf', self.spark_conf_mock),
mock.patch('sparkly.session.SparkContext', self.spark_context_mock),
]
[p.start() for p in self.patches]
def setUpClass(cls):
master = os.getenv('MASTER')
assert master is not None, "Please start a Spark standalone cluster and export MASTER to your env."
num_workers = os.getenv('SPARK_WORKER_INSTANCES')
assert num_workers is not None, "Please export SPARK_WORKER_INSTANCES to your env."
cls.num_workers = int(num_workers)
spark_jars = os.getenv('SPARK_CLASSPATH')
assert spark_jars and 'tensorflow-hadoop' in spark_jars, "Please add path to tensorflow-hadoop-*.jar to SPARK_CLASSPATH."
cls.conf = SparkConf().set('spark.jars', spark_jars)
cls.sc = SparkContext(master, cls.__name__, conf=cls.conf)
cls.spark = SparkSession.builder.getOrCreate()
def init_spark_context():
# load spark context
conf = SparkConf().setAppName("movie_recommendation-server")
# IMPORTANT: pass aditional Python modules to each worker
sc = SparkContext(conf=conf, pyFiles=['engine.py', 'app.py'])
return sc
def init_spark_context():
# load spark context
conf = SparkConf().setAppName("movie_recommendation-server")
# IMPORTANT: pass aditional Python modules to each worker
sc = SparkContext(conf=conf, pyFiles=['engine.py', 'app.py'])
return sc
matrix_multiply.py 文件源码
项目:Scalable-Matrix-Multiplication-on-Apache-Spark
作者: Abhishek-Arora
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def main():
input = sys.argv[1]
output = sys.argv[2]
conf = SparkConf().setAppName('Matrix Multiplication')
sc = SparkContext(conf=conf)
assert sc.version >= '1.5.1'
row = sc.textFile(input).map(lambda row : row.split(' ')).cache()
ncol = len(row.take(1)[0])
intermediateResult = row.map(permutation).reduce(add_tuples)
outputFile = open(output, 'w')
result = [intermediateResult[x:x+3] for x in range(0, len(intermediateResult), ncol)]
for row in result:
for element in row:
outputFile.write(str(element) + ' ')
outputFile.write('\n')
outputFile.close()
# outputResult = sc.parallelize(result).coalesce(1)
# outputResult.saveAsTextFile(output)
def _create_sql_context(self):
"""
Create a new SQL context within a new Spark context. Import of classes from
pyspark has to be pushed down into this method as Spark needs to be available
in order for the libraries to be imported successfully. Since Spark is not available
when the ETL is started initally, we delay the import until the ETL has restarted
under Spark.
Side-effect: Logging is configured by the time that pyspark is loaded
so we have some better control over filters and formatting.
"""
from pyspark import SparkConf, SparkContext, SQLContext
if "SPARK_ENV_LOADED" not in os.environ:
self.logger.warning("SPARK_ENV_LOADED is not set")
self.logger.info("Starting SparkSQL context")
conf = (SparkConf()
.setAppName(__name__)
.set("spark.logConf", "true"))
sc = SparkContext(conf=conf)
# Copy the credentials from the session into hadoop for access to S3
session = boto3.Session()
credentials = session.get_credentials()
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.access.key", credentials.access_key)
hadoopConf.set("fs.s3a.secret.key", credentials.secret_key)
return SQLContext(sc)
def spark_session_create(self, app_name):
"""
spark Loader Class
creadted for the purpose of handling Spark Jobs
"""
tfmsa_logger("Spark Session Created")
conf = SparkConf()
conf.setMaster('spark://{0}'.format(settings.SPARK_HOST))
conf.setAppName(app_name)
conf.set('spark.driver.cores', settings.SPARK_CORE)
conf.set('spark.driver.memory', settings.SPARK_MEMORY)
conf.set('spark.executor.cores', settings.SPARK_WORKER_CORE)
conf.set('spark.executor.memory', settings.SPARK_WORKER_MEMORY)
conf.set('spark.driver.allowMultipleContexts', "true")
return SparkContext(conf=conf)