def call(self, milliseconds, jrdds):
# Clear the failure
self.failure = None
try:
if self.ctx is None:
self.ctx = SparkContext._active_spark_context
if not self.ctx or not self.ctx._jsc:
# stopped
return
# extend deserializers with the first one
sers = self.deserializers
if len(sers) < len(jrdds):
sers += (sers[0],) * (len(jrdds) - len(sers))
rdds = [self.rdd_wrap_func(jrdd, self.ctx, ser) if jrdd else None
for jrdd, ser in zip(jrdds, sers)]
t = datetime.fromtimestamp(milliseconds / 1000.0)
r = self.func(t, *rdds)
if r:
return r._jrdd
except:
self.failure = traceback.format_exc()
评论列表
文章目录