def __init__(self, ipaddress, username = 'admin', password = ''):
self.address = ipaddress
self.username = username
self.password = password
pwd_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwd_mgr.add_password(None, 'http://{}'.format(self.address), self.username, self.password)
handler = urllib2.HTTPBasicAuthHandler(pwd_mgr)
self.conn = urllib2.build_opener(handler)
self._url_snapshot = 'http://{address}/snapshot.cgi'.format(address = self.address)
self._url_command = 'http://{address}/decoder_control.cgi?username={username}&password={password}&command='.format(address = self.address, username = self.username, password = self.password) + '{command}'
python类HTTPPasswordMgrWithDefaultRealm()的实例源码
def fetch_file(uri, file=None, username=None, password=None):
"""
Fetch a file based on the URI provided. If you do not pass in a file pointer
a tempfile.NamedTemporaryFile, or None if the file could not be
retrieved is returned.
The URI can be either an HTTP url, or "s3://bucket_name/key_name"
"""
boto.log.info('Fetching %s' % uri)
if file == None:
file = tempfile.NamedTemporaryFile()
try:
if uri.startswith('s3://'):
bucket_name, key_name = uri[len('s3://'):].split('/', 1)
c = boto.connect_s3(aws_access_key_id=username,
aws_secret_access_key=password)
bucket = c.get_bucket(bucket_name)
key = bucket.get_key(key_name)
key.get_contents_to_file(file)
else:
if username and password:
passman = urllib2.HTTPPasswordMgrWithDefaultRealm()
passman.add_password(None, uri, username, password)
authhandler = urllib2.HTTPBasicAuthHandler(passman)
opener = urllib2.build_opener(authhandler)
urllib2.install_opener(opener)
s = urllib2.urlopen(uri)
file.write(s.read())
file.seek(0)
except:
raise
boto.log.exception('Problem Retrieving file: %s' % uri)
file = None
return file
def readUrl(self,host,port,url,user,password):
error=False
tomcatUrl = "http://"+host+":"+str(port)+url
try:
pwdManager = urlconnection.HTTPPasswordMgrWithDefaultRealm()
pwdManager.add_password(None,tomcatUrl,user,password)
authHandler = urlconnection.HTTPBasicAuthHandler(pwdManager)
opener=urlconnection.build_opener(authHandler)
urlconnection.install_opener(opener)
req = urlconnection.Request(tomcatUrl)
handle = urlconnection.urlopen(req, None)
data = handle.read()
except HTTPError as e:
if(e.code==401):
data="ERROR: Unauthorized user. Does not have permissions. %s" %(e)
elif(e.code==403):
data="ERROR: Forbidden, yours credentials are not correct. %s" %(e)
else:
data="ERROR: The server couldn\'t fulfill the request. %s" %(e)
error=True
except URLError as e:
data = 'ERROR: We failed to reach a server. Reason: %s' %(e.reason)
error = True
except socket.timeout as e:
data = 'ERROR: Timeout error'
error = True
except socket.error as e:
data = "ERROR: Unable to connect with host "+self.host+":"+self.port
error = True
except:
data = "ERROR: Unexpected error: %s"%(sys.exc_info()[0])
error = True
return data,error
def readUrl(self,host,port,url,user,password):
error=False
tomcatUrl = "http://"+host+":"+str(port)+url
try:
pwdManager = urlconnection.HTTPPasswordMgrWithDefaultRealm()
pwdManager.add_password(None,tomcatUrl,user,password)
authHandler = urlconnection.HTTPBasicAuthHandler(pwdManager)
opener=urlconnection.build_opener(authHandler)
urlconnection.install_opener(opener)
req = urlconnection.Request(tomcatUrl)
handle = urlconnection.urlopen(req, None)
data = handle.read()
except HTTPError as e:
if(e.code==401):
data="ERROR: Unauthorized user. Does not have permissions. %s" %(e)
elif(e.code==403):
data="ERROR: Forbidden, yours credentials are not correct. %s" %(e)
else:
data="ERROR: The server couldn\'t fulfill the request. %s" %(e)
error=True
except URLError as e:
data = 'ERROR: We failed to reach a server. Reason: %s' %(e.reason)
error = True
except socket.timeout as e:
data = 'ERROR: Timeout error'
error = True
except socket.error as e:
data = "ERROR: Unable to connect with host "+self.host+":"+self.port
error = True
except:
data = "ERROR: Unexpected error: %s"%(sys.exc_info()[0])
error = True
return data,error
def metricCollector():
data = {}
#defaults
data['plugin_version'] = PLUGIN_VERSION
data['heartbeat_required']=HEARTBEAT
data['units']=METRICS_UNITS
URL = "http://"+COUCHDB_HOST+":"+COUCHDB_PORT+COUCHDB_STATS_URI
try:
if COUCHDB_USERNAME and COUCHDB_PASSWORD:
password_mgr = connector.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(REALM, URL, COUCHDB_USERNAME, COUCHDB_PASSWORD)
auth_handler = connector.HTTPBasicAuthHandler(password_mgr)
opener = connector.build_opener(auth_handler)
connector.install_opener(opener)
response = connector.urlopen(URL, timeout=10)
byte_responseData = response.read()
str_responseData = byte_responseData.decode('UTF-8')
couch_dict = json.loads(str_responseData)
for attribute, attribute_value in couch_dict.items():
for metric, val in attribute_value.items():
if 'current' in val and val['current'] is not None:
if metric in METRICS_KEY_VS_NAME:
metric = METRICS_KEY_VS_NAME[metric]
data[metric]=val['current']
except Exception as e:
data['status']=0
data['msg']=str(e)
return data
def metricCollector():
data = {}
#defaults
data['plugin_version'] = PLUGIN_VERSION
data['heartbeat_required']=HEARTBEAT
data['units']=METRICS_UNITS
URL = "http://"+RIAK_HOST+":"+RIAK_PORT+"/"+RIAK_STATS_URI
try:
if RIAK_USERNAME and RIAK_PASSWORD:
password_mgr = connector.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(REALM, URL, RIAK_USERNAME, RIAK_PASSWORD)
auth_handler = connector.HTTPBasicAuthHandler(password_mgr)
opener = connector.build_opener(auth_handler)
connector.install_opener(opener)
response = connector.urlopen(URL, timeout=10)
byte_responseData = response.read()
str_responseData = byte_responseData.decode('UTF-8')
riak_dict = json.loads(str_responseData)
for metric in riak_dict:
if metric in METRICS_TO_BE_PUSHED_TO_SERVER:
value=riak_dict[metric]
if metric in BYTES_TO_MB_LIST:
value=convertBytesToMB(value)
data[metric]=value
except Exception as e:
data['status']=0
data['msg']=str(e)
return data
def readUrl(self,host,port,url,user,password):
error=False
tomcatUrl = "http://"+host+":"+str(port)+url
try:
pwdManager = urlconnection.HTTPPasswordMgrWithDefaultRealm()
pwdManager.add_password(None,tomcatUrl,user,password)
authHandler = urlconnection.HTTPBasicAuthHandler(pwdManager)
opener=urlconnection.build_opener(authHandler)
urlconnection.install_opener(opener)
req = urlconnection.Request(tomcatUrl)
handle = urlconnection.urlopen(req, None)
data = handle.read()
except HTTPError as e:
if(e.code==401):
data="ERROR: Unauthorized user. Does not have permissions. %s" %(e)
elif(e.code==403):
data="ERROR: Forbidden, yours credentials are not correct. %s" %(e)
else:
data="ERROR: The server couldn\'t fulfill the request. %s" %(e)
error=True
except URLError as e:
data = 'ERROR: We failed to reach a server. Reason: %s' %(e.reason)
error = True
except socket.timeout as e:
data = 'ERROR: Timeout error'
error = True
except socket.error as e:
data = "ERROR: Unable to connect with host "+self.host+":"+self.port
error = True
except:
data = "ERROR: Unexpected error: %s"%(sys.exc_info()[0])
error = True
return data,error
def __build_url_opener(self, uri):
"""
Build the url opener for managing HTTP Basic Athentication
"""
# Create an OpenerDirector with support
# for Basic HTTP Authentication...
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(realm=None,
uri=uri,
user=self.client_id,
passwd=self.client_secret)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
return opener
def __init__(self, username, password, #pylint: disable=E1002
verbose=0, use_datetime=False, https_handler=None):
"""Initialize"""
if PY2:
Transport.__init__(self, use_datetime=False)
else:
super().__init__(use_datetime=False)
self._username = username
self._password = password
self._use_datetime = use_datetime
self.verbose = verbose
self._username = username
self._password = password
self._handlers = []
if self._username and self._password:
self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
self._auth_handler = urllib2.HTTPDigestAuthHandler(self._passmgr)
else:
self._auth_handler = None
self._passmgr = None
if https_handler:
self._handlers.append(https_handler)
self._scheme = 'https'
else:
self._scheme = 'http'
if self._auth_handler:
self._handlers.append(self._auth_handler)
def __setHTTPAuthentication():
"""
Check and set the HTTP authentication method (Basic or Digest),
username and password to perform HTTP requests with.
"""
global authHandler
if not conf.aType and not conf.aCred:
return
elif conf.aType and not conf.aCred:
errMsg = "you specified the HTTP Authentication type, but "
errMsg += "did not provide the credentials"
raise sqlmapSyntaxException, errMsg
elif not conf.aType and conf.aCred:
errMsg = "you specified the HTTP Authentication credentials, "
errMsg += "but did not provide the type"
raise sqlmapSyntaxException, errMsg
parseTargetUrl()
debugMsg = "setting the HTTP Authentication type and credentials"
logger.debug(debugMsg)
aTypeLower = conf.aType.lower()
if aTypeLower not in ( "basic", "digest" ):
errMsg = "HTTP Authentication type value must be "
errMsg += "Basic or Digest"
raise sqlmapSyntaxException, errMsg
aCredRegExp = re.search("^(.*?)\:(.*?)$", conf.aCred)
if not aCredRegExp:
errMsg = "HTTP Authentication credentials value must be "
errMsg += "in format username:password"
raise sqlmapSyntaxException, errMsg
authUsername = aCredRegExp.group(1)
authPassword = aCredRegExp.group(2)
passwordMgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
passwordMgr.add_password(None, "%s://%s" % (conf.scheme, conf.hostname), authUsername, authPassword)
if aTypeLower == "basic":
authHandler = urllib2.HTTPBasicAuthHandler(passwordMgr)
elif aTypeLower == "digest":
authHandler = urllib2.HTTPDigestAuthHandler(passwordMgr)
def openWebIF(self, part = None, reader = None):
self.proto = "http"
if config.oscaminfo.userdatafromconf.value:
self.ip = "127.0.0.1"
udata = self.getUserData()
if isinstance(udata, str):
if "httpuser" in udata:
self.username=""
elif "httppwd" in udata:
self.password = ""
else:
return False, udata
else:
self.port = udata[2]
self.username = udata[0]
self.password = udata[1]
else:
self.ip = ".".join("%d" % d for d in config.oscaminfo.ip.value)
self.port = str(config.oscaminfo.port.value)
self.username = str(config.oscaminfo.username.value)
self.password = str(config.oscaminfo.password.value)
if self.port.startswith( '+' ):
self.proto = "https"
self.port.replace("+","")
if part is None:
self.url = "%s://%s:%s/oscamapi.html?part=status" % ( self.proto, self.ip, self.port )
else:
self.url = "%s://%s:%s/oscamapi.html?part=%s" % ( self.proto, self.ip, self.port, part )
if part is not None and reader is not None:
self.url = "%s://%s:%s/oscamapi.html?part=%s&label=%s" % ( self.proto, self.ip, self.port, part, reader )
pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwman.add_password( None, self.url, self.username, self.password )
handlers = urllib2.HTTPDigestAuthHandler( pwman )
opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
urllib2.install_opener( opener )
request = urllib2.Request( self.url )
err = False
try:
data = urllib2.urlopen( request ).read()
# print data
except urllib2.URLError, e:
if hasattr(e, "reason"):
err = str(e.reason)
elif hasattr(e, "code"):
err = str(e.code)
if err is not False:
print "[openWebIF] Fehler: %s" % err
return False, err
else:
return True, data
def openWebIF(self, part = None, reader = None):
self.proto = "http"
if config.ncaminfo.userdatafromconf.value:
self.ip = "127.0.0.1"
udata = self.getUserData()
if isinstance(udata, str):
if "httpuser" in udata:
self.username=""
elif "httppwd" in udata:
self.password = ""
else:
return False, udata
else:
self.port = udata[2]
self.username = udata[0]
self.password = udata[1]
else:
self.ip = ".".join("%d" % d for d in config.ncaminfo.ip.value)
self.port = str(config.ncaminfo.port.value)
self.username = str(config.ncaminfo.username.value)
self.password = str(config.ncaminfo.password.value)
if self.port.startswith( '+' ):
self.proto = "https"
self.port.replace("+","")
if part is None:
self.url = "%s://%s:%s/ncamapi.html?part=status" % ( self.proto, self.ip, self.port )
else:
self.url = "%s://%s:%s/ncamapi.html?part=%s" % ( self.proto, self.ip, self.port, part )
if part is not None and reader is not None:
self.url = "%s://%s:%s/ncamapi.html?part=%s&label=%s" % ( self.proto, self.ip, self.port, part, reader )
pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwman.add_password( None, self.url, self.username, self.password )
handlers = urllib2.HTTPDigestAuthHandler( pwman )
opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
urllib2.install_opener( opener )
request = urllib2.Request( self.url )
err = False
try:
data = urllib2.urlopen( request ).read()
# print data
except urllib2.URLError, e:
if hasattr(e, "reason"):
err = str(e.reason)
elif hasattr(e, "code"):
err = str(e.code)
if err is not False:
print "[openWebIF] Fehler: %s" % err
return False, err
else:
return True, data
def create_jira_issue(self, server_url, username, password, issue_summary, issue_description, project_key, issue_type='Bug'):
"""
connect to jira server and create an issue under a specific project
"""
status = True
output_dict = {}
wdesc = "Creates a JIRA issue"
pSubStep(wdesc)
issue_summary = issue_summary.replace('"', " ")
issue_description = issue_description.replace('"', "-")
fetchuri = server_url
postdata_url=fetchuri+'/rest/api/2/issue/'
postdata = """
{
"fields": {
"project":
{
"key": \""""+project_key+"""\"
},
"summary": \""""+issue_summary+"""\",
"description": \""""+issue_description+"""\",
"issuetype": {
"name": \""""+issue_type+"""\"
}
}
}
"""
credential_handler=urllib2.HTTPPasswordMgrWithDefaultRealm()
credential_handler.add_password(None, postdata_url, username, password)
auth = urllib2.HTTPBasicAuthHandler(credential_handler)
userpassword = username + ":" + password
password = base64.b64encode(userpassword)
#Create an Authentication handler
opener = urllib2.build_opener(auth)
urllib2.install_opener(opener)
opener = urllib2.build_opener(urllib2.HTTPHandler(debuglevel=1))
#Create a POST request
headers={"Authorization" : "Basic "+password,"Content-Type": "application/json"}
request=urllib2.Request(str(postdata_url),postdata,headers)
try:
handler = urllib2.urlopen(request)
extension = json.loads(handler.read())
issue_id = str(extension['key'])
pNote("JIRA Issue Created. Issue-Id: {0}".format(issue_id))
output_dict["issue_id"] = issue_id
except Exception as e:
status = False
pNote("Problem creating JIRA issue." , "error")
pNote("JIRA Error Code: ({0})".format(e) , "error")
Utils.data_Utils.update_datarepository(output_dict)
Utils.testcase_Utils.report_substep_status(status)
return status
def openWebIF(self, part = None, reader = None):
if config.oscaminfo.userdatafromconf.getValue():
self.ip = "127.0.0.1"
udata = self.getUserData()
if isinstance(udata, str):
if "httpuser" in udata:
self.username=""
elif "httppwd" in udata:
self.password = ""
else:
return (False, udata)
else:
self.port = udata[2]
self.username = udata[0]
self.password = udata[1]
else:
self.ip = ".".join("%d" % d for d in config.oscaminfo.ip.getValue())
self.port = config.oscaminfo.port.getValue()
self.username = config.oscaminfo.username.getValue()
self.password = config.oscaminfo.password.getValue()
if part is None:
self.url = "http://%s:%s/oscamapi.html?part=status" % ( self.ip, self.port )
else:
self.url = "http://%s:%s/oscamapi.html?part=%s" % (self.ip, self.port, part )
if part is not None and reader is not None:
self.url = "http://%s:%s/oscamapi.html?part=%s&label=%s" % ( self.ip, self.port, part, reader )
print "URL=%s" % self.url
pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwman.add_password( None, self.url, self.username, self.password )
handlers = urllib2.HTTPDigestAuthHandler( pwman )
opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
urllib2.install_opener( opener )
request = urllib2.Request( self.url )
err = False
try:
data = urllib2.urlopen( request ).read()
# print data
except urllib2.URLError, e:
if hasattr(e, "reason"):
err = str(e.reason)
elif hasattr(e, "code"):
err = str(e.code)
if err is not False:
print "[openWebIF] Fehler: %s" % err
return (False, err)
else:
return (True, data)
def SHOWS(url):
if __settings__.getSetting('proxy_use') == 'true':
proxy_server = None
proxy_type_id = 0
proxy_port = 8080
proxy_user = None
proxy_pass = None
try:
proxy_server = __settings__.getSetting('proxy_server')
proxy_type_id = __settings__.getSetting('proxy_type')
proxy_port = __settings__.getSetting('proxy_port')
proxy_user = __settings__.getSetting('proxy_user')
proxy_pass = __settings__.getSetting('proxy_pass')
except:
pass
passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
proxy_details = 'http://' + proxy_server + ':' + proxy_port
passmgr.add_password(None, proxy_details, proxy_user, proxy_pass)
authinfo = urllib2.ProxyBasicAuthHandler(passmgr)
proxy_support = urllib2.ProxyHandler({"http" : proxy_details})
opener = urllib2.build_opener(proxy_support, authinfo)
urllib2.install_opener(opener)
f = urllib2.urlopen(url)
buf = f.read()
buf=re.sub('&','&',buf)
buf=re.sub('·','',buf)
#print "BUF %s" % buf
f.close()
buf = buf.split('grid-list__item width--one-half width--custard--one-third')
for p in buf:
try:
linkurl= re.compile('href="(.+?)"').findall (p)[0]
#print linkurl
image= re.compile('srcset="(.+?)"').findall (p)[0].replace('w=304&h=174','w=800&h=450')
#print image
name= re.compile('"tout__title complex-link__target theme__target">(.+?)</h3',re.DOTALL).findall (p)[0].strip()
#print name
episodes = re.compile('"tout__meta theme__meta">(.+?)</p',re.DOTALL).findall (p)[0].strip()
#print episodes
if 'day left' in episodes or 'days left' in episodes :
addDir2(name+' - '+episodes,linkurl,3,'', '',image,'',isFolder=False)
else:
if not 'no episodes' in episodes.lower():
addDir(name+' - '+episodes,linkurl,2,image)
except:pass
setView('tvshows', 'show')
def Send(self, uri, query_headers, query_data,ID):
self.uri = uri
self.query_headers = query_headers
self.query_data = query_data
self.ID = ID
# Connect-timeout in seconds
timeout = 5
socket.setdefaulttimeout(timeout)
url = '{}://{}{}'.format(self.proto, self.host, self.uri)
if self.verbose:
print "[Verbose] Sending:", url
if self.proto == 'https':
if hasattr(ssl, '_create_unverified_context'):
print "[i] Creating SSL Unverified Context"
ssl._create_default_https_context = ssl._create_unverified_context
if self.credentials:
Basic_Auth = self.credentials.split(':')
if self.verbose:
print "[Verbose] User:",Basic_Auth[0],"Password:",Basic_Auth[1]
try:
pwd_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwd_mgr.add_password(None, url, Basic_Auth[0], Basic_Auth[1])
auth_handler = urllib2.HTTPBasicAuthHandler(pwd_mgr)
opener = urllib2.build_opener(auth_handler)
urllib2.install_opener(opener)
except Exception as e:
print "[!] Basic Auth Error:",e
sys.exit(1)
if self.query_data:
request = urllib2.Request(url, data=json.dumps(self.query_data), headers=self.query_headers)
else:
request = urllib2.Request(url, None, headers=self.query_headers)
response = urllib2.urlopen(request)
# print response
if response:
print "[<] {} OK".format(response.code)
if self.Raw:
return response
else:
html = response.read()
return html
def Send(self, uri):
# The SSI daemon are looking for this, and opens a new FD (5), but this does'nt actually
# matter for the functionality of this exploit, only for future references.
headers = {
'User-Agent' : 'MSIE',
}
# Connect-timeout in seconds
timeout = 5
socket.setdefaulttimeout(timeout)
url = '%s://%s%s' % (self.proto, self.host, uri)
if self.verbose:
print "[Verbose] Sending:", url
if self.proto == 'https':
if hasattr(ssl, '_create_unverified_context'):
print "[i] Creating SSL Default Context"
ssl._create_default_https_context = ssl._create_unverified_context
if self.credentials:
Basic_Auth = self.credentials.split(':')
if self.verbose:
print "[Verbose] User:",Basic_Auth[0],"Password:",Basic_Auth[1]
try:
pwd_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwd_mgr.add_password(None, url, Basic_Auth[0], Basic_Auth[1])
auth_handler = urllib2.HTTPBasicAuthHandler(pwd_mgr)
opener = urllib2.build_opener(auth_handler)
urllib2.install_opener(opener)
except Exception as e:
print "[!] Basic Auth Error:",e
sys.exit(1)
if self.noexploit and not self.verbose:
print "[<] 204 Not Sending!"
html = "Not sending any data"
else:
data = None
req = urllib2.Request(url, data, headers)
rsp = urllib2.urlopen(req)
if rsp:
print "[<] %s OK" % rsp.code
html = rsp.read()
return html
def openWebIF(self, part = None, reader = None):
self.proto = "http"
if config.oscaminfo.userdatafromconf.value:
self.ip = "127.0.0.1"
udata = self.getUserData()
if isinstance(udata, str):
if "httpuser" in udata:
self.username=""
elif "httppwd" in udata:
self.password = ""
else:
return False, udata
else:
self.port = udata[2]
self.username = udata[0]
self.password = udata[1]
else:
self.ip = ".".join("%d" % d for d in config.oscaminfo.ip.value)
self.port = str(config.oscaminfo.port.value)
self.username = str(config.oscaminfo.username.value)
self.password = str(config.oscaminfo.password.value)
if self.port.startswith( '+' ):
self.proto = "https"
self.port.replace("+","")
if part is None:
self.url = "%s://%s:%s/oscamapi.html?part=status" % ( self.proto, self.ip, self.port )
else:
self.url = "%s://%s:%s/oscamapi.html?part=%s" % ( self.proto, self.ip, self.port, part )
if part is not None and reader is not None:
self.url = "%s://%s:%s/oscamapi.html?part=%s&label=%s" % ( self.proto, self.ip, self.port, part, reader )
pwman = urllib2.HTTPPasswordMgrWithDefaultRealm()
pwman.add_password( None, self.url, self.username, self.password )
handlers = urllib2.HTTPDigestAuthHandler( pwman )
opener = urllib2.build_opener( urllib2.HTTPHandler, handlers )
urllib2.install_opener( opener )
request = urllib2.Request( self.url )
err = False
try:
data = urllib2.urlopen( request ).read()
# print data
except urllib2.URLError, e:
if hasattr(e, "reason"):
err = str(e.reason)
elif hasattr(e, "code"):
err = str(e.code)
if err is not False:
print "[OScaminfo] Fehler: %s" % err
return False, err
else:
return True, data
def send(address, message, requestType, useAuth=False, username="", password=""):
"""
Used for sending requests that do not require message body, like GET and DELETE.
Params: address of the webservice (string)
message to the webservice (string)
request type for the message (string, GET or DELETE)
"""
try:
if not address.startswith("http://"):
address = "http://"+address
url = address + message
if useAuth:
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, url, username, password)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
request = urllib2.Request(url)
request.get_method = lambda: requestType
response = opener.open(request)
else:
opener = urllib2.build_opener(urllib2.HTTPHandler)
request = urllib2.Request(url)
request.get_method = lambda: requestType
response = opener.open(request)
data = response.read()
data = data.replace('\n',' ')
except urllib2.HTTPError, err:
data = traceback.format_exc()
if err.code == 401:
data = "Error: HTTP Status Code 401. Authentication with the Web Service failed. Please ensure that the authentication credentials are set, are correct, and that authentication mode is enabled."
else:
data = err.read()
try:
data = json.loads(data)
except:
pass
return data
def pSend(address, message, requestType, body, useAuth=False, username="", password=""):
"""
Used for sending requests that require a message body, like PUT and POST.
Params: address of the webservice (string)
message to the webservice (string)
request type for the message (string, PUT or POST)
message body for the request (string, JSON object)
"""
response = ""
try:
if not address.startswith("http://"):
address = "http://"+address
url = address + message
if useAuth:
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, url, username, password)
handler = urllib2.HTTPBasicAuthHandler(password_mgr)
opener = urllib2.build_opener(handler)
request = urllib2.Request(url, data=body)
request.get_method = lambda: requestType
response = opener.open(request)
else:
opener = urllib2.build_opener(urllib2.HTTPHandler)
request = urllib2.Request(url, data=body)
request.get_method = lambda: requestType
response = opener.open(request)
data = response.read()
except urllib2.HTTPError, err:
data = traceback.format_exc()
if err.code == 401:
data = "Error: HTTP Status Code 401. Authentication with the Web Service failed. Please ensure that the authentication credentials are set, are correct, and that authentication mode is enabled."
else:
data = err.read()
try:
data = json.loads(data)
except:
pass
return data