python类SparkConf()的实例源码

finance_similarity.py 文件源码 项目:Spark-in-Finance-Quantitative-Investing 作者: litaotao 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def create_sc():
    sc_conf = SparkConf()
    sc_conf.setAppName("finance-similarity-app")
    sc_conf.setMaster('spark://10.21.208.21:7077')
    sc_conf.set('spark.executor.memory', '2g')
    sc_conf.set('spark.executor.cores', '4')
    sc_conf.set('spark.cores.max', '40')
    sc_conf.set('spark.logConf', True)
    print sc_conf.getAll()

    sc = None
    try:
        sc.stop()
        sc = SparkContext(conf=sc_conf)
    except:
        sc = SparkContext(conf=sc_conf)

    return sc
es_helper.py 文件源码 项目:unfetter 作者: unfetter-analytic 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_rdd(es_index, es_type):

    if es_type is "":
        resource = es_index
    else:
        resource = es_index + "/" + es_type
    es_read_conf = {
        "es.nodes": ES_IP,
        "es.port": ES_PORT,
        "es.resource": resource,
        "es.index.read.missing.as.empty": 'yes'
    }
    conf = SparkConf().setAppName("Unfetter")
    sc = SparkContext(conf=conf)
    rdd = sc.newAPIHadoopRDD(
        inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_read_conf)
    return rdd
twitterStream.py 文件源码 项目:Twitter-and-IMDB-Sentimental-Analytics 作者: abhinandanramesh 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def main():
    conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 10)   # Create a streaming context with batch interval of 10 sec
    ssc.checkpoint("checkpoint")

    pwords = load_wordlist("positive.txt")
    nwords = load_wordlist("negative.txt")
    ts = time.time()   
    counts = stream(ssc, pwords, nwords, 100)
    make_plot(counts)
context.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def getOrCreate(cls, checkpointPath, setupFunc):
        """
        Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
        If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
        recreated from the checkpoint data. If the data does not exist, then the provided setupFunc
        will be used to create a new context.

        @param checkpointPath: Checkpoint directory used in an earlier streaming program
        @param setupFunc:      Function to create a new context and setup DStreams
        """
        cls._ensure_initialized()
        gw = SparkContext._gateway

        # Check whether valid checkpoint information exists in the given path
        ssc_option = gw.jvm.StreamingContextPythonHelper().tryRecoverFromCheckpoint(checkpointPath)
        if ssc_option.isEmpty():
            ssc = setupFunc()
            ssc.checkpoint(checkpointPath)
            return ssc

        jssc = gw.jvm.JavaStreamingContext(ssc_option.get())

        # If there is already an active instance of Python SparkContext use it, or create a new one
        if not SparkContext._active_spark_context:
            jsc = jssc.sparkContext()
            conf = SparkConf(_jconf=jsc.getConf())
            SparkContext(conf=conf, gateway=gw, jsc=jsc)

        sc = SparkContext._active_spark_context

        # update ctx in serializer
        cls._transformerSerializer.ctx = sc
        return StreamingContext(sc, None, jssc)
status_api_demo.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():
    conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
    sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)

    def run():
        rdd = sc.parallelize(range(10), 10).map(delayed(2))
        reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
        return reduced.map(delayed(2)).collect()

    result = call_in_background(run)
    status = sc.statusTracker()
    while result.empty():
        ids = status.getJobIdsForGroup()
        for id in ids:
            job = status.getJobInfo(id)
            print("Job", id, "status: ", job.status)
            for sid in job.stageIds:
                info = status.getStageInfo(sid)
                if info:
                    print("Stage %d: %d tasks total (%d active, %d complete)" %
                          (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))
        time.sleep(1)

    print("Job results are:", result.get())
    sc.stop()
