python类SparkContext()的实例源码

finance_similarity.py 文件源码 项目:Spark-in-Finance-Quantitative-Investing 作者: litaotao 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def create_sc():
    sc_conf = SparkConf()
    sc_conf.setAppName("finance-similarity-app")
    sc_conf.setMaster('spark://10.21.208.21:7077')
    sc_conf.set('spark.executor.memory', '2g')
    sc_conf.set('spark.executor.cores', '4')
    sc_conf.set('spark.cores.max', '40')
    sc_conf.set('spark.logConf', True)
    print sc_conf.getAll()

    sc = None
    try:
        sc.stop()
        sc = SparkContext(conf=sc_conf)
    except:
        sc = SparkContext(conf=sc_conf)

    return sc
es_helper.py 文件源码 项目:unfetter 作者: unfetter-analytic 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_rdd(es_index, es_type):

    if es_type is "":
        resource = es_index
    else:
        resource = es_index + "/" + es_type
    es_read_conf = {
        "es.nodes": ES_IP,
        "es.port": ES_PORT,
        "es.resource": resource,
        "es.index.read.missing.as.empty": 'yes'
    }
    conf = SparkConf().setAppName("Unfetter")
    sc = SparkContext(conf=conf)
    rdd = sc.newAPIHadoopRDD(
        inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_read_conf)
    return rdd
data_pipeline.py 文件源码 项目:search-MjoLniR 作者: wikimedia 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def main(argv=None):
    args = parse_arguments(argv)
    if args['very_verbose']:
        logging.basicConfig(level=logging.DEBUG)
    elif args['verbose']:
        logging.basicConfig(level=logging.INFO)
    else:
        logging.basicConfig()
    del args['verbose']
    del args['very_verbose']
    sc = SparkContext(appName="MLR: data collection pipeline")
    # spark info logging is incredibly spammy. Use warn to have some hope of
    # human decipherable output
    sc.setLogLevel('WARN')
    sqlContext = HiveContext(sc)
    run_pipeline(sc, sqlContext, **args)
twitterStream.py 文件源码 项目:Twitter-and-IMDB-Sentimental-Analytics 作者: abhinandanramesh 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def main():
    conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
    sc = SparkContext(conf=conf)
    ssc = StreamingContext(sc, 10)   # Create a streaming context with batch interval of 10 sec
    ssc.checkpoint("checkpoint")

    pwords = load_wordlist("positive.txt")
    nwords = load_wordlist("negative.txt")
    ts = time.time()   
    counts = stream(ssc, pwords, nwords, 100)
    make_plot(counts)
common.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _py2java(sc, obj):
    """ Convert Python object into Java """
    if isinstance(obj, RDD):
        obj = _to_java_object_rdd(obj)
    elif isinstance(obj, DataFrame):
        obj = obj._jdf
    elif isinstance(obj, SparkContext):
        obj = obj._jsc
    elif isinstance(obj, list):
        obj = [_py2java(sc, x) for x in obj]
    elif isinstance(obj, JavaObject):
        pass
    elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
        pass
    else:
        data = bytearray(PickleSerializer().dumps(obj))
        obj = sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(data)
    return obj
status_api_demo.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
    conf = SparkConf().set("spark.ui.showConsoleProgress", "false")
    sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf)

    def run():
        rdd = sc.parallelize(range(10), 10).map(delayed(2))
        reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
        return reduced.map(delayed(2)).collect()

    result = call_in_background(run)
    status = sc.statusTracker()
    while result.empty():
        ids = status.getJobIdsForGroup()
        for id in ids:
            job = status.getJobInfo(id)
            print("Job", id, "status: ", job.status)
            for sid in job.stageIds:
                info = status.getStageInfo(sid)
                if info:
                    print("Stage %d: %d tasks total (%d active, %d complete)" %
                          (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks))
        time.sleep(1)

    print("Job results are:", result.get())
    sc.stop()
