python类ETIMEDOUT的实例源码

base_clf_pn53x.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_command_with_rsp_timeout_error(self, chipset):
        cmd = HEX('0000ff 05fb d4 00 313233 96 00')
        rsp = IOError(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT))
        chipset.transport.read.side_effect = [ACK(), rsp]
        with pytest.raises(IOError) as excinfo:
            chipset.command(0, b'123', 1.0)
        assert excinfo.value.errno == errno.ETIMEDOUT
        assert chipset.transport.read.mock_calls == [call(100), call(1000)]
        assert chipset.transport.write.mock_calls == [call(cmd), call(ACK())]
base_clf_pn53x.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_listen_tta_not_activated(self, device):
        device.chipset.transport.read.side_effect = [
            ACK(), RSP('09 00'),                          # WriteRegister
            ACK(), IOError(errno.ETIMEDOUT, ""),          # TgInitAsTarget
        ]
        target = nfc.clf.LocalTarget('106A')
        target.sens_res = HEX("4400")
        target.sel_res = HEX("00")
        target.sdd_res = HEX("08010203")
        assert device.listen_tta(target, 1.0) is None
base_clf_pn53x.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_listen_tta_as_tt4_initiator_cmd_empty(self, device):
        device.chipset.transport.read.side_effect = [
            ACK(), RSP('09 00'),                        # WriteRegister
            ACK(), RSP('8D 00 E0 80'),                  # TgInitAsTarget
            ACK(), RSP('91 00'),                        # TgResponseToInitiator
            ACK(), RSP('89 00'),                        # TgGetInitiatorCommand
            ACK(), IOError(errno.ETIMEDOUT, ""),        # TgInitAsTarget
        ]
        target = nfc.clf.LocalTarget('106A')
        target.sens_res = HEX("4400")
        target.sel_res = HEX("20")
        target.sdd_res = HEX("08010203")
        assert device.listen_tta(target, 1.0) is None
        assert device.chipset.transport.read.call_count == 10
        device.chipset.transport.write.assert_any_call(CMD('90 0578807002'))
base_clf_pn53x.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_listen_tta_wrong_bitrate_received(self, device):
        device.chipset.transport.read.side_effect = [
            ACK(), RSP('09 00'),                        # WriteRegister
            ACK(), RSP('8D 11 00'),                     # TgInitAsTarget
            ACK(), IOError(errno.ETIMEDOUT, ""),        # TgInitAsTarget
        ]
        target = nfc.clf.LocalTarget('106A')
        target.sens_res = HEX("4400")
        target.sel_res = HEX("40")
        target.sdd_res = HEX("08010203")
        assert device.listen_tta(target, 1.0) is None
        assert device.chipset.transport.read.call_count == 6
base_clf_pn53x.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_listen_dep_not_activated(self, device):
        device.chipset.transport.read.side_effect = [
            ACK(), RSP('09 00'),                          # WriteRegister
            ACK(), IOError(errno.ETIMEDOUT, ""),          # TgInitAsTarget
        ]
        target = nfc.clf.LocalTarget()
        target.sensf_res = HEX("01 01fe010203040506 0000000000000000 0000")
        target.sens_res = HEX("0101")
        target.sel_res = HEX("40")
        target.sdd_res = HEX("08010203")
        target.atr_res = HEX("D501 d0d1d2d3d4d5d6d7d8d9 0000000800")
        assert device.listen_dep(target, 0.001) is None
        assert device.chipset.transport.read.call_count == 4
test_clf_rcs956.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_listen_dep_not_activated(self, device):
        device.chipset.transport.read.side_effect = [
            ACK(), RSP('19'),                           # ResetMode
            ACK(), RSP('09 00'),                        # WriteRegister
            ACK(), RSP('33'),                           # RFConfiguration
            ACK(), RSP('13'),                           # SetParameters
            ACK(), RSP('09 00'),                        # WriteRegister
            ACK(), IOError(errno.ETIMEDOUT, ""),        # TgInitAsTarget
        ]
        target = nfc.clf.LocalTarget()
        target.sensf_res = HEX("01 01fe010203040506 0000000000000000 0000")
        target.sens_res = HEX("0101")
        target.sel_res = HEX("40")
        target.sdd_res = HEX("08010203")
        target.atr_res = HEX("D501 d0d1d2d3d4d5d6d7d8d9 0000000800")
        assert device.listen_dep(target, 0.001) is None
        assert device.chipset.transport.read.call_count == 12
        assert device.chipset.transport.write.mock_calls == [call(_) for _ in [
            CMD('18 01'), ACK(),                        # ResetMode
            CMD('08 630b80'),                           # WriteRegister
            CMD('32 82080208'),                         # RFConfiguration
            CMD('12 08'),                               # SetParameters
            CMD('08 63017b6302b06303b0'),               # WriteRegister
            CMD('8c 0201010102034001 fe01020304050600'
                '   0000000000000000 0001fe0102030405'
                '   060000'),                           # TgInitAsTarget
            ACK(),
        ]]
