twitter_to_es.py 文件源码

python
阅读 21 收藏 0 点赞 0 评论 0

项目:ankaracloudmeetup-bigdata-demo 作者: serkan-ozal 项目源码 文件源码
def save(tweets, es_host, es_port):    
    es = Elasticsearch(host = es_host, port = es_port)

    print('Saving tweets into ElasticSearch on {}...'.format(es_host))

    if es.indices.exists(index_name):
        print ('Index {} already exists'.format(index_name))
        try:
            es.indices.put_mapping(doc_type, tweet_mapping, index_name)
        except ElasticsearchException as e:
            print('Error while putting mapping:\n' + str(e))
            print('Deleting index {} on...'.format(index_name))
            es.indices.delete(index_name)
            print('Creating index {}...'.format(index_name))
            es.indices.create(index_name, body = {'mappings': mapping})
    else:
        print('Index {} does not exist'.format(index_name))
        print('Creating index {}...'.format(index_name))
        es.indices.create(index_name, body = {'mappings': mapping})

    counter = 0
    bulk_data = []
    list_size = len(tweets)
    for doc in tweets:
        tweet = analyze_and_get_tweet(doc)
        bulk_doc = {
            "_index": index_name,
            "_type": doc_type,
            "_id": tweet[id_field],
            "_source": tweet
        }
        bulk_data.append(bulk_doc)
        counter += 1

        if counter % bulk_chunk_size == 0 or counter == list_size:
            print('ElasticSearch bulk index (index: {INDEX}, type: {TYPE})...'.format(INDEX=index_name, TYPE=doc_type))
            success, _ = bulk(es, bulk_data)
            print 'ElasticSearch has indexed %d documents' % success
            bulk_data = []
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号