Geoip2的python库在pySpark的map函数中不起作用
发布于 2021-01-29 16:06:12
我正在使用geoip2的python库和pySpark来获取某些IP的地理地址。我的代码是这样的:
geoDBpath = 'somePath/geoDB/GeoLite2-City.mmdb'
geoPath = os.path.join(geoDBpath)
sc.addFile(geoPath)
reader = geoip2.database.Reader(SparkFiles.get(geoPath))
def ip2city(ip):
try:
city = reader.city(ip).city.name
except:
city = 'not found'
return city
我试过了
print ip2city("128.101.101.101")
有用。但是,当我尝试在rdd.map中执行此操作时:
rdd = sc.parallelize([ip1, ip2, ip3, ip3, ...])
print rdd.map(lambda x: ip2city(x))
它报告
Traceback (most recent call last):
File "/home/worker/software/spark/python/pyspark/rdd.py", line 1299, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/home/worker/software/spark/python/pyspark/context.py", line 916, in runJob
port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "/home/worker/software/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/home/worker/software/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/home/worker/software/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
TypeError: Required argument 'fileno' (pos 1) not found
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
谁能告诉我如何在rdd.map()中使ip2city函数工作。谢谢!
关注者
0
被浏览
162
1 个回答
-
您的代码问题似乎来自
reader
对象。它不能作为闭包的一部分正确序列化并发送给工人。要处理此问题,必须在工人上实例化它。处理此问题的一种方法是使用mapPartitions
:from pyspark import SparkFiles geoDBpath = 'GeoLite2-City.mmdb' sc.addFile(geoDBpath) def partitionIp2city(iter): from geoip2 import database def ip2city(ip): try: city = reader.city(ip).city.name except: city = 'not found' return city reader = database.Reader(SparkFiles.get(geoDBpath)) return [ip2city(ip) for ip in iter] rdd = sc.parallelize(['128.101.101.101', '85.25.43.84']) rdd.mapPartitions(partitionIp2city).collect() ## ['Minneapolis', None]