def process_warcs(self, id_, iterator):
s3pattern = re.compile('^s3://([^/]+)/(.+)')
base_dir = os.path.abspath(os.path.dirname(__file__))
# S3 client (not thread-safe, initialize outside parallelized loop)
no_sign_request = botocore.client.Config(
signature_version=botocore.UNSIGNED)
s3client = boto3.client('s3', config=no_sign_request)
for uri in iterator:
self.warc_input_processed.add(1)
if uri.startswith('s3://'):
self.get_logger().info('Reading from S3 {}'.format(uri))
s3match = s3pattern.match(uri)
if s3match is None:
self.get_logger().error("Invalid S3 URI: " + uri)
continue
bucketname = s3match.group(1)
path = s3match.group(2)
warctemp = TemporaryFile(mode='w+b',
dir=self.args.local_temp_dir)
try:
s3client.download_fileobj(bucketname, path, warctemp)
except botocore.client.ClientError as exception:
self.get_logger().error(
'Failed to download {}: {}'.format(uri, exception))
self.warc_input_failed.add(1)
continue
warctemp.seek(0)
stream = warctemp
elif uri.startswith('hdfs://'):
self.get_logger().error("HDFS input not implemented: " + uri)
continue
else:
self.get_logger().info('Reading local stream {}'.format(uri))
if uri.startswith('file:'):
uri = uri[5:]
uri = os.path.join(base_dir, uri)
try:
stream = open(uri, 'rb')
except IOError as exception:
self.get_logger().error(
'Failed to open {}: {}'.format(uri, exception))
self.warc_input_failed.add(1)
continue
no_parse = (not self.warc_parse_http_header)
try:
for record in ArchiveIterator(stream,
no_record_parse=no_parse):
for res in self.process_record(record):
yield res
self.records_processed.add(1)
except ArchiveLoadFailed as exception:
self.warc_input_failed.add(1)
self.get_logger().error(
'Invalid WARC: {} - {}'.format(uri, exception))
评论列表
文章目录