launcher.py 文件源码 项目:spylon 作者: maxpoint 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def sql_context(self, application_name):
        """Create a spark context given the parameters configured in this class.

        The caller is responsible for calling ``.close`` on the resulting spark context

        Parameters
        ----------
        application_name : string

        Returns
        -------
        sc : SparkContext
        """
        sc = self.spark_context(application_name)
        import pyspark
        sqlContext = pyspark.SQLContext(sc)
        return (sc, sqlContext)
benchmark_spark.py 文件源码 项目:implicit 作者: benfred 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def benchmark_spark(ratings, factors, iterations=5):
    conf = (SparkConf()
            .setAppName("implicit_benchmark")
            .setMaster('local[*]')
            .set('spark.driver.memory', '16G')
            )
    context = SparkContext(conf=conf)
    spark = SparkSession(context)

    times = {}
    try:
        ratings = convert_sparse_to_dataframe(spark, context, ratings)

        for rank in factors:
            als = ALS(rank=rank, maxIter=iterations,
                      alpha=1, implicitPrefs=True,
                      userCol="row", itemCol="col", ratingCol="data")
            start = time.time()
            als.fit(ratings)
            elapsed = time.time() - start
            times[rank] = elapsed / iterations
            print("spark. factors=%i took %.3f" % (rank, elapsed/iterations))
    finally:
        spark.stop()

    return times
recommender.py 文件源码 项目:Location-based-Restaurants-Recommendation-System 作者: patilankita79 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def main():
    conf = SparkConf().setMaster("local[2]").setAppName("YelpRecommender")
    sc = SparkContext(conf=conf)
    rdd_data = readElasticSearch(sc)
    parsed_mapped_data = rdd_data.filter(location_recommender)
    sorted_data = parsed_mapped_data.top(150, key=lambda a: a[1]["stars"])
    topn_data = copyUniqueData(sorted_data, 5)
    printResult(topn_data)
    clearElasticSearch()
    sorted_rdd = sc.parallelize(topn_data)
    es_data = sorted_rdd.map(remap_es)
    es_data.saveAsNewAPIHadoopFile(path='-',
                               outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
                               keyClass="org.apache.hadoop.io.NullWritable",
                               valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
                               conf={ "es.resource" :  "yelpreco/resturant"})
sparkcc.py 文件源码 项目:cc-pyspark 作者: commoncrawl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run(self):
        self.args = self.parse_arguments()

        conf = SparkConf().setAll((
            ("spark.task.maxFailures", "10"),
            ("spark.locality.wait", "20s"),
            ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
        ))
        sc = SparkContext(
            appName=self.name,
            conf=conf)
        sqlc = SQLContext(sparkContext=sc)

        self.records_processed = sc.accumulator(0)
        self.warc_input_processed = sc.accumulator(0)
        self.warc_input_failed = sc.accumulator(0)

        self.run_job(sc, sqlc)

        sc.stop()
generate_user_artist_date_feature.py 文件源码 项目:ProjectPOP 作者: BUPT-768 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def generate_user_actions_with_artist(sc):
    '''
    data_source/user_actions.csv ????artist_id

    Args:
        sc: pyspark.SparkContext
    '''
    hdfs_file_dir = 'hdfs:/home/ProjectPOP/data_source'
    hdfs_song_path = '%s/mars_tianchi_songs.csv' % (hdfs_file_dir)
    hdfs_action_path = '%s/mars_tianchi_user_actions.csv' % (hdfs_file_dir)

    logger.info('Start generate song_artist_dict')
    song_artist_dict = dict(sc.textFile(hdfs_song_path).map(_generate_song_artist_dict).collect())
    song_artist_dict_broadcast = sc.broadcast(song_artist_dict)

    logger.info('Start process user_actions')
    user_actions = sc.textFile(hdfs_action_path).map(lambda l: _add_artist_into_line(l, song_artist_dict_broadcast))
    logger.info(user_actions.take(5))        
    user_actions.saveAsTextFile('%s/mars_tianchi_songs_with_artist.csv' % (hdfs_file_dir))
    return True
