def init_es(timeout=TIMEOUT):
log.info("connecting to %s %s", settings.ELASTICSEARCH_URL, settings.ELASTICSEARCH_PORT)
auth = AWSRequestsAuth(aws_access_key=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
aws_host=settings.ELASTICSEARCH_URL,
aws_region='us-west-1',
aws_service='es')
auth.encode = lambda x: bytes(x.encode('utf-8'))
es = Elasticsearch(host=settings.ELASTICSEARCH_URL,
port=settings.ELASTICSEARCH_PORT,
connection_class=RequestsHttpConnection,
timeout=timeout,
max_retries=10, retry_on_timeout=True,
http_auth=auth)
return es
python类RequestsHttpConnection()的实例源码
def elasticsearch_client(conf):
""" returns an Elasticsearch instance configured using an es_conn_config """
es_conn_conf = build_es_conn_config(conf)
auth = Auth()
es_conn_conf['http_auth'] = auth(host=es_conn_conf['es_host'],
username=es_conn_conf['es_username'],
password=es_conn_conf['es_password'],
aws_region=es_conn_conf['aws_region'],
boto_profile=es_conn_conf['boto_profile'])
return Elasticsearch(host=es_conn_conf['es_host'],
port=es_conn_conf['es_port'],
url_prefix=es_conn_conf['es_url_prefix'],
use_ssl=es_conn_conf['use_ssl'],
verify_certs=es_conn_conf['verify_certs'],
connection_class=RequestsHttpConnection,
http_auth=es_conn_conf['http_auth'],
timeout=es_conn_conf['es_conn_timeout'],
send_get_body_as=es_conn_conf['send_get_body_as'])
def main():
parser = argparse.ArgumentParser(description='Download items from ES index')
arg = parser.add_argument
arg('output', help='output in .jl.gz format')
arg('index', help='ES index name')
arg('--domain', help='url.domain to filter')
arg('--id', help='record id')
arg('--host', default='localhost', help='ES host in host[:port] format')
arg('--user', help='HTTP Basic Auth user')
arg('--password', help='HTTP Basic Auth password')
arg('--chunk-size', type=int, default=100, help='download chunk size')
args = parser.parse_args()
kwargs = {}
if args.user or args.password:
kwargs['http_auth'] = (args.user, args.password)
client = elasticsearch.Elasticsearch(
[args.host],
connection_class=elasticsearch.RequestsHttpConnection,
timeout=600,
**kwargs)
print(client.info())
search = Search(using=client, index=args.index)
if args.domain:
search = search.filter('term', **{'url.domain': args.domain})
if args.id:
search = search.filter('term', **{'_id': args.id})
total = 0
with tqdm.tqdm(total=search.count()) as pbar:
with gzip.open(args.output, 'wt') as f:
for x in search.params(size=args.chunk_size).scan():
total += 1
pbar.update(1)
f.write(json.dumps(x.to_dict()))
f.write('\n')
print('{:,} items downloaded to {}'.format(total, args.output))
def get(logger):
elasticsearch_endpoint = os.getenv("DSS_ES_ENDPOINT", "localhost")
elasticsearch_port = int(os.getenv("DSS_ES_PORT", "443"))
client = ElasticsearchClient._es_client.get((elasticsearch_endpoint, elasticsearch_port), None)
if client is None:
try:
logger.debug("Connecting to Elasticsearch at host: {}".format(elasticsearch_endpoint))
if elasticsearch_endpoint.endswith(".amazonaws.com"):
session = boto3.session.Session()
# TODO (akislyuk) Identify/resolve why use of AWSV4Sign results in an AWS auth error
# when Elasticsearch scroll is used. Work around this by using the
# requests_aws4auth package as described here:
# https://elasticsearch-py.readthedocs.io/en/master/#running-on-aws-with-iam
# es_auth = AWSV4Sign(session.get_credentials(), session.region_name, service="es")
# Begin workaround
current_credentials = session.get_credentials().get_frozen_credentials()
es_auth = AWS4Auth(current_credentials.access_key, current_credentials.secret_key,
session.region_name, "es", session_token=current_credentials.token)
# End workaround
client = Elasticsearch(
hosts=[{'host': elasticsearch_endpoint, 'port': elasticsearch_port}],
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
http_auth=es_auth)
else:
client = Elasticsearch(
[{'host': elasticsearch_endpoint, 'port': elasticsearch_port}],
use_ssl=False
)
ElasticsearchClient._es_client[(elasticsearch_endpoint, elasticsearch_port)] = client
except Exception as ex:
logger.error("Unable to connect to Elasticsearch endpoint {}. Exception: {}".format(
elasticsearch_endpoint, ex)
)
raise ex
return client
lambda_function.py 文件源码
项目:lambda-cloudfront-log-ingester
作者: dbnegative
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def lambda_handler(event, context):
'''Invoke Lambda '''
# load config from json file in s3 bucket
config = load_config(context)
# create ES connection with sts auth file
es_client = Elasticsearch(host=config['es_host'],
port=80,
connection_class=RequestsHttpConnection,
http_auth=sts_auth(config),
timeout=config['es_connection_timeout'])
# create new index with custom mappings from config, ignore if it's already created
# new index will be created for everyday YMV
suffix = datetime.strftime(datetime.now(), '%Y-%m-%d')
resp = es_client.indices.create(index="cloudfrontlog-" +
suffix, body=config['es_mapping'],
ignore=400)
print resp
# create a s3 boto client
s3_client = boto3.client('s3')
# split bucket and filepath to variables
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
# set the file path
file_path = '/tmp/cflogfile.gz'
# download the gzip log from s3
s3_client.download_file(bucket, key, file_path)
# parse the log
record_set = parse_log('/tmp/cflogfile.gz')
# write the dict to ES
resp = write_bulk(record_set, es_client, config)
print resp
def main():
parser = argparse.ArgumentParser(
description='Download item hashes from ES index')
arg = parser.add_argument
arg('output', help='output in .csv format')
arg('index', help='ES index name')
arg('--domain', help='url.domain to filter')
arg('--host', default='localhost', help='ES host in host[:port] format')
arg('--user', help='HTTP Basic Auth user')
arg('--password', help='HTTP Basic Auth password')
arg('--chunk-size', type=int, default=100, help='download chunk size')
args = parser.parse_args()
kwargs = {}
if args.user or args.password:
kwargs['http_auth'] = (args.user, args.password)
client = elasticsearch.Elasticsearch(
[args.host],
connection_class=elasticsearch.RequestsHttpConnection,
timeout=600,
**kwargs)
print(client.info())
search = Search(using=client, index=args.index)
if args.domain:
search = search.filter('term', **{'url.domain': args.domain})
total = 0
with tqdm.tqdm(total=search.count()) as pbar:
with open(args.output, 'wt') as f:
writer = csv.writer(f)
for x in search.params(size=args.chunk_size).scan():
total += 1
pbar.update(1)
x = x.to_dict()
writer.writerow([
x['timestamp_crawl'],
(hashlib.sha1((x['raw_content'] or '')
.encode('utf8')).hexdigest()),
x['team'],
x['url'],
canonicalize_url(x['url'], keep_fragments=True),
])
print('{:,} items downloaded to {}'.format(total, args.output))