def send_image_local(recipient_id, image_path):
data = {
'recipient': str({
'id': recipient_id
}),
'message': str({
'attachment': {
'type': 'image',
'payload': {
}
}
}),
'filedata': (os.path.basename(image_path), open(image_path, 'rb'),
'image/png')
}
data = MultipartEncoder(data)
params = {
'access_token': os.environ["PAGE_ACCESS_TOKEN"]
}
headers = {
'Content-Type': data.content_type
}
r = requests.post("https://graph.facebook.com/v2.6/me/messages",
params=params, headers=headers, data=data).json()
python类MultipartEncoder()的实例源码
def uploadPhoto(self, photo, caption = None, upload_id = None):
if upload_id is None:
upload_id = str(int(time.time() * 1000))
data = {
'upload_id' : upload_id,
'_uuid' : self.uuid,
'_csrftoken' : self.token,
'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
'photo' : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
}
m = MultipartEncoder(data, boundary=self.uuid)
self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
'X-IG-Connection-Type' : 'WIFI',
'Cookie2' : '$Version=1',
'Accept-Language' : 'en-US',
'Accept-Encoding' : 'gzip, deflate',
'Content-type': m.content_type,
'Connection' : 'close',
'User-Agent' : self.USER_AGENT})
response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
if response.status_code == 200:
if self.configure(upload_id, photo, caption):
self.expose()
return False
def uploadPhoto(self, photo, caption = None, upload_id = None):
if upload_id is None:
upload_id = str(int(time.time() * 1000))
data = {
'upload_id' : upload_id,
'_uuid' : self.uuid,
'_csrftoken' : self.token,
'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
'photo' : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
}
m = MultipartEncoder(data, boundary=self.uuid)
self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
'X-IG-Connection-Type' : 'WIFI',
'Cookie2' : '$Version=1',
'Accept-Language' : 'en-US',
'Accept-Encoding' : 'gzip, deflate',
'Content-type': m.content_type,
'Connection' : 'close',
'User-Agent' : self.USER_AGENT})
response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
if response.status_code == 200:
if self.configure(upload_id, photo, caption):
self.expose()
return False
def uploadPhoto(self, photo, caption = None, upload_id = None):
if upload_id is None:
upload_id = str(int(time.time() * 1000))
data = {
'upload_id' : upload_id,
'_uuid' : self.uuid,
'_csrftoken' : self.token,
'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
'photo' : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
}
m = MultipartEncoder(data, boundary=self.uuid)
self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
'X-IG-Connection-Type' : 'WIFI',
'Cookie2' : '$Version=1',
'Accept-Language' : 'en-US',
'Accept-Encoding' : 'gzip, deflate',
'Content-type': m.content_type,
'Connection' : 'close',
'User-Agent' : self.USER_AGENT})
response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
if response.status_code == 200:
if self.configure(upload_id, photo, caption):
self.expose()
return False
def uploadPhoto(self, photo, caption = None, upload_id = None):
if upload_id is None:
upload_id = str(int(time.time() * 1000))
data = {
'upload_id' : upload_id,
'_uuid' : self.uuid,
'_csrftoken' : self.token,
'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
'photo' : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
}
m = MultipartEncoder(data, boundary=self.uuid)
self.session.headers.update ({'X-IG-Capabilities' : '3Q4=',
'X-IG-Connection-Type' : 'WIFI',
'Cookie2' : '$Version=1',
'Accept-Language' : 'en-US',
'Accept-Encoding' : 'gzip, deflate',
'Content-type': m.content_type,
'Connection' : 'close',
'User-Agent' : self.USER_AGENT})
response = self.session.post(self.API_URL + "upload/photo/", data=m.to_string())
if response.status_code == 200:
if self.configure(upload_id, photo, caption):
self.expose()
return False
def sendSparkPOST(url, data):
m = MultipartEncoder(fields=data)
request = requests.post(url, data=m,
headers={"Content-Type": m.content_type,
"Authorization": "Bearer "+bearer})
return request
def test_basic_multiple(self):
first = ValueTarget()
second = ValueTarget()
third = ValueTarget()
encoder = MultipartEncoder(fields={
'first': 'foo',
'second': 'bar',
'third': 'baz'
})
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('first', first)
parser.register('second', second)
parser.register('third', third)
parser.data_received(encoder.to_string())
self.assertEqual(first.value, b'foo')
self.assertEqual(second.value, b'bar')
self.assertEqual(third.value, b'baz')
def test_chunked_single(self):
expected_value = 'hello world'
target = ValueTarget()
encoder = MultipartEncoder(fields={'value': expected_value})
body = encoder.to_string()
parser = StreamingFormDataParser(
headers={'Content-Type': encoder.content_type})
parser.register('value', target)
index = body.index(b'world')
parser.data_received(body[:index])
parser.data_received(body[index:])
self.assertEqual(target.value, expected_value.encode('utf-8'))
def main():
args = parse_args()
with open(args.filename, 'rb') as file_:
content_type = mimetypes.guess_type(args.filename)[0]
fields = {
'name': 'hello world',
'lines': 'first line\r\n\r\nsecond line',
'file': (args.filename, file_, content_type)
}
body = MultipartEncoder(fields=fields).to_string()
if args.decode:
print(body.decode('utf-8'))
else:
print(body)
def uploadPhoto(self, photo, caption = None, upload_id = None):
if upload_id is None:
upload_id = str(int(time.time() * 1000))
data = {
'upload_id' : upload_id,
'_uuid' : self.uuid,
'_csrftoken' : self.token,
'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
'photo' : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
}
m = MultipartEncoder(data, boundary=self.uuid)
self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
'X-IG-Connection-Type' : 'WIFI',
'Cookie2' : '$Version=1',
'Accept-Language' : 'en-US',
'Accept-Encoding' : 'gzip, deflate',
'Content-type': m.content_type,
'Connection' : 'close',
'User-Agent' : self.USER_AGENT})
response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
if response.status_code == 200:
if self.configure(upload_id, photo, caption):
self.expose()
return False
def http_send_attachmail(to, cc, sub, content, filelist=[], mail_format="html", mail_from=mail_from):
attachNum = str(len(filelist))
attachs = {}
i = 1
for attach in filelist:
idx = 'attach' + str(i)
attachs[idx] = (attach, open(attach, "rb"))
i+=1
fields = {"tos":to, "cc":cc, "subject":sub, "content":content, "from":mail_from, "format":mail_format, "attachNum":attachNum}
fields = dict(fields, **attachs)
m = MultipartEncoder(fields)
headers = {"content-type":m.content_type}
r = requests.post(mail_api, data=m, headers=headers)
ret = r.json()
status = str(ret['status']) + "-" + ret['msg']
sendlog(status, to + "|cc:" +cc, sub)
return ret
def upload_data(gpu_ip, job_hash, data_path):
url = 'http://%s:%s/runJobDecorator' % (gpu_ip, settings.GPU_PORT)
file_size = path.getsize(data_path)
pbar = tqdm(total=file_size, unit='B', unit_scale=True)
def callback(monitor):
progress = monitor.bytes_read - callback.last_bytes_read
pbar.update(progress)
callback.last_bytes_read = monitor.bytes_read
callback.last_bytes_read = 0
with open(data_path, 'rb') as f:
data = {
'file': ('uploads.pkl', f, 'application/octet-stream'),
'hash': job_hash
}
encoder = MultipartEncoder(
fields=data
)
monitor = MultipartEncoderMonitor(encoder, callback)
r = requests.post(url, data=monitor, headers={
'Content-Type': monitor.content_type})
remove(data_path)
# pbar might not close when the user interrupts, need to fix this
pbar.close()
status_check(r)
def do_artifacts_artifact_add(opts):
logging.debug('add artifact %r', opts)
url = artifacts_url(opts.service)
image = {
'name': opts.name,
'description': opts.description,
}
# build contents of multipart/form-data, image meta must come first, hence
# we use an OrderedDict to preserve the order
files = OrderedDict()
for k, v in image.items():
files[k] = (None, io.StringIO(v))
# followed by firmware data
# but first, try to find out the size of firmware data
files['size'] = str(os.stat(opts.infile).st_size)
files['artifact'] = (opts.infile, open(opts.infile, 'rb'), "application/octet-stream", {})
encoder = MultipartEncoder(files)
if sys.stderr.isatty():
try:
from requests_toolbelt import MultipartEncoderMonitor
from clint.textui.progress import Bar as ProgressBar
pb = ProgressBar(expected_size=encoder.len, filled_char='=', every=1024*1024)
monitor = MultipartEncoderMonitor(encoder,
lambda mon: pb.show(mon.bytes_read))
encoder = monitor
except ImportError:
pass
with api_from_opts(opts) as api:
rsp = api.post(url, data=encoder,
headers={'Content-Type': encoder.content_type})
if rsp.status_code == 201:
# created
location = rsp.headers.get('Location', '')
print("created with URL: {}".format(location))
print('artifact ID: ', location.rsplit('/')[-1])
else:
errorprinter(rsp)
def send_attachment(self, recipient_id, attachment_type, attachment_path,
notification_type=NotificationType.regular):
"""Send an attachment to the specified recipient using local path.
Input:
recipient_id: recipient id to send to
attachment_type: type of attachment (image, video, audio, file)
attachment_path: Path of attachment
Output:
Response from API as <dict>
"""
payload = {
'recipient': {
{
'id': recipient_id
}
},
'notification_type': notification_type,
'message': {
{
'attachment': {
'type': attachment_type,
'payload': {}
}
}
},
'filedata': (os.path.basename(attachment_path), open(attachment_path, 'rb'))
}
multipart_data = MultipartEncoder(payload)
multipart_header = {
'Content-Type': multipart_data.content_type
}
return requests.post(self.graph_url, data=multipart_data,
params=self.auth_args, headers=multipart_header).json()
def post_file(session, domain='', file='', login=LOGIN):
""" posts file to the cloud's upload server
param: file - string filename with path
"""
assert domain is not None, 'no domain'
assert file is not None, 'no file'
filetype = guess_type(file)[0]
if not filetype:
filetype = DEFAULT_FILETYPE
if LOGGER:
LOGGER.warning('File {} type is unknown, using default: {}'.format(file, DEFAULT_FILETYPE))
filename = os.path.basename(file)
quoted_login = quote_plus(login)
timestamp = str(int(time.mktime(datetime.datetime.now().timetuple()))) + TIME_AMEND
url = urljoin(domain, '?cloud_domain=' + str(CLOUD_DOMAIN_ORD) + '&x-email=' + quoted_login + '&fileapi' + timestamp)
m = MultipartEncoder(fields={'file': (quote_plus(filename), open(file, 'rb'), filetype)})
try:
r = session.post(url, data=m, headers={'Content-Type': m.content_type}, verify = VERIFY_SSL)
except Exception as e:
if LOGGER:
LOGGER.error('Post file HTTP request error: {}'.format(e))
return (None, None)
if r.status_code == requests.codes.ok:
if len(r.content):
hash = r.content[:40].decode()
size = int(r.content[41:-2])
return (hash, size)
elif LOGGER:
LOGGER.error('File {} post error, no hash and size received'.format(file))
elif LOGGER:
LOGGER.error('File {} post error, http code: {}, msg: {}'.format(file, r.status_code, r.text))
return (None, None)
def __repr__(self):
return '<MultipartEncoder: {0!r}>'.format(self.fields)
def __init__(self, encoder, callback=None):
#: Instance of the :class:`MultipartEncoder` being monitored
self.encoder = encoder
#: Optionally function to call after a read
self.callback = callback or IDENTITY
#: Number of bytes already read from the :class:`MultipartEncoder`
#: instance
self.bytes_read = 0
#: Avoid the same problem in bug #80
self.len = self.encoder.len
def from_fields(cls, fields, boundary=None, encoding='utf-8',
callback=None):
encoder = MultipartEncoder(fields, boundary, encoding)
return cls(encoder, callback)
def __repr__(self):
return '<MultipartEncoder: {0!r}>'.format(self.fields)
def __init__(self, encoder, callback=None):
#: Instance of the :class:`MultipartEncoder` being monitored
self.encoder = encoder
#: Optionally function to call after a read
self.callback = callback or IDENTITY
#: Number of bytes already read from the :class:`MultipartEncoder`
#: instance
self.bytes_read = 0
#: Avoid the same problem in bug #80
self.len = self.encoder.len
def from_fields(cls, fields, boundary=None, encoding='utf-8',
callback=None):
encoder = MultipartEncoder(fields, boundary, encoding)
return cls(encoder, callback)
def send_image(self, recipient_id, image_path):
'''Send an image to the specified recipient.
Image must be PNG or JPEG or GIF (more might be supported).
https://developers.facebook.com/docs/messenger-platform/send-api-reference/image-attachment
Input:
recipient_id: recipient id to send to
image_path: path to image to be sent
Output:
Response from API as <dict>
'''
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'image',
'payload': {}
}
}
),
'filedata': (image_path, open(image_path, 'rb'))
}
multipart_data = MultipartEncoder(payload)
multipart_header = {
'Content-Type': multipart_data.content_type
}
return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
def send_audio(self, recipient_id, audio_path):
'''Send audio to the specified recipient.
Audio must be MP3 or WAV
https://developers.facebook.com/docs/messenger-platform/send-api-reference/audio-attachment
Input:
recipient_id: recipient id to send to
audio_path: path to audio to be sent
Output:
Response from API as <dict>
'''
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'audio',
'payload': {}
}
}
),
'filedata': (audio_path, open(image_path, 'rb'))
}
multipart_data = MultipartEncoder(payload)
multipart_header = {
'Content-Type': multipart_data.content_type
}
return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
def send_file(self, recipient_id, file_path):
'''Send file to the specified recipient.
https://developers.facebook.com/docs/messenger-platform/send-api-reference/file-attachment
Input:
recipient_id: recipient id to send to
file_path: path to file to be sent
Output:
Response from API as <dict>
'''
payload = {
'recipient': json.dumps(
{
'id': recipient_id
}
),
'message': json.dumps(
{
'attachment': {
'type': 'file',
'payload': {}
}
}
),
'filedata': (file_path, open(image_path, 'rb'))
}
multipart_data = MultipartEncoder(payload)
multipart_header = {
'Content-Type': multipart_data.content_type
}
return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
def create_upload(self, filepath):
return MultipartEncoder({
'file': (os.path.basename(filepath), open(filepath, 'rb'), self.file_type)
})
def _upload(title, author, text,
author_url='', tph_uuid=None, page_id=None, user_agent=default_user_agent, convert_html=True,
clean_html=True):
if not title:
raise TitleRequiredError('Title is required')
if not text:
raise TextRequiredError('Text is required')
content = convert_html_to_telegraph_format(text, clean_html) if convert_html else text
cookies = dict(tph_uuid=tph_uuid) if tph_uuid and page_id else None
fields = {
'Data': ('content.html', content, 'plain/text'),
'title': title,
'author': author,
'author_url': author_url,
'page_id': page_id or '0'
}
m = MultipartEncoder(fields, boundary='TelegraPhBoundary21')
headers = {
'Content-Type': m.content_type,
'Accept': 'application/json, text/javascript, */*; q=0.01',
'User-Agent': user_agent
}
r = requests.Session()
r.mount('https://', requests.adapters.HTTPAdapter(max_retries=3))
response = r.post(save_url, timeout=4, headers=headers, cookies=cookies, data=m.to_string())
result = json.loads(response.text)
if 'path' in result:
result['tph_uuid'] = response.cookies.get('tph_uuid') or tph_uuid
result['url'] = base_url + '/' + result['path']
return result
else:
error_msg = result['error'] if 'error' in result else ''
raise TelegraphError(error_msg)
def handle(self, *args, **options):
self.stdout.write("Uploading database to central server...\n")
encoder = MultipartEncoder({
"project": options['project'],
"file": ("db.sqlite3", open(DB_PATH, "rb"), "application/octet-stream")
})
monitor = MultipartEncoderMonitor(encoder, create_callback(encoder))
r = requests.post(CENTRAL_SERVER_DB_UPLOAD_URL, data=monitor, headers={"Content-Type": monitor.content_type})
print("\nUpload finished! (Returned status {0} {1})".format(r.status_code, r.reason))
def api_upload(service, encData, encMeta, keys):
'''
Uploads data to Send.
Caution! Data is uploaded as given, this function will not encrypt it for you
'''
service += 'api/upload'
files = requests_toolbelt.MultipartEncoder(fields={'file': ('blob', encData, 'application/octet-stream') })
pbar = progbar(files.len)
monitor = requests_toolbelt.MultipartEncoderMonitor(files, lambda files: pbar.update(monitor.bytes_read - pbar.n))
headers = {
'X-File-Metadata' : unpadded_urlsafe_b64encode(encMeta),
'Authorization' : 'send-v1 ' + unpadded_urlsafe_b64encode(keys.authKey),
'Content-type' : monitor.content_type
}
r = requests.post(service, data=monitor, headers=headers, stream=True)
r.raise_for_status()
pbar.close()
body_json = r.json()
secretUrl = body_json['url'] + '#' + unpadded_urlsafe_b64encode(keys.secretKey)
fileId = body_json['id']
fileNonce = unpadded_urlsafe_b64decode(r.headers['WWW-Authenticate'].replace('send-v1 ', ''))
try:
owner_token = body_json['owner']
except:
owner_token = body_json['delete']
return secretUrl, fileId, fileNonce, owner_token
def __repr__(self):
return '<MultipartEncoder: {0!r}>'.format(self.fields)
def __init__(self, encoder, callback=None):
#: Instance of the :class:`MultipartEncoder` being monitored
self.encoder = encoder
#: Optionally function to call after a read
self.callback = callback or IDENTITY
#: Number of bytes already read from the :class:`MultipartEncoder`
#: instance
self.bytes_read = 0
#: Avoid the same problem in bug #80
self.len = self.encoder.len