是否可以在Spark中按组缩放数据?

发布于 2021-01-29 15:00:37

我想用StandardScalerfrom pyspark.mllib.feature import StandardScaler)缩放数据,现在我可以通过将RDD的值传递给transform函数来做到这一点,但是问题是我想保留键。无论如何,我是否通过保留数据密钥来扩展数据?

样本数据集

0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,smurf.

进口货

import sys
import os
from collections import OrderedDict
from numpy import array
from math import sqrt
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.mllib.clustering import KMeans
    from pyspark.mllib.feature import StandardScaler
    from pyspark.statcounter import StatCounter

    print ("Successfully imported Spark Modules")
except ImportError as e:
    print ("Can not import Spark Modules", e)
    sys.exit(1)

代码部分

    sc = SparkContext(conf=conf)   
    raw_data = sc.textFile(data_file)
    parsed_data = raw_data.map(Parseline)

Parseline 功能:

def Parseline(line):
    line_split = line.split(",")
    clean_line_split = [line_split[0]]+line_split[4:-1]
    return (line_split[-1], array([float(x) for x in clean_line_split]))
关注者
0
被浏览
78
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    这不是一个很好的解决方案,但是您可以调整我对类似Scala问题的答案。让我们从一个示例数据开始:

    import numpy as np
    
    np.random.seed(323)
    
    keys = ["foo"] * 50 + ["bar"] * 50
    values = (
        np.vstack([np.repeat(-10, 500), np.repeat(10, 500)]).reshape(100, -1) +
        np.random.rand(100, 10)
    )
    
    rdd = sc.parallelize(zip(keys, values))
    

    不幸的MultivariateStatisticalSummary是,它只是围绕JVM模型的包装,并且它并不是真正的Python友好。幸运的是,有了NumPy数组,我们可以使用standardStatCounter通过键来计算统计信息:

    from pyspark.statcounter import StatCounter
    
    def compute_stats(rdd):
        return rdd.aggregateByKey(
            StatCounter(), StatCounter.merge, StatCounter.mergeStats
        ).collectAsMap()
    

    最后我们可以map归一化:

    def scale(rdd, stats):
        def scale_(kv):
            k, v = kv
            return (v - stats[k].mean()) / stats[k].stdev()
        return rdd.map(scale_)
    
    scaled = scale(rdd, compute_stats(rdd))
    scaled.first()
    
    ## array([ 1.59879188, -1.66816084,  1.38546532,  1.76122047,  1.48132643,
    ##    0.01512487,  1.49336769,  0.47765982, -1.04271866,  1.55288814])
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看