def generate_processor_script(events_file, log_file=None):
script_file = os.path.join(tempfile.gettempdir(), 'kclipy.%s.processor.py' % short_uid())
if log_file:
log_file = "'%s'" % log_file
else:
log_file = 'None'
content = """#!/usr/bin/env python
import os, sys, glob, json, socket, time, logging, tempfile
import subprocess32 as subprocess
logging.basicConfig(level=logging.INFO)
for path in glob.glob('%s/lib/python*/site-packages'):
sys.path.insert(0, path)
sys.path.insert(0, '%s')
from localstack.config import DEFAULT_ENCODING
from localstack.utils.kinesis import kinesis_connector
from localstack.utils.common import timestamp
events_file = '%s'
log_file = %s
error_log = os.path.join(tempfile.gettempdir(), 'kclipy.error.log')
if __name__ == '__main__':
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
num_tries = 3
sleep_time = 2
error = None
for i in range(0, num_tries):
try:
sock.connect(events_file)
error = None
break
except Exception as e:
error = e
if i < num_tries:
msg = '%%s: Unable to connect to UNIX socket. Retrying.' %% timestamp()
subprocess.check_output('echo "%%s" >> %%s' %% (msg, error_log), shell=True)
time.sleep(sleep_time)
if error:
print("WARN: Unable to connect to UNIX socket after retrying: %%s" %% error)
raise error
def receive_msg(records, checkpointer, shard_id):
try:
# records is a list of amazon_kclpy.messages.Record objects -> convert to JSON
records_dicts = [j._json_dict for j in records]
message_to_send = {'shard_id': shard_id, 'records': records_dicts}
string_to_send = '%%s\\n' %% json.dumps(message_to_send)
bytes_to_send = string_to_send.encode(DEFAULT_ENCODING)
sock.send(bytes_to_send)
except Exception as e:
msg = "WARN: Unable to forward event: %%s" %% e
print(msg)
subprocess.check_output('echo "%%s" >> %%s' %% (msg, error_log), shell=True)
kinesis_connector.KinesisProcessor.run_processor(log_file=log_file, processor_func=receive_msg)
""" % (LOCALSTACK_VENV_FOLDER, LOCALSTACK_ROOT_FOLDER, events_file, log_file)
save_file(script_file, content)
run('chmod +x %s' % script_file)
TMP_FILES.append(script_file)
return script_file
评论列表
文章目录