python类EBUSY的实例源码

basic.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _unsafe_writes(self, src, dest, exception):
      # sadly there are some situations where we cannot ensure atomicity, but only if
      # the user insists and we get the appropriate error we update the file unsafely
      if exception.errno == errno.EBUSY:
          #TODO: issue warning that this is an unsafe operation, but doing it cause user insists
          try:
              try:
                  out_dest = open(dest, 'wb')
                  in_src = open(src, 'rb')
                  shutil.copyfileobj(in_src, out_dest)
              finally: # assuring closed files in 2.4 compatible way
                  if out_dest:
                      out_dest.close()
                  if in_src:
                      in_src.close()
          except (shutil.Error, OSError, IOError):
              e = get_exception()
              self.fail_json(msg='Could not write data to file (%s) from (%s): %s' % (dest, src, e))

      else:
          self.fail_json(msg='Could not replace file: %s to %s: %s' % (src, dest, exception))
naming.py 文件源码 项目:SameKeyProxy 作者: xzhou 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def createGroup(self,groupname):
        groupname=self.validateName(groupname)
        dirnam = self.translate(groupname)
        self.lock.acquire()
        try:
            try:
                os.mkdir(dirnam)
                self._dosynccall("createGroup",groupname)
                Log.msg('NameServer','created group',groupname)
            except OSError,x:
                if x.errno in (errno.EEXIST, errno.EBUSY):
                    raise Pyro.errors.NamingError('group already exists',groupname)
                elif x.errno == errno.ENOENT:
                    raise Pyro.errors.NamingError('(parent)group not found')
                else:
                    raise Pyro.errors.NamingError(str(x))
        finally:
            self.lock.release()
test_paste.py 文件源码 项目:modern-paste 作者: LINKIWI 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_scrub_inactive_pastes_error(self):
        pastes = [util.testing.PasteFactory.generate(expiry_time=None) for _ in range(15)]
        [util.testing.AttachmentFactory.generate(paste_id=paste.paste_id, file_name='file') for paste in pastes]
        [database.paste.deactivate_paste(paste.paste_id) for paste in pastes[:10]]

        with mock.patch.object(shutil, 'rmtree') as mock_rmtree:
            mock_rmtree.side_effect = OSError(errno.EBUSY)

            self.assertRaises(
                OSError,
                database.paste.scrub_inactive_pastes,
            )

        with mock.patch.object(shutil, 'rmtree') as mock_rmtree:
            mock_rmtree.side_effect = OSError(errno.ENOENT)

            for paste in pastes:
                self.assertIsNotNone(database.paste.get_paste_by_id(paste.paste_id))
test_os.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
        """A higher level wrapper representing how an application is
        supposed to use sendfile().
        """
        while 1:
            try:
                if self.SUPPORT_HEADERS_TRAILERS:
                    return os.sendfile(sock, file, offset, nbytes, headers,
                                       trailers)
                else:
                    return os.sendfile(sock, file, offset, nbytes)
            except OSError as err:
                if err.errno == errno.ECONNRESET:
                    # disconnected
                    raise
                elif err.errno in (errno.EAGAIN, errno.EBUSY):
                    # we have to retry send data
                    continue
                else:
                    raise
test_os.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
        """A higher level wrapper representing how an application is
        supposed to use sendfile().
        """
        while 1:
            try:
                if self.SUPPORT_HEADERS_TRAILERS:
                    return os.sendfile(sock, file, offset, nbytes, headers,
                                       trailers)
                else:
                    return os.sendfile(sock, file, offset, nbytes)
            except OSError as err:
                if err.errno == errno.ECONNRESET:
                    # disconnected
                    raise
                elif err.errno in (errno.EAGAIN, errno.EBUSY):
                    # we have to retry send data
                    continue
                else:
                    raise
test_os.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def sendfile_wrapper(self, sock, file, offset, nbytes, headers=[], trailers=[]):
        """A higher level wrapper representing how an application is
        supposed to use sendfile().
        """
        while 1:
            try:
                if self.SUPPORT_HEADERS_TRAILERS:
                    return os.sendfile(sock, file, offset, nbytes, headers,
                                       trailers)
                else:
                    return os.sendfile(sock, file, offset, nbytes)
            except OSError as err:
                if err.errno == errno.ECONNRESET:
                    # disconnected
                    raise
                elif err.errno in (errno.EAGAIN, errno.EBUSY):
                    # we have to retry send data
                    continue
                else:
                    raise
