python类lockf()的实例源码

recipe-578476.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def do_magic(self):
        if OS_WIN:
            try:
                if os.path.exists(LOCK_PATH):
                    os.unlink(LOCK_PATH)
                self.fh = os.open(LOCK_PATH, os.O_CREAT | os.O_EXCL | os.O_RDWR)
            except EnvironmentError as err:
                if err.errno == 13:
                    self.is_running = True
                else:
                    raise
        else:
            try:
                self.fh = open(LOCK_PATH, 'w')
                fcntl.lockf(self.fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
            except EnvironmentError as err:
                if self.fh is not None:
                    self.is_running = True
                else:
                    raise
lock.py 文件源码 项目:Citation-Fetcher 作者: andrewhead 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def lock_method(lock_filename):
    ''' Use an OS lock such that a method can only be called once at a time. '''

    def decorator(func):

        @functools.wraps(func)
        def lock_and_run_method(*args, **kwargs):

            # Only run this program if it's not already running
            # Snippet based on
            # http://linux.byexamples.com/archives/494/how-can-i-avoid-running-a-python-script-multiple-times-implement-file-locking/
            fp = open(lock_filename, 'w')
            try:
                fcntl.lockf(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
            except IOError:
                raise SystemExit(
                    "This program is already running.  Please stop the current process or " +
                    "remove " + lock_filename + " to run this script."
                )

            return func(*args, **kwargs)

        return lock_and_run_method

    return decorator
sock.py 文件源码 项目:chihu 作者: yelongyu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, addr, conf, log, fd=None):
        if fd is None:
            try:
                st = os.stat(addr)
            except OSError as e:
                if e.args[0] != errno.ENOENT:
                    raise
            else:
                if stat.S_ISSOCK(st.st_mode):
                    os.remove(addr)
                else:
                    raise ValueError("%r is not a socket" % addr)
        self.parent = os.getpid()
        super(UnixSocket, self).__init__(addr, conf, log, fd=fd)
        # each arbiter grabs a shared lock on the unix socket.
        fcntl.lockf(self.sock, fcntl.LOCK_SH | fcntl.LOCK_NB)
minc_tools.py 文件源码 项目:nist_mni_pipelines 作者: vfonov 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def cache(self,name,suffix=''):
        """Allocate a name in cache, if cache was setup
        also lock the file , so that another process have to wait before using the same file name

        Important: call unlock() on result
        """
        #TODO: something more clever here?
        fname=''
        if self.work_dir is not None:
            fname=self.cache_dir+os.sep+name+suffix
            lock_name=fname+'.lock'
            f=self._locks[lock_name]=open(lock_name, 'a')
            fcntl.lockf(f.fileno(), fcntl.LOCK_EX )
        else:
            fname=self.tmp(name+suffix)

        return fname
minc_tools.py 文件源码 项目:nist_mni_pipelines 作者: vfonov 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def unlock(self,fname):
        #TODO: something more clever here?
        lock_name=fname+'.lock'
        try:
            f=self._locks[lock_name]

            if f is not None:
                fcntl.lockf(f.fileno(), fcntl.LOCK_UN)
                f.close()

            del self._locks[lock_name]

#            try:
#                os.unlink(lock_name)
#            except OSError:
                #probably somebody else is blocking
#               pass

        except KeyError:
            pass


    #def __del__(self):
        #self.do_cleanup()
    #    pass
singleton.py 文件源码 项目:Linalfred 作者: PeterHo 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def isSingleInstance(flavor_id=""):
    global fp
    basename = os.path.splitext(os.path.abspath(sys.argv[0]))[0].replace(
        "/", "-").replace(":", "").replace("\\", "-") + '-%s' % flavor_id + '.lock'
    lockfile = os.path.normpath(tempfile.gettempdir() + '/' + basename)

    if sys.platform == 'win32':
        try:
            if os.path.exists(lockfile):
                os.unlink(lockfile)
            fp = os.open(
                lockfile, os.O_CREAT | os.O_EXCL | os.O_RDWR)
        except OSError:
            return False

    else:  # non Windows
        fp = open(lockfile, 'w')
        try:
            fcntl.lockf(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            return False

    return True
I2C_mutex.py 文件源码 项目:GoPiGo3 作者: DexterInd 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def acquire(self):
        if self.mutex_debug:
            print("I2C mutex acquire")

        acquired = False
        while not acquired:
            try:
                self.DexterLockI2C_handle = open('/run/lock/DexterLockI2C', 'w')
                # lock
                fcntl.lockf(self.DexterLockI2C_handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
                acquired = True
            except IOError: # already locked by a different process
                time.sleep(0.001)
            except Exception as e:
                print(e)

        if self.mutex_debug:
            print("I2C mutex acquired {}".format(time.time()))
package.py 文件源码 项目:builds 作者: open-power-host-os 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def lock(self):
        """
        Locks the package to avoid concurrent operations on its shared
        resources.
        Currently, the only resource shared among scripts executed from
        different directories is the repository.
        """
        if not self.locking_enabled:
            LOG.debug("This package has no shared resources to lock")
            return

        LOG.debug("Checking for lock on file {}.".format(self.lock_file_path))
        self.lock_file = open(self.lock_file_path, "w")
        try:
            fcntl.lockf(self.lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError as exc:
            RESOURCE_UNAVAILABLE_ERROR = 11
            if exc.errno == RESOURCE_UNAVAILABLE_ERROR:
                LOG.info("Waiting for other process to finish operations "
                         "on {}.".format(self.name))
            else:
                raise
        fcntl.lockf(self.lock_file, fcntl.LOCK_EX)
config.py 文件源码 项目:webmon 作者: KarolBedkowski 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _try_lock():
    """Check and create lock file - prevent running application twice.

    Return lock file handler.
    """
    lock_file_path = _find_config_file("app.lock", False)
    _check_dir_for_file(lock_file_path)
    try:
        if fcntl is not None:
            lock_file = open(lock_file_path, "w")
            fcntl.lockf(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
        else:
            if os.path.isfile(lock_file_path):
                _LOG.error("another instance detected (lock file exists) "
                           "- exiting")
                return None
            lock_file = open(lock_file_path, "w")
        return lock_file
    except IOError as err:
        import errno
        if err.errno == errno.EAGAIN:
            _LOG.error("another instance detected - exiting")
        else:
            _LOG.exception("locking failed: %s", err)
    return None
_logs.py 文件源码 项目:porcupine 作者: Akuli 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _lock(fileno):
    """Try to lock a file. Return True on success."""
    # closing the file unlocks it, so we don't need to unlock here
    if platform.system() == 'Windows':
        try:
            msvcrt.locking(fileno, msvcrt.LK_NBLCK, 10)
            return True
        except PermissionError:
            return False
    else:
        try:
            fcntl.lockf(fileno, fcntl.LOCK_EX | fcntl.LOCK_NB)
            return True
        # the docs recommend catching both of these
        except (BlockingIOError, PermissionError):
            return False
tendo_singleton.py 文件源码 项目:QTodoTxt2 作者: QTodoTxt 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __del__(self):
        import sys
        import os
        if not self.initialized:
            return
        try:
            if sys.platform == 'win32':
                if hasattr(self, 'fd'):
                    os.close(self.fd)
                    os.unlink(self.lockfile)
            else:
                import fcntl
                fcntl.lockf(self.fp, fcntl.LOCK_UN)
                # os.close(self.fp)
                if os.path.isfile(self.lockfile):
                    os.unlink(self.lockfile)
        except Exception as e:
            raise
lockfile.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def unlock(self):
                """Unlocks the LockFile."""

                if self._fileobj:
                        # To avoid race conditions with the next caller
                        # waiting for the lock file, it is simply
                        # truncated instead of removed.
                        try:
                                fcntl.lockf(self._fileobj, fcntl.LOCK_UN)
                                self._fileobj.truncate(0)
                                self._fileobj.close()
                                self._lock.release()
                        except EnvironmentError:
                                # If fcntl, or the file operations returned
                                # an exception, drop the lock. Do not catch
                                # the exception that could escape from
                                # releasing the lock.
                                self._lock.release()
                                raise
                        finally:
                                self._fileobj = None
                else:
                        if self._provide_mutex:
                                assert not self._lock.locked
systemlist.py 文件源码 项目:lib9 作者: Jumpscale 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def acquire_lock(path):
    """
        little tool to do EAGAIN until lockfile released
    :param path:
    :return: path
    """
    lock_file = open(path, 'w')
    while True:
        send_to_syslog("attempting to acquire lock %s" % path)
        try:
            fcntl.lockf(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
            send_to_syslog("acquired lock %s" % path)
            return lock_file
        except IOError as e:
            send_to_syslog("failed to acquire lock %s because '%s' - waiting 1 second" % (path, e))
            time.sleep(1)
rbuserdb.py 文件源码 项目:useradm 作者: redbrick 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def uidNumber_getnext(self):
        """Get the next available uidNumber for adding a new user.
        Locks uidNumber file, reads number. Returns (file descriptor,
        uidNumber). uidNumber_savenext() must be called once the
        uidNumber is used successfully."""

        uid_num_file = os.open(rbconfig.file_uidNumber, os.O_RDWR)
        retries = 0

        while 1:
            try:
                fcntl.lockf(uid_num_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
            except IOError:
                retries += 1
                if retries == 20:
                    raise RBFatalError(
                        ('Could not lock uidNumber.txt file after 20 attempts.'
                         'Please try again!'))
                time.sleep(0.5)
            else:
                break
        num_uid = int(os.read(uid_num_file, 32))
        return uid_num_file, num_uid
metadata.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def update(path):
    """
    allow concurrent update of metadata
    """
    p = os.path.join(path, "metadata.json")
    # we have to open writeable to get a lock
    with open(p, "a") as f:
        fcntl.lockf(f, fcntl.LOCK_EX)
        data = load(path)
        yield(data)
        save(path, data)
        fcntl.lockf(f, fcntl.LOCK_UN)
mailbox.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _unlock_file(f):
    """Unlock file f using lockf and dot locking."""
    if fcntl:
        fcntl.lockf(f, fcntl.LOCK_UN)
    if os.path.exists(f.name + '.lock'):
        os.remove(f.name + '.lock')
locked_file.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def unlock_and_close(self):
            """Close and unlock the file using the fcntl.lockf primitive."""
            if self._locked:
                fcntl.lockf(self._fh.fileno(), fcntl.LOCK_UN)
            self._locked = False
            if self._fh:
                self._fh.close()
recipe-578476.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def clean_up(self):
        # this is not really needed
        try:
            if self.fh is not None:
                if OS_WIN:
                    os.close(self.fh)
                    os.unlink(LOCK_PATH)
                else:
                    fcntl.lockf(self.fh, fcntl.LOCK_UN)
                    self.fh.close() # ???
                    os.unlink(LOCK_PATH)
        except Exception as err:
            # logger.exception(err)
            raise # for debugging porpuses, do not raise it on production
daemon.py 文件源码 项目:shadowsocksR-b 作者: hao35954514 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
locked_file.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def unlock_and_close(self):
      """Close and unlock the file using the fcntl.lockf primitive."""
      if self._locked:
        fcntl.lockf(self._fh.fileno(), fcntl.LOCK_UN)
      self._locked = False
      if self._fh:
        self._fh.close()
cache.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def close(self):
        self.save()
        self.storage.close()
        if self.lock:
            fcntl.lockf(self.lock, fcntl.LOCK_UN)
            self.lock.close()
            os.unlink(self.lock.name)
        self.opened = False
daemonize.py 文件源码 项目:pykit 作者: baishancloud 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def trylock_or_exit(self, timeout=10):

        interval = 0.1
        n = int(timeout / interval) + 1
        flag = fcntl.LOCK_EX | fcntl.LOCK_NB

        for ii in range(n):

            fd = os.open(self.lockfile, os.O_RDWR | os.O_CREAT)

            fcntl.fcntl(fd, fcntl.F_SETFD,
                        fcntl.fcntl(fd, fcntl.F_GETFD, 0)
                        | fcntl.FD_CLOEXEC)

            try:
                fcntl.lockf(fd, flag)

                self.lockfp = os.fdopen(fd, 'w+r')
                break

            except IOError as e:
                os.close(fd)
                if e[0] == errno.EAGAIN:
                    time.sleep(interval)
                else:
                    raise

        else:
            logger.info("Failure acquiring lock %s" % (self.lockfile, ))
            sys.exit(1)

        logger.info("OK acquired lock %s" % (self.lockfile))
daemonize.py 文件源码 项目:pykit 作者: baishancloud 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def unlock(self):

        if self.lockfp is None:
            return

        fd = self.lockfp.fileno()
        fcntl.lockf(fd, fcntl.LOCK_UN)
        self.lockfp.close()
        self.lockfp = None
mailbox.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _unlock_file(f):
    """Unlock file f using lockf and dot locking."""
    if fcntl:
        fcntl.lockf(f, fcntl.LOCK_UN)
    if os.path.exists(f.name + '.lock'):
        os.remove(f.name + '.lock')
process_lock.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def trylock(self):
        fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
process_lock.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def unlock(self):
        fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
test.py 文件源码 项目:contrail-ansible-internal 作者: Juniper 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self):
        self.fh = None
        self.is_running = False
        try:
            self.fh = open(LOCK_PATH, 'w')
            fcntl.lockf(self.fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except EnvironmentError as err:
            if self.fh is not None:
                self.is_running = True
            else:
                raise
test.py 文件源码 项目:contrail-ansible-internal 作者: Juniper 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, traceback):
        if self.fh is not None:
            fcntl.lockf(self.fh, fcntl.LOCK_UN)
            self.fh.close()
            os.unlink(LOCK_PATH)
daemon.py 文件源码 项目:shadowsocksr 作者: shadowsocksr-backup 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0
daemon.py 文件源码 项目:ShadowSocks 作者: immqy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def write_pid_file(pid_file, pid):
    import fcntl
    import stat

    try:
        fd = os.open(pid_file, os.O_RDWR | os.O_CREAT,
                     stat.S_IRUSR | stat.S_IWUSR)
    except OSError as e:
        shell.print_exception(e)
        return -1
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    assert flags != -1
    flags |= fcntl.FD_CLOEXEC
    r = fcntl.fcntl(fd, fcntl.F_SETFD, flags)
    assert r != -1
    # There is no platform independent way to implement fcntl(fd, F_SETLK, &fl)
    # via fcntl.fcntl. So use lockf instead
    try:
        fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB, 0, 0, os.SEEK_SET)
    except IOError:
        r = os.read(fd, 32)
        if r:
            logging.error('already started at pid %s' % common.to_str(r))
        else:
            logging.error('already started')
        os.close(fd)
        return -1
    os.ftruncate(fd, 0)
    os.write(fd, common.to_bytes(str(pid)))
    return 0


问题


面经


文章

微信
公众号

扫码关注公众号