benchmark_spark.py 文件源码 项目:implicit 作者: benfred 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def benchmark_spark(ratings, factors, iterations=5):
    conf = (SparkConf()
            .setAppName("implicit_benchmark")
            .setMaster('local[*]')
            .set('spark.driver.memory', '16G')
            )
    context = SparkContext(conf=conf)
    spark = SparkSession(context)

    times = {}
    try:
        ratings = convert_sparse_to_dataframe(spark, context, ratings)

        for rank in factors:
            als = ALS(rank=rank, maxIter=iterations,
                      alpha=1, implicitPrefs=True,
                      userCol="row", itemCol="col", ratingCol="data")
            start = time.time()
            als.fit(ratings)
            elapsed = time.time() - start
            times[rank] = elapsed / iterations
            print("spark. factors=%i took %.3f" % (rank, elapsed/iterations))
    finally:
        spark.stop()

    return times
recommender.py 文件源码 项目:Location-based-Restaurants-Recommendation-System 作者: patilankita79 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main():
    conf = SparkConf().setMaster("local[2]").setAppName("YelpRecommender")
    sc = SparkContext(conf=conf)
    rdd_data = readElasticSearch(sc)
    parsed_mapped_data = rdd_data.filter(location_recommender)
    sorted_data = parsed_mapped_data.top(150, key=lambda a: a[1]["stars"])
    topn_data = copyUniqueData(sorted_data, 5)
    printResult(topn_data)
    clearElasticSearch()
    sorted_rdd = sc.parallelize(topn_data)
    es_data = sorted_rdd.map(remap_es)
    es_data.saveAsNewAPIHadoopFile(path='-',
                               outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
                               keyClass="org.apache.hadoop.io.NullWritable",
                               valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
                               conf={ "es.resource" :  "yelpreco/resturant"})
sparkcc.py 文件源码 项目:cc-pyspark 作者: commoncrawl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run(self):
        self.args = self.parse_arguments()

        conf = SparkConf().setAll((
            ("spark.task.maxFailures", "10"),
            ("spark.locality.wait", "20s"),
            ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
        ))
        sc = SparkContext(
            appName=self.name,
            conf=conf)
        sqlc = SQLContext(sparkContext=sc)

        self.records_processed = sc.accumulator(0)
        self.warc_input_processed = sc.accumulator(0)
        self.warc_input_failed = sc.accumulator(0)

        self.run_job(sc, sqlc)

        sc.stop()
binarize_fsl.py 文件源码 项目:sim 作者: big-data-lab-team 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():

    conf = SparkConf().setAppName("binarize nifti")
    sc = SparkContext(conf=conf)
    sc.setLogLevel('ERROR')


    parser = argparse.ArgumentParser(description='Binarize images using FSL installed in a Docker container')
    parser.add_argument('threshold', type=int, help="binarization threshold")
    parser.add_argument('folder_path', type=str, help='folder path containing all of the splits')
    parser.add_argument('output_path', type=str, help='output folder path')

    args = parser.parse_args()

    print args.folder_path 
    client = Config().get_client('dev')

    nibRDD = sc.binaryFiles(args.folder_path)\
        .map(lambda x: get_data(x))\
        .map(lambda x: binarize(x, args.threshold))\
        .map(lambda x: copy_to_hdfs(x, args.output_path, client)).collect()
matrix_multiply_sparse.py 文件源码 项目:Scalable-Matrix-Multiplication-on-Apache-Spark 作者: Abhishek-Arora 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():


    input = sys.argv[1]
    output = sys.argv[2]


    conf = SparkConf().setAppName('Sparse Matrix Multiplication')
    sc = SparkContext(conf=conf)
    assert sc.version >= '1.5.1'

    sparseMatrix = sc.textFile(input).map(lambda row : row.split(' ')).map(createCSRMatrix).map(multiplyMatrix).reduce(operator.add)
    outputFile = open(output, 'w')

    for row in range(len(sparseMatrix.indptr)-1):
        col = sparseMatrix.indices[sparseMatrix.indptr[row]:sparseMatrix.indptr[row+1]]
        data = sparseMatrix.data[sparseMatrix.indptr[row]:sparseMatrix.indptr[row+1]]
        indexValuePairs = zip(col,data)
        formattedOutput = formatOutput(indexValuePairs)
        outputFile.write(formattedOutput + '\n')