test_driver.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_private_destroy_ebusy_timeout(self):
        # Tests that _destroy will retry 3 times to destroy the guest when an
        # EBUSY is raised, but eventually times out and raises the libvirtError
        ex = fakelibvirt.make_libvirtError(
                fakelibvirt.libvirtError,
                ("Failed to terminate process 26425 with SIGKILL: "
                 "Device or resource busy"),
                error_code=fakelibvirt.VIR_ERR_SYSTEM_ERROR,
                int1=errno.EBUSY)

        mock_guest = mock.Mock(libvirt_guest.Guest, id=1)
        mock_guest.poweroff = mock.Mock(side_effect=ex)

        instance = objects.Instance(**self.test_instance)
        drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)

        with mock.patch.object(drvr._host, 'get_guest',
                               return_value=mock_guest):
            self.assertRaises(fakelibvirt.libvirtError, drvr._destroy,
                              instance)

        self.assertEqual(3, mock_guest.poweroff.call_count)
test_driver.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_private_destroy_ebusy_multiple_attempt_ok(self):
        # Tests that the _destroy attempt loop is broken when EBUSY is no
        # longer raised.
        ex = fakelibvirt.make_libvirtError(
                fakelibvirt.libvirtError,
                ("Failed to terminate process 26425 with SIGKILL: "
                 "Device or resource busy"),
                error_code=fakelibvirt.VIR_ERR_SYSTEM_ERROR,
                int1=errno.EBUSY)

        mock_guest = mock.Mock(libvirt_guest.Guest, id=1)
        mock_guest.poweroff = mock.Mock(side_effect=[ex, None])

        inst_info = hardware.InstanceInfo(power_state.SHUTDOWN, id=1)
        instance = objects.Instance(**self.test_instance)
        drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)

        with mock.patch.object(drvr._host, 'get_guest',
                               return_value=mock_guest):
            with mock.patch.object(drvr, 'get_info', return_value=inst_info):
                drvr._destroy(instance)

        self.assertEqual(2, mock_guest.poweroff.call_count)
engine.py 文件源码 项目:tm-librarian 作者: FabricAttachedMemory 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def cmd_destroy_shelf(self, cmdict):
        """ For a shelf, zombify books (mark for zeroing) and remove xattrs
            In (dict)---
                path
                node
            Out (dict) ---
                shelf data
        """
        self.errno = errno.EBUSY
        shelf = self.cmd_get_shelf(cmdict)
        assert not self.db.open_count(
            shelf), '%s has active opens' % shelf.name
        bos = self._list_shelf_books(shelf)
        xattrs = self.db.list_xattrs(shelf)
        for thisbos in bos:
            self.db.delete_bos(thisbos)
            _ = self._set_book_alloc(thisbos, TMBook.ALLOC_ZOMBIE)
        for xattr in xattrs:
            self.db.remove_xattr(shelf, xattr)
        rsp = self.db.delete_shelf(shelf, commit=True)

        return rsp
kvESX.py 文件源码 项目:vsphere-storage-for-docker 作者: vmware 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def load(volpath):
    """
    Load and return dictionary from the sidecar
    """
    meta_file = lib.DiskLib_SidecarMakeFileName(volpath.encode(),
                                                DVOL_KEY.encode())
    retry_count = 0
    vol_name = vmdk_utils.get_volname_from_vmdk_path(volpath)
    while True:
        try:
            with open(meta_file, "r") as fh:
                kv_str = fh.read()
            break
        except IOError as open_error:
            # This is a workaround to the timing/locking with metadata files issue #626
            if open_error.errno == errno.EBUSY and retry_count <= vmdk_utils.VMDK_RETRY_COUNT:
                logging.warning("Meta file %s busy for load(), retrying...", meta_file)
                vmdk_utils.log_volume_lsof(vol_name)
                retry_count += 1
                time.sleep(vmdk_utils.VMDK_RETRY_SLEEP)
            else:
                logging.exception("Failed to access %s", meta_file)
                return None

    try:
        return json.loads(kv_str)
    except ValueError:
        logging.exception("load:Failed to decode meta-data for %s", volpath)
        return None
kvESX.py 文件源码 项目:vsphere-storage-for-docker 作者: vmware 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def save(volpath, kv_dict, key=None, value=None):
    """
    Save the dictionary to side car.
    """
    meta_file = lib.DiskLib_SidecarMakeFileName(volpath.encode(),
                                                DVOL_KEY.encode())
    kv_str = json.dumps(kv_dict)

    retry_count = 0
    vol_name = vmdk_utils.get_volname_from_vmdk_path(volpath)
    while True:
        try:
            if key:
                with open(meta_file, "r") as rfh:
                    kv_val = rfh.read()
                    try:
                        kv_match = json.loads(kv_val)
                    except ValueError:
                        logging.exception("load:Failed to decode meta-data for %s", volpath)
                        return False

                    if key in kv_match and kv_match[key] != value:
                        return False
            with open(meta_file, "w") as fh:
                fh.write(align_str(kv_str, KV_ALIGN))
            break
        except IOError as open_error:
            # This is a workaround to the timing/locking with metadata files issue #626
            if open_error.errno == errno.EBUSY and retry_count <= vmdk_utils.VMDK_RETRY_COUNT:
                logging.warning("Meta file %s busy for save(), retrying...", meta_file)
                vmdk_utils.log_volume_lsof(vol_name)
                retry_count += 1
                time.sleep(vmdk_utils.VMDK_RETRY_SLEEP)
            else:
                logging.exception("Failed to save meta-data for %s", volpath)
                return False

    return True
