python类EBUSY的实例源码

test_ossaudiodev.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, OSError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
test_linuxaudiodev.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
test_ossaudiodev.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
handlers.py 文件源码 项目:sdk-samples 作者: cradlepoint 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def initiate_sendfile(self):
        """A wrapper around sendfile."""
        try:
            sent = sendfile(self._fileno, self._filefd, self._offset,
                            self.ac_out_buffer_size)
        except OSError as err:
            if err.errno in _ERRNOS_RETRY or err.errno == errno.EBUSY:
                return
            elif err.errno in _ERRNOS_DISCONNECTED:
                self.handle_close()
            else:
                if self.tot_bytes_sent == 0:
                    logger.warning(
                        "sendfile() failed; falling back on using plain send")
                    raise _GiveUpOnSendfile
                else:
                    raise
        else:
            if sent == 0:
                # this signals the channel that the transfer is completed
                self.discard_buffers()
                self.handle_close()
            else:
                self._offset += sent
                self.tot_bytes_sent += sent

    # --- utility methods
test_os.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_flags(self):
        try:
            os.sendfile(self.sockno, self.fileno, 0, 4096,
                        flags=os.SF_NODISKIO)
        except OSError as err:
            if err.errno not in (errno.EBUSY, errno.EAGAIN):
                raise
test_ossaudiodev.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, OSError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
test_config.py 文件源码 项目:maas 作者: maas 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_write_config_errors_if_unexpected_exception(self):
        dnsconfig = DNSConfig()
        exception = IOError(errno.EBUSY, factory.make_string())
        self.patch(config, 'atomic_write', Mock(side_effect=exception))
        self.assertRaises(IOError, dnsconfig.write_config)
microrepl.py 文件源码 项目:intellij-micropython 作者: vlasovskikh 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def connect_miniterm(port):
    try:
        ser = serial.Serial(port, BAUDRATE, parity=PARITY, rtscts=False,
                            xonxoff=False)
        return Miniterm(ser, echo=False)
    except serial.SerialException as e:
        if e.errno == errno.ENOENT:
            sys.stderr.write(
                "Device %r not found. Check your "
                "MicroPython device path settings.\n" % port)
        elif e.errno == errno.EBUSY:
            # Device is busy. Explain what to do.
            sys.stderr.write(
                "Found the device, but it is busy. "
                "Wait up to 20 seconds, or "
                "press the reset button on the "
                "back of the device next to the yellow light; "
                "then try again.\n"
            )
        elif e.errno == errno.EACCES:
            sys.stderr.write("Found the device, but could not connect.".format(port))
            sys.stderr.write(e)
            sys.stderr.write('On linux, try adding yourself to the "dialout" group')
            sys.stderr.write('sudo usermod -a -G dialout <your-username>')
        else:
            # Try to be as helpful as possible.
            sys.stderr.write("Found the device, but could not connect via" +
                             " port %r: %s\n" % (port, e))
            sys.stderr.write("I'm not sure what to suggest. :-(\n")
        input("Press ENTER to continue")
        sys.exit(1)
transport.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def open(self, usb_bus, dev_adr):
        self.usb_dev = None
        self.usb_out = None
        self.usb_inp = None

        for dev in self.context.getDeviceList(skip_on_error=True):
            if ((dev.getBusNumber() == usb_bus and
                 dev.getDeviceAddress() == dev_adr)):
                break
        else:
            log.error("no device {0} on bus {1}".format(dev_adr, usb_bus))
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        try:
            first_setting = dev.iterSettings().next()
        except StopIteration:
            log.error("no usb configuration settings, please replug device")
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        def transfer_type(x):
            return x & libusb.TRANSFER_TYPE_MASK

        def endpoint_dir(x):
            return x & libusb.ENDPOINT_DIR_MASK

        for endpoint in first_setting.iterEndpoints():
            ep_addr = endpoint.getAddress()
            ep_attr = endpoint.getAttributes()
            if transfer_type(ep_attr) == libusb.TRANSFER_TYPE_BULK:
                if endpoint_dir(ep_addr) == libusb.ENDPOINT_IN:
                    if not self.usb_inp:
                        self.usb_inp = endpoint
                if endpoint_dir(ep_addr) == libusb.ENDPOINT_OUT:
                    if not self.usb_out:
                        self.usb_out = endpoint

        if not (self.usb_inp and self.usb_out):
            log.error("no bulk endpoints for read and write")
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))

        try:
            # workaround the PN533's buggy USB implementation
            self._manufacturer_name = dev.getManufacturer()
            self._product_name = dev.getProduct()
        except libusb.USBErrorIO:
            self._manufacturer_name = None
            self._product_name = None

        try:
            self.usb_dev = dev.open()
            self.usb_dev.claimInterface(0)
        except libusb.USBErrorAccess:
            raise IOError(errno.EACCES, os.strerror(errno.EACCES))
        except libusb.USBErrorBusy:
            raise IOError(errno.EBUSY, os.strerror(errno.EBUSY))
        except libusb.USBErrorNoDevice:
            raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
