如何使用插入/修改/删除将dynamodb设计为弹性搜索

发布于 2021-01-29 18:32:58

如何使用Python将整个文档传递到弹性搜索中?这是进行弹性搜索的正确方法吗?

在dynamodb中id是主键

如何插入dynamodb下面是代码

import boto3
from boto3.dynamodb.conditions import Key, And, Attr
def lambda_handler(event, context):
    dynamodb = boto3.resource ('dynamodb')
    table =dynamodb.Table('newtable')
    with table.batch_writer(overwrite_by_pkeys=['id']) as batch:
            batch.put_item(
                Item={
                    'id': '1',
                    'last_name': 'V',
                    'age': '2',
                }
            )
            batch.put_item(
                Item={
                    'id': '2',
                    'last_name': 'JJ',
                    'age': '7',
                }
            )
            batch.put_item(
                Item={
                    'id': '9',
                    'last_name': 'ADD',
                    'age': '95',
                }
            )
            batch.put_item(
                Item={
                    'id': '10',
                    'last_name': 'ADD',
                    'age': '95',
                }
            )
  • 如何将期望的结果推送到Elastic Search中

  • 如果dynamodb内容改变,如何在ES中自动反映

我已经通过链接https://aws.amazon.com/blogs/compute/indexing-amazon-dynamodb-content-
with-amazon-elasticsearch-service-using-aws-
lambda/

下面是我出错的代码 ERROR: NameError("name 'event' is not defined")

码。*在此之前,从dynamodb表触发以下lambda函数

import boto3
import json
import re
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection

session = boto3.session.Session()
credentials = session.get_credentials()
# s3 = session.resource('s3')
awsauth = AWS4Auth(credentials.access_key,
                   credentials.secret_key,
                   session.region_name, 'es',
                   session_token=credentials.token)
es = Elasticsearch(
    ['https://xx-east-1.es.amazonaws.com'],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)
reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
                   "_timestamp", "_ttl"]


def lambda_handler(event, context):
    print(event)
    dynamodb = boto3.resource('dynamodb')

    # Loop over the DynamoDB Stream records
    for record in event['Records']:

        try:

            if record['eventName'] == "INSERT":
                insert_document(es, record)
            elif record['eventName'] == "REMOVE":
                remove_document(es, record)
            elif record['eventName'] == "MODIFY":
                modify_document(es, record)

        except Exception as e:
            print("Failed to process:")
            print(json.dumps(record))
            print("ERROR: " + repr(e))
            continue


# Process MODIFY events
def modify_document(es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    docId = docid(record)
    print("KEY")
    print(docId)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document())

    print("Updated document:")
    print(doc)

    # We reindex the whole document as ES accepts partial docs
    es.index(index=table,
             body=doc,
             id=docId,
             doc_type=table,
             refresh=True)

    print("Successly modified - Index: " + table + " - Document ID: " + docId)