binarize_fsl.py 文件源码 项目:sim 作者: big-data-lab-team 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():

    conf = SparkConf().setAppName("binarize nifti")
    sc = SparkContext(conf=conf)
    sc.setLogLevel('ERROR')


    parser = argparse.ArgumentParser(description='Binarize images using FSL installed in a Docker container')
    parser.add_argument('threshold', type=int, help="binarization threshold")
    parser.add_argument('folder_path', type=str, help='folder path containing all of the splits')
    parser.add_argument('output_path', type=str, help='output folder path')

    args = parser.parse_args()

    print args.folder_path 
    client = Config().get_client('dev')

    nibRDD = sc.binaryFiles(args.folder_path)\
        .map(lambda x: get_data(x))\
        .map(lambda x: binarize(x, args.threshold))\
        .map(lambda x: copy_to_hdfs(x, args.output_path, client)).collect()
matrix_multiply_sparse.py 文件源码 项目:Scalable-Matrix-Multiplication-on-Apache-Spark 作者: Abhishek-Arora 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main():


    input = sys.argv[1]
    output = sys.argv[2]


    conf = SparkConf().setAppName('Sparse Matrix Multiplication')
    sc = SparkContext(conf=conf)
    assert sc.version >= '1.5.1'

    sparseMatrix = sc.textFile(input).map(lambda row : row.split(' ')).map(createCSRMatrix).map(multiplyMatrix).reduce(operator.add)
    outputFile = open(output, 'w')

    for row in range(len(sparseMatrix.indptr)-1):
        col = sparseMatrix.indices[sparseMatrix.indptr[row]:sparseMatrix.indptr[row+1]]
        data = sparseMatrix.data[sparseMatrix.indptr[row]:sparseMatrix.indptr[row+1]]
        indexValuePairs = zip(col,data)
        formattedOutput = formatOutput(indexValuePairs)
        outputFile.write(formattedOutput + '\n')
spark_word2vec.py 文件源码 项目:noungroups 作者: gushecht 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main(in_loc, out_dir):
    logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s',
                        level=logging.INFO)

    sc = ps.SparkContext(appName='Word2Vec')
    logger.info('Distributing input data')
    raw_data = sc.textFile(in_loc).cache()
    data = raw_data.map(lambda line: line.split(' '))
    print(data.getNumPartitions())

    logger.info('Training Word2Vec model')
    model = Word2Vec().setVectorSize(128).setNumIterations(5).fit(data)

    w2v_dict = model.getVectors()
    logger.info('Saving word to vectors dictionary')
    with open(path.join(out_dir, 'w2v_dict.pkl'), 'wb') as f:
        cPickle.dump(w2v_dict, f, cPickle.HIGHEST_PROTOCOL)

    model.save(sc, out_dir)
spark_nounfiltersort.py 文件源码 项目:noungroups 作者: gushecht 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main(in_dir, out_dir):
    sc = ps.SparkContext()
    text_files = sc.textFile(in_dir)
    counts = text_files.flatMap(lambda line: line.split(' ')) \
                       .filter(lambda word: any(label in word for label in LABELS)) \
                       .map(lambda word: (word, 1)) \
                       .reduceByKey(add) \
                       .persist(storageLevel=ps.StorageLevel.MEMORY_AND_DISK)
    total_nouns = counts.values() \
                        .reduce(add)
    sorted_nouns = counts.map(lambda (word, count): (word, count / float(total_nouns))) \
                         .sortBy(lambda (word, count): count, ascending=False) \
                         .collect()
    with open(path.join(out_dir, 'sorted_nouns.txt'), 'w+') as f:
        for word in sorted_nouns:
            f.write(str(word) + '\n')
training_pipeline.py 文件源码 项目:search-MjoLniR 作者: wikimedia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main(argv=None):
    args = parse_arguments(argv)
    if args['very_verbose']:
        logging.basicConfig(level=logging.DEBUG)
    elif args['verbose']:
        logging.basicConfig(level=logging.INFO)
    else:
        logging.basicConfig()
    del args['verbose']
    del args['very_verbose']
    # TODO: Set spark configuration? Some can't actually be set here though, so best might be to set all of it
    # on the command line for consistency.
    sc = SparkContext(appName="MLR: training pipeline")
    sc.setLogLevel('WARN')
    sqlContext = HiveContext(sc)

    output_dir = args['output_dir']
    if os.path.exists(output_dir):
        logging.error('Output directory (%s) already exists' % (output_dir))
        sys.exit(1)

    # Maybe this is a bit early to create the path ... but should be fine.
    # The annoyance might be that an error in training requires deleting
    # this directory to try again.
    os.mkdir(output_dir)

    try:
        run_pipeline(sc, sqlContext, **args)
    except:  # noqa: E722
        # If the directory we created is still empty delete it
        # so it doesn't need to be manually re-created
        if not len(glob.glob(os.path.join(output_dir, '*'))):
            os.rmdir(output_dir)
        raise