test_ossaudiodev.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError as msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except (TypeError, AttributeError):
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize/8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time (%s) > 10%% off of expected time (%s)" %
                        (elapsed_time, expected_time))
test_ossaudiodev.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError, msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except TypeError:
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize//8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time > 10% off of expected time")
test_ossaudiodev.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError, msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except TypeError:
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize//8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time > 10% off of expected time")
util.py 文件源码 项目:mock 作者: rpm-software-management 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def rmtree(path, selinux=False, exclude=()):
    """Version of shutil.rmtree that ignores no-such-file-or-directory errors,
       tries harder if it finds immutable files and supports excluding paths"""
    if os.path.islink(path):
        raise OSError("Cannot call rmtree on a symbolic link")
    try_again = True
    retries = 0
    failed_to_handle = False
    failed_filename = None
    if path in exclude:
        return
    while try_again:
        try_again = False
        try:
            names = os.listdir(path)
            for name in names:
                fullname = os.path.join(path, name)
                if fullname not in exclude:
                    try:
                        mode = os.lstat(fullname).st_mode
                    except OSError:
                        mode = 0
                    if stat.S_ISDIR(mode):
                        try:
                            rmtree(fullname, selinux=selinux, exclude=exclude)
                        except OSError as e:
                            if e.errno in (errno.EPERM, errno.EACCES, errno.EBUSY):
                                # we alrady tried handling this on lower level and failed,
                                # there's no point in trying again now
                                failed_to_handle = True
                            raise
                    else:
                        os.remove(fullname)
            os.rmdir(path)
        except OSError as e:
            if failed_to_handle:
                raise
            if e.errno == errno.ENOENT:  # no such file or directory
                pass
            elif exclude and e.errno == errno.ENOTEMPTY:  # there's something excluded left
                pass
            elif selinux and (e.errno == errno.EPERM or e.errno == errno.EACCES):
                try_again = True
                if failed_filename == e.filename:
                    raise
                failed_filename = e.filename
                os.system("chattr -R -i %s" % path)
            elif e.errno == errno.EBUSY:
                retries += 1
                if retries > 1:
                    raise
                try_again = True
                getLog().debug("retrying failed tree remove after sleeping a bit")
                time.sleep(2)
            else:
                raise
test_ossaudiodev.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError as msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except (TypeError, AttributeError):
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize/8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time (%s) > 10%% off of expected time (%s)" %
                        (elapsed_time, expected_time))
cgutils.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def set_memory_hardlimit(cgrp, limit):
    """Set the cgroup hard-limits to the desired value.

    The logic here is complicated since the ordering of operations depends on
    weither we are lowering or raising the value.
    """
    def _lower_memory_hardlimit(cgrp, limit):
        """Lower the cgroup memory hardlimit."""
        cgroups.set_value('memory', cgrp,
                          'memory.limit_in_bytes', limit)
        cgroups.set_value('memory', cgrp,
                          'memory.memsw.limit_in_bytes', limit)

    def _raise_memory_hardlimit(cgrp, limit):
        """Raise the cgroup memory hardlimit."""
        cgroups.set_value('memory', cgrp,
                          'memory.memsw.limit_in_bytes', limit)
        cgroups.set_value('memory', cgrp,
                          'memory.limit_in_bytes', limit)

    memory_hardlimit_funs = [_lower_memory_hardlimit, _raise_memory_hardlimit]
    while memory_hardlimit_funs:
        try:
            memory_hardlimit_fun = memory_hardlimit_funs.pop(0)
            memory_hardlimit_fun(cgrp, limit)
            break

        except IOError as err:
            if err.errno == errno.EBUSY:
                # Resource busy, this means the cgroup is already using more
                # then the limit we are trying to set.
                raise TreadmillCgroupError('Unable to set hard limit to %d. '
                                           'Cgroup %r memory over limit.' %
                                           (limit, cgrp))

            elif err.errno == errno.EINVAL:
                # This means we did the hard limit operation in the wrong
                # order. If we have a different ordering to try, go ahead.
                if memory_hardlimit_funs:
                    continue

            # For any other case, raise the exception
            raise
