def sql_context(self, application_name):
"""Create a spark context given the parameters configured in this class.
The caller is responsible for calling ``.close`` on the resulting spark context
Parameters
----------
application_name : string
Returns
-------
sc : SparkContext
"""
sc = self.spark_context(application_name)
import pyspark
sqlContext = pyspark.SQLContext(sc)
return (sc, sqlContext)
python类SQLContext()的实例源码
def setUp(self):
super(TestDataDrivenSpecsRepo, self).setUp()
if type(self) is not TestDataDrivenSpecsRepo:
self.sql_context = SQLContext(self.spark_context)
def with_sql_context(application_name, conf=None):
"""Context manager for a spark context
Returns
-------
sc : SparkContext
sql_context: SQLContext
Examples
--------
Used within a context manager
>>> with with_sql_context("MyApplication") as (sc, sql_context):
... import pyspark
... # Do stuff
... pass
"""
if conf is None:
conf = default_configuration
assert isinstance(conf, SparkConfiguration)
sc = conf.spark_context(application_name)
import pyspark.sql
try:
yield sc, pyspark.sql.SQLContext(sc)
finally:
sc.stop()
def _create_sql_context(self):
"""
Create a new SQL context within a new Spark context. Import of classes from
pyspark has to be pushed down into this method as Spark needs to be available
in order for the libraries to be imported successfully. Since Spark is not available
when the ETL is started initally, we delay the import until the ETL has restarted
under Spark.
Side-effect: Logging is configured by the time that pyspark is loaded
so we have some better control over filters and formatting.
"""
from pyspark import SparkConf, SparkContext, SQLContext
if "SPARK_ENV_LOADED" not in os.environ:
self.logger.warning("SPARK_ENV_LOADED is not set")
self.logger.info("Starting SparkSQL context")
conf = (SparkConf()
.setAppName(__name__)
.set("spark.logConf", "true"))
sc = SparkContext(conf=conf)
# Copy the credentials from the session into hadoop for access to S3
session = boto3.Session()
credentials = session.get_credentials()
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.access.key", credentials.access_key)
hadoopConf.set("fs.s3a.secret.key", credentials.secret_key)
return SQLContext(sc)