test_clf_rcs956.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_listen_dep_ioerror_timeout_after_psl(self, device):
        atr_req = 'D400 30313233343536373839 00000000'
        atr_res = 'D501 d0d1d2d3d4d5d6d7d8d9 0000000800'
        psl_req = 'D404 00 12 03'
        device.chipset.transport.read.side_effect = [
            ACK(), RSP('19'),                           # ResetMode
            ACK(), RSP('09 00'),                        # WriteRegister
            ACK(), RSP('33'),                           # RFConfiguration
            ACK(), RSP('13'),                           # SetParameters
            ACK(), RSP('09 00'),                        # WriteRegister
            ACK(), RSP('8D 05 11' + atr_req),           # TgInitAsTarget
            ACK(), RSP('93 00'),                        # TgSetGeneralBytes
            ACK(), RSP('89 00 06' + psl_req),           # TgGetInitiatorCommand
            ACK(), self.reg_rsp('FD'),                  # ReadRegister
            ACK(), RSP('09 00'),                        # WriteRegister
            ACK(), RSP('91 00'),                        # TgResponseToInitiator
            ACK(), self.reg_rsp('FD'),                  # ReadRegister
            ACK(), RSP('09 00'),                        # WriteRegister
            ACK(), IOError(errno.ETIMEDOUT, ""),        # TgGetInitiatorCommand
        ]
        target = nfc.clf.LocalTarget()
        target.sensf_res = HEX("01 01fe010203040506 0000000000000000 0000")
        target.sens_res = HEX("0101")
        target.sel_res = HEX("40")
        target.sdd_res = HEX("08010203")
        target.atr_res = HEX(atr_res)
        assert device.listen_dep(target, 1.0) is None
        assert device.chipset.transport.read.call_count == 28
test_clf_transport.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_read(self, serial, tty):
        serial.return_value.read.side_effect = [
            HEX('0000ff00ff00'),
        ]
        assert tty.read(0) == b'\x00\x00\xff\x00\xff\x00'
        assert serial.return_value.read.mock_calls == [call(6)]
        assert tty.tty.timeout == 0.05

        serial.return_value.read.reset_mock()
        serial.return_value.read.side_effect = [
            HEX('0000ff03fbd5'), HEX('01020000'),
        ]
        assert tty.read(51) == b'\x00\x00\xff\x03\xfb\xd5\x01\x02\x00\x00'
        assert serial.return_value.read.mock_calls == [call(6), call(4)]
        assert tty.tty.timeout == 0.051

        serial.return_value.read.reset_mock()
        serial.return_value.read.side_effect = [
            HEX('0000ffffff01'), HEX('01fed5'), bytearray(256) + HEX('2b00'),
        ]
        tty.read(100)
        assert serial.return_value.read.mock_calls == [
            call(6), call(3), call(258),
        ]
        assert tty.tty.timeout == 0.1

        serial.return_value.read.reset_mock()
        serial.return_value.read.side_effect = [HEX('')]
        with pytest.raises(IOError) as excinfo:
            tty.read(1100)
        assert excinfo.value.errno == errno.ETIMEDOUT
        assert serial.return_value.read.mock_calls == [call(6)]
        assert tty.tty.timeout == 1.1

        tty.tty = None
        assert tty.read(1000) is None
test_clf_transport.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_write(self, usb):
        usb.write(b'12')
        usb.usb_dev.bulkWrite.assert_called_with(0x04, b'12', 0)

        usb.write(b'12', 100)
        usb.usb_dev.bulkWrite.assert_called_with(0x04, b'12', 100)

        usb.write(64 * b'1', 100)
        usb.usb_dev.bulkWrite.assert_has_calls([
            call(0x04, 64 * b'1', 100),
            call(0x04, b'', 100),
        ])

        usb.usb_dev.bulkWrite.side_effect = [
            nfc.clf.transport.libusb.USBErrorTimeout,
            nfc.clf.transport.libusb.USBErrorNoDevice,
            nfc.clf.transport.libusb.USBError,
        ]
        with pytest.raises(IOError) as excinfo:
            usb.write(b'12')
        assert excinfo.value.errno == errno.ETIMEDOUT

        with pytest.raises(IOError) as excinfo:
            usb.write(b'12')
        assert excinfo.value.errno == errno.ENODEV

        with pytest.raises(IOError) as excinfo:
            usb.write(b'12')
        assert excinfo.value.errno == errno.EIO

        usb.usb_out = None
        assert usb.write(b'12') is None