test_ossaudiodev.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError, msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except TypeError:
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize//8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time > 10% off of expected time")
test_ossaudiodev.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except OSError as msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except (TypeError, AttributeError):
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize/8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time (%s) > 10%% off of expected time (%s)" %
                        (elapsed_time, expected_time))
test_ossaudiodev.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except IOError, msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except TypeError:
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize//8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time > 10% off of expected time")
test_ossaudiodev.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def play_sound_file(self, data, rate, ssize, nchannels):
        try:
            dsp = ossaudiodev.open('w')
        except OSError as msg:
            if msg.args[0] in (errno.EACCES, errno.ENOENT,
                               errno.ENODEV, errno.EBUSY):
                raise unittest.SkipTest(msg)
            raise

        # at least check that these methods can be invoked
        dsp.bufsize()
        dsp.obufcount()
        dsp.obuffree()
        dsp.getptr()
        dsp.fileno()

        # Make sure the read-only attributes work.
        self.assertFalse(dsp.closed)
        self.assertEqual(dsp.name, "/dev/dsp")
        self.assertEqual(dsp.mode, "w", "bad dsp.mode: %r" % dsp.mode)

        # And make sure they're really read-only.
        for attr in ('closed', 'name', 'mode'):
            try:
                setattr(dsp, attr, 42)
            except (TypeError, AttributeError):
                pass
            else:
                self.fail("dsp.%s not read-only" % attr)

        # Compute expected running time of sound sample (in seconds).
        expected_time = float(len(data)) / (ssize/8) / nchannels / rate

        # set parameters based on .au file headers
        dsp.setparameters(AFMT_S16_NE, nchannels, rate)
        self.assertTrue(abs(expected_time - 3.51) < 1e-2, expected_time)
        t1 = time.time()
        dsp.write(data)
        dsp.close()
        t2 = time.time()
        elapsed_time = t2 - t1

        percent_diff = (abs(elapsed_time - expected_time) / expected_time) * 100
        self.assertTrue(percent_diff <= 10.0,
                        "elapsed time (%s) > 10%% off of expected time (%s)" %
                        (elapsed_time, expected_time))
directory.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def remove(self, pkgplan):
                path = self.get_installed_path(pkgplan.image.get_root())
                try:
                        os.rmdir(path)
                except OSError as e:
                        if e.errno == errno.ENOENT:
                                pass
                        elif e.errno in (errno.EEXIST, errno.ENOTEMPTY):
                                # Cannot remove directory since it's
                                # not empty.
                                pkgplan.salvage(path)
                        elif e.errno == errno.ENOTDIR:
                                # Either the user or another package has changed
                                # this directory into a link or file.  Salvage
                                # what's there and drive on.
                                pkgplan.salvage(path)
                        elif e.errno == errno.EBUSY and os.path.ismount(path):
                                # User has replaced directory with mountpoint,
                                # or a package has been poorly implemented.
                                if not self.attrs.get("implicit"):
                                        err_txt = _("Unable to remove {0}; it is "
                                            "in use as a mountpoint. To "
                                            "continue, please unmount the "
                                            "filesystem at the target "
                                            "location and try again.").format(
                                            path)
                                        raise apx.ActionExecutionError(self,
                                            details=err_txt, error=e,
                                            fmri=pkgplan.origin_fmri) 
                        elif e.errno == errno.EBUSY:
                                # os.path.ismount() is broken for lofs
                                # filesystems, so give a more generic
                                # error.
                                if not self.attrs.get("implicit"):
                                        err_txt = _("Unable to remove {0}; it "
                                            "is in use by the system, another "
                                            "process, or as a "
                                            "mountpoint.").format(path)
                                        raise apx.ActionExecutionError(self,
                                            details=err_txt, error=e,
                                            fmri=pkgplan.origin_fmri)
                        elif e.errno != errno.EACCES: # this happens on Windows
                                raise


问题


面经


文章

微信
公众号

扫码关注公众号