test_process_lock.py 文件源码 项目:deb-python-fasteners 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_bad_acquire(self):
        lock_file = os.path.join(self.lock_dir, 'lock')
        lock = BrokenLock(lock_file, errno.EBUSY)
        self.assertRaises(threading.ThreadError, lock.acquire)
naming.py 文件源码 项目:SameKeyProxy 作者: xzhou 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _initdb_1(self):
        # root is not a NamedTree but a directory
        try:
            os.mkdir(self.dbroot)
        except OSError,x:
            if x.errno not in (errno.EEXIST, errno.EBUSY):
                raise
test_clf_transport.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_init_fail_open(self, usb_context):
        device = self.Device(0x1000, 0x2000, 1, 2, [
            self.Settings([
                self.Endpoint(0x0004, 0x0002),
                self.Endpoint(0x0084, 0x0002),
            ])
        ])
        device.open = MagicMock()
        device.open.side_effect = [
            nfc.clf.transport.libusb.USBErrorAccess,
            nfc.clf.transport.libusb.USBErrorBusy,
            nfc.clf.transport.libusb.USBErrorNoDevice,
        ]
        usb_context.return_value.getDeviceList.return_value = [device]

        with pytest.raises(IOError) as excinfo:
            nfc.clf.transport.USB(1, 2)
        assert excinfo.value.errno == errno.EACCES

        with pytest.raises(IOError) as excinfo:
            nfc.clf.transport.USB(1, 2)
        assert excinfo.value.errno == errno.EBUSY

        with pytest.raises(IOError) as excinfo:
            nfc.clf.transport.USB(1, 2)
        assert excinfo.value.errno == errno.ENODEV
__main__.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main(args):
    print("This is the %s version of nfcpy run in Python %s\non %s" %
          (nfc.__version__, platform.python_version(), platform.platform()))
    print("I'm now searching your system for contactless devices")

    found = 0
    for vid, pid, bus, dev in nfc.clf.transport.USB.find("usb"):
        if (vid, pid) in nfc.clf.device.usb_device_map:
            path = "usb:{0:03d}:{1:03d}".format(bus, dev)
            try:
                clf = nfc.ContactlessFrontend(path)
                print("** found %s" % clf.device)
                clf.close()
                found += 1
            except IOError as error:
                if error.errno == errno.EACCES:
                    usb_device_access_denied(bus, dev, vid, pid, path)
                elif error.errno == errno.EBUSY:
                    usb_device_found_is_busy(bus, dev, vid, pid, path)

    if args.search_tty:
        for dev in nfc.clf.transport.TTY.find("tty")[0]:
            path = "tty:{0}".format(dev[8:])
            try:
                clf = nfc.ContactlessFrontend(path)
                print("** found %s" % clf.device)
                clf.close()
                found += 1
            except IOError as error:
                if error.errno == errno.EACCES:
                    print("access denied for device with path %s" % path)
                elif error.errno == errno.EBUSY:
                    print("the device with path %s is busy" % path)
    else:
        print("I'm not trying serial devices because you haven't told me")
        print("-- add the option '--search-tty' to have me looking")
        print("-- but beware that this may break other serial devs")

    if not found:
        print("Sorry, but I couldn't find any contactless device")
utils.py 文件源码 项目:charm-ceph-osd 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def umount(mount_point):
    """This function unmounts a mounted directory forcibly. This will
    be used for unmounting broken hard drive mounts which may hang.

    If umount returns EBUSY this will lazy unmount.

    :param mount_point: str. A String representing the filesystem mount point
    :returns: int. Returns 0 on success. errno otherwise.
    """
    libc_path = ctypes.util.find_library("c")
    libc = ctypes.CDLL(libc_path, use_errno=True)

    # First try to umount with MNT_FORCE
    ret = libc.umount(mount_point, 1)
    if ret < 0:
        err = ctypes.get_errno()
        if err == errno.EBUSY:
            # Detach from try. IE lazy umount
            ret = libc.umount(mount_point, 2)
            if ret < 0:
                err = ctypes.get_errno()
                return err
            return 0
        else:
            return err
    return 0
test_replace_osd.py 文件源码 项目:charm-ceph-osd 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_umount_ebusy(self):
        self.ctypes.util.find_library.return_value = 'libc.so.6'
        umount_mock = Mock()
        self.ctypes.CDLL.return_value = umount_mock
        umount_mock.umount.side_effect = umount_busy
        self.ctypes.get_errno.return_value = errno.EBUSY

        ret = ceph.umount('/some/osd/mount')
        umount_mock.assert_has_calls([
            call.umount('/some/osd/mount', 1),
            call.umount('/some/osd/mount', 2),
        ])
        assert ret == 0
