def __init__(self, additional_options=None):
os.environ['PYSPARK_PYTHON'] = sys.executable
submit_args = [
self._setup_repositories(),
self._setup_packages(),
self._setup_jars(),
'pyspark-shell',
]
os.environ['PYSPARK_SUBMIT_ARGS'] = ' '.join(filter(None, submit_args))
def _create_spark_context():
spark_conf = SparkConf()
spark_conf.set('spark.sql.catalogImplementation', 'hive')
spark_conf.setAll(self._setup_options(additional_options))
return SparkContext(conf=spark_conf)
# If we are in instant testing mode
if InstantTesting.is_activated():
spark_context = InstantTesting.get_context()
# It's the first run, so we have to create context and demonise the process.
if spark_context is None:
spark_context = _create_spark_context()
if os.fork() == 0: # Detached process.
signal.pause()
else:
InstantTesting.set_context(spark_context)
else:
spark_context = _create_spark_context()
# Init HiveContext
super(SparklySession, self).__init__(spark_context)
self._setup_udfs()
self.read_ext = SparklyReader(self)
self.catalog_ext = SparklyCatalog(self)
attach_writer_to_dataframe()
评论列表
文章目录