def get_reddit_submissions(subreddit):
# Connect to Kafka
producer = KafkaProducer(bootstrap_servers='kafka:9092')
# Reddit API
reddit = authenticate_api()
submissions = 0
try:
for submission in reddit.subreddit(subreddit).new():
sub = format_submission(submission)
if submissions > 1000:
break
msg = producer.send('data', json.dumps(sub).encode('utf-8'))
submissions += 1
print(submissions)
with open('test.jsonl', 'a') as f:
f.write(json.dumps(sub)+'\n')
# Flush kafka producer
producer.flush()
except Exception as e:
with open('Errors.txt', 'a') as f:
f.write(str(type(e))+'\n')
f.write(str(e)+'\n')
# Flush kafka producer
producer.flush()
return subreddit
评论列表
文章目录