def send_attachment_message(sender, attachment_type, payload):
fb_sender_id = sender['id']
content = {
'recipient': {
'id': fb_sender_id
},
'message': {
'attachment': {
'type': attachment_type,
'payload': payload
}
}
}
headers = {
'Content-Type': 'application/json'
}
payload = json.dumps(content)
logging.debug(payload)
url = 'https://graph.facebook.com/v2.6/me/messages?access_token=' + config.FACEBOOK_PAGE_ACCESS_TOKEN
if config.PRODUCTION:
req = urlfetch.fetch(
url,
payload,
urlfetch.POST,
headers
)
logging.debug(req.content)
python类POST的实例源码
def send_fb_message(payload):
if config.PRODUCTION:
try:
req = urlfetch.fetch(
'https://graph.facebook.com/v2.6/me/messages?access_token=' + config.FACEBOOK_PAGE_ACCESS_TOKEN,
payload,
urlfetch.POST,
{'Content-Type': 'application/json'}
)
logging.debug(req.content)
except urlfetch.Error as e:
logging.error(e.message)
def fetch(url, data=None, headers=None,
cookie=Cookie.SimpleCookie(),
user_agent='Mozilla/5.0'):
headers = headers or {}
if not data is None:
data = urllib.urlencode(data)
if user_agent:
headers['User-agent'] = user_agent
headers['Cookie'] = ' '.join(
['%s=%s;' % (c.key, c.value) for c in cookie.values()])
try:
from google.appengine.api import urlfetch
except ImportError:
req = urllib2.Request(url, data, headers)
html = urllib2.urlopen(req).read()
else:
method = ((data is None) and urlfetch.GET) or urlfetch.POST
while url is not None:
response = urlfetch.fetch(url=url, payload=data,
method=method, headers=headers,
allow_truncated=False, follow_redirects=False,
deadline=10)
# next request will be a get, so no need to send the data again
data = None
method = urlfetch.GET
# load cookies from the response
cookie.load(response.headers.get('set-cookie', ''))
url = response.headers.get('location')
html = response.content
return html
def request(self, operation, url, data=None, headers=None):
"""Performs an HTTP call to the server, supports GET, POST, PUT, and
DELETE.
Usage example, perform and HTTP GET on http://www.google.com/:
import atom.http
client = atom.http.HttpClient()
http_response = client.request('GET', 'http://www.google.com/')
Args:
operation: str The HTTP operation to be performed. This is usually one
of 'GET', 'POST', 'PUT', or 'DELETE'
data: filestream, list of parts, or other object which can be converted
to a string. Should be set to None when performing a GET or DELETE.
If data is a file-like object which can be read, this method will
read a chunk of 100K bytes at a time and send them.
If the data is a list of parts to be sent, each part will be
evaluated and sent.
url: The full URL to which the request should be sent. Can be a string
or atom.url.Url.
headers: dict of strings. HTTP headers which should be sent
in the request.
"""
all_headers = self.headers.copy()
if headers:
all_headers.update(headers)
# Construct the full payload.
# Assume that data is None or a string.
data_str = data
if data:
if isinstance(data, list):
# If data is a list of different objects, convert them all to strings
# and join them together.
converted_parts = [__ConvertDataPart(x) for x in data]
data_str = ''.join(converted_parts)
else:
data_str = __ConvertDataPart(data)
# If the list of headers does not include a Content-Length, attempt to
# calculate it based on the data object.
if data and 'Content-Length' not in all_headers:
all_headers['Content-Length'] = len(data_str)
# Set the content type to the default value if none was set.
if 'Content-Type' not in all_headers:
all_headers['Content-Type'] = 'application/atom+xml'
# Lookup the urlfetch operation which corresponds to the desired HTTP verb.
if operation == 'GET':
method = urlfetch.GET
elif operation == 'POST':
method = urlfetch.POST
elif operation == 'PUT':
method = urlfetch.PUT
elif operation == 'DELETE':
method = urlfetch.DELETE
else:
method = None
return HttpResponse(urlfetch.Fetch(url=str(url), payload=data_str,
method=method, headers=all_headers))
def impersonate(self, user_id=DEFAULT):
"""
To use this make a POST to
`http://..../impersonate request.post_vars.user_id=<id>`
Set request.post_vars.user_id to 0 to restore original user.
requires impersonator is logged in and::
has_permission('impersonate', 'auth_user', user_id)
"""
request = current.request
session = current.session
auth = session.auth
table_user = self.table_user()
if not self.is_logged_in():
raise HTTP(401, "Not Authorized")
current_id = auth.user.id
requested_id = user_id
user = None
if user_id is DEFAULT:
user_id = current.request.post_vars.user_id
if user_id and user_id != self.user.id and user_id != '0':
if not self.has_permission('impersonate',
self.table_user(),
user_id):
raise HTTP(403, "Forbidden")
user = table_user(user_id)
if not user:
raise HTTP(401, "Not Authorized")
auth.impersonator = pickle.dumps(session, pickle.HIGHEST_PROTOCOL)
auth.user.update(
table_user._filter_fields(user, True))
self.user = auth.user
self.update_groups()
log = self.messages['impersonate_log']
self.log_event(log, dict(id=current_id, other_id=auth.user.id))
self.run_login_onaccept()
elif user_id in (0, '0'):
if self.is_impersonating():
session.clear()
session.update(pickle.loads(auth.impersonator))
self.user = session.auth.user
self.update_groups()
self.run_login_onaccept()
return None
if requested_id is DEFAULT and not request.post_vars:
return SQLFORM.factory(Field('user_id', 'integer'))
elif not user:
return None
else:
return SQLFORM(table_user, user.id, readonly=True)
def impersonate(self, user_id=DEFAULT):
"""
To use this make a POST to
`http://..../impersonate request.post_vars.user_id=<id>`
Set request.post_vars.user_id to 0 to restore original user.
requires impersonator is logged in and::
has_permission('impersonate', 'auth_user', user_id)
"""
request = current.request
session = current.session
auth = session.auth
table_user = self.table_user()
if not self.is_logged_in():
raise HTTP(401, "Not Authorized")
current_id = auth.user.id
requested_id = user_id
if user_id is DEFAULT:
user_id = current.request.post_vars.user_id
if user_id and user_id != self.user.id and user_id != '0':
if not self.has_permission('impersonate',
self.table_user(),
user_id):
raise HTTP(403, "Forbidden")
user = table_user(user_id)
if not user:
raise HTTP(401, "Not Authorized")
auth.impersonator = pickle.dumps(session, pickle.HIGHEST_PROTOCOL)
auth.user.update(
table_user._filter_fields(user, True))
self.user = auth.user
self.update_groups()
log = self.messages['impersonate_log']
self.log_event(log, dict(id=current_id, other_id=auth.user.id))
self.run_login_onaccept()
elif user_id in (0, '0'):
if self.is_impersonating():
session.clear()
session.update(pickle.loads(auth.impersonator))
self.user = session.auth.user
self.update_groups()
self.run_login_onaccept()
return None
if requested_id is DEFAULT and not request.post_vars:
return SQLFORM.factory(Field('user_id', 'integer'))
return SQLFORM(table_user, user.id, readonly=True)
def impersonate(self, user_id=DEFAULT):
"""
To use this make a POST to
`http://..../impersonate request.post_vars.user_id=<id>`
Set request.post_vars.user_id to 0 to restore original user.
requires impersonator is logged in and::
has_permission('impersonate', 'auth_user', user_id)
"""
request = current.request
session = current.session
auth = session.auth
table_user = self.table_user()
if not self.is_logged_in():
raise HTTP(401, "Not Authorized")
current_id = auth.user.id
requested_id = user_id
user = None
if user_id is DEFAULT:
user_id = current.request.post_vars.user_id
if user_id and user_id != self.user.id and user_id != '0':
if not self.has_permission('impersonate',
self.table_user(),
user_id):
raise HTTP(403, "Forbidden")
user = table_user(user_id)
if not user:
raise HTTP(401, "Not Authorized")
auth.impersonator = pickle.dumps(session, pickle.HIGHEST_PROTOCOL)
auth.user.update(
table_user._filter_fields(user, True))
self.user = auth.user
self.update_groups()
log = self.messages['impersonate_log']
self.log_event(log, dict(id=current_id, other_id=auth.user.id))
self.run_login_onaccept()
elif user_id in (0, '0'):
if self.is_impersonating():
session.clear()
session.update(pickle.loads(auth.impersonator))
self.user = session.auth.user
self.update_groups()
self.run_login_onaccept()
return None
if requested_id is DEFAULT and not request.post_vars:
return SQLFORM.factory(Field('user_id', 'integer'))
elif not user:
return None
else:
return SQLFORM(table_user, user.id, readonly=True)
def impersonate(self, user_id=DEFAULT):
"""
To use this make a POST to
`http://..../impersonate request.post_vars.user_id=<id>`
Set request.post_vars.user_id to 0 to restore original user.
requires impersonator is logged in and::
has_permission('impersonate', 'auth_user', user_id)
"""
request = current.request
session = current.session
auth = session.auth
table_user = self.table_user()
if not self.is_logged_in():
raise HTTP(401, "Not Authorized")
current_id = auth.user.id
requested_id = user_id
user = None
if user_id is DEFAULT:
user_id = current.request.post_vars.user_id
if user_id and user_id != self.user.id and user_id != '0':
if not self.has_permission('impersonate',
self.table_user(),
user_id):
raise HTTP(403, "Forbidden")
user = table_user(user_id)
if not user:
raise HTTP(401, "Not Authorized")
auth.impersonator = pickle.dumps(session, pickle.HIGHEST_PROTOCOL)
auth.user.update(
table_user._filter_fields(user, True))
self.user = auth.user
self.update_groups()
log = self.messages['impersonate_log']
self.log_event(log, dict(id=current_id, other_id=auth.user.id))
self.run_login_onaccept()
elif user_id in (0, '0'):
if self.is_impersonating():
session.clear()
session.update(pickle.loads(auth.impersonator))
self.user = session.auth.user
self.update_groups()
self.run_login_onaccept()
return None
if requested_id is DEFAULT and not request.post_vars:
return SQLFORM.factory(Field('user_id', 'integer'))
elif not user:
return None
else:
return SQLFORM(table_user, user.id, readonly=True)
def request(self, operation, url, data=None, headers=None):
"""Performs an HTTP call to the server, supports GET, POST, PUT, and
DELETE.
Usage example, perform and HTTP GET on http://www.google.com/:
import atom.http
client = atom.http.HttpClient()
http_response = client.request('GET', 'http://www.google.com/')
Args:
operation: str The HTTP operation to be performed. This is usually one
of 'GET', 'POST', 'PUT', or 'DELETE'
data: filestream, list of parts, or other object which can be converted
to a string. Should be set to None when performing a GET or DELETE.
If data is a file-like object which can be read, this method will
read a chunk of 100K bytes at a time and send them.
If the data is a list of parts to be sent, each part will be
evaluated and sent.
url: The full URL to which the request should be sent. Can be a string
or atom.url.Url.
headers: dict of strings. HTTP headers which should be sent
in the request.
"""
all_headers = self.headers.copy()
if headers:
all_headers.update(headers)
# Construct the full payload.
# Assume that data is None or a string.
data_str = data
if data:
if isinstance(data, list):
# If data is a list of different objects, convert them all to strings
# and join them together.
converted_parts = [__ConvertDataPart(x) for x in data]
data_str = ''.join(converted_parts)
else:
data_str = __ConvertDataPart(data)
# If the list of headers does not include a Content-Length, attempt to
# calculate it based on the data object.
if data and 'Content-Length' not in all_headers:
all_headers['Content-Length'] = len(data_str)
# Set the content type to the default value if none was set.
if 'Content-Type' not in all_headers:
all_headers['Content-Type'] = 'application/atom+xml'
# Lookup the urlfetch operation which corresponds to the desired HTTP verb.
if operation == 'GET':
method = urlfetch.GET
elif operation == 'POST':
method = urlfetch.POST
elif operation == 'PUT':
method = urlfetch.PUT
elif operation == 'DELETE':
method = urlfetch.DELETE
else:
method = None
return HttpResponse(urlfetch.Fetch(url=str(url), payload=data_str,
method=method, headers=all_headers))
def request(self, operation, url, data=None, headers=None):
"""Performs an HTTP call to the server, supports GET, POST, PUT, and
DELETE.
Usage example, perform and HTTP GET on http://www.google.com/:
import atom.http
client = atom.http.HttpClient()
http_response = client.request('GET', 'http://www.google.com/')
Args:
operation: str The HTTP operation to be performed. This is usually one
of 'GET', 'POST', 'PUT', or 'DELETE'
data: filestream, list of parts, or other object which can be converted
to a string. Should be set to None when performing a GET or DELETE.
If data is a file-like object which can be read, this method will
read a chunk of 100K bytes at a time and send them.
If the data is a list of parts to be sent, each part will be
evaluated and sent.
url: The full URL to which the request should be sent. Can be a string
or atom.url.Url.
headers: dict of strings. HTTP headers which should be sent
in the request.
"""
all_headers = self.headers.copy()
if headers:
all_headers.update(headers)
# Construct the full payload.
# Assume that data is None or a string.
data_str = data
if data:
if isinstance(data, list):
# If data is a list of different objects, convert them all to strings
# and join them together.
converted_parts = [__ConvertDataPart(x) for x in data]
data_str = ''.join(converted_parts)
else:
data_str = __ConvertDataPart(data)
# If the list of headers does not include a Content-Length, attempt to
# calculate it based on the data object.
if data and 'Content-Length' not in all_headers:
all_headers['Content-Length'] = len(data_str)
# Set the content type to the default value if none was set.
if 'Content-Type' not in all_headers:
all_headers['Content-Type'] = 'application/atom+xml'
# Lookup the urlfetch operation which corresponds to the desired HTTP verb.
if operation == 'GET':
method = urlfetch.GET
elif operation == 'POST':
method = urlfetch.POST
elif operation == 'PUT':
method = urlfetch.PUT
elif operation == 'DELETE':
method = urlfetch.DELETE
else:
method = None
return HttpResponse(urlfetch.Fetch(url=str(url), payload=data_str,
method=method, headers=all_headers))
def impersonate(self, user_id=DEFAULT):
"""
To use this make a POST to
`http://..../impersonate request.post_vars.user_id=<id>`
Set request.post_vars.user_id to 0 to restore original user.
requires impersonator is logged in and::
has_permission('impersonate', 'auth_user', user_id)
"""
request = current.request
session = current.session
auth = session.auth
table_user = self.table_user()
if not self.is_logged_in():
raise HTTP(401, "Not Authorized")
current_id = auth.user.id
requested_id = user_id
if user_id is DEFAULT:
user_id = current.request.post_vars.user_id
if user_id and user_id != self.user.id and user_id != '0':
if not self.has_permission('impersonate',
self.table_user(),
user_id):
raise HTTP(403, "Forbidden")
user = table_user(user_id)
if not user:
raise HTTP(401, "Not Authorized")
auth.impersonator = pickle.dumps(session, pickle.HIGHEST_PROTOCOL)
auth.user.update(
table_user._filter_fields(user, True))
self.user = auth.user
self.update_groups()
log = self.messages['impersonate_log']
self.log_event(log, dict(id=current_id, other_id=auth.user.id))
self.run_login_onaccept()
elif user_id in (0, '0'):
if self.is_impersonating():
session.clear()
session.update(pickle.loads(auth.impersonator))
self.user = session.auth.user
self.update_groups()
self.run_login_onaccept()
return None
if requested_id is DEFAULT and not request.post_vars:
return SQLFORM.factory(Field('user_id', 'integer'))
return SQLFORM(table_user, user.id, readonly=True)
def request(self, operation, url, data=None, headers=None):
"""Performs an HTTP call to the server, supports GET, POST, PUT, and
DELETE.
Usage example, perform and HTTP GET on http://www.google.com/:
import atom.http
client = atom.http.HttpClient()
http_response = client.request('GET', 'http://www.google.com/')
Args:
operation: str The HTTP operation to be performed. This is usually one
of 'GET', 'POST', 'PUT', or 'DELETE'
data: filestream, list of parts, or other object which can be converted
to a string. Should be set to None when performing a GET or DELETE.
If data is a file-like object which can be read, this method will
read a chunk of 100K bytes at a time and send them.
If the data is a list of parts to be sent, each part will be
evaluated and sent.
url: The full URL to which the request should be sent. Can be a string
or atom.url.Url.
headers: dict of strings. HTTP headers which should be sent
in the request.
"""
all_headers = self.headers.copy()
if headers:
all_headers.update(headers)
# Construct the full payload.
# Assume that data is None or a string.
data_str = data
if data:
if isinstance(data, list):
# If data is a list of different objects, convert them all to strings
# and join them together.
converted_parts = [__ConvertDataPart(x) for x in data]
data_str = ''.join(converted_parts)
else:
data_str = __ConvertDataPart(data)
# If the list of headers does not include a Content-Length, attempt to
# calculate it based on the data object.
if data and 'Content-Length' not in all_headers:
all_headers['Content-Length'] = len(data_str)
# Set the content type to the default value if none was set.
if 'Content-Type' not in all_headers:
all_headers['Content-Type'] = 'application/atom+xml'
# Lookup the urlfetch operation which corresponds to the desired HTTP verb.
if operation == 'GET':
method = urlfetch.GET
elif operation == 'POST':
method = urlfetch.POST
elif operation == 'PUT':
method = urlfetch.PUT
elif operation == 'DELETE':
method = urlfetch.DELETE
else:
method = None
return HttpResponse(urlfetch.Fetch(url=str(url), payload=data_str,
method=method, headers=all_headers))
def auth_callback_provider( self ):
# STEP 3
oauth_verifier = self.request.get( 'oauth_verifier' )
params = [( 'oauth_consumer_key' , settings.secrets.CLIENT_ID_TWITTER ),
( 'oauth_nonce' , webapp2_extras.security.generate_random_string( length = 42, pool = webapp2_extras.security.ALPHANUMERIC ).encode( 'utf-8' )),
( 'oauth_signature_method' , "HMAC-SHA1" ),
( 'oauth_timestamp' , str( int( time.time()))),
( 'oauth_token', self.session.get( 'twitter_oauth_token' )),
( 'oauth_version' , "1.0" )]
normalised_url = 'https://api.twitter.com/oauth/access_token/'
oauth_signature = self.auth_sign( normalised_url, params, self.session.get( 'twitter_oauth_token_secret') )
params.append(( 'oauth_signature', oauth_signature ))
params.append(( 'oauth_verifier', oauth_verifier ))
url_params = enki.libutil.urlencode( params )
result = self.urlfetch_safe( url = normalised_url, payload = url_params, method = urlfetch.POST )
response = self.process_result_as_query_string( result )
oauth_token = response.get( 'oauth_token' )
oauth_token_secret = response.get('oauth_token_secret')
user_id = response.get( 'user_id')
if user_id and oauth_token:
#get email address if we can
verify_params = [('include_email', 'true'),
('include_entities','false'),
('oauth_consumer_key', settings.secrets.CLIENT_ID_TWITTER ),
('oauth_nonce', webapp2_extras.security.generate_random_string( length = 42, pool = webapp2_extras.security.ALPHANUMERIC ).encode( 'utf-8' )),
('oauth_signature_method', "HMAC-SHA1"),
('oauth_timestamp', str(int(time.time()))),
('oauth_token', oauth_token ),
('oauth_version', "1.0"),
('skip_status', 'true')]
verify_oauth_signature = self.auth_sign('https://api.twitter.com/1.1/account/verify_credentials.json', verify_params,oauth_token_secret, method_get=True )
verify_params.append(('oauth_signature', verify_oauth_signature))
verify_url_params = enki.libutil.urlencode( verify_params )
full_url = 'https://api.twitter.com/1.1/account/verify_credentials.json?' + verify_url_params
verify_credentials_result_json = self.urlfetch_safe( url = full_url, method = urlfetch.GET )
verify_credentials_result = self.process_result_as_JSON(verify_credentials_result_json)
response['email'] = verify_credentials_result['email']
response['email_verified'] = True
loginInfoSettings = { 'provider_uid': 'user_id',
'email': 'email',
'email_verified': 'email_verified' }
loginInfo = self.process_login_info( loginInfoSettings, response )
self.provider_authenticated_callback( loginInfo )
else:
self.abort( 401 )
return
def set_welcome_message(fb_page_id):
url = 'https://graph.facebook.com/v2.6/' + fb_page_id + '/thread_settings?access_token=' + config.FACEBOOK_PAGE_ACCESS_TOKEN
if config.PRODUCTION:
content = {
'setting_type': 'call_to_actions',
'thread_state': 'new_thread',
'call_to_actions': [
{
'message': {
'text': 'Hello there!',
'attachment': {
'type': 'template',
'payload': {
'template_type': 'generic',
'elements': [
{
'title': 'Welcome to %s' % config.FACEBOOK_BOT_NAME,
'item_url': 'https://gilacoolbot.appspot.com',
'image_url': 'http://messengerdemo.parseapp.com/img/rift.png',
'subtitle': 'This is a subtitle',
'buttons': [
{
'type': 'web_url',
'title': 'View website',
'url': 'https://gilacoolbot.appspot.com'
},
{
'type': 'postback',
'title': 'Start chatting',
'payload': 'DEVELOPER_DEFINED_PAYLOAD',
}
]
}
]
}
}
}
}
]
}
headers = {
'Content-Type': 'application/json'
}
payload = json.dumps(content)
req = urlfetch.fetch(
url,
payload,
urlfetch.POST,
headers
)
logging.debug(req.content)
def impersonate(self, user_id=DEFAULT):
"""
To use this make a POST to
`http://..../impersonate request.post_vars.user_id=<id>`
Set request.post_vars.user_id to 0 to restore original user.
requires impersonator is logged in and::
has_permission('impersonate', 'auth_user', user_id)
"""
request = current.request
session = current.session
auth = session.auth
table_user = self.table_user()
if not self.is_logged_in():
raise HTTP(401, "Not Authorized")
current_id = auth.user.id
requested_id = user_id
if user_id is DEFAULT:
user_id = current.request.post_vars.user_id
if user_id and user_id != self.user.id and user_id != '0':
if not self.has_permission('impersonate',
self.table_user(),
user_id):
raise HTTP(403, "Forbidden")
user = table_user(user_id)
if not user:
raise HTTP(401, "Not Authorized")
auth.impersonator = pickle.dumps(session, pickle.HIGHEST_PROTOCOL)
auth.user.update(
table_user._filter_fields(user, True))
self.user = auth.user
self.update_groups()
log = self.messages['impersonate_log']
self.log_event(log, dict(id=current_id, other_id=auth.user.id))
self.run_login_onaccept()
elif user_id in (0, '0'):
if self.is_impersonating():
session.clear()
session.update(pickle.loads(auth.impersonator))
self.user = session.auth.user
self.update_groups()
self.run_login_onaccept()
return None
if requested_id is DEFAULT and not request.post_vars:
return SQLFORM.factory(Field('user_id', 'integer'))
return SQLFORM(table_user, user.id, readonly=True)