meteos-script-1.6.0.py 文件源码 项目:meteos 作者: openstack 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def init_context(self):

        self.base_hostname = socket.gethostname().split(".")[0]
        master_node = 'spark://' + self.base_hostname + ':7077'
        self.context = SparkContext(master_node, 'INFO')
tests.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setUp(self):
        self.sc = SparkContext('local[4]', "MLlib tests")
        self.spark = SparkSession(self.sc)
clustering.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load(cls, sc, path):
        """Load the GaussianMixtureModel from disk.

        :param sc:
          SparkContext.
        :param path:
          Path to where the model is stored.
        """
        model = cls._load_java(sc, path)
        wrapper = sc._jvm.org.apache.spark.mllib.api.python.GaussianMixtureModelWrapper(model)
        return cls(wrapper)
clustering.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def load(cls, sc, path):
        """Load the LDAModel from disk.

        :param sc:
          SparkContext.
        :param path:
          Path to where the model is stored.
        """
        if not isinstance(sc, SparkContext):
            raise TypeError("sc should be a SparkContext, got type %s" % type(sc))
        if not isinstance(path, basestring):
            raise TypeError("path should be a basestring, got type %s" % type(path))
        model = callMLlibFunc("loadLDAModel", sc, path)
        return LDAModel(model)
