def cli(args):
try:
opts, args = getopt.getopt(args, "i:", ["id=", ])
except getopt.GetoptError:
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print('A simple script for loading data from PostgreSQL to Mongo')
sys.exit()
elif opt in ("-i", "--id"):
row_id = arg
# Connection to the PostgreSQL, to be defined in the Airflow UI
pg_hook = PostgresHook(postgres_conn_id="postgres_data")
# Retrieve the data stored in PostgreSQL
pg_command = """SELECT * FROM dag_dag WHERE id = %s"""
data = pg_hook.get_records(pg_command, parameters=[row_id])
# Connect to Mongo databases in the Docker compose
mongoengine.connect(db="dags", host="mongo:27017", alias="default")
# Search for existing documents with the same dag_name
dags_docs = DAG_Description.objects(dag_name="hello_world_template")
# logging.info(type(dags_docs))
# logging.info(dags_docs)
# If there are no documents
if len(dags_docs) == 0:
# Setup a new document for storing the data
logging.info("Creating a new Mongo document for %s", row_id)
dag_document = DAG_Description(
dag_name=row_id,
raw_data=data[0][1],
clean_data=data[0][2],
vis_type=data[0][3],
vis_title=data[0][4],
vis_text=data[0][5],
vis_footer=data[0][6],
created_at=data[0][7],
updated_at=data[0][8])
# Save the document
dag_document.save()
# If there are more than one documents, get the first one
elif len(dags_docs) > 1:
logging.info("Updating the first Mongo document found for %s", row_id)
dag_document = dags_docs.first()
dag_document.update(raw_data=data[0][1], clean_data=data[0][2], updated_at=datetime.now)
# If there is only one document
elif len(dags_docs) == 1:
logging.info("Updating the Mongo document found for %s", row_id)
dag_document = dags_docs.first()
dag_document.update(raw_data=data[0][1], clean_data=data[0][2], updated_at=datetime.now)
# Return the success message
logging.info("Data exported from PostgreSQL to Mongo successfully.")
评论列表
文章目录