def get_rdd(es_index, es_type):
if es_type is "":
resource = es_index
else:
resource = es_index + "/" + es_type
es_read_conf = {
"es.nodes": ES_IP,
"es.port": ES_PORT,
"es.resource": resource,
"es.index.read.missing.as.empty": 'yes'
}
conf = SparkConf().setAppName("Unfetter")
sc = SparkContext(conf=conf)
rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_read_conf)
return rdd
评论列表
文章目录