如何使用插入/修改/删除将dynamodb设计为弹性搜索
如何使用Python将整个文档传递到弹性搜索中?这是进行弹性搜索的正确方法吗?
在dynamodb中id
是主键
如何插入dynamodb下面是代码
import boto3
from boto3.dynamodb.conditions import Key, And, Attr
def lambda_handler(event, context):
dynamodb = boto3.resource ('dynamodb')
table =dynamodb.Table('newtable')
with table.batch_writer(overwrite_by_pkeys=['id']) as batch:
batch.put_item(
Item={
'id': '1',
'last_name': 'V',
'age': '2',
}
)
batch.put_item(
Item={
'id': '2',
'last_name': 'JJ',
'age': '7',
}
)
batch.put_item(
Item={
'id': '9',
'last_name': 'ADD',
'age': '95',
}
)
batch.put_item(
Item={
'id': '10',
'last_name': 'ADD',
'age': '95',
}
)
-
如何将期望的结果推送到Elastic Search中
-
如果dynamodb内容改变,如何在ES中自动反映
我已经通过链接https://aws.amazon.com/blogs/compute/indexing-amazon-dynamodb-content-
with-amazon-elasticsearch-service-using-aws-
lambda/
下面是我出错的代码 ERROR: NameError("name 'event' is not defined")
码。*在此之前,从dynamodb表触发以下lambda函数
import boto3
import json
import re
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
session = boto3.session.Session()
credentials = session.get_credentials()
# s3 = session.resource('s3')
awsauth = AWS4Auth(credentials.access_key,
credentials.secret_key,
session.region_name, 'es',
session_token=credentials.token)
es = Elasticsearch(
['https://xx-east-1.es.amazonaws.com'],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
"_timestamp", "_ttl"]
def lambda_handler(event, context):
print(event)
dynamodb = boto3.resource('dynamodb')
# Loop over the DynamoDB Stream records
for record in event['Records']:
try:
if record['eventName'] == "INSERT":
insert_document(es, record)
elif record['eventName'] == "REMOVE":
remove_document(es, record)
elif record['eventName'] == "MODIFY":
modify_document(es, record)
except Exception as e:
print("Failed to process:")
print(json.dumps(record))
print("ERROR: " + repr(e))
continue
# Process MODIFY events
def modify_document(es, record):
table = getTable(record)
print("Dynamo Table: " + table)
docId = docid(record)
print("KEY")
print(docId)
# Unmarshal the DynamoDB JSON to a normal JSON
doc = json.dumps(document())
print("Updated document:")
print(doc)
# We reindex the whole document as ES accepts partial docs
es.index(index=table,
body=doc,
id=docId,
doc_type=table,
refresh=True)
print("Successly modified - Index: " + table + " - Document ID: " + docId)
def remove_document(es, record):
table = getTable(record)
print("Dynamo Table: " + table)
docId = docid(record)
print("Deleting document ID: " + docId)
es.delete(index=table,
id=docId,
doc_type=table,
refresh=True)
print("Successly removed - Index: " + table + " - Document ID: " + docId)
# Process INSERT events
def insert_document(es, record):
table = getTable(record)
print("Dynamo Table: " + table)
# Create index if missing
if es.indices.exists(table) == False:
print("Create missing index: " + table)
es.indices.create(table,
body='{"settings": { "index.mapping.coerce": true } }')
print("Index created: " + table)
# Unmarshal the DynamoDB JSON to a normal JSON
doc = json.dumps(document())
print("New document to Index:")
print(doc)
newId = docid(record)
es.index(index=table,
body=doc,
id=newId,
doc_type=table,
refresh=True)
print("Successly inserted - Index: " + table + " - Document ID: " + newId)
def getTable(record):
p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
m = p.match(record['eventSourceARN'])
if m is None:
raise Exception("Table not found in SourceARN")
return m.group(1).lower()
def document(event):
result = []
for r in event['Records']:
tmp = {}
for k, v in r['dynamodb']['NewImage'].items():
if "S" in v.keys() or "BOOL" in v.keys():
tmp[k] = v.get('S', v.get('BOOL', False))
elif 'NULL' in v:
tmp[k] = None
result.append(tmp)
for i in result:
return i
def docid(event):
result = []
for r in event['Records']:
tmp = {}
for k, v in r['dynamodb']['Keys'].items():
if "S" in v.keys() or "BOOL" in v.keys():
tmp[k] = v.get('S', v.get('BOOL', False))
elif 'NULL' in v:
tmp[k] = None
result.append(tmp)
for newId in result:
return newId
在文档和文档上获取错误
各自都在提供输出
result = []
for r in event['Records']:
tmp = {}
for k, v in r['dynamodb']['NewImage'].items():
#for k, v in r['dynamodb']['Keys'].items():
if "S" in v.keys() or "BOOL" in v.keys():
tmp[k] = v.get('S', v.get('BOOL', False))
elif 'NULL' in v:
tmp[k] = None
result.append(tmp)
for i in result:
print (i)
event = {'Records': [{'eventID': '2339bc590c21035b84f8cc602b12c1d2', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '9'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '9'}, 'age': {'S': '95'}}, 'SequenceNumber': '3100000000035684810908', 'SizeBytes': 23, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'xxxx', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '2'}}, 'NewImage': {'last_name': {'S': 'JJ'}, 'id': {'S': '2'}, 'age': {'S': '5'}}, 'SequenceNumber': '3200000000035684810954', 'SizeBytes': 21, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'a9c90c0c4a5a4b64d0314c4557e94e28', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '10'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '10'}, 'age': {'S': '95'}}, 'SequenceNumber': '3300000000035684810956', 'SizeBytes': 25, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': '288f4a424992e5917af0350b53f754dc', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '1'}}, 'NewImage': {'last_name': {'S': 'V'}, 'id': {'S': '1'}, 'age': {'S': '2'}}, 'SequenceNumber': '3400000000035684810957', 'SizeBytes': 20, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}]}
-
您可以检查以下内容。我试图 复制 该问题,并可以 确认 错误
ERROR: NameError("name 'event' is not defined")
根据您的示例和 我自己的表* ,我使用了DynamoDb流中的 模拟 INSERT :
event
*{ "Records": [ { "eventID": "b8b993cf16d1aacb61b40411b39e0b1f", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "1" } }, "NewImage": { "last_name": { "S": "V" }, "id": { "N": "1" }, "age": { "S": "2" } }, "SequenceNumber": "25200000000020406897812", "SizeBytes": 22, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" }, { "eventID": "e5d5bec988945c06ffc879cf16b89bf7", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "9" } }, "NewImage": { "last_name": { "S": "ADD" }, "id": { "N": "9" }, "age": { "S": "95" } }, "SequenceNumber": "25300000000020406897813", "SizeBytes": 25, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" }, { "eventID": "f1a7c9736253b5ef28ced38ed5ff645b", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "2" } }, "NewImage": { "last_name": { "S": "JJ" }, "id": { "N": "2" }, "age": { "S": "7" } }, "SequenceNumber": "25400000000020406897819", "SizeBytes": 23, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" }, { "eventID": "bfcbad9dc19883e4172e6dc25e66637b", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595922821.0, "Keys": { "id": { "N": "10" } }, "NewImage": { "last_name": { "S": "ADD" }, "id": { "N": "10" }, "age": { "S": "95" } }, "SequenceNumber": "25500000000020406897820", "SizeBytes": 25, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569" } ] }
修改示例
event
:{ "Records": [ { "eventID": "4e4629c88aa00e366c89a293d9c82d54", "eventName": "MODIFY", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-1", "dynamodb": { "ApproximateCreationDateTime": 1595924589.0, "Keys": { "id": { "N": "2" } }, "NewImage": { "last_name": { "S": "zhgdhfgdh" }, "id": { "N": "2" }, "age": { "S": "7" } }, "OldImage": { "last_name": { "S": "JJ" }, "id": { "N": "2" }, "age": { "S": "7" } }, "SequenceNumber": "25600000000020408264140", "SizeBytes": 49, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-1:34234:table/newtable/stream/2020-07-28T06:59:38.569" } ] }
我可以 确认 的lambda函数的修改代码现在 不会产生错误 :
import boto3 import json import re from requests_aws4auth import AWS4Auth from elasticsearch import Elasticsearch, RequestsHttpConnection session = boto3.session.Session() credentials = session.get_credentials() s3 = session.resource('s3') awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, session.region_name, 'es', session_token=credentials.token) es = Elasticsearch( ['https://vpc-test-dmamain-452frn764ggb4a.us-east-1.es.amazonaws.com'], use_ssl=True, verify_certs=True, http_auth=awsauth, connection_class=RequestsHttpConnection ) reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size", "_timestamp", "_ttl"] def lambda_handler(event, context): print(event) #dynamodb = boto3.resource('dynamodb') # Loop over the DynamoDB Stream records for record in event['Records']: if record['eventName'] == "INSERT": insert_document(event, es, record) elif record['eventName'] == "REMOVE": remove_document(event, es, record) elif record['eventName'] == "MODIFY": modify_document(event, es, record) # Process MODIFY events def modify_document(event, es, record): table = getTable(record) print("Dynamo Table: " + table) docId = docid(event, event) print("KEY") print(docId) # Unmarshal the DynamoDB JSON to a normal JSON doc = json.dumps(document(event)) print("Updated document:") print(doc) # We reindex the whole document as ES accepts partial docs es.index(index=table, body=doc, id=docId, doc_type=table, refresh=True) print("Successly modified - Index: " , table , " - Document ID: " , docId) def remove_document(event, es, record): table = getTable(record) print("Dynamo Table: " + table) docId = docid(event, event) print("Deleting document ID: ", docId) es.delete(index=table, id=docId, doc_type=table, refresh=True) print("Successly removed - Index: ", table, " - Document ID: " , docId) # Process INSERT events def insert_document(event, es, record): table = getTable(record) print("Dynamo Table: " + table) # Create index if missing if es.indices.exists(table) == False: print("Create missing index: " + table) es.indices.create(table, body='{"settings": { "index.mapping.coerce": true } }') print("Index created: " + table) # Unmarshal the DynamoDB JSON to a normal JSON doc = json.dumps(document(event)) print("New document to Index:") print(doc) newId = docid(event, record) es.index(index=table, body=doc, id=newId, doc_type=table, refresh=True) print("Successly inserted - Index: " , table + " - Document ID: " , newId) def getTable(record): p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+') m = p.match(record['eventSourceARN']) if m is None: raise Exception("Table not found in SourceARN") return m.group(1).lower() def document(event): result = [] for r in event['Records']: tmp = {} for k, v in r['dynamodb']['NewImage'].items(): if "S" in v.keys() or "BOOL" in v.keys(): tmp[k] = v.get('S', v.get('BOOL', False)) elif 'NULL' in v: tmp[k] = None result.append(tmp) for i in result: return i def docid(event, record): result = [] for r in event['Records']: tmp = {} for k, v in r['dynamodb']['Keys'].items(): if "S" in v.keys() or "BOOL" in v.keys(): tmp[k] = v.get('S', v.get('BOOL', False)) elif 'NULL' in v: tmp[k] = None result.append(tmp) for newId in result: return newId
我 尚未验证
数据是否正确写入,修改或插入了ElasticSearch。但是我运行了ES域,并在lambda中使用了它来验证lambda是否可以连接到它并运行查询。lambda的INSERT事件输出示例:
Dynamo Table: newtable New document to Index: {"last_name": "V", "age": "2"} Successly inserted - Index: newtable - Document ID: {} Dynamo Table: newtable New document to Index: {"last_name": "V", "age": "2"} Successly inserted - Index: newtable - Document ID: {} Dynamo Table: newtable New document to Index: {"last_name": "V", "age": "2"} Successly inserted - Index: newtable - Document ID: {} Dynamo Table: newtable New document to Index: {"last_name": "V", "age": "2"} Successly inserted - Index: newtable - Document ID: {} Example output from lambda from MODIFY event:
更新文件:
{ "last_name": "zhgdhfgdh", "age": "7" } Successly modified - Index: newtable - Document ID: {}
我认为
docid
如果它可以正常工作,则需要进一步调查,因为它似乎返回空dict:Document ID: {}