spark_context_test.py 文件源码 项目:monasca-transform 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        # Create a local Spark context with 4 cores
        spark_conf = SparkConf().setMaster('local[4]').\
            setAppName("monasca-transform unit tests").\
            set("spark.sql.shuffle.partitions", "10")
        self.spark_context = SparkContext.getOrCreate(conf=spark_conf)
        # quiet logging
        logger = self.spark_context._jvm.org.apache.log4j
        logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
        logger.LogManager.getLogger("akka").setLevel(logger.Level.WARN)
launcher.py 文件源码 项目:spylon 作者: maxpoint 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def spark_context(self, application_name):
        """Create a spark context given the parameters configured in this class.

        The caller is responsible for calling ``.close`` on the resulting spark context

        Parameters
        ----------
        application_name : string

        Returns
        -------
        sc : SparkContext
        """

        # initialize the spark configuration
        self._init_spark()
        import pyspark
        import pyspark.sql

        # initialize conf
        spark_conf = pyspark.SparkConf()
        for k, v in self._spark_conf_helper._conf_dict.items():
            spark_conf.set(k, v)

        log.info("Starting SparkContext")
        return pyspark.SparkContext(appName=application_name, conf=spark_conf)
Consumer.py 文件源码 项目:Location-based-Restaurants-Recommendation-System 作者: patilankita79 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def main():
    conf = SparkConf().setMaster("local[2]").setAppName("YelpConsumer")
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 10)

    kstream = KafkaUtils.createDirectStream(ssc, topics=['yelp-stream'],
                                            kafkaParams={"metadata.broker.list": 'localhost:9092'})


    parsed_json = kstream.map(lambda (k, v): json.loads(v))
    remapped_data = parsed_json.map(remap_elastic)

    remapped_data.foreachRDD(writeElasticSearch)
    ssc.start() 
    ssc.awaitTermination()
app.py 文件源码 项目:spark-celery 作者: gregbaker 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _default_sparkconf_builder():
    """
    Build a SparkConf object that can be used for the worker's SparkContext.
    """
    from pyspark import SparkConf
    return SparkConf().setAppName('SparkCeleryTask') \
        .set('spark.dynamicAllocation.minExecutors', 1) \
        .set('spark.dynamicAllocation.executorIdleTimeout', 60) \
        .set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 3600)
demo.py 文件源码 项目:spark-celery 作者: gregbaker 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def sparkconfig_builder():
    from pyspark import SparkConf
    return SparkConf().setAppName('SparkCeleryTask') \
        .set('spark.dynamicAllocation.enabled', 'true') \
        .set('spark.dynamicAllocation.schedulerBacklogTimeout', 1) \
        .set('spark.dynamicAllocation.minExecutors', 1) \
        .set('spark.dynamicAllocation.executorIdleTimeout', 20) \
        .set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 60)
server.py 文件源码 项目:Spark_Movie_recsys 作者: dreamcity 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def init_spark_context():
    # load spark context
    conf = SparkConf().setAppName("movie_recommendation-server")
    # IMPORTANT: pass aditional Python modules to each worker
    # sc = SparkContext(conf=conf, pyFiles=['rec_engine.py', 'app.py'])
    sc = SparkContext(conf=conf)

    return sc
context.py 文件源码 项目:pyspark 作者: v-v-vishnevskiy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def getOrCreate(cls, checkpointPath, setupFunc):
        """
        Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
        If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
        recreated from the checkpoint data. If the data does not exist, then the provided setupFunc
        will be used to create a new context.

        @param checkpointPath: Checkpoint directory used in an earlier streaming program
        @param setupFunc:      Function to create a new context and setup DStreams
        """
        cls._ensure_initialized()
        gw = SparkContext._gateway

        # Check whether valid checkpoint information exists in the given path
        ssc_option = gw.jvm.StreamingContextPythonHelper().tryRecoverFromCheckpoint(checkpointPath)
        if ssc_option.isEmpty():
            ssc = setupFunc()
            ssc.checkpoint(checkpointPath)
            return ssc

        jssc = gw.jvm.JavaStreamingContext(ssc_option.get())

        # If there is already an active instance of Python SparkContext use it, or create a new one
        if not SparkContext._active_spark_context:
            jsc = jssc.sparkContext()
            conf = SparkConf(_jconf=jsc.getConf())
            SparkContext(conf=conf, gateway=gw, jsc=jsc)

        sc = SparkContext._active_spark_context

        # update ctx in serializer
        cls._transformerSerializer.ctx = sc
        return StreamingContext(sc, None, jssc)
