def test_read_very_lazy_A(self):
want = ['x' * 100, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
self.assertEqual('', telnet.read_very_lazy())
data = ''
while True:
try:
read_data = telnet.read_very_lazy()
except EOFError:
break
data += read_data
if not read_data:
telnet.fill_rawq()
self.assertEqual('', telnet.cookedq)
telnet.process_rawq()
self.assertTrue(want[0].startswith(data))
self.assertEqual(data, want[0])
python类Telnet()的实例源码
def _test_command(self, data):
""" helper for testing IAC + cmd """
self.setUp()
self.dataq.put(data)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
nego = nego_collector()
telnet.set_option_negotiation_callback(nego.do_nego)
txt = telnet.read_all()
cmd = nego.seen
self.assertTrue(len(cmd) > 0) # we expect at least one command
self.assertIn(cmd[0], self.cmds)
self.assertEqual(cmd[1], tl.NOOPT)
self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
nego.sb_getter = None # break the nego => telnet cycle
self.tearDown()
def _test_read_any_eager_A(self, func_name):
"""
read_very_eager()
Read all data available already queued or on the socket,
without blocking.
"""
want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
expects = want[1] + want[2]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
func = getattr(telnet, func_name)
data = ''
while True:
try:
data += func()
self.assertTrue(expects.startswith(data))
except EOFError:
break
self.assertEqual(expects, data)
def test_read_lazy_A(self):
want = ['x' * 100, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
self.assertEqual('', telnet.read_lazy())
data = ''
while True:
try:
read_data = telnet.read_lazy()
data += read_data
if not read_data:
telnet.fill_rawq()
except EOFError:
break
self.assertTrue(want[0].startswith(data))
self.assertEqual(data, want[0])
def test_read_very_lazy_A(self):
want = ['x' * 100, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
self.assertEqual('', telnet.read_very_lazy())
data = ''
while True:
try:
read_data = telnet.read_very_lazy()
except EOFError:
break
data += read_data
if not read_data:
telnet.fill_rawq()
self.assertEqual('', telnet.cookedq)
telnet.process_rawq()
self.assertTrue(want[0].startswith(data))
self.assertEqual(data, want[0])
def _test_command(self, data):
""" helper for testing IAC + cmd """
self.setUp()
self.dataq.put(data)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
nego = nego_collector()
telnet.set_option_negotiation_callback(nego.do_nego)
txt = telnet.read_all()
cmd = nego.seen
self.assertTrue(len(cmd) > 0) # we expect at least one command
self.assertIn(cmd[0], self.cmds)
self.assertEqual(cmd[1], tl.NOOPT)
self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
nego.sb_getter = None # break the nego => telnet cycle
self.tearDown()
def _establish_connection(cls, remote_access_data):
"""Establish telnet connection"""
try:
time.sleep(cls.SLEEP_TIME)
tnet = telnetlib.Telnet(remote_access_data.address, timeout=cls.TIMEOUT)
except EnvironmentError:
# print "Telnet: Error connecting to remote device"
return None
if remote_access_data.username != 'None':
tnet.read_until("login:")
tnet.write("\r\n" + remote_access_data.username + "\r\n")
if remote_access_data.identifier != 'None':
tnet.read_until("Password: ")
tnet.write("\r\n" + remote_access_data.identifier + "\r\n")
tnet.read_until(">")
return tnet
def cmd(self, cmd_str, wait_ret=True):
if not isinstance(cmd_str, str):
raise TypeError("Invalid command string", cmd_str)
if not self.is_open:
raise attenuator.Error("Telnet connection not open for commands")
cmd_str.strip(self.tx_cmd_separator)
self._tn.read_until(_ascii_string(self.prompt), 2)
self._tn.write(_ascii_string(cmd_str + self.tx_cmd_separator))
if wait_ret is False:
return None
match_idx, match_val, ret_text = self._tn.expect(
[_ascii_string("\S+" + self.rx_cmd_separator)], 1)
if match_idx == -1:
raise attenuator.Error(
"Telnet command failed to return valid data")
ret_text = ret_text.decode()
ret_text = ret_text.strip(self.tx_cmd_separator + self.rx_cmd_separator
+ self.prompt)
return ret_text
def test_telnet(hostname):
# This attempts to test the validity of a controller
# It returns a tuple (probably not the best way to do this) which is in the format:
# Initial Connection Test (bool), Version Response Test (bool), Version Response (str)
try:
tn = telnetlib.Telnet(host=hostname, timeout=3)
except socket.timeout:
return False, False, None
try:
tn.write("n\r\n")
version_string = tn.read_until("}",3)
except:
return True, False, None
return True, True, version_string
# The following was used for testing during development
def test_debuglevel_reads(self):
# test all the various places that self.msg(...) is called
given_a_expect_b = [
# Telnet.fill_rawq
(b'a', ": recv b''\n"),
# Telnet.process_rawq
(tl.IAC + bytes([88]), ": IAC 88 not recognized\n"),
(tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"),
(tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
(tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
(tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
]
for a, b in given_a_expect_b:
telnet = test_telnet([a])
telnet.set_debuglevel(1)
txt = telnet.read_all()
self.assertIn(b, telnet._messages)
return
def FetchCiscoLSDB(globals):
sys.stderr.write('fetch cisco LSDB')
LSADB = {}
for r in controlport:
sys.stderr.write('Fetch ' + r + '\n')
tn = telnetlib.Telnet(LXCIP, controlport[r])
LoginToRouter(r, tn)
CLIoutput = ""
for lsatype in ["router", "network"]:
#print "XXXXXXXXXXXXfetch" + lsatype
tn.write("show ip ospf database %s \n\r" % lsatype )
CLIoutput += tn.read_until("#")
tn.read_very_eager()
print "*******************" + str(r) + "**************"
print CLIoutput
print "--------------------------------------------"
LSADB[r] = ParseCLIoutput(CLIoutput,globals)
tn.close()
return LSADB
def portScanner(url,dosyaAdi):
raporIcerik=""
baslangic=int(raw_input("Start port: "))
bitis=int(raw_input("Finish port: "))
for port in range(baslangic, bitis, 1):
try:
i=str(port)
baglanti = telnetlib.Telnet(url, i)
baglanti.write("\n")
print "[+]", str(port), " - ", baglanti.read_all().splitlines()[0]
raporIcerik+="[+]", str(port), " - ", baglanti.read_all().splitlines()[0]+"\n"
baglanti.close()
except:
pass
rapor = open(dosyaAdi, "a")
rapor.write(raporIcerik)
rapor.close()
def method(url,dosyaAdi):
try:
telnetBaglanti = telnetlib.Telnet(url, 80)
telnetBaglanti.write("OPTIONS / HTTP/1.1\n")
komut = "Host: " + url + "\n\n\n\n"
telnetBaglanti.write(komut)
sayfa = telnetBaglanti.read_all()
deger = str(sayfa).find("Allow")
deger2 = str(sayfa).find("\n", deger + 1)
Metodlar = sayfa[deger:deger2]
print "Methods: ", Metodlar
raporIcerik="[+]Methods: "+Metodlar+"\n"
rapor = open(dosyaAdi, "a")
rapor.write(raporIcerik)
rapor.close()
except:
pass
def is_port_open(self, unit=None, port='3306', address=None):
if unit:
addr = unit.info['public-address']
elif address:
addr = address
else:
raise Exception('Please provide a unit or address')
try:
telnetlib.Telnet(addr, port)
return True
except socket.error as e:
if e.errno == 113:
self.log.error("could not connect to %s:%s" % (addr, port))
if e.errno == 111:
self.log.error("connection refused connecting"
" to %s:%s" % (addr,
port))
return False
def get(host, index):
index = int(index)
assert 1 <= index <= 4
tn = telnetlib.Telnet(host, 1234, 1)
tn.read_until(b"\r\n", 0.5)
tn.write(b"login admin admin\n")
tn.read_until(b"250 OK\r\n", 0.5)
tn.write("port {}\n".format(index).encode())
read = tn.read_until(b"\r\n", 0.5)
m = re.match(".*250 (\d).*", read.decode())
if m is None:
raise Exception("NetIO: could not match response")
value = m.group(1)
tn.write(b"quit\n")
tn.close()
return value == "1"
def audit(arg):
port = 23
time = 5
user = 'admin'
password = '<<< %s(un=\'%s\') = %u'
finish = '->'
try:
t = telnetlib.Telnet(arg,port, timeout=time)
t.write(user + '\n')
t.read_until('password: ')
t.write(password + '\n')
str1 = t.read_until(finish)
t.write("?\n")
str = t.read_until(finish)
t.close()
if ('->' in str) and ('exec' in str):
security_hole(arg)
return arg
except Exception, e:
pass
def audit(arg):
port = 23
time = 5
user = 'admin'
password = '<<< %s(un=\'%s\') = %u'
finish = '->'
try:
t = telnetlib.Telnet(arg,port, timeout=time)
t.write(user + '\n')
t.read_until('password: ')
t.write(password + '\n')
str1 = t.read_until(finish)
t.write("?\n")
str = t.read_until(finish)
t.close()
if ('->' in str) and ('exec' in str):
security_hole(arg)
except Exception, e:
pass
def _test_read_any_eager_A(self, func_name):
"""
read_very_eager()
Read all data available already queued or on the socket,
without blocking.
"""
want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
expects = want[1] + want[2]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
func = getattr(telnet, func_name)
data = ''
while True:
try:
data += func()
self.assertTrue(expects.startswith(data))
except EOFError:
break
self.assertEqual(expects, data)
def test_read_lazy_A(self):
want = ['x' * 100, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
self.assertEqual('', telnet.read_lazy())
data = ''
while True:
try:
read_data = telnet.read_lazy()
data += read_data
if not read_data:
telnet.fill_rawq()
except EOFError:
break
self.assertTrue(want[0].startswith(data))
self.assertEqual(data, want[0])