def _write_request(self, request_url, json_message, api_key=None):
data = {
'data': base64.b64encode(json_message.encode('utf8')),
'verbose': 1,
'ip': 0,
}
if api_key:
data.update({'api_key': api_key})
encoded_data = urllib.parse.urlencode(data).encode('utf8')
try:
request = urllib.request.Request(request_url, encoded_data)
# Note: We don't send timeout=None here, because the timeout in urllib2 defaults to
# an internal socket timeout, not None.
if self._request_timeout is not None:
response = urllib.request.urlopen(request, timeout=self._request_timeout).read()
else:
response = urllib.request.urlopen(request).read()
except urllib.error.URLError as e:
raise six.raise_from(MixpanelException(e), e)
try:
response = json.loads(response.decode('utf8'))
except ValueError:
raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response))
if response['status'] != 1:
raise MixpanelException('Mixpanel error: {0}'.format(response['error']))
return True
评论列表
文章目录