spark-als.py 文件源码 项目:ntu-summer-course 作者: lucasko-tw 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self): 
  conf = SparkConf().setAppName("ntu-speech").setMaster("local")
  self.sc = SparkContext(conf=conf)
  self.sqlCtx = SQLContext(self.sc)
pacci.py 文件源码 项目:PACCI 作者: SymSecGroup 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def getSparkConf(mode="mesos", node=0):
    """
    get the spark configuration according to the setting
    :param mode:
    :param node:
    :return:
    """
    global options

    '''
    get spark configuration
    '''
    sconf=SparkConf()

    '''
    set spark configuration
    '''
    sconf.setAppName("%s" % (str(options)))

    # set run mode, now only support spark standalone and mesos coarse mode
    if (mode == 'spark'):
        sconf.setMaster(Setting.SPARK_STANDALONE_URL)
    elif (mode == 'mesos'):
        sconf.setMaster(Setting.MESOS_COARSE_URL)
        # sconf.set("spark.mesos.coarse", "false")
        sconf.set("spark.mesos.coarse", "true")
        sconf.set("spark.mesos.executor.home", Setting.SPARK_HOME)
    else:
        print("****unknown mode")
        exit(0)

    # set core limit if need
    if (0 >= node):
        print "****Spark:no cores max"
    else:
        sconf.set("spark.cores.max", "%d" % (options.cpu * node))

    return sconf
binarize_spark.py 文件源码 项目:sim 作者: big-data-lab-team 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():

    conf = SparkConf().setAppName("binarize nifti")
    sc = SparkContext(conf=conf)
    sc.setLogLevel('ERROR')


    parser = argparse.ArgumentParser(description='Binarize images')
    parser.add_argument('threshold', type=int, help="binarization threshold")
    parser.add_argument('folder_path', type=str, help='folder path containing all of the splits')
    parser.add_argument('output_path', type=str, help='output folder path')
    parser.add_argument('num', type=int,choices=[2,4,6,8], help='number of binarization operations')
    parser.add_argument('-m', '--in_memory', type=bool, default=True,  help='in memory computation')    

    args = parser.parse_args()

    nibRDD = sc.binaryFiles(args.folder_path)\
        .map(lambda x: get_data(x))

    client = Config().get_client('dev')

    if args.in_memory == 'True':
        print "Performing in-memory computations"

        for i in xrange(num - 1):
           nibRDD = nibRDD.map(lambda x: binarize(x, args.threshold))
        nibRDD = nibRDD.map(lambda x: binarize_and_save(x, args.threshold, args.output_path, client)).collect()

    else:
        print "Writing intermediary results to disk and loading from disk"

        binRDD = nibRDD.map(lambda x: binarize_and_save(x, args.threshold, args.output_path + "1", client)).collect()

        for i in xrange(num - 1):
           binRDD = sc.binaryFiles(args.output_path + "1")\
                        .map(lambda x: get_data(x))\
                        .map(lambda x: binarize_and_save(x, args.threshold, args.output_path + "1", client)).collect()
