def send_message_android(destination, message):
headers = {
'Authorization': 'key=' + settings.FIREBASE_SERVER_KEY,
'Content - Type': 'application/json'
}
payload = {
"to": destination,
"data": {
"title": config.TITLE_PUSH_NOTIFICATION,
"detail": message
}
}
request = requests.post(
settings.FIREBASE_API_URL,
json=payload,
headers=headers
)
print(request.text)
python类post()的实例源码
def send_message_ios(destination, message):
headers = {
'Authorization': 'key=' + settings.FIREBASE_SERVER_KEY,
'Content - Type': 'application/json'
}
payload = {
"to": destination,
"priority": "high",
"badge": 0,
"notification": {
"title": config.TITLE_PUSH_NOTIFICATION,
"text": message,
"sound": "default",
}
}
request = requests.post(
settings.FIREBASE_API_URL,
json=payload,
headers=headers
)
print(request.text)
def obtain_access_token(config, password):
info("Will now attempt to obtain an JWT...")
auth_request_data = {'client_id': 'jumpauth',
'username': config.username,
'password': password,
'grant_type': 'password'}
auth_url = get_auth_url(config)
auth_response = requests.post(auth_url, auth_request_data)
if auth_response.status_code not in [200, 201]:
log.info("Authentication failed: {0}".
format(auth_response.json().get("error")))
auth_response.raise_for_status()
else:
log.info("Authentication OK!")
# TODO bail out if there was no access token in the answer
access_token = auth_response.json()['access_token']
info("Access token was received.")
verbose("Access token is:\n'{0}'".format(access_token))
return access_token
def authenticate(username, password):
global Cookies, Host
try:
postData = "password=" + getPasswordEnc(password) + "&clientType=android&username=" + \
username + "&key=" + getSessionEnc(AccessToken) + "&code=8&clientip=" + LocalIpAddress
header['Content-Length'] = '219'
url = 'http://' + Host + '/wf.do'
resp = requests.post(url=url, headers=header, cookies=Cookies, data=postData)
resp.encoding = "GBK"
print resp.request.headers
text = resp.text
print text
if 'auth00' in text > 0:
print "????."
return True
else:
print "????[" + getErrorMsg(resp) + "]"
return False
except Exception as e:
print e
print "(6/10) ????????"
return False
def __call__(self):
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Cache-Control": "no-cache",
}
body = urlparse.urlencode({
"client_id": self.api_key,
"client_secret": self.client_secret,
"jwt_token": self.jwt_token
})
r = requests.post(self.endpoint, headers=headers, data=body)
if r.status_code != 200:
raise RuntimeError("Unable to authorize against {}:\n"
"Response Code: {:d}, Response Text: {}\n"
"Response Headers: {}]".format(self.endpoint, r.status_code, r.text, r.headers))
self.set_expiry(r.json()['expires_in'])
return r.json()['access_token']
def nightly_build(ctx, project_name, branch_name, circle_token):
url = 'https://circleci.com/api/v1/project/{}/tree/{}?circle-token={}'.format(
project_name,
branch_name,
circle_token
)
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
}
body = {
'build_parameters': {
'RUN_NIGHTLY_BUILD': "true",
}
}
response = requests.post(url, json=body, headers=headers)
print("Nightly Build queued on CircleCI: http_status={}, response={}".format(
response.status_code,
response.json()
))
def start_job(self, job_id):
"""
Request to start a job
:param job_id: the id of the job to start
:return: the response obj:
{
result: string 'success' or 'already started'
}
"""
# endpoint /v1/jobs/{job_id}/event
endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
doc = {"start": None}
r = requests.post(endpoint,
headers=self.headers,
data=json.dumps(doc),
verify=self.verify)
if r.status_code != 202:
raise exceptions.ApiClientException(r)
return r.json()
def stop_job(self, job_id):
"""
Request to stop a job
:param job_id: the id of the job to start
:return: the response obj:
{
result: string 'success' or 'already stopped'
}
"""
# endpoint /v1/jobs/{job_id}/event
endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
doc = {"stop": None}
r = requests.post(endpoint,
headers=self.headers,
data=json.dumps(doc),
verify=self.verify)
if r.status_code != 202:
raise exceptions.ApiClientException(r)
return r.json()
def abort_job(self, job_id):
"""
Request to abort a job
:param job_id: the id of the job to start
:return: the response obj:
{
result: string 'success' or 'already stopped'
}
"""
# endpoint /v1/jobs/{job_id}/event
endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
doc = {"abort": None}
r = requests.post(endpoint,
headers=self.headers,
data=json.dumps(doc),
verify=self.verify)
if r.status_code != 202:
raise exceptions.ApiClientException(r)
return r.json()
def end_session(self, session_id, job_id, session_tag, result):
"""
Informs the freezer service that the job has ended.
Provides information about the job's result and the session tag
:param session_id:
:param job_id:
:param session_tag:
:param result:
:return:
"""
# endpoint /v1/sessions/{sessions_id}/action
endpoint = '{0}{1}/action'.format(self.endpoint, session_id)
doc = {"end": {
"job_id": job_id,
"current_tag": session_tag,
"result": result
}}
r = requests.post(endpoint,
headers=self.headers,
data=json.dumps(doc),
verify=self.verify)
if r.status_code != 202:
raise exceptions.ApiClientException(r)
return r.json()
def stop_job(self, job_id):
"""
Request to stop a job
:param job_id: the id of the job to start
:return: the response obj:
{
result: string 'success' or 'already stopped'
}
"""
# endpoint /v1/jobs/{job_id}/event
endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
doc = {"stop": None}
r = requests.post(endpoint,
headers=self.headers,
data=json.dumps(doc),
verify=self.verify)
if r.status_code != 202:
raise exceptions.ApiClientException(r)
return r.json()
def abort_job(self, job_id):
"""
Request to abort a job
:param job_id: the id of the job to start
:return: the response obj:
{
result: string 'success' or 'already stopped'
}
"""
# endpoint /v1/jobs/{job_id}/event
endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
doc = {"abort": None}
r = requests.post(endpoint,
headers=self.headers,
data=json.dumps(doc),
verify=self.verify)
if r.status_code != 202:
raise exceptions.ApiClientException(r)
return r.json()
def end_session(self, session_id, job_id, session_tag, result):
"""
Informs the freezer service that the job has ended.
Provides information about the job's result and the session tag
:param session_id:
:param job_id:
:param session_tag:
:param result:
:return:
"""
# endpoint /v1/sessions/{sessions_id}/action
endpoint = '{0}{1}/action'.format(self.endpoint, session_id)
doc = {"end": {
"job_id": job_id,
"current_tag": session_tag,
"result": result
}}
r = requests.post(endpoint,
headers=self.headers,
data=json.dumps(doc),
verify=self.verify)
if r.status_code != 202:
raise exceptions.ApiClientException(r)
return r.json()
def send(notif_type, **params):
''' Send slack notifications. '''
url = config()['base_url'] + config()['endpoint']
(text, color) = notification.get(
notif_type,
config=get_config(),
notif_config=config(),
create_link=create_link,
**params
)
payload = {
'attachments': [
{
'color': color,
'text': text
}
]
}
requests.post(url, json=payload)
def send(notif_type, **params):
''' Send hipchat notifications. '''
url = API_BASE_URL.format(
company_name=config()['company_name'],
room_id=config()['room_id'],
auth_token=config()['auth_token']
)
(text, color) = notification.get(
notif_type,
config=get_config(),
notif_config=config(),
create_link=create_link,
**params
)
payload = {
'color': color,
'message': text,
'notify': config()['notify'],
'message_format': 'html'
}
requests.post(url, json=payload)
def create_logset(logset_name=None, params=None):
"""
Add a new logset to the current account.
If a filename is given, load the contents of the file
as json parameters for the request.
If a name is given, create a new logset with the given name
"""
if params is not None:
request_params = params
else:
request_params = {
'logset': {
'name': logset_name
}
}
headers = api_utils.generate_headers('rw')
try:
response = requests.post(_url()[1], json=request_params, headers=headers)
handle_response(response, 'Creating logset failed.\n', 201)
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def create(payload):
"""
Create an api key with the provided ID
"""
action, url = _url()
headers = api_utils.generate_headers('owner', method='POST', body=json.dumps(payload),
action=action)
try:
response = requests.post(url, headers=headers, json=payload)
if response_utils.response_error(response):
sys.stderr.write('Create api key failed.')
sys.exit(1)
elif response.status_code == 201:
handle_api_key_response(response)
except requests.exceptions.RequestException as error:
sys.stderr.write(error)
sys.exit(1)
def execute(self, document, variable_values=None, timeout=None):
query_str = print_ast(document)
payload = {
'query': query_str,
'variables': variable_values or {}
}
data_key = 'json' if self.use_json else 'data'
post_args = {
'headers': self.headers,
'auth': self.auth,
'cookies': self.cookies,
'timeout': timeout or self.default_timeout,
data_key: payload
}
request = requests.post(self.url, **post_args)
request.raise_for_status()
result = request.json()
assert 'errors' in result or 'data' in result, 'Received non-compatible response "{}"'.format(result)
return ExecutionResult(
errors=result.get('errors'),
data=result.get('data')
)
def refresh_access_token(self, refresh_token):
""" Returns the access token for Spotify Web API using a refresh token.
Args:
refresh_token (string): The token has been returned by the access
token request.
Returns:
SpotifyCredentials: The parsed response from Spotify.
"""
# Get new auth token
payload = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token
}
headers = self._make_authorization_headers()
return requests.post(
'https://accounts.spotify.com/api/token',
data=payload,
headers=headers
).json()
def request_access_token(self, token):
""" Request access token from Last.fm.
Args:
token (string): Token from redirect.
Return:
dict: Response from get session call.
"""
payload = {
'api_key': self.__key,
'method': 'auth.getSession',
'token': token
}
payload['api_sig'] = self.sign(payload)
payload['format'] = 'json'
response = requests.post(
'https://ws.audioscrobbler.com/2.0/', params=payload).json()
return LastfmCredentials(response['session']['key'])