def put_file(request: web.Request):
checksum = hashlib.sha1()
with tempfile.SpooledTemporaryFile(max_size=1024 * 1024) as tmpfile:
try:
while True:
chunk = yield from request._payload.read(1024)
if chunk is streams.EOF_MARKER:
break
if isinstance(chunk, asyncio.Future):
chunk = yield from asyncio.wait_for(chunk, timeout=60)
if chunk:
checksum.update(chunk)
tmpfile.write(chunk)
except asyncio.TimeoutError:
raise web.HTTPRequestTimeout()
calculated_hash = checksum.hexdigest()
if 'X-Content-SHA1' in request.headers:
client_hash = request.headers['X-Content-SHA1'].lower()
if calculated_hash != client_hash:
logger.warn('SHA1 hash mismatch: %s != %s' % (calculated_hash, client_hash))
raise web.HTTPBadRequest(text='SHA1 hash does not match')
name = request.match_info.get('name').strip()
if name in replication.dellog:
# We know this is already deleted
raise web.HTTPConflict(text='This file has already been deleted in the cluster.')
is_replication = request.headers['User-Agent'].startswith('cockatiel/')
filename = generate_filename(name, calculated_hash,
get_timestamp_from_name(name) if is_replication else str(int(time.time())))
filepath = os.path.join(config.args.storage, filename)
if not os.path.exists(filepath):
directory, _ = os.path.split(filepath)
os.makedirs(directory, exist_ok=True)
tmpfile.seek(0)
with open(filepath, 'wb') as f:
for chunk in chunks(tmpfile):
f.write(chunk)
logger.debug('Created file {}, scheduling replication.'.format(filename))
replication.queue_operation('PUT', filename)
return web.Response(status=201, headers={
'Location': '/' + filename
})
else:
logger.debug('File {} already existed.'.format(filename))
return web.Response(status=302, headers={
'Location': '/' + filename
})
评论列表
文章目录