def post_file(session, domain='', file='', login=LOGIN):
""" posts file to the cloud's upload server
param: file - string filename with path
"""
assert domain is not None, 'no domain'
assert file is not None, 'no file'
filetype = guess_type(file)[0]
if not filetype:
filetype = DEFAULT_FILETYPE
if LOGGER:
LOGGER.warning('File {} type is unknown, using default: {}'.format(file, DEFAULT_FILETYPE))
filename = os.path.basename(file)
quoted_login = quote_plus(login)
timestamp = str(int(time.mktime(datetime.datetime.now().timetuple()))) + TIME_AMEND
url = urljoin(domain, '?cloud_domain=' + str(CLOUD_DOMAIN_ORD) + '&x-email=' + quoted_login + '&fileapi' + timestamp)
m = MultipartEncoder(fields={'file': (quote_plus(filename), open(file, 'rb'), filetype)})
try:
r = session.post(url, data=m, headers={'Content-Type': m.content_type}, verify = VERIFY_SSL)
except Exception as e:
if LOGGER:
LOGGER.error('Post file HTTP request error: {}'.format(e))
return (None, None)
if r.status_code == requests.codes.ok:
if len(r.content):
hash = r.content[:40].decode()
size = int(r.content[41:-2])
return (hash, size)
elif LOGGER:
LOGGER.error('File {} post error, no hash and size received'.format(file))
elif LOGGER:
LOGGER.error('File {} post error, http code: {}, msg: {}'.format(file, r.status_code, r.text))
return (None, None)
评论列表
文章目录