clustering.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _test():
    import doctest
    import pyspark.mllib.clustering
    globs = pyspark.mllib.clustering.__dict__.copy()
    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
    (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
    globs['sc'].stop()
    if failure_count:
        exit(-1)
tests.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setUp(self):
        self.sc = SparkContext('local[4]', "MLlib tests")
        self.ssc = StreamingContext(self.sc, 1.0)
util.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def save(self, sc, path):
        """Save this model to the given path."""
        if not isinstance(sc, SparkContext):
            raise TypeError("sc should be a SparkContext, got type %s" % type(sc))
        if not isinstance(path, basestring):
            raise TypeError("path should be a basestring, got type %s" % type(path))
        self._java_model.save(sc._jsc.sc(), path)
recommendation.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _test():
    import doctest
    import pyspark.mllib.recommendation
    from pyspark.sql import SQLContext
    globs = pyspark.mllib.recommendation.__dict__.copy()
    sc = SparkContext('local[4]', 'PythonTest')
    globs['sc'] = sc
    globs['sqlContext'] = SQLContext(sc)
    (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
    globs['sc'].stop()
    if failure_count:
        exit(-1)
recoverable_network_wordcount.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def createContext(host, port, outputPath):
    # If you do not see this printed, that means the StreamingContext has been loaded
    # from the new checkpoint
    print("Creating new context")
    if os.path.exists(outputPath):
        os.remove(outputPath)
    sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    # Create a socket stream on target ip:port and count the
    # words in input stream of \n delimited text (eg. generated by 'nc')
    lines = ssc.socketTextStream(host, port)
    words = lines.flatMap(lambda line: line.split(" "))
    wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

    def echo(time, rdd):
        # Get or register the blacklist Broadcast
        blacklist = getWordBlacklist(rdd.context)
        # Get or register the droppedWordsCounter Accumulator
        droppedWordsCounter = getDroppedWordsCounter(rdd.context)

        # Use blacklist to drop words and use droppedWordsCounter to count them
        def filterFunc(wordCount):
            if wordCount[0] in blacklist.value:
                droppedWordsCounter.add(wordCount[1])
                False
            else:
                True

        counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
        print(counts)
        print("Dropped %d word(s) totally" % droppedWordsCounter.value)
        print("Appending to " + os.path.abspath(outputPath))
        with open(outputPath, 'a') as f:
            f.write(counts + "\n")

    wordCounts.foreachRDD(echo)
    return ssc
processor.py 文件源码 项目:Stock-Visualizer 作者: saguo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, name, broker, source_topic, destination_topic):
        sc = SparkContext("local[2]", name)
        sc.setLogLevel('ERROR')
        self.ssc = StreamingContext(sc, 5)

        directKafkaStream = KafkaUtils.createDirectStream(
            self.ssc,
            [source_topic],
            {'metadata.broker.list': broker}
        )

        producer = Producer(broker, destination_topic)
        process_stream(directKafkaStream, producer)
contenedores.py 文件源码 项目:data_processing_course 作者: luisbelloch 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
  sc = SparkContext('local', 'practicas_spark')
  pr = definir_path_resultados('./resultados')
  ejercicio_0(sc, pr)
  ejercicio_1(sc, pr)
  ejercicio_2(sc, pr)
  ejercicio_3(sc, pr)
  ejercicio_4(sc, pr)
  ejercicio_5(sc, pr)
  ejercicio_6(sc, pr)
  ejercicio_7(sc, pr)
gnosis_ref_arch.py 文件源码 项目:fabric8-analytics-stack-analysis 作者: fabric8-analytics 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _train_fp_growth_model(cls, data_store, eco_to_package_topic_dict, min_support_count,
                               additional_path, fp_num_partition):
        sc = SparkContext()
        manifest_file_list = data_store.list_files(
            prefix=os.path.join(additional_path, gnosis_constants.MANIFEST_FILEPATH))
        list_of_topic_list = list()
        for manifest_file in manifest_file_list:
            eco_to_package_list_json_array = data_store.read_json_file(
                manifest_file)
            for eco_to_package_list_json in eco_to_package_list_json_array:
                ecosystem = eco_to_package_list_json.get(gnosis_constants.MANIFEST_ECOSYSTEM)
                list_of_package_list = eco_to_package_list_json.get(
                    gnosis_constants.MANIFEST_PACKAGE_LIST)
                for package_list in list_of_package_list:
                    package_list_lowercase = [x.lower() for x in package_list]
                    topic_list = cls.get_topic_list_for_package_list(package_list_lowercase,
                                                                     ecosystem,
                                                                     eco_to_package_topic_dict)
                    list_of_topic_list.append(topic_list)
        transactions = sc.parallelize(list_of_topic_list)
        transactions.cache()

        min_support = float(min_support_count / float(transactions.count()))

        model = FPGrowth.train(transactions, minSupport=min_support,
                               numPartitions=fp_num_partition)

        return model
launcher.py 文件源码 项目:spylon 作者: maxpoint 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def spark_context(self, application_name):
        """Create a spark context given the parameters configured in this class.

        The caller is responsible for calling ``.close`` on the resulting spark context

        Parameters
        ----------
        application_name : string

        Returns
        -------
        sc : SparkContext
        """

        # initialize the spark configuration
        self._init_spark()
        import pyspark
        import pyspark.sql

        # initialize conf
        spark_conf = pyspark.SparkConf()
        for k, v in self._spark_conf_helper._conf_dict.items():
            spark_conf.set(k, v)

        log.info("Starting SparkContext")
        return pyspark.SparkContext(appName=application_name, conf=spark_conf)
launcher.py 文件源码 项目:spylon 作者: maxpoint 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def with_spark_context(application_name, conf=None):
    """Context manager for a spark context

    Parameters
    ----------
    application_name : string
    conf : string, optional

    Returns
    -------
    sc : SparkContext

    Examples
    --------
    Used within a context manager
    >>> with with_spark_context("MyApplication") as sc:
    ...     # Your Code here
    ...     pass

    """
    if conf is None:
        conf = default_configuration
    assert isinstance(conf, SparkConfiguration)

    sc = conf.spark_context(application_name)
    try:
        yield sc
    finally:
        sc.stop()


问题


面经


文章

微信
公众号

扫码关注公众号