def get_ips_from_url(url):
"""
Retrieve IPs from url
:param str url: The url to resolve
:rtype: list
:return: the list of resolved IP address for given url
"""
try:
parsed = urlparse(url)
if parsed.hostname:
socket.setdefaulttimeout(5)
ips = socket.gethostbyname_ex(parsed.hostname)[2]
return ips
except (ValueError, socket.error, socket.gaierror, socket.herror, socket.timeout):
pass
python类setdefaulttimeout()的实例源码
def Scrape(url):
timeout = 10
socket.setdefaulttimeout(timeout)
#Collecting html content.
headers = {'User-Agent': 'TorScrapper - Onion scrapper | github.com/ConanKapoor/TorScrapper.git' }
req = urllib.request.Request(url,None,headers)
response = urllib.request.urlopen(req)
#Using BeautifulSoup to parse html object response.
page = BeautifulSoup(response.read(),'html.parser')
#Saving output
token = re.sub(r'[^\w]', '', url)
name = os.path.abspath("") + '/Output/Scraped-' + token +'.html'
file = open(name,'w')
file.write(str(page))
file.close()
# Taking input.
def run(self):
# loop over the "in" queue and get a new port and scan it
socket.setdefaulttimeout(TIMEOUT)
while 1:
host, port = self.inq.get()
sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# connect to the given host:port
sd.connect((host, port))
except socket.error:
# set the CLOSED flag
self.outq.put((host, port, 'CLOSED'))
else:
# set the OPEN flag
self.outq.put((host, port, 'OPEN'))
sd.close()
def _do_trakt_auth_post(self, url, data):
try:
session = self.get_session()
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + session,
'trakt-api-version': '2',
'trakt-api-key': self.CLIENT_ID
}
# timeout in seconds
timeout = 5
socket.setdefaulttimeout(timeout)
request = urllib2.Request(url, data, headers)
response = urllib2.urlopen(request).read()
self.logger.info('Response: {0}'.format(response))
return response
except urllib2.HTTPError as e:
self.logger.error('Unable to submit post data {url} - {error}'.format(url=url, error=e.reason))
raise
def test(host):
socket.setdefaulttimeout(5)
if int(verbose) == 1:
s.send("PRIVMSG %s :%s%s\r\n" % (CHAN, "[+] Testing:", host))
try:
if host[:7] != "http://":
host = "http://"+host
source = urllib2.urlopen(host).read()
if re.search(MATCH, source):
s.send("PRIVMSG %s :%s%s\r\n" % (CHAN, "[!] Found:", host))
file = open("foundsqli.txt", "a")
file.write("\n[!] Found: "+host)
file.close()
else:
if int(verbose) == 1:
s.send("PRIVMSG %s :%s%s\r\n" % (CHAN, "[-] Not Vuln:", host))
except(socket.gaierror, socket.timeout, socket.error), msg:
s.send("PRIVMSG %s :%s%s @ %s\r\n" % (CHAN, "[-] Error: ",msg, host))
except:
pass
def __setHTTPTimeout():
"""
Set the HTTP timeout
"""
if conf.timeout:
debugMsg = "setting the HTTP timeout"
logger.debug(debugMsg)
conf.timeout = float(conf.timeout)
if conf.timeout < 3.0:
warnMsg = "the minimum HTTP timeout is 3 seconds, sqlmap "
warnMsg += "will going to reset it"
logger.warn(warnMsg)
conf.timeout = 3.0
else:
conf.timeout = 10.0
socket.setdefaulttimeout(conf.timeout)
def check(ip, port, timeout):
try:
socket.setdefaulttimeout(timeout)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, int(port)))
flag = "envi"
# envi
# dump
# reqs
# ruok
# stat
s.send(flag)
data = s.recv(1024)
print(data)
s.close()
if 'Environment' in data:
return u"Zookeeper Unauthorized access"
except:
pass
def run(self):
# loop over the "in" queue and get a new port and scan it
socket.setdefaulttimeout(TIMEOUT)
while 1:
host, port = self.inq.get()
sd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# connect to the given host:port
sd.connect((host, port))
except socket.error:
# set the CLOSED flag
self.outq.put((host, port, 'CLOSED'))
else:
# set the OPEN flag
self.outq.put((host, port, 'OPEN'))
sd.close()
def checkTraficLight(self):
self.activityTimer.callback.remove(self.checkTraficLight)
self.activityTimer.start(100, False)
from urllib import urlopen
import socket
import os
currentTimeoutDefault = socket.getdefaulttimeout()
socket.setdefaulttimeout(3)
message = ""
picon = None
default = True
socket.setdefaulttimeout(currentTimeoutDefault)
if default:
self.showDisclaimer()
else:
message += "\n" + _("Do you want to update your receiver?")
self.session.openWithCallback(self.startActualUpdate, MessageBox, message, default = default, picon = picon)
def ipkgCallback(self, event, param):
if event == IpkgComponent.EVENT_DONE:
if self.updating:
self.updating = False
self.ipkg.startCmd(IpkgComponent.CMD_UPGRADE_LIST)
elif self.ipkg.currentCommand == IpkgComponent.CMD_UPGRADE_LIST:
self.total_packages = len(self.ipkg.getFetchedList())
print ('[OnlineVersionCheck] %s Updates available' % self.total_packages)
if self.total_packages:
from urllib import urlopen
import socket
currentTimeoutDefault = socket.getdefaulttimeout()
socket.setdefaulttimeout(3)
config.softwareupdate.updatefound.setValue(True)
try:
config.softwareupdate.updateisunstable.setValue(urlopen("http://odisealinux.com/feeds/" + getImageVersion() + "/status").read())
except:
config.softwareupdate.updateisunstable.setValue(1)
socket.setdefaulttimeout(currentTimeoutDefault)
else:
config.softwareupdate.updatefound.setValue(False)
else:
config.softwareupdate.updatefound.setValue(False)
pass
def build_http():
"""Builds httplib2.Http object
Returns:
A httplib2.Http object, which is used to make http requests, and which has timeout set by default.
To override default timeout call
socket.setdefaulttimeout(timeout_in_sec)
before interacting with this method.
"""
if socket.getdefaulttimeout() is not None:
http_timeout = socket.getdefaulttimeout()
else:
http_timeout = DEFAULT_HTTP_TIMEOUT_SEC
return httplib2.Http(timeout=http_timeout)
def get_browser(self):
'''Returns a mechanize.Browser object configured with the framework's global options.'''
br = mechanize.Browser()
# set the user-agent header
br.addheaders = [('User-agent', self._global_options['user-agent'])]
# set debug options
if self._global_options['verbosity'] >= 2:
br.set_debug_http(True)
br.set_debug_redirects(True)
br.set_debug_responses(True)
# set proxy
if self._global_options['proxy']:
br.set_proxies({'http': self._global_options['proxy'], 'https': self._global_options['proxy']})
# additional settings
br.set_handle_robots(False)
# set timeout
socket.setdefaulttimeout(self._global_options['timeout'])
return br
def single_host(ip):
try:
socket.inet_aton(ip)
except socket.error:
return 'Error: Invalid IP address.'
results = ''
for p in PORTS:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c = s.connect_ex((ip, p))
socket.setdefaulttimeout(0.5)
state = 'open' if not c else 'closed'
results += '{:>5}/tcp {:>7}\n'.format(p, state)
return results.rstrip()
def single_host(ip):
try:
socket.inet_aton(ip)
except socket.error:
return 'Error: Invalid IP address.'
results = ''
for p in PORTS:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
c = s.connect_ex((ip, p))
socket.setdefaulttimeout(0.5)
state = 'open' if not c else 'closed'
results += '{:>5}/tcp {:>7}\n'.format(p, state)
return results.rstrip()
def __recv_msg_compat(sock,size,timeout): # compatibility implementation for non-MSG_WAITALL / M2Crypto
msglen=0
msglist=[]
# Receive chunks of max. 60kb size:
# (rather arbitrary limit, but it avoids memory/buffer problems on certain OSes -- VAX/VMS, Windows)
while msglen<size:
chunk=sock.recv(min(60000,size-msglen))
if not chunk:
if hasattr(sock,'pending'):
# m2crypto ssl socket - they have problems with a defaulttimeout
if socket.getdefaulttimeout() != None:
raise ConnectionClosedError("m2crypto SSL can't be used when socket.setdefaulttimeout() has been set")
err = ConnectionClosedError('connection lost')
err.partialMsg=''.join(msglist) # store the message that was received until now
raise err
msglist.append(chunk)
msglen+=len(chunk)
return ''.join(msglist)
# Send a message over a socket. Raises ConnectionClosedError if the msg
# couldn't be sent (the connection has probably been lost then).
# We need this because 'send' isn't guaranteed to send all desired
# bytes in one call, for instance, when network load is high.
def TTSBaidu(self, tid, txt, lan, spd):
'''
get the BAIDU.COM's TTS url
filename: save the txt's Speech in the file with filetype .wav
lan: language, 'en' for English or 'zh' for Chinese
txt: the TTS text
spd: the speedding of read
'''
socket.setdefaulttimeout(34.0)
try:
#print('processing... ',tid, txt)
ttsUrl = genTTSUrl(lan ,txt, spd)
c = getpage(ttsUrl)
#?master?????
#print('processing...finished',tid)
self.results[tid]=c
except urllib.error.URLError as e:
print("error:URLError ",e," we will try again...tid:",tid)
self.TTSBaidu(tid, txt, lan, spd)
except socket.timeout:
print("error: TTSBaidu time out!, we will try again...tid:",tid )
self.TTSBaidu(tid, txt, lan, spd)
finally:
pass
def workFunc(task):
try:
socket.setdefaulttimeout(10)
i = requests.get(task['url'])
except:
return((False, task))
if i.status_code == requests.codes.ok:
filename = '{0}'.format(task['filename'].replace(' ', '_'))
f = iopen(filename, 'wb')
f.write(i.content)
f.close()
return((True, task))
else:
return((False, task))
# read in tasking for files not already downloaded
def run():
try:
try:
socket.setdefaulttimeout(float(variables['timeout'][0]))
except ValueError:
printError('invalid timeout')
return ModuleError("invalid timeout")
conn = http.client.HTTPConnection(variables['target'][0])
conn.request("HEAD","/index.html")
res = conn.getresponse()
results = res.getheaders()
print('')
for item in results:
print(colors.yellow+item[0], item[1]+colors.end)
print('')
return results
except http.client.InvalidURL:
printError('invalid url')
return ("invalid url")
except socket.gaierror:
printError('name or service not known')
return ModuleError("name or service not known")
except socket.timeout:
printError('timeout')
return ModuleError("timeout")
def getLatestImageTimestamp(self):
currentTimeoutDefault = socket.getdefaulttimeout()
socket.setdefaulttimeout(3)
try:
# TODO: Use Twisted's URL fetcher, urlopen is evil. And it can
# run in parallel to the package update.
from time import strftime
from datetime import datetime
imageVersion = about.getImageTypeString().split(" ")[1]
imageVersion = (int(imageVersion) < 5 and "%.1f" or "%s") % int(imageVersion)
url = "http://openpli.org/download/timestamp/%s~%s" % (HardwareInfo().get_device_model(), imageVersion)
try:
latestImageTimestamp = datetime.fromtimestamp(int(urlopen(url, timeout=5).read())).strftime(_("%Y-%m-%d %H:%M"))
except:
# OpenPli 5.0 uses python 2.7.11 and here we need to bypass the certificate check
from ssl import _create_unverified_context
latestImageTimestamp = datetime.fromtimestamp(int(urlopen(url, timeout=5, context=_create_unverified_context()).read())).strftime(_("%Y-%m-%d %H:%M"))
except:
latestImageTimestamp = ""
socket.setdefaulttimeout(currentTimeoutDefault)
return latestImageTimestamp
def start(self,stopEvent,port=PORT_NUMBER):
global PORT_NUMBER
global HOST_NAME
global g_stopEvent
print 'port',port,'HOST_NAME',HOST_NAME
g_stopEvent = stopEvent
socket.setdefaulttimeout(10)
server_class = ThreadedHTTPServer
#MyHandler.protocol_version = "HTTP/1.1"
MyHandler.protocol_version = "HTTP/1.1"
httpd = server_class((HOST_NAME, port), MyHandler)
print "XBMCLocalProxy Starts - %s:%s" % (HOST_NAME, port)
while(True and not stopEvent.isSet()):
httpd.handle_request()
httpd.server_close()
print "XBMCLocalProxy Stops %s:%s" % (HOST_NAME, port)
def start(self,stopEvent,port=PORT_NUMBER):
global PORT_NUMBER
global HOST_NAME
global g_stopEvent
print 'port',port,'HOST_NAME',HOST_NAME
g_stopEvent = stopEvent
socket.setdefaulttimeout(10)
server_class = ThreadedHTTPServer
#MyHandler.protocol_version = "HTTP/1.1"
MyHandler.protocol_version = "HTTP/1.1"
httpd = server_class((HOST_NAME, port), MyHandler)
print "XBMCLocalProxy Starts - %s:%s" % (HOST_NAME, port)
while(True and not stopEvent.isSet()):
httpd.handle_request()
httpd.server_close()
print "XBMCLocalProxy Stops %s:%s" % (HOST_NAME, port)
def build_http():
"""Builds httplib2.Http object
Returns:
A httplib2.Http object, which is used to make http requests, and which has timeout set by default.
To override default timeout call
socket.setdefaulttimeout(timeout_in_sec)
before interacting with this method.
"""
if socket.getdefaulttimeout() is not None:
http_timeout = socket.getdefaulttimeout()
else:
http_timeout = DEFAULT_HTTP_TIMEOUT_SEC
return httplib2.Http(timeout=http_timeout)
banner_grabber.py 文件源码
项目:Python-Penetration-Testing-for-Developers
作者: PacktPublishing
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def main():
ports = [21,23,22]
ips = "192.168.195."
for octet in range(0,255):
for port in ports:
ip = ips + str(octet)
#print("[*] Testing port %s at IP %s") % (port, ip)
try:
socket.setdefaulttimeout(1)
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((ip,port))
output = s.recv(1024)
print("[+] The banner: %s for IP: %s at Port: %s") % (output,ip,port)
except:
print("[-] Failed to Connect to %s:%s") % (ip, port)
finally:
s.close()
portsc14.py 文件源码
项目:Python-Penetration-Testing-for-Developers
作者: PacktPublishing
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def scantcp(threadName,rmip,r1,r2,c):
try:
for port in range(r1,r2):
sock= socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#sock= socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
socket.setdefaulttimeout(c)
result = sock.connect_ex((rmip,port))
if result==0:
print "Port Open:---->\t", port,"--", data.get(port, "Not in Database")
sock.close()
except KeyboardInterrupt:
print "You stop this "
sys.exit()
except socket.gaierror:
print "Hostname could not be resolved"
sys.exit()
except socket.error:
print "could not connect to server"
sys.exit()
shelf.close()
def check(ip, port, timeout):
try:
socket.setdefaulttimeout(timeout)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, int(port)))
data = binascii.a2b_hex(
"3a000000a741000000000000d40700000000000061646d696e2e24636d640000000000ffffffff130000001069736d6173746572000100000000")
s.send(data)
result = s.recv(1024)
if "ismaster" in result:
getlog_data = binascii.a2b_hex(
"480000000200000000000000d40700000000000061646d696e2e24636d6400000000000100000021000000026765744c6f670010000000737461727475705761726e696e67730000")
s.send(getlog_data)
result = s.recv(1024)
if "totalLinesWritten" in result:
return u"?????"
except Exception, e:
pass
def check(ip, port, timeout):
try:
socket.setdefaulttimeout(timeout)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, port))
flag = "PUT /vultest.txt HTTP/1.1\r\nHost: %s:%d\r\nContent-Length: 9\r\n\r\nxxscan0\r\n\r\n" % (ip, port)
s.send(flag)
time.sleep(1)
data = s.recv(1024)
s.close()
if 'PUT' in data:
url = 'http://' + ip + ":" + str(port) + '/vultest.txt'
request = urllib2.Request(url)
res_html = urllib2.urlopen(request, timeout=timeout).read(204800)
if 'xxscan0' in res_html:
return u"iis webdav??"
except Exception, e:
pass
def check(ip, port, timeout):
socket.setdefaulttimeout(timeout)
user_list = ['root']
for user in user_list:
for pass_ in PASSWORD_DIC:
try:
pass_ = str(pass_.replace('{user}', user))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, int(port)))
packet = sock.recv(254)
# print packet
plugin, scramble = get_scramble(packet)
auth_data = get_auth_data(user, pass_, scramble, plugin)
sock.send(auth_data)
result = sock.recv(1024)
if result == "\x07\x00\x00\x02\x00\x00\x00\x02\x00\x00\x00":
return u"?????????%s????%s" % (user, pass_)
except Exception, e:
if "Errno 10061" in str(e) or "timed out" in str(e): return
def check(ip, port, timeout):
try:
socket.setdefaulttimeout(timeout)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, port))
filename = random_str(6)
flag = "PUT /fileserver/sex../../..\\styles/%s.txt HTTP/1.0\r\nContent-Length: 9\r\n\r\nxxscan0\r\n\r\n"%(filename)
s.send(flag)
time.sleep(1)
s.recv(1024)
s.close()
url = 'http://' + ip + ":" + str(port) + '/styles/%s.txt'%(filename)
res_html = urllib2.urlopen(url, timeout=timeout).read(1024)
if 'xxscan0' in res_html:
return u"???????????" + url
except:
pass
def check(ip, port, timeout):
try:
socket.setdefaulttimeout(timeout)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, int(port)))
s.send("INFO\r\n")
result = s.recv(1024)
if "redis_version" in result:
return u"?????"
elif "Authentication" in result:
for pass_ in PASSWORD_DIC:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((ip, int(port)))
s.send("AUTH %s\r\n" % (pass_))
result = s.recv(1024)
if '+OK' in result:
return u"?????????%s" % (pass_)
except Exception, e:
pass
def audit(arg):
port =9200
host = arg
pluginList = ['test','kopf', 'HQ', 'marvel', 'bigdesk' ,'head' ]
try:
for plugin in pluginList:
socket.setdefaulttimeout(3)
s = socket.socket()
s.connect((host,port))
s.send("GET /_plugin/%s/ HTTP/1.0\n"
"Host: %s\n\n" % (plugin, host))
file = s.recv(16)
if ("HTTP/1.0 200 OK" in file):
grab(plugin,host,port)
break
except Exception:
pass
finally:
s.close()