test_tutorial_macros.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:Eskapade 作者: KaveIO 项目源码 文件源码
def test_esk606(self):
        """Test Esk-606: Convert Spark data frame"""

        # run Eskapade
        self.run_eskapade('esk606_convert_spark_df.py')
        proc_mgr = ProcessManager()
        ds = proc_mgr.service(DataStore)

        # define types of stored data sets
        data_types = {'df': pyspark.sql.DataFrame, 'rdd': pyspark.RDD, 'list': list, 'pd': pd.DataFrame}

        # define functions to obtain data-frame content
        content_funcs = {'df': lambda d: sorted(d.rdd.map(tuple).collect()),
                         'rdd': lambda d: sorted(d.collect()),
                         'list': lambda d: sorted(d),
                         'pd': lambda d: sorted(map(tuple, d.values))}

        # check input data
        self.assertIn('df', ds, 'no data found with key "df"')
        self.assertIsInstance(ds['df'], pyspark.sql.DataFrame, 'unexpected type for input data frame')

        # check created data sets
        rows = [(it, 'foo{:d}'.format(it), (it + 1) / 2.) for it in range(20, 100)]
        for key, dtype in data_types.items():
            # check content
            dkey = '{}_output'.format(key)
            self.assertIn(dkey, ds, 'no data found with key "{}"'.format(dkey))
            self.assertIsInstance(ds[dkey], dtype, 'unexpected type for "{}" data'.format(key))
            self.assertListEqual(content_funcs[key](ds[dkey]), rows, 'unexpected content for "{}" data'.format(key))

            # check schema
            skey = '{}_schema'.format(key)
            self.assertIn(skey, ds, 'no schema found with key {}'.format(skey))
            self.assertListEqual(list(ds[skey]), list(ds['df'].schema), 'unexpected schema for "{}" data'.format(key))
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号