utils.py 文件源码 项目:charm-ceph-mon 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def umount(mount_point):
    """This function unmounts a mounted directory forcibly. This will
    be used for unmounting broken hard drive mounts which may hang.

    If umount returns EBUSY this will lazy unmount.

    :param mount_point: str. A String representing the filesystem mount point
    :returns: int. Returns 0 on success. errno otherwise.
    """
    libc_path = ctypes.util.find_library("c")
    libc = ctypes.CDLL(libc_path, use_errno=True)

    # First try to umount with MNT_FORCE
    ret = libc.umount(mount_point, 1)
    if ret < 0:
        err = ctypes.get_errno()
        if err == errno.EBUSY:
            # Detach from try. IE lazy umount
            ret = libc.umount(mount_point, 2)
            if ret < 0:
                err = ctypes.get_errno()
                return err
            return 0
        else:
            return err
    return 0
test_ossaudiodev.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
application.py 文件源码 项目:Solfege 作者: RannyeriDev 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def display_sound_init_error_message(self, e):

        if isinstance(e, soundcard.SoundInitException):
            solfege.win.display_error_message(
            """%s""" % str(e).decode(locale.getpreferredencoding(), 'replace'))

        elif isinstance(e, ImportError):
            solfege.win.display_error_message2(str(e), _("You should configure sound from the preferences window, and try to use an external midi player. Or try to recompile the program and check for error messages to see why the module is not built."))

        elif getattr(e, 'errno', None) == errno.EACCES:
            solfege.win.display_error_message(
                "The sound init failed: %s\n"
                "The errno EACCES indicates that you don't have write "
                "permission to the device."
                % str(e).decode(locale.getpreferredencoding(), 'replace'))

        elif getattr(e, 'errno', None) == errno.EBUSY:
            solfege.win.display_error_message(
                "The sound init failed: %s\n"
                "It seems like some other program is using the device. You "
                "should try to quit that other program and restart Solfege."
                % str(e).decode(locale.getpreferredencoding(), 'replace'))

        else:
            solfege.win.display_error_message(
                "The sound init failed: %s\n"
                "You should configure sound from the 'Sound' page of "
                "the preferences window.\n\n"
                "It is also possible that the OS sound setup is incorrect."
                % str(e).decode(locale.getpreferredencoding(), 'replace'))
utils.py 文件源码 项目:charm-ceph 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def umount(mount_point):
    """This function unmounts a mounted directory forcibly. This will
    be used for unmounting broken hard drive mounts which may hang.

    If umount returns EBUSY this will lazy unmount.

    :param mount_point: str. A String representing the filesystem mount point
    :returns: int. Returns 0 on success. errno otherwise.
    """
    libc_path = ctypes.util.find_library("c")
    libc = ctypes.CDLL(libc_path, use_errno=True)

    # First try to umount with MNT_FORCE
    ret = libc.umount(mount_point, 1)
    if ret < 0:
        err = ctypes.get_errno()
        if err == errno.EBUSY:
            # Detach from try. IE lazy umount
            ret = libc.umount(mount_point, 2)
            if ret < 0:
                err = ctypes.get_errno()
                return err
            return 0
        else:
            return err
    return 0
test_linuxaudiodev.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
test_ossaudiodev.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
test_linuxaudiodev.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
test_ossaudiodev.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
test_os.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_flags(self):
                try:
                    os.sendfile(self.sockno, self.fileno, 0, 4096,
                                flags=os.SF_NODISKIO)
                except OSError as err:
                    if err.errno not in (errno.EBUSY, errno.EAGAIN):
                        raise
test_ossaudiodev.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError) as msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    support.run_unittest(__name__)
test_linuxaudiodev.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = linuxaudiodev.open('w')
    except linuxaudiodev.error, msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT, errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    run_unittest(LinuxAudioDevTests)
test_ossaudiodev.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_main():
    try:
        dsp = ossaudiodev.open('w')
    except (ossaudiodev.error, IOError), msg:
        if msg.args[0] in (errno.EACCES, errno.ENOENT,
                           errno.ENODEV, errno.EBUSY):
            raise unittest.SkipTest(msg)
        raise
    dsp.close()
    test_support.run_unittest(__name__)
test_os.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_flags(self):
        try:
            os.sendfile(self.sockno, self.fileno, 0, 4096,
                        flags=os.SF_NODISKIO)
        except OSError as err:
            if err.errno not in (errno.EBUSY, errno.EAGAIN):
                raise


问题


面经


文章

微信
公众号

扫码关注公众号