spark_bids.py 文件源码 项目:sim 作者: big-data-lab-team 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():

    # Arguments parsing
    parser=argparse.ArgumentParser()

    # Required inputs
    parser.add_argument("bids_app_boutiques_descriptor", help="Boutiques descriptor of the BIDS App that will process the dataset.")
    parser.add_argument("bids_dataset", help="BIDS dataset to be processed.")
    parser.add_argument("output_dir", help="Output directory.")

    # Optional inputs
    parser.add_argument("--skip-participant-analysis", action = 'store_true', help="Skips participant analysis.")
    parser.add_argument("--skip-group-analysis", action = 'store_true', help="Skips groups analysis.")
    parser.add_argument("--skip-participants", metavar="FILE", type=lambda x: is_valid_file(parser, x), help="Skips participant labels in the text file.")
    parser.add_argument("--hdfs", action = 'store_true', help="Passes data by value rather than by reference in the pipeline. Use it with HDFS only. Requires HDFS to be started.")
    args=parser.parse_args()

    spark_bids = SparkBIDS(args.bids_app_boutiques_descriptor,
                           args.bids_dataset,
                           args.output_dir,
                           { 'use_hdfs': args.hdfs,
                             'skip_participant_analysis': args.skip_participant_analysis,
                             'skip_group_analysis': args.skip_group_analysis,
                             'skip_participants_file': args.skip_participants})

    sc = None

    if spark_bids.spark_required():
        # Spark initialization
        conf = SparkConf().setAppName("BIDS pipeline")
        sc = SparkContext(conf=conf)

    # Run!
    spark_bids.run(sc)

# Execute program
twitterStream.py 文件源码 项目:Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka 作者: sridharswamy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def main():
    conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
    sc = SparkContext(conf=conf)

    # Creating a streaming context with batch interval of 10 sec
    ssc = StreamingContext(sc, 10)
    ssc.checkpoint("checkpoint")
    pwords = load_wordlist("./Dataset/positive.txt")
    nwords = load_wordlist("./Dataset/negative.txt")
    counts = stream(ssc, pwords, nwords, 100)
    make_plot(counts)
session.py 文件源码 项目:sparkly 作者: Tubular 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, additional_options=None):
        os.environ['PYSPARK_PYTHON'] = sys.executable
        submit_args = [
            self._setup_repositories(),
            self._setup_packages(),
            self._setup_jars(),
            'pyspark-shell',
        ]
        os.environ['PYSPARK_SUBMIT_ARGS'] = ' '.join(filter(None, submit_args))

        def _create_spark_context():
            spark_conf = SparkConf()
            spark_conf.set('spark.sql.catalogImplementation', 'hive')
            spark_conf.setAll(self._setup_options(additional_options))
            return SparkContext(conf=spark_conf)

        # If we are in instant testing mode
        if InstantTesting.is_activated():
            spark_context = InstantTesting.get_context()

            # It's the first run, so we have to create context and demonise the process.
            if spark_context is None:
                spark_context = _create_spark_context()
                if os.fork() == 0:  # Detached process.
                    signal.pause()
                else:
                    InstantTesting.set_context(spark_context)
        else:
            spark_context = _create_spark_context()

        # Init HiveContext
        super(SparklySession, self).__init__(spark_context)
        self._setup_udfs()

        self.read_ext = SparklyReader(self)
        self.catalog_ext = SparklyCatalog(self)

        attach_writer_to_dataframe()
test_session.py 文件源码 项目:sparkly 作者: Tubular 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUp(self):
        super(TestSparklySession, self).setUp()
        self.spark_conf_mock = mock.Mock(spec=SparkConf)
        self.spark_context_mock = mock.Mock(spec=SparkContext)

        self.patches = [
            mock.patch('sparkly.session.SparkConf', self.spark_conf_mock),
            mock.patch('sparkly.session.SparkContext', self.spark_context_mock),
        ]
        [p.start() for p in self.patches]
test.py 文件源码 项目:TensorFlowOnSpark 作者: yahoo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUpClass(cls):
    master = os.getenv('MASTER')
    assert master is not None, "Please start a Spark standalone cluster and export MASTER to your env."

    num_workers = os.getenv('SPARK_WORKER_INSTANCES')
    assert num_workers is not None, "Please export SPARK_WORKER_INSTANCES to your env."
    cls.num_workers = int(num_workers)

    spark_jars = os.getenv('SPARK_CLASSPATH')
    assert spark_jars and 'tensorflow-hadoop' in spark_jars, "Please add path to tensorflow-hadoop-*.jar to SPARK_CLASSPATH."

    cls.conf = SparkConf().set('spark.jars', spark_jars)
    cls.sc = SparkContext(master, cls.__name__, conf=cls.conf)
    cls.spark = SparkSession.builder.getOrCreate()
