def import_dashboard_via_user_pass(api_url, user, password, dashboard):
payload = {'dashboard': dashboard,
'overwrite': True}
auth_string = base64.b64encode('%s:%s' % (user, password))
headers = {'Authorization': "Basic {}".format(auth_string),
'Content-Type': 'application/json'}
req = urllib2.Request(api_url + 'api/dashboards/db',
headers=headers,
data=json.dumps(payload))
try:
resp = urllib2.urlopen(req)
data = json.load(resp)
return data
except urllib2.HTTPError, error:
data = json.load(error)
return data
python类HTTPError()的实例源码
def get(self, url, proxy=None):
if proxy:
proxy = urllib2.ProxyHandler({'http': proxy})
opener = urllib2.build_opener(proxy)
urllib2.install_opener(opener)
try:
response = urllib2.urlopen(url)
except HTTPError, e:
resp = e.read()
self.status_code = e.code
except URLError, e:
resp = e.read()
self.status_code = e.code
else:
self.status_code = response.code
resp = response.read()
return resp
def run(self):
data = self.getData()
value = {
data: {
"type": self.data_type
}
}
json_data = json.dumps(value)
post_data = json_data.encode('utf-8')
headers = {'Content-Type': 'application/json'}
try:
request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers)
response = urllib2.urlopen(request)
report = json.loads(response.read())
self.report(report)
except urllib2.HTTPError:
self.error("Hippocampe: " + str(sys.exc_info()[1]))
except urllib2.URLError:
self.error("Hippocampe: service is not available")
except Exception as e:
self.unexpectedError(e)
def send_result(email, result, title, urn):
"""
Args:
email (str): address to send the results
result (obj): results to send
title (str):
urn (str): uniform resource name
Returns:
str: response from endpoint
"""
url = 'https://mongoaud.it/results'
headers = {'Content-type': 'application/json',
'Accept': 'application/json'}
values = {'email': email, 'result': result, 'title': title, 'urn': urn, 'date': get_date()}
try:
req = urllib2.Request(url, json.dumps(values), headers)
response = urllib2.urlopen(req)
return response.read()
except (urllib2.HTTPError, urllib2.URLError) as exc:
return "Sadly enough, we are having technical difficulties at the moment, " \
"please try again later.\n\n%s" % str(exc)
def check_version(version):
# if application is binary then check for latest version
if getattr(sys, 'frozen', False):
try:
url = "https://api.github.com/repos/stampery/mongoaudit/releases/latest"
req = urllib2.urlopen(url)
releases = json.loads(req.read())
latest = releases["tag_name"]
if version < latest:
print("mongoaudit version " + version)
print("There's a new version " + latest)
_upgrade(releases)
except (urllib2.HTTPError, urllib2.URLError):
print("Couldn't check for upgrades")
except os.error:
print("Couldn't write mongoaudit binary")
def download_songs(url):
time.sleep(random.random() * 0.5)
try:
page = urllib2.urlopen(url).read()
soup = BeautifulSoup(page, 'html.parser')
# Get the artist name
artist_name = soup.findAll('h1')[0].get_text()[:-7].lower().replace(' ', '_')
# Store all songs for a given artist
with open('artist_data/'+artist_name+'.txt', 'wb') as w:
for song in soup.findAll('a', {'target': '_blank'}):
if 'lyrics/' in song['href']:
song_url = song['href'][1:].strip()
w.write(song_url + '\n')
except urllib2.HTTPError:
print '404 not found'
def TestSite(url):
protocheck(url)
print "Trying: " + url
try:
urllib2.urlopen(url, timeout=3)
except urllib2.HTTPError, e:
if e.code == 405:
print url + " found!"
print "Now the brute force will begin! >:)"
if e.code == 404:
printout(str(e), YELLOW)
print " - XMLRPC has been moved, removed, or blocked"
sys.exit()
except urllib2.URLError, g:
printout("Could not identify XMLRPC. Please verify the domain.\n", YELLOW)
sys.exit()
except socket.timeout as e:
print type(e)
printout("The socket timed out, try it again.", YELLOW)
sys.exit()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-q', '--term', dest='term', default=DEFAULT_TERM,
type=str, help='Search term (default: %(default)s)')
parser.add_argument('-l', '--location', dest='location',
default=DEFAULT_LOCATION, type=str,
help='Search location (default: %(default)s)')
input_values = parser.parse_args()
try:
query_api(input_values.term, input_values.location)
except HTTPError as error:
sys.exit(
'Encountered HTTP error {0} on {1}:\n {2}\nAbort program.'.format(
error.code,
error.url,
error.read(),
)
)
def _do_trakt_auth_post(self, url, data):
try:
session = self.get_session()
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + session,
'trakt-api-version': '2',
'trakt-api-key': self.CLIENT_ID
}
# timeout in seconds
timeout = 5
socket.setdefaulttimeout(timeout)
request = urllib2.Request(url, data, headers)
response = urllib2.urlopen(request).read()
self.logger.info('Response: {0}'.format(response))
return response
except urllib2.HTTPError as e:
self.logger.error('Unable to submit post data {url} - {error}'.format(url=url, error=e.reason))
raise
def GetThatShit(head_URL):
source = ""
global gets;global proxy_num
head_URL = head_URL.replace("+",arg_eva)
request_web = urllib2.Request(head_URL)
request_web.add_header('User-Agent',agent)
while len(source) < 1:
if arg_debug == "on":
print "\n[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
try:
gets+=1;proxy_num+=1
source = proxy_list[proxy_num % proxy_len].open(request_web).read()
except (KeyboardInterrupt, SystemExit):
raise
except (urllib2.HTTPError):
print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Trying again!"
print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
break
except:
print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Look at the error and try to figure it out!"
print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
raise
return source
#the guts and glory - Binary Algorithim that does all the guessing for the Blind Methodology
def getFile(link):
try:
source = urllib2.urlopen(link)
except(urllib2.HTTPError),msg:
print "\nError:",msg
sys.exit()
num = 1
file = 'tmp_insidepropw_'+link.split('=')[1]+'.txt'
while os.path.isfile(file) == True:
file = link.rsplit("/",1)[1]+"."+str(num)
num+=1
try:
shutil.copyfileobj(source, open(file, "w+"))
except(IOError):
print "\nCannot write to `"+file+"' (Permission denied)."
sys.exit(1)
print "File downloaded", file
newfilelist.append(file)
def GetThatShit(head_URL):
source = ""
global gets;global proxy_num
head_URL = head_URL.replace("+",arg_eva)
request_web = urllib2.Request(head_URL)
request_web.add_header('User-Agent',agent)
while len(source) < 1:
if arg_debug == "on":
print "\n[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
try:
gets+=1;proxy_num+=1
source = proxy_list[proxy_num % proxy_len].open(request_web).read()
except (KeyboardInterrupt, SystemExit):
raise
except (urllib2.HTTPError):
print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Trying again!"
print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
break
except:
print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Look at the error and try to figure it out!"
print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n"
raise
return source
#say hello
def run(self):
password = getword()
try:
print "-"*12
print "User:",username,"Password:",password
req = urllib2.Request(sys.argv[1])
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, sys.argv[1], username, password)
authhandler = urllib2.HTTPBasicAuthHandler(passman)
opener = urllib2.build_opener(authhandler)
fd = opener.open(req)
print "\t\n\n[+] Login successful: Username:",username,"Password:",password,"\n"
print "[+] Retrieved", fd.geturl()
info = fd.info()
for key, value in info.items():
print "%s = %s" % (key, value)
sys.exit(2)
except (urllib2.HTTPError,socket.error):
pass
def run(self):
username, password = getword()
try:
print "-"*12
print "User:",username,"Password:",password
req = urllib2.Request(sys.argv[1])
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, sys.argv[1], username, password)
authhandler = urllib2.HTTPBasicAuthHandler(passman)
opener = urllib2.build_opener(authhandler)
fd = opener.open(req)
print "\t\n\nUsername:",username,"Password:",password,"----- Login successful!!!\n\n"
print "Retrieved", fd.geturl()
info = fd.info()
for key, value in info.items():
print "%s = %s" % (key, value)
sys.exit(2)
except (urllib2.HTTPError, httplib.BadStatusLine,socket.error), msg:
print "An error occurred:", msg
pass
def getCookie(self):
"""
This method is the first to be called when initializing a
Google dorking object through this library. It is used to
retrieve the Google session cookie needed to perform the
further search
"""
try:
conn = self.opener.open("http://www.google.com/ncr")
headers = conn.info()
except urllib2.HTTPError, e:
headers = e.info()
except urllib2.URLError, e:
errMsg = "unable to connect to Google"
raise sqlmapConnectionException, errMsg
def search(self, googleDork):
"""
This method performs the effective search on Google providing
the google dork and the Google session cookie
"""
if not googleDork:
return None
url = "http://www.google.com/search?"
url += "q=%s&" % urlencode(googleDork)
url += "num=100&hl=en&safe=off&filter=0&btnG=Search"
try:
conn = self.opener.open(url)
page = conn.read()
except urllib2.HTTPError, e:
page = e.read()
except urllib2.URLError, e:
errMsg = "unable to connect to Google"
raise sqlmapConnectionException, errMsg
self.__matches = self.__parsePage(page)
return self.__matches
def _api_call(url, opener):
"""
Makes a REST call against the Couchbase API.
Args:
url (str): The URL to get, including endpoint
Returns:
list: The JSON response
"""
try:
urllib2.install_opener(opener)
resp = urllib2.urlopen(url, timeout=http_timeout)
except (urllib2.HTTPError, urllib2.URLError) as e:
collectd.error("Error making API call (%s) %s" % (e, url))
return None
try:
return json.load(resp)
except ValueError, e:
collectd.error("Error parsing JSON for API call (%s) %s" % (e, url))
return None
def _woxikon_de_url_handler(target):
'''
Query woxikon for sysnonym
'''
time_out_choice = float(get_variable(
'tq_online_backends_timeout', _timeout_period_default))
try:
response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
web_content = StringIO(unescape(decode_utf_8(response.read())))
response.close()
except HTTPError:
return 1
except URLError as err:
if isinstance(err.reason, socket.timeout): # timeout error?
return 1
return -1 # other error
except socket.timeout: # timeout error failed to be captured by URLError
return 1
return web_content
def _jeck_ru_url_handler(target):
'''
Query jiport for sysnonym
'''
time_out_choice = float(get_variable(
'tq_online_backends_timeout', _timeout_period_default))
try:
response = urlopen(fixurl(u'http://jeck.ru/tools/SynonymsDictionary/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice)
web_content = StringIO(decode_utf_8(response.read()))
response.close()
except HTTPError:
return 1
except URLError as err:
if isinstance(err.reason, socket.timeout): # timeout error?
return 1
return -1 # any other error
except socket.timeout: # if timeout error not captured by URLError
return 1
return web_content
def post_request(self, request, payload=None):
# FIXME: provide full set of ssl options instead of this hack
if self.server_url.startswith('https'):
import ssl
return urllib2.urlopen(request, data=payload, timeout=self.timeout,
context=ssl._create_unverified_context())
return urllib2.urlopen(request, data=payload, timeout=self.timeout)
# def post_request(self, request, payload=None): # @UnusedVariable
# try:
# try:
# _response = urllib2.urlopen(request, timeout=self.timeout)
# except TypeError:
# _response = urllib2.urlopen(request)
# except urllib2.HTTPError, e:
# logerr("post failed: %s" % e)
# raise weewx.restx.FailedPost(e)
# else:
# return _response
def gethtml(url):
try:
request = urllib2.Request(url)
request.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0')
request.add_header('Accept-Language', 'en-us;q=0.5,en;q=0.3')
request.add_header('Referer', request.get_full_url())
u = urllib2.urlopen(request , timeout = 3)
content = u.read()
try:
content = content.encode("utf-8")
except:
content = content.decode('gbk','ignore').encode("utf-8",'ignore')
return {"html":content,"code":u.code,"url":u.geturl()}
except urllib2.HTTPError,e:
try:
return {"html":e.read(),"code":e.code,"url":e.geturl()}
except:
return {"html":'',"code":e.code,"url":e.geturl()}
except:
return {"html":"","code":404, "url":url}
def symlinks(user, repo):
mappings = []
url1 = 'https://api.github.com/repos/%s/%s/git/refs/heads/master' % (user, repo)
try:
r = urllib2.urlopen(url1)
except urllib2.HTTPError:
print("Invalid url %s.Leaving..." % url1)
sys.exit(1)
base = json.load(r)
sha = base['object']['sha']
url2 = 'https://api.github.com/repos/%s/%s/git/trees/%s?recursive=1' % (user, repo, sha)
r = urllib2.urlopen(url2)
try:
base = json.load(r)
except:
return []
for e in base['tree']:
if e['mode'] == '120000':
mappings.append(e['path'])
return mappings
def __login(self):
"""Funkcja wykonuj?ca logowanie do librusa"""
# Odebranie ciasteczek
self.__opener.addheaders = [('Authorization', 'Basic MzU6NjM2YWI0MThjY2JlODgyYjE5YTMzZjU3N2U5NGNiNGY=')]
try:
self.__opener.open('https://synergia.librus.pl')
list(self.__cj)[0].domain='api.librus.pl'
tokens = loads(self.__opener.open('https://api.librus.pl/OAuth/Token',
data=urlencode({
'grant_type': 'password',
'username': config.login,
'password': config.password,
'librus_long_term_token': '1',
})).read())
except urllib2.HTTPError as e:
e.getcode() == 400
raise WrongPasswordError('Nieprawid?owe has?o')
self.__opener.addheaders = [('Authorization', 'Bearer %s' % tokens['access_token'])]
def get_announcements(self):
"""
Funkcja pobieraj?ca dane ze strony https://librus.synergia.pl/ogloszenia
:returns: :return: lista [{"author": autor,
"title": tytu?,
"time": czas,
"content": zawarto??}]
"""
# Za?adowanie og?osze?
try:
data = loads(self.__opener.open('https://api.librus.pl/2.0/SchoolNotices').read())
except urllib2.HTTPError:
raise SessionExpiredError
print data
return [{'author': notice[u'AddedBy'][u'Id'],
'title': notice[u'Subject'].encode('utf-8'),
'content': notice[u'Content'].encode('utf-8'),
'time': notice[u'StartDate']
} for notice in data[u'SchoolNotices']]
def basic_auth(server="http://127.0.0.1"):
"""
to use basic login with a different server
from gluon.contrib.login_methods.basic_auth import basic_auth
auth.settings.login_methods.append(basic_auth('http://server'))
"""
def basic_login_aux(username,
password,
server=server):
key = base64.b64encode(username + ':' + password)
headers = {'Authorization': 'Basic ' + key}
request = urllib2.Request(server, None, headers)
try:
urllib2.urlopen(request)
return True
except (urllib2.URLError, urllib2.HTTPError):
return False
return basic_login_aux
getServerSecretShare.py 文件源码
项目:incubator-milagro-mfa-server
作者: apache
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def get_server_secret(credentials, expires):
""" Fetch server secret from CertiVox server """
path = 'serverSecret'
params = urllib.urlencode({
'app_id': credentials['app_id'],
'expires': expires,
'signature': sign_message(
'{}{}{}'.format(path, credentials['app_id'], expires),
str(credentials['app_key'])
)
})
try:
response = urllib2.urlopen('{api_url}{end_point}?{params}'.format(
api_url=credentials['api_url'],
end_point=path,
params=params,
))
except urllib2.HTTPError as e:
if e.code == 408:
print "Make sure your time it correct!"
raise ScriptException('Response code: {} - {}'.format(e.code, e.read()))
data = json.loads(response.read())
return data['serverSecret']
def fetch_json(self, url):
"""Fetch remote json"""
timeout = 1
while True:
try:
logging.debug('Opening %s.', url)
response = urllib2.urlopen(url)
break
except urllib2.HTTPError as err:
if timeout <= MAX_TIMEOUT:
logging.warn('Error opening %s, error code %d, reason is %s.', url, err.code, err.reason)
logging.warn('Waiting for %ds before retrying.', timeout)
time.sleep(timeout)
timeout *= 2
else:
logging.error('Error opening %s, error code %d, reason is %s.', url, err.code, err.reason)
raise err
data = json.load(response)
return data
def __init__(self):
super(SwarfarmLogger, self).__init__()
self.plugin_enabled = True
config_name = 'swproxy.config'
if not os.path.exists(config_name):
self.config = {}
else:
with open(config_name) as f:
self.config = json.load(f)
self.plugin_enabled = not self.config.get('disable_swarfarm_logger', False)
if self.plugin_enabled:
# Get the list of accepted commands from the server
logger.info('SwarfarmLogger - Retrieving list of accepted log types from SWARFARM...')
try:
resp = urllib2.urlopen(self.commands_url)
self.accepted_commands = json.loads(resp.readline())
resp.close()
logger.info('SwarfarmLogger - Looking for the following commands to log:\r\n' + ', '.join(self.accepted_commands.keys()))
except urllib2.HTTPError:
logger.fatal('SwarfarmLogger - Unable to retrieve accepted log types. SWARFARM logging is disabled.')
self.plugin_enabled = False
def process_data(self, req_json, resp_json):
command = req_json.get('command')
if command in self.accepted_commands:
accepted_data = self.accepted_commands[command]
result_data = {}
if 'request' in accepted_data:
result_data['request'] = {item: req_json.get(item) for item in accepted_data['request']}
if 'response' in accepted_data:
result_data['response'] = {item: resp_json.get(item) for item in accepted_data['response']}
if result_data:
data = json.dumps(result_data)
try:
resp = urllib2.urlopen(self.log_url, data=urllib.urlencode({'data': data}))
except urllib2.HTTPError as e:
logger.warn('SwarfarmLogger - Error: {}'.format(e.readline()))
else:
resp.close()
logger.info('SwarfarmLogger - {} logged successfully'.format(command))
def main():
module = AnsibleModule(
argument_spec=dict(
url=dict(required=True),
body=dict(required=True),
header=dict(required=False),
)
)
url = module.params['url']
body = module.params['body']
header = module.params['header']
req = Request(url)
req.add_header('Content-Type', 'application/json')
if header:
for k, v in header.iteritems():
req.add_header(k, v)
try:
urlopen(req, json.dumps(body))
except HTTPError as e:
module.fail_json(msg=e.reason, code=e.code, response=e.read())
else:
module.exit_json(changed=True)