def test_esk606(self):
"""Test Esk-606: Convert Spark data frame"""
# run Eskapade
self.run_eskapade('esk606_convert_spark_df.py')
proc_mgr = ProcessManager()
ds = proc_mgr.service(DataStore)
# define types of stored data sets
data_types = {'df': pyspark.sql.DataFrame, 'rdd': pyspark.RDD, 'list': list, 'pd': pd.DataFrame}
# define functions to obtain data-frame content
content_funcs = {'df': lambda d: sorted(d.rdd.map(tuple).collect()),
'rdd': lambda d: sorted(d.collect()),
'list': lambda d: sorted(d),
'pd': lambda d: sorted(map(tuple, d.values))}
# check input data
self.assertIn('df', ds, 'no data found with key "df"')
self.assertIsInstance(ds['df'], pyspark.sql.DataFrame, 'unexpected type for input data frame')
# check created data sets
rows = [(it, 'foo{:d}'.format(it), (it + 1) / 2.) for it in range(20, 100)]
for key, dtype in data_types.items():
# check content
dkey = '{}_output'.format(key)
self.assertIn(dkey, ds, 'no data found with key "{}"'.format(dkey))
self.assertIsInstance(ds[dkey], dtype, 'unexpected type for "{}" data'.format(key))
self.assertListEqual(content_funcs[key](ds[dkey]), rows, 'unexpected content for "{}" data'.format(key))
# check schema
skey = '{}_schema'.format(key)
self.assertIn(skey, ds, 'no schema found with key {}'.format(skey))
self.assertListEqual(list(ds[skey]), list(ds['df'].schema), 'unexpected schema for "{}" data'.format(key))
评论列表
文章目录