python类flock()的实例源码

pidfile.py 文件源码 项目:pscheduler 作者: perfsonar 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __enter__(self):
        if self.path is None:
            return self.pidfile

        self.pidfile = open(self.path, "a+")
        try:
            fcntl.flock(self.pidfile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            self.pidfile = None
            raise SystemExit("Already running according to " + self.path)
        self.pidfile.seek(0)
        self.pidfile.truncate()
        self.pidfile.write(str(os.getpid()))
        self.pidfile.flush()
        self.pidfile.seek(0)
        return self.pidfile
cache.py 文件源码 项目:jenkins-epo 作者: peopledoc 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def open(self):
        if self.opened:
            return

        self.lock = open(SETTINGS.CACHE_PATH + '.lock', 'ab')
        try:
            fcntl.flock(self.lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
            mode = 'c'
        except IOError:
            logger.warn("Cache locked, using read-only")
            mode = 'r'
            self.lock.close()
            self.lock = None

        try:
            self.storage = shelve.open(SETTINGS.CACHE_PATH, mode)
        except Exception as e:
            if mode != 'c':
                raise
            logger.warn("Dropping corrupted cache on %s", e)
            self.lock.truncate(0)
            self.storage = shelve.open(SETTINGS.CACHE_PATH, mode)
        self.opened = True
storeProxy.py 文件源码 项目:searchForAll 作者: MemoryAndDream 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def store(proxys):
    pidfile = open(proxyFilePath, "a")
    for i in range(10):
        try:
            fcntl.flock(pidfile, fcntl.LOCK_EX | fcntl.LOCK_NB)  # LOCK_EX ???:????????????????????????
            # LOCK_NB ????: ?????????????????????????????????????
            if type(proxys) == type([]):
                for proxy in proxys:
                    pidfile.write(proxy + '\n')
            else:
                pidfile.write(proxys + '\n')
            pidfile.close()
            break
        except:
            # print "another instance is running..."
            time.sleep(3)
queue.py 文件源码 项目:temboard-agent 作者: dalibo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_content_all_messages(self):
        """
        Get all messages
        """
        try:
            buffers = list()
            with open(self.file_path, 'r') as fd:
                fcntl.flock(fd, fcntl.LOCK_SH)
                for row_msg in fd.readlines():
                    try:
                        msg = self.parse_row_message(row_msg)
                        buffers.append(json.loads(msg.content))
                    except Exception:
                        pass
                fcntl.flock(fd, fcntl.LOCK_UN)
            return buffers
        except Exception:
            return
queue.py 文件源码 项目:temboard-agent 作者: dalibo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def push(self, message):
        """ Push a new message. """
        if self.overflow_mode == 'drop':
            if self.max_length > -1 and self.get_length() >= self.max_length:
                return
            if self.max_size > -1 and self.get_size() >= self.max_size:
                return

        with open(self.file_path, 'a') as fd:
            # Let's hold an exclusive lock.
            fcntl.flock(fd, fcntl.LOCK_EX)
            fd.write(message.serialize())
            fcntl.flock(fd, fcntl.LOCK_UN)
            fd.close()

        if self.overflow_mode == 'slide':
            if self.max_size == -1 and self.max_length > -1:
                while self.get_length() > self.max_length:
                    self.shift()
            elif self.max_size > -1 and self.max_length == -1:
                while self.get_size() > self.max_size:
                    self.shift()
engine.py 文件源码 项目:cobra 作者: wufeifei 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def init_list(self, data=None):
        """
        Initialize asid_list file.
        :param data: list or a string
        :return:
        """
        file_path = os.path.join(running_path, '{sid}_list'.format(sid=self.sid))
        if not os.path.exists(file_path):
            if isinstance(data, list):
                with open(file_path, 'w') as f:
                    fcntl.flock(f, fcntl.LOCK_EX)
                    f.write(json.dumps({
                        'sids': {},
                        'total_target_num': len(data),
                    }))
            else:
                with open(file_path, 'w') as f:
                    fcntl.flock(f, fcntl.LOCK_EX)
                    f.write(json.dumps({
                        'sids': {},
                        'total_target_num': 1,
                    }))
engine.py 文件源码 项目:cobra 作者: wufeifei 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def list(self, data=None):
        """
        Update asid_list file.
        :param data: tuple (s_sid, target)
        :return:
        """
        file_path = os.path.join(running_path, '{sid}_list'.format(sid=self.sid))
        if data is None:
            with open(file_path, 'r') as f:
                fcntl.flock(f, fcntl.LOCK_EX)
                result = f.readline()
                return json.loads(result)
        else:
            with open(file_path, 'r+') as f:  # w+ causes a file reading bug
                fcntl.flock(f, fcntl.LOCK_EX)
                result = f.read()
                if result == '':
                    result = {'sids': {}}
                else:
                    result = json.loads(result)
                result['sids'][data[0]] = data[1]
                f.seek(0)
                f.truncate()
                f.write(json.dumps(result))
lock.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def lock_path(path, timeout=0):
    fd = os.open(path, os.O_CREAT)
    flags = fcntl.fcntl(fd, fcntl.F_GETFD, 0)
    flags |= fcntl.FD_CLOEXEC
    fcntl.fcntl(fd, fcntl.F_SETFD, flags)

    started = time.time()

    while True:
        try:
            fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            if started < time.time() - timeout:
                raise LockError("Couldn't obtain lock")
        else:
            break
        time.sleep(0.1)

    def unlock_path():
        fcntl.flock(fd, fcntl.LOCK_UN)
        os.close(fd)

    return unlock_path
flock_tool.py 文件源码 项目:node-ninja 作者: CodeJockey 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ExecFlock(self, lockfile, *cmd_list):
    """Emulates the most basic behavior of Linux's flock(1)."""
    # Rely on exception handling to report errors.
    # Note that the stock python on SunOS has a bug
    # where fcntl.flock(fd, LOCK_EX) always fails
    # with EBADF, that's why we use this F_SETLK
    # hack instead.
    fd = os.open(lockfile, os.O_WRONLY|os.O_NOCTTY|os.O_CREAT, 0666)
    if sys.platform.startswith('aix'):
      # Python on AIX is compiled with LARGEFILE support, which changes the
      # struct size.
      op = struct.pack('hhIllqq', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0)
    else:
      op = struct.pack('hhllhhl', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0)
    fcntl.fcntl(fd, fcntl.F_SETLK, op)
    return subprocess.call(cmd_list)
myirc.py 文件源码 项目:minqlx-plugins 作者: dsverdlo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, address, nickname, msg_handler, perform_handler, raw_handler=None, stop_event=threading.Event(), ident=None):
        split_addr = address.split(":")
        self.host = split_addr[0]
        self.port = int(split_addr[1]) if len(split_addr) > 1 else 6667
        self.nickname = nickname
        self.msg_handler = msg_handler
        self.perform_handler = perform_handler
        self.raw_handler = raw_handler
        self.stop_event = stop_event
        self.reader = None
        self.writer = None
        self.server_options = {}
        super().__init__()

        self._lock = threading.Lock()
        self._old_nickname = self.nickname

        # support for ident server oidentd 
        self.idnt       = ident if ident else nickname
        self.ifile_buf  = None
        self.flock      = FLock(LOCKFILE)
myirc.py 文件源码 项目:minqlx-plugins 作者: dsverdlo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def writeIdentFile(self):
        """Write self.ident to oidentd's user cfg file but
        keep any entries for restoring them later."""

        if not os.path.isfile(IDENTFILE):
            return

        # In the process of connecting, acquire the lock 
        self.flock.acquire()
        try:
            with open(IDENTFILE, 'r') as ifile:
                self.ifile_buf = ifile.readlines()
            with open(IDENTFILE, 'w') as ifile:
                ifile.write(IDENTFMT.format(self.idnt))
        except Exception:
            minqlx.log_exception()
watch.py 文件源码 项目:OperatingSystemLab 作者: wzc1995 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def edit_hosts():
    f = os.popen('/usr/local/bin/etcdctl ls --sort --recursive /hosts')
    hosts_str = f.read()


    hosts_arr = hosts_str.strip('\n').split('\n')
    hosts_fd = open('/tmp/hosts', 'w')

    fcntl.flock(hosts_fd.fileno(), fcntl.LOCK_EX)

    hosts_fd.write('127.0.0.1 localhost cluster' + '\n')
    i = 0
    for host_ip in hosts_arr:
        host_ip = host_ip[host_ip.rfind('/') + 1:]
        if host_ip[0] == '0':
            hosts_fd.write(host_ip[1:] + ' cluster-' + str(i) + '\n')
        else:
            hosts_fd.write(host_ip + ' cluster-' + str(i) + '\n')
        i += 1

    hosts_fd.flush()
    os.system('/bin/cp /tmp/hosts /etc/hosts')
    hosts_fd.close()
utils.py 文件源码 项目:django-gateone 作者: jimmy201602 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def write_pid(path):
    """Writes our PID to *path*."""
    try:
        pid = os.getpid()
        with io.open(path, mode='w', encoding='utf-8') as pidfile:
            # Get a non-blocking exclusive lock
            fcntl.flock(pidfile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
            pidfile.seek(0)
            pidfile.truncate(0)
            pidfile.write(unicode(pid))
    except:
        logging.error(_("Could not write PID file: %s") % path)
        raise # This raises the original exception
    finally:
        try:
            pidfile.close()
        except:
            pass
flock_tool.py 文件源码 项目:sublime-bem-create 作者: bem-tools 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ExecFlock(self, lockfile, *cmd_list):
    """Emulates the most basic behavior of Linux's flock(1)."""
    # Rely on exception handling to report errors.
    # Note that the stock python on SunOS has a bug
    # where fcntl.flock(fd, LOCK_EX) always fails
    # with EBADF, that's why we use this F_SETLK
    # hack instead.
    fd = os.open(lockfile, os.O_WRONLY|os.O_NOCTTY|os.O_CREAT, 0666)
    if sys.platform.startswith('aix'):
      # Python on AIX is compiled with LARGEFILE support, which changes the
      # struct size.
      op = struct.pack('hhIllqq', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0)
    else:
      op = struct.pack('hhllhhl', fcntl.F_WRLCK, 0, 0, 0, 0, 0, 0)
    fcntl.fcntl(fd, fcntl.F_SETLK, op)
    return subprocess.call(cmd_list)
lock.py 文件源码 项目:karton 作者: karton 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def release(self):
        '''
        Release a previously acquired lock.
        '''
        assert self._locked
        assert self._lock_file

        # Note that this actually leaves the lock file around, but deleting it without a race
        # is not trivial.
        fcntl.flock(self._lock_file, fcntl.LOCK_UN)
        verbose('Lock "%s" released.' % self._lock_file_path)

        self._locked = False

        self._lock_file.close()
        self._lock_file = None
Quads.py 文件源码 项目:quads 作者: redhat-performance 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def write_data(self):
        if self.config_newer_than_data():
            self.read_data()
            return False
        else:
            try:
                self.data = {"clouds":self.quads.clouds.data, "hosts":self.quads.hosts.data, "history":self.quads.history.data, "cloud_history":self.quads.cloud_history.data}
                with open(self.config, 'w') as yaml_file:
                    fcntl.flock(yaml_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
                    yaml_file.write(yaml.dump(self.data, default_flow_style=False))
                    fcntl.flock(yaml_file, fcntl.LOCK_UN)
                self.read_data()
                return True
            except Exception, ex:
                self.logger.error("There was a problem with your file %s" % ex)
                return False
common.py 文件源码 项目:ops_agent 作者: sjqzhang 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def getLock(self,lock_path,force=False,timeout=30,filename="easyops.lock"):
        import fcntl
        lockFile = os.path.join(lock_path,filename)
        #fp = open(lockFile,'w')
        try:
            if os.path.isfile(lockFile):
                os.chmod(lockFile, 0o777)
        except:
            pass
        self.fp[lockFile] = open(lockFile,'w')
        count = 0
        while True:
            if count > timeout:
                return False
            count += 1
            try:
                fcntl.flock(self.fp[lockFile],fcntl.LOCK_EX|fcntl.LOCK_NB)
            except IOError:
                if force == True:
                    return True
                gevent.sleep(1)
            else:
                return True
onion.py 文件源码 项目:ooniprobe-debian 作者: TheTorProject 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def is_tor_data_dir_usable(tor_data_dir):
    """
    Checks if the Tor data dir specified is usable. This means that
     it is not being locked and we have permissions to write to it.
    """
    if not os.path.exists(tor_data_dir):
        return True

    try:
        fcntl.flock(open(os.path.join(tor_data_dir, 'lock'), 'w'),
                    fcntl.LOCK_EX | fcntl.LOCK_NB)
        return True
    except (IOError, OSError) as err:
        if err.errno == errno.EACCES:
            # Permission error
            return False
        elif err.errno == errno.EAGAIN:
            # File locked
            return False
ceph_rgw.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
    exit_status = 1
    try:
        args = parse_args()
        # Make sure the exporter is only running once.
        lock_file = '/var/lock/{}.lock'.format(os.path.basename(sys.argv[0]))
        lock_fd = os.open(lock_file, os.O_CREAT)
        lock_success = False
        try:
            fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
            lock_success = True
        except IOError:
            msg = 'Failed to export metrics, another instance is running.'
            syslog.syslog(syslog.LOG_INFO, msg)
            sys.stderr.write(msg + '\n')
        if lock_success:
            # Create a new registry, otherwise unwanted default collectors are
            # added automatically.
            registry = prometheus_client.CollectorRegistry()
            # Register our own collector and write metrics to STDOUT.
            registry.register(CephRgwCollector(**vars(args)))
            sys.stdout.write(prometheus_client.generate_latest(registry))
            sys.stdout.flush()
            # Unlock the lock file.
            fcntl.flock(lock_fd, fcntl.LOCK_UN)
            exit_status = 0
    except Exception as e:
        syslog.syslog(syslog.LOG_ERR, str(e))
    # Cleanup
    os.close(lock_fd)
    if lock_success:
        try:
            os.unlink(lock_file)
        except:
            pass
    sys.exit(exit_status)
recipe-577404.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _lockonly(file):
    _msg('got file #', file.fileno())
    try:
        flock(file, LOCK_EX | LOCK_NB)
    except IOError:
        _msg('failed to lock')
        return False
    else:
        _msg('locked successfully')
        return True
recipe-577404.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def lockfile(file):
    "flock a given file, then unflock it immediately"
    if _lockonly(file):
        flock(file, LOCK_UN)

# Options
recipe-577404.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def main(program, option='', path='test.flock'):
    "Do one of the tests or print a short help"
    flocktests = globals()
    option = option.lstrip('-')
    if option and (option in OPTIONS):
        function = flocktests[option]
        function(path)
    else:
        _help(__doc__.lstrip())
        _help('Usage: {0} OPTION [PATH]'.format(basename(program)),
              'Default PATH: test.flock', 'OPTIONS:',
              *('-{0}  {1}'.format(option, flocktests[option].__doc__)
                for option in OPTIONS))
recipe-576891.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def lock (self):
        '''
        Creates and holds on to the lock file with exclusive access.
        Returns True if lock successful, False if it is not, and raises
        an exception upon operating system errors encountered creating the
        lock file.
        '''
        try:
            #
            # Create or else open and trucate lock file, in read-write mode.
            #
            # A crashed app might not delete the lock file, so the
            # os.O_CREAT | os.O_EXCL combination that guarantees
            # atomic create isn't useful here.  That is, we don't want to
            # fail locking just because the file exists.
            #
            # Could use os.O_EXLOCK, but that doesn't exist yet in my Python
            #
            self.lockfd = os.open (self.lockfile,
                                   os.O_TRUNC | os.O_CREAT | os.O_RDWR)

            # Acquire exclusive lock on the file, but don't block waiting for it
            fcntl.flock (self.lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB)

            # Writing to file is pointless, nobody can see it
            os.write (self.lockfd, "My Lockfile")

            return True
        except (OSError, IOError), e:
            # Lock cannot be acquired is okay, everything else reraise exception
            if e.errno in (errno.EACCES, errno.EAGAIN):
                return False
            else:
                raise
recipe-577911.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __enter__(self):
        self.pidfile = open(self.path, "a+")
        try:
            fcntl.flock(self.pidfile.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError:
            raise SystemExit("Already running according to " + self.path)
        self.pidfile.seek(0)
        self.pidfile.truncate()
        self.pidfile.write(str(os.getpid()))
        self.pidfile.flush()
        self.pidfile.seek(0)
        return self.pidfile
services.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def lock_file_nonblocking(fileobj):
    # Use fcntl.flock instead of fcntl.lockf. lockf on pypy 1.7 seems
    # to ignore existing locks.

    try:
        fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
    except IOError, ioe:
        if ioe.errno not in (errno.EACCES, errno.EAGAIN):
            raise
        return False
    return True
services.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def unlock_file(fileobj):
    fcntl.flock(fileobj, fcntl.LOCK_UN)
maildirbot.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def lockfile(filename):
    with open(filename, "wb") as opened:
        fd = opened.fileno()
        try:
            fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except IOError as ioe:
            if ioe.errno not in (errno.EACCES, errno.EAGAIN):
                raise
            yield False
        else:
            try:
                yield True
            finally:
                fcntl.flock(fd, fcntl.LOCK_UN)
locks.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def lock(f, flags):
            ret = fcntl.flock(_fd(f), flags)
            return (ret == 0)
locks.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def unlock(f):
            ret = fcntl.flock(_fd(f), fcntl.LOCK_UN)
            return (ret == 0)
locks.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def lock(f, flags):
            ret = fcntl.flock(_fd(f), flags)
            return (ret == 0)


问题


面经


文章

微信
公众号

扫码关注公众号