def remove_document(es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    docId = docid(record)
    print("Deleting document ID: " + docId)

    es.delete(index=table,
              id=docId,
              doc_type=table,
              refresh=True)

    print("Successly removed - Index: " + table + " - Document ID: " + docId)


# Process INSERT events
def insert_document(es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    # Create index if missing
    if es.indices.exists(table) == False:
        print("Create missing index: " + table)

        es.indices.create(table,
                          body='{"settings": { "index.mapping.coerce": true } }')

        print("Index created: " + table)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document())

    print("New document to Index:")
    print(doc)

    newId = docid(record)
    es.index(index=table,
             body=doc,
             id=newId,
             doc_type=table,
             refresh=True)

    print("Successly inserted - Index: " + table + " - Document ID: " + newId)


def getTable(record):
    p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
    m = p.match(record['eventSourceARN'])
    if m is None:
        raise Exception("Table not found in SourceARN")
    return m.group(1).lower()


def document(event):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['NewImage'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
        for i in result:
            return i


def docid(event):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['Keys'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
    for newId in result:
        return newId

在文档和文档上获取错误

各自都在提供输出

result = []
for r in event['Records']:
    tmp = {}

    for k, v in r['dynamodb']['NewImage'].items():
    #for k, v in r['dynamodb']['Keys'].items():
        if "S" in v.keys() or "BOOL" in v.keys():
            tmp[k] = v.get('S', v.get('BOOL', False))
        elif 'NULL' in v:
            tmp[k] = None

    result.append(tmp)
for i in result:
    print (i)

event = {'Records': [{'eventID': '2339bc590c21035b84f8cc602b12c1d2', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '9'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '9'}, 'age': {'S': '95'}}, 'SequenceNumber': '3100000000035684810908', 'SizeBytes': 23, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'xxxx', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '2'}}, 'NewImage': {'last_name': {'S': 'JJ'}, 'id': {'S': '2'}, 'age': {'S': '5'}}, 'SequenceNumber': '3200000000035684810954', 'SizeBytes': 21, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'a9c90c0c4a5a4b64d0314c4557e94e28', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '10'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '10'}, 'age': {'S': '95'}}, 'SequenceNumber': '3300000000035684810956', 'SizeBytes': 25, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': '288f4a424992e5917af0350b53f754dc', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '1'}}, 'NewImage': {'last_name': {'S': 'V'}, 'id': {'S': '1'}, 'age': {'S': '2'}}, 'SequenceNumber': '3400000000035684810957', 'SizeBytes': 20, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}]}
关注者
0
被浏览
43
1 个回答
  • 面试哥
    面试哥 2021-01-29
    为面试而生,有面试问题,就找面试哥。

    您可以检查以下内容。我试图 复制 该问题,并可以 确认 错误

    ERROR: NameError("name 'event' is not defined")
    

    根据您的示例和 我自己的表* ,我使用了DynamoDb流中的 模拟 INSERT :event *

    {
      "Records": [
        {
          "eventID": "b8b993cf16d1aacb61b40411b39e0b1f",
          "eventName": "INSERT",
          "eventVersion": "1.1",
          "eventSource": "aws:dynamodb",
          "awsRegion": "us-east-1",
          "dynamodb": {
            "ApproximateCreationDateTime": 1595922821.0,
            "Keys": {
              "id": {
                "N": "1"
              }
            },
            "NewImage": {
              "last_name": {
                "S": "V"
              },
              "id": {
                "N": "1"
              },
              "age": {
                "S": "2"
              }
            },
            "SequenceNumber": "25200000000020406897812",
            "SizeBytes": 22,
            "StreamViewType": "NEW_AND_OLD_IMAGES"
          },
          "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
        },
        {
          "eventID": "e5d5bec988945c06ffc879cf16b89bf7",
          "eventName": "INSERT",
          "eventVersion": "1.1",
          "eventSource": "aws:dynamodb",
          "awsRegion": "us-east-1",
          "dynamodb": {
            "ApproximateCreationDateTime": 1595922821.0,
            "Keys": {
              "id": {
                "N": "9"
              }
            },
            "NewImage": {
              "last_name": {
                "S": "ADD"
              },
              "id": {
                "N": "9"
              },
              "age": {
                "S": "95"
              }
            },
            "SequenceNumber": "25300000000020406897813",
            "SizeBytes": 25,
            "StreamViewType": "NEW_AND_OLD_IMAGES"
          },
          "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
        },
        {
          "eventID": "f1a7c9736253b5ef28ced38ed5ff645b",
          "eventName": "INSERT",
          "eventVersion": "1.1",
          "eventSource": "aws:dynamodb",
          "awsRegion": "us-east-1",
          "dynamodb": {
            "ApproximateCreationDateTime": 1595922821.0,
            "Keys": {
              "id": {
                "N": "2"
              }
            },
            "NewImage": {
              "last_name": {
                "S": "JJ"
              },
              "id": {
                "N": "2"
              },
              "age": {
                "S": "7"
              }
            },
            "SequenceNumber": "25400000000020406897819",
            "SizeBytes": 23,
            "StreamViewType": "NEW_AND_OLD_IMAGES"
          },
          "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
        },
        {
          "eventID": "bfcbad9dc19883e4172e6dc25e66637b",
          "eventName": "INSERT",
          "eventVersion": "1.1",
          "eventSource": "aws:dynamodb",
          "awsRegion": "us-east-1",
          "dynamodb": {
            "ApproximateCreationDateTime": 1595922821.0,
            "Keys": {
              "id": {
                "N": "10"
              }
            },
            "NewImage": {
              "last_name": {
                "S": "ADD"
              },
              "id": {
                "N": "10"
              },
              "age": {
                "S": "95"
              }
            },
            "SequenceNumber": "25500000000020406897820",
            "SizeBytes": 25,
            "StreamViewType": "NEW_AND_OLD_IMAGES"
          },
          "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
        }
      ]
    }
    

    修改示例event

    {
      "Records": [
        {
          "eventID": "4e4629c88aa00e366c89a293d9c82d54",
          "eventName": "MODIFY",
          "eventVersion": "1.1",
          "eventSource": "aws:dynamodb",
          "awsRegion": "us-east-1",
          "dynamodb": {
            "ApproximateCreationDateTime": 1595924589.0,
            "Keys": {
              "id": {
                "N": "2"
              }
            },
            "NewImage": {
              "last_name": {
                "S": "zhgdhfgdh"
              },
              "id": {
                "N": "2"
              },
              "age": {
                "S": "7"
              }
            },
            "OldImage": {
              "last_name": {
                "S": "JJ"
              },
              "id": {
                "N": "2"
              },
              "age": {
                "S": "7"
              }
            },
            "SequenceNumber": "25600000000020408264140",
            "SizeBytes": 49,
            "StreamViewType": "NEW_AND_OLD_IMAGES"
          },
          "eventSourceARN": "arn:aws:dynamodb:us-east-1:34234:table/newtable/stream/2020-07-28T06:59:38.569"
        }
      ]
    }
    

    我可以 确认 的lambda函数的修改代码现在 不会产生错误

    import boto3
    import json
    import re
    
    from requests_aws4auth import AWS4Auth
    from elasticsearch import Elasticsearch, RequestsHttpConnection
    
    session = boto3.session.Session()
    credentials = session.get_credentials()
    
    s3 = session.resource('s3')
    
    awsauth = AWS4Auth(credentials.access_key,
                      credentials.secret_key,
                      session.region_name, 'es',
                      session_token=credentials.token)
    
    
    es = Elasticsearch(
        ['https://vpc-test-dmamain-452frn764ggb4a.us-east-1.es.amazonaws.com'],
        use_ssl=True,
        verify_certs=True,
        http_auth=awsauth,
        connection_class=RequestsHttpConnection
    )
    reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
                       "_timestamp", "_ttl"]
    
    
    def lambda_handler(event, context):
        print(event)
        #dynamodb = boto3.resource('dynamodb')
    
        # Loop over the DynamoDB Stream records
        for record in event['Records']:
    
            if record['eventName'] == "INSERT":
                insert_document(event, es, record)
            elif record['eventName'] == "REMOVE":
                remove_document(event, es, record)
            elif record['eventName'] == "MODIFY":
                modify_document(event, es, record)
    
    
    # Process MODIFY events
    def modify_document(event, es, record):
        table = getTable(record)
        print("Dynamo Table: " + table)
    
        docId = docid(event, event)
        print("KEY")
        print(docId)
    
        # Unmarshal the DynamoDB JSON to a normal JSON
        doc = json.dumps(document(event))
    
        print("Updated document:")
        print(doc)
    
        # We reindex the whole document as ES accepts partial docs
        es.index(index=table,
                 body=doc,
                 id=docId,
                 doc_type=table,
                 refresh=True)
    
        print("Successly modified - Index: " , table , " - Document ID: " , docId)
    
    
    def remove_document(event, es, record):
    
        table = getTable(record)
    
        print("Dynamo Table: " + table)
    
        docId = docid(event, event)
        print("Deleting document ID: ", docId)
    
        es.delete(index=table,
                  id=docId,
                  doc_type=table,
                  refresh=True)
    
        print("Successly removed - Index: ", table, " - Document ID: " , docId)
    
    
    # Process INSERT events
    def insert_document(event, es, record):
        table = getTable(record)
        print("Dynamo Table: " + table)
    
        # Create index if missing
        if es.indices.exists(table) == False:
            print("Create missing index: " + table)
    
            es.indices.create(table,
                              body='{"settings": { "index.mapping.coerce": true } }')
    
            print("Index created: " + table)
    
        # Unmarshal the DynamoDB JSON to a normal JSON
        doc = json.dumps(document(event))
    
        print("New document to Index:")
        print(doc)
    
        newId = docid(event, record)
    
        es.index(index=table,
                 body=doc,
                 id=newId,
                 doc_type=table,
                 refresh=True)
    
        print("Successly inserted - Index: " , table + " - Document ID: " , newId)
    
    
    def getTable(record):
        p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
        m = p.match(record['eventSourceARN'])
        if m is None:
            raise Exception("Table not found in SourceARN")
        return m.group(1).lower()
    
    
    def document(event):
        result = []
        for r in event['Records']:
            tmp = {}
            for k, v in r['dynamodb']['NewImage'].items():
                if "S" in v.keys() or "BOOL" in v.keys():
                    tmp[k] = v.get('S', v.get('BOOL', False))
                elif 'NULL' in v:
                    tmp[k] = None
            result.append(tmp)
            for i in result:
                return i
    
    
    def docid(event, record):
        result = []
        for r in event['Records']:
            tmp = {}
            for k, v in r['dynamodb']['Keys'].items():
                if "S" in v.keys() or "BOOL" in v.keys():
                    tmp[k] = v.get('S', v.get('BOOL', False))
                elif 'NULL' in v:
                    tmp[k] = None
            result.append(tmp)
        for newId in result:
            return newId
    

    尚未验证
    数据是否正确写入,修改或插入了ElasticSearch。但是我运行了ES域,并在lambda中使用了它来验证lambda是否可以连接到它并运行查询。

    lambda的INSERT事件输出示例:

    Dynamo Table: newtable
    New document to Index:
    {"last_name": "V", "age": "2"}
    Successly inserted - Index:  newtable - Document ID:  {}
    Dynamo Table: newtable
    New document to Index:
    {"last_name": "V", "age": "2"}
    Successly inserted - Index:  newtable - Document ID:  {}
    Dynamo Table: newtable
    New document to Index:
    {"last_name": "V", "age": "2"}
    Successly inserted - Index:  newtable - Document ID:  {}
    Dynamo Table: newtable
    New document to Index:
    {"last_name": "V", "age": "2"}
    Successly inserted - Index:  newtable - Document ID:  {}
    
    Example output from lambda from MODIFY event:
    

    更新文件:

    {
        "last_name": "zhgdhfgdh",
        "age": "7"
    }
    Successly modified - Index:  newtable  - Document ID:  
    {}
    

    我认为docid如果它可以正常工作,则需要进一步调查,因为它似乎返回空dict:

     Document ID:  {}
    


知识点
面圈网VIP题库

面圈网VIP题库全新上线,海量真题题库资源。 90大类考试,超10万份考试真题开放下载啦

去下载看看