如何在PySpark中创建自定义估算器

发布于 2021-01-29 19:36:05

我正在尝试Estimator在PySpark MLlib中构建一个简单的自定义。我在这里可以编写自定义的Transformer,但是我不确定如何在.NET上执行此操作Estimator。我也不明白做什么@keyword_only,为什么我需要这么多的设置方法和获取方法。Scikit-learn似乎有一个适用于自定义模型的文档(请参阅此处,但PySpark没有。

示例模型的伪代码:

class NormalDeviation():
    def __init__(self, threshold = 3):
    def fit(x, y=None):
       self.model = {'mean': x.mean(), 'std': x.std()]
    def predict(x):
       return ((x-self.model['mean']) > self.threshold * self.model['std'])
    def decision_function(x): # does ml-lib support this?
关注者
0
被浏览
90
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    一般来说,没有文档,因为对于Spark 1.6 / 2.0,大多数相关API都不打算公开。它应该在Spark 2.1.0中更改(请参阅SPARK-7146)。

    API是比较复杂的,因为它必须遵循特定的惯例,以使给定TransformerEstimator兼容的PipelineAPI。对于某些功能,例如读写和网格搜索,可能需要其中一些方法。其他,例如keyword_only,只是简单的帮手,并非严格要求。

    假设您已经为平均参数定义了以下混合:

    from pyspark.ml.pipeline import Estimator, Model, Pipeline
    from pyspark.ml.param.shared import *
    from pyspark.sql.functions import avg, stddev_samp
    
    
    class HasMean(Params):
    
        mean = Param(Params._dummy(), "mean", "mean", 
            typeConverter=TypeConverters.toFloat)
    
        def __init__(self):
            super(HasMean, self).__init__()
    
        def setMean(self, value):
            return self._set(mean=value)
    
        def getMean(self):
            return self.getOrDefault(self.mean)
    

    标准偏差参数:

    class HasStandardDeviation(Params):
    
        standardDeviation = Param(Params._dummy(),
            "standardDeviation", "standardDeviation", 
            typeConverter=TypeConverters.toFloat)
    
        def __init__(self):
            super(HasStandardDeviation, self).__init__()
    
        def setStddev(self, value):
            return self._set(standardDeviation=value)
    
        def getStddev(self):
            return self.getOrDefault(self.standardDeviation)
    

    和阈值:

    class HasCenteredThreshold(Params):
    
        centeredThreshold = Param(Params._dummy(),
                "centeredThreshold", "centeredThreshold",
                typeConverter=TypeConverters.toFloat)
    
        def __init__(self):
            super(HasCenteredThreshold, self).__init__()
    
        def setCenteredThreshold(self, value):
            return self._set(centeredThreshold=value)
    
        def getCenteredThreshold(self):
            return self.getOrDefault(self.centeredThreshold)
    

    您可以创建以下基本Estimator内容:

    from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable 
    from pyspark import keyword_only  
    
    class NormalDeviation(Estimator, HasInputCol, 
            HasPredictionCol, HasCenteredThreshold,
            # Available in PySpark >= 2.3.0 
            # Credits https://stackoverflow.com/a/52467470
            # by https://stackoverflow.com/users/234944/benjamin-manns
            DefaultParamsReadable, DefaultParamsWritable):
    
        @keyword_only
        def __init__(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
            super(NormalDeviation, self).__init__()
            kwargs = self._input_kwargs
            self.setParams(**kwargs)
    
        # Required in Spark >= 3.0
        def setInputCol(self, value):
            """
            Sets the value of :py:attr:`inputCol`.
            """
            return self._set(inputCol=value)
    
        # Required in Spark >= 3.0
        def setPredictionCol(self, value):
            """
            Sets the value of :py:attr:`predictionCol`.
            """
            return self._set(predictionCol=value)
    
        @keyword_only
        def setParams(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
            kwargs = self._input_kwargs
            return self._set(**kwargs)        
    
        def _fit(self, dataset):
            c = self.getInputCol()
            mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
            return NormalDeviationModel(
                inputCol=c, mean=mu, standardDeviation=sigma, 
                centeredThreshold=self.getCenteredThreshold(),
                predictionCol=self.getPredictionCol())
    
    
    class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
            HasMean, HasStandardDeviation, HasCenteredThreshold,
            DefaultParamsReadable, DefaultParamsWritable):
    
        @keyword_only
        def __init__(self, inputCol=None, predictionCol=None,
                    mean=None, standardDeviation=None,
                    centeredThreshold=None):
            super(NormalDeviationModel, self).__init__()
            kwargs = self._input_kwargs
            self.setParams(**kwargs)  
    
        @keyword_only
        def setParams(self, inputCol=None, predictionCol=None,
                    mean=None, standardDeviation=None,
                    centeredThreshold=None):
            kwargs = self._input_kwargs
            return self._set(**kwargs)           
    
        def _transform(self, dataset):
            x = self.getInputCol()
            y = self.getPredictionCol()
            threshold = self.getCenteredThreshold()
            mu = self.getMean()
            sigma = self.getStddev()
    
            return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)     
    

    最后,它可以按如下方式使用:

    df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])
    
    normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
    model  = Pipeline(stages=[normal_deviation]).fit(df)
    
    model.transform(df).show()
    ## +---+----+----------+
    ## | id|   x|prediction|
    ## +---+----+----------+
    ## |  1| 2.0|     false|
    ## |  2| 3.0|     false|
    ## |  3| 0.0|     false|
    ## |  4|99.0|      true|
    ## +---+----+----------+
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看