transport.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 64 收藏 0 点赞 0 评论 0
def write(self, frame, timeout=0):
        if self.usb_out is not None:
            log.log(logging.DEBUG-1, ">>> %s", hexlify(frame))
            try:
                ep_addr = self.usb_out.getAddress()
                self.usb_dev.bulkWrite(ep_addr, bytes(frame), timeout)
                if len(frame) % self.usb_out.getMaxPacketSize() == 0:
                    self.usb_dev.bulkWrite(ep_addr, b'', timeout)
            except libusb.USBErrorTimeout:
                raise IOError(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT))
            except libusb.USBErrorNoDevice:
                raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
            except libusb.USBError as error:
                log.error("%r", error)
                raise IOError(errno.EIO, os.strerror(errno.EIO))
proxylib.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def handle(self, handler, do_ssl_handshake=True):
        """strip connect"""
        logging.info('%s "STRIP %s %s:%d %s" - -', handler.address_string(), handler.command, handler.host, handler.port, handler.protocol_version)
        handler.send_response(200)
        handler.end_headers()
        if do_ssl_handshake:
            try:
                self.do_ssl_handshake(handler)
            except (socket.error, ssl.SSLError, OpenSSL.SSL.Error) as e:
                if e.args[0] not in (errno.ECONNABORTED, errno.ECONNRESET) or (e[0] == -1 and 'Unexpected EOF' in e[1]):
                    logging.exception('ssl.wrap_socket(connection=%r) failed: %s', handler.connection, e)
                return
        try:
            handler.raw_requestline = handler.rfile.readline(65537)
            if len(handler.raw_requestline) > 65536:
                handler.requestline = ''
                handler.request_version = ''
                handler.command = ''
                handler.send_error(414)
                handler.wfile.close()
                return
            if not handler.raw_requestline:
                handler.close_connection = 1
                return
            if not handler.parse_request():
                handler.send_error(400)
                handler.wfile.close()
                return
        except (socket.error, ssl.SSLError, OpenSSL.SSL.Error) as e:
            if e.args[0] in (errno.ECONNABORTED, errno.ECONNRESET, errno.EPIPE):
                handler.close_connection = 1
                return
            else:
                raise
        try:
            handler.do_METHOD()
        except (socket.error, ssl.SSLError, OpenSSL.SSL.Error) as e:
            if e.args[0] not in (errno.ECONNABORTED, errno.ETIMEDOUT, errno.EPIPE):
                raise
test_socks.py 文件源码 项目:microProxy 作者: mike820324 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_handle_stream_closed(self):
        self.layer.socks_conn = Mock()
        self.layer.socks_conn.send = Mock(side_effect=self.collect_send_event)

        socks_request = Request(
            REQ_COMMAND["CONNECT"], ADDR_TYPE["IPV4"],
            u"1.2.3.4", self.port)

        addr_not_support_status = RESP_STATUS["ADDRESS_TYPE_NOT_SUPPORTED"]
        network_unreach_status = RESP_STATUS["NETWORK_UNREACHABLE"]
        general_fail_status = RESP_STATUS["GENRAL_FAILURE"]

        error_cases = [
            (errno.ENOEXEC, addr_not_support_status),
            (errno.EBADF, addr_not_support_status),
            (errno.ETIMEDOUT, network_unreach_status),
            (errno.EADDRINUSE, general_fail_status),
            (55566, general_fail_status)]

        for code, expect_status in error_cases:
            self.layer.create_dest_stream = Mock(
                side_effect=self.create_raise_exception_function(
                    StreamClosedError((code, ))))
            result_future = self.layer.handle_request_and_create_destination(
                socks_request)
            with self.assertRaises(DestNotConnectedError):
                yield result_future

            self.assertIsNotNone(self.event)
            self.assertIsInstance(self.event, Response)
            self.assertEqual(self.event.status, expect_status)
            self.assertEqual(self.event.atyp, ADDR_TYPE["IPV4"])
            self.assertEqual(str(self.event.addr), "1.2.3.4")
            self.assertEqual(self.event.port, self.port)
socks.py 文件源码 项目:microProxy 作者: mike820324 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def handle_stream_closed_error(self, error, event):
        if error.real_error:
            err_num = abs(error.real_error[0])
            try:
                errorcode = errno.errorcode[err_num]
            except KeyError:
                errorcode = "undefined(code={0})".format(err_num)

            logger.debug("connect to {0}:{1} with error code {2}".format(
                event.addr, event.port, errorcode))
            # NOTE: if we submit an incorrect address type,
            # the error code will be:
            # - ENOEXEC in macos.
            # - EBADF in linux.
            if err_num in (errno.ENOEXEC, errno.EBADF):
                reason = "ADDRESS_TYPE_NOT_SUPPORTED"
            elif err_num == errno.ETIMEDOUT:
                reason = "NETWORK_UNREACHABLE"
            else:
                logger.error("unhandled error code {0} received".format(errorcode))
                reason = "GENRAL_FAILURE"

            yield self.send_event_to_src_conn(Response(
                RESP_STATUS[reason],
                event.atyp, event.addr, event.port), raise_exception=False)
            raise DestNotConnectedError((event.addr, event.port))
        else:  # pragma: no cover
            # TODO: StreamClosedError without real_error?
            # need check that tornado would occur this situation?
            raise