server.py 文件源码 项目:pyspark-docker 作者: thuongdinh 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def init_spark_context():
    # load spark context
    conf = SparkConf().setAppName("movie_recommendation-server")
    # IMPORTANT: pass aditional Python modules to each worker
    sc = SparkContext(conf=conf, pyFiles=['engine.py', 'app.py'])

    return sc
server.py 文件源码 项目:flaskapp 作者: hpnhxxwn 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def init_spark_context():
    # load spark context
    conf = SparkConf().setAppName("movie_recommendation-server")
    # IMPORTANT: pass aditional Python modules to each worker
    sc = SparkContext(conf=conf, pyFiles=['engine.py', 'app.py'])

    return sc
matrix_multiply.py 文件源码 项目:Scalable-Matrix-Multiplication-on-Apache-Spark 作者: Abhishek-Arora 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main():


    input = sys.argv[1]
    output = sys.argv[2]


    conf = SparkConf().setAppName('Matrix Multiplication')
    sc = SparkContext(conf=conf)
    assert sc.version >= '1.5.1'

    row = sc.textFile(input).map(lambda row : row.split(' ')).cache()
    ncol = len(row.take(1)[0])
    intermediateResult = row.map(permutation).reduce(add_tuples)

    outputFile = open(output, 'w') 





    result = [intermediateResult[x:x+3] for x in range(0, len(intermediateResult), ncol)]


    for row in result:
        for element in row:
            outputFile.write(str(element) + ' ')
        outputFile.write('\n')

    outputFile.close()

    # outputResult = sc.parallelize(result).coalesce(1)
    # outputResult.saveAsTextFile(output)
spark.py 文件源码 项目:arthur-redshift-etl 作者: harrystech 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _create_sql_context(self):
        """
        Create a new SQL context within a new Spark context. Import of classes from
        pyspark has to be pushed down into this method as Spark needs to be available
        in order for the libraries to be imported successfully. Since Spark is not available
        when the ETL is started initally, we delay the import until the ETL has restarted
        under Spark.

        Side-effect: Logging is configured by the time that pyspark is loaded
        so we have some better control over filters and formatting.
        """
        from pyspark import SparkConf, SparkContext, SQLContext

        if "SPARK_ENV_LOADED" not in os.environ:
            self.logger.warning("SPARK_ENV_LOADED is not set")

        self.logger.info("Starting SparkSQL context")
        conf = (SparkConf()
                .setAppName(__name__)
                .set("spark.logConf", "true"))
        sc = SparkContext(conf=conf)

        # Copy the credentials from the session into hadoop for access to S3
        session = boto3.Session()
        credentials = session.get_credentials()
        hadoopConf = sc._jsc.hadoopConfiguration()
        hadoopConf.set("fs.s3a.access.key", credentials.access_key)
        hadoopConf.set("fs.s3a.secret.key", credentials.secret_key)

        return SQLContext(sc)
locfile_manager.py 文件源码 项目:tensormsa_old 作者: TensorMSA 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def spark_session_create(self, app_name):
        """
        spark Loader Class
        creadted for the purpose of handling Spark Jobs
        """
        tfmsa_logger("Spark Session Created")
        conf = SparkConf()
        conf.setMaster('spark://{0}'.format(settings.SPARK_HOST))
        conf.setAppName(app_name)
        conf.set('spark.driver.cores', settings.SPARK_CORE)
        conf.set('spark.driver.memory', settings.SPARK_MEMORY)
        conf.set('spark.executor.cores', settings.SPARK_WORKER_CORE)
        conf.set('spark.executor.memory', settings.SPARK_WORKER_MEMORY)
        conf.set('spark.driver.allowMultipleContexts', "true")
        return SparkContext(conf=conf)


问题


面经


文章

微信
公众号

扫码关注公众号