sigma.py 文件源码 项目:apex-sigma-core 作者: lu-ci 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def init_database(self):
        self.log.info('Connecting to Database...')
        self.db = Database(self, self.cfg.db)
        try:
            await self.db[self.db.db_cfg.database].collection.find_one({})
        except pymongo.errors.ServerSelectionTimeoutError:
            self.log.error('A Connection To The Database Host Failed!')
            exit(errno.ETIMEDOUT)
        except pymongo.errors.OperationFailure:
            self.log.error('Database Access Operation Failed!')
            exit(errno.EACCES)
        self.log.info('Successfully Connected to Database')
simple_irc_bot.py 文件源码 项目:irc_bot 作者: cvium 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def handle_expt_event(self):
        error = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if error == errno.ECONNREFUSED:
            log.error('Connection refused. Check server connection config.')
            self.close()
            return
        elif error == errno.ETIMEDOUT:
            log.error('Connection timed out.')
        log.error(error)
        self.handle_error()
transport.py 文件源码 项目:bitpay-brick 作者: javgh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def read(self, timeout):
        if self.tty is not None:
            self.tty.timeout = max(timeout / 1000.0, 0.05)
            frame = bytearray(self.tty.read(6))
            if frame is None or len(frame) == 0:
                raise IOError(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT))
            if frame.startswith("\x00\x00\xff\x00\xff\x00"):
                return frame
            LEN = frame[3]
            if LEN == 0xFF:
                frame += self.tty.read(3)
                LEN = frame[5]<<8 | frame[6]
            frame += self.tty.read(LEN + 1)
            log.debug("<<< " + str(frame).encode("hex"))
            return frame
transport.py 文件源码 项目:bitpay-brick 作者: javgh 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _PYUSB0_read(self, timeout):
        if self.usb_inp is not None:
            try:
                frame = self.usb_dev.bulkRead(self.usb_inp, 300, timeout)
            except self.usb.USBError as error:
                if error.message == "Connection timed out":
                    ETIMEDOUT = errno.ETIMEDOUT
                    raise IOError(ETIMEDOUT, os.strerror(ETIMEDOUT))
                else:
                    log.error("{0!r}".format(error))
                    raise IOError(errno.EIO, os.strerror(errno.EIO))
            else:
                frame = bytearray(frame)
                log.debug("<<< " + str(frame).encode("hex"))
                return frame
transport.py 文件源码 项目:bitpay-brick 作者: javgh 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _PYUSB1_read(self, timeout):
        if self.usb_inp is not None:
            try:
                frame = self.usb_inp.read(300, timeout)
            except self.usb_core.USBError as error:
                if error.errno != errno.ETIMEDOUT:
                    log.error("{0!r}".format(error))
                raise error
            else:
                frame = bytearray(frame)
                log.debug("<<< " + str(frame).encode("hex"))
                return frame
transport.py 文件源码 项目:bitpay-brick 作者: javgh 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _PYUSB0_write(self, frame):
        if self.usb_out is not None:
            log.debug(">>> " + str(frame).encode("hex"))
            try:
                self.usb_dev.bulkWrite(self.usb_out, frame)
                if len(frame) % 64 == 0: # must end bulk transfer
                    self.usb_dev.bulkWrite(self.usb_out, '')
            except self.usb.USBError as error:
                if error.message == "Connection timed out":
                    ETIMEDOUT = errno.ETIMEDOUT
                    raise IOError(ETIMEDOUT, os.strerror(ETIMEDOUT))
                else:
                    log.error("{0!r}".format(error))
                    raise IOError(errno.EIO, os.strerror(errno.EIO))
transport.py 文件源码 项目:bitpay-brick 作者: javgh 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _PYUSB1_write(self, frame):
        if self.usb_out is not None:
            log.debug(">>> " + str(frame).encode("hex"))
            try:
                self.usb_out.write(frame)
                if len(frame) % self.usb_out.wMaxPacketSize == 0:
                    self.usb_out.write('') # end bulk transfer
            except self.usb_core.USBError as error:
                if error.errno != errno.ETIMEDOUT:
                    log.error("{0!r}".format(error))
                raise error


问题


面经


文章

微信
公众号

扫码关注公众号