python类LOCK_UN的实例源码

utils.py 文件源码 项目:Qyoutube-dl 作者: lzambella 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _unlock_file(f):
        fcntl.flock(f, fcntl.LOCK_UN)
lock.py 文件源码 项目:lemongraph 作者: NationalSecurityAgency 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _update(self, mode):
        if mode is None:
            if self.locked:
                fcntl.lockf(self.fd, fcntl.LOCK_UN)
                self.locked = False
        elif self.mode is not mode or not self.locked:
            self.mode = mode
            self.locked = True
            for offset in self.locks:
                fcntl.lockf(self.fd, self.mode, 1, offset)
locks.py 文件源码 项目:Scrum 作者: prakharchoudhary 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def unlock(f):
            ret = fcntl.flock(_fd(f), fcntl.LOCK_UN)
            return (ret == 0)
locked_file.py 文件源码 项目:office-interoperability-tools 作者: milossramek 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def unlock_and_close(self):
      """Close and unlock the file using the fcntl.lockf primitive."""
      if self._locked:
        fcntl.lockf(self._fh.fileno(), fcntl.LOCK_UN)
      self._locked = False
      if self._fh:
        self._fh.close()
process_lock.py 文件源码 项目:deb-python-fasteners 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _unlock(lockfile):
        fcntl.lockf(lockfile, fcntl.LOCK_UN)
portalocker.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def unlock(file):
        fcntl.flock(file.fileno(), fcntl.LOCK_UN)
plock.py 文件源码 项目:aquests 作者: hansroh 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def release(self):
            fcntl.flock(self.handle, fcntl.LOCK_UN)
locks.py 文件源码 项目:django 作者: alexsukhrin 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def unlock(f):
            ret = fcntl.flock(_fd(f), fcntl.LOCK_UN)
            return (ret == 0)
scheduling.py 文件源码 项目:sesame-paste-noodle 作者: aissehust 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run(self):
        """
        THIS IS BLOCKING! CALL IT AT LAST!
        """
        with open(Job.lockFileName, 'w') as f:
            rv = fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
            print("job {} is running.".format(os.getpid()))
            f.write(str(os.getpid()) + '\n')
            f.flush()
            self.action()
            fcntl.lockf(f.fileno(), fcntl.LOCK_UN)
hetr_server.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def write_server_info(filename, port):
    pid = os.getpid()
    rank = MPI.COMM_WORLD.Get_rank()
    server_info = '{}:{}:{}:{}:{}'.format(LINE_TOKEN, rank, pid, port, LINE_TOKEN).strip()
    logger.debug("write_server_info: line %s, filename %s", server_info, filename)

    time.sleep(0.1 * rank)
    with open(filename, "a") as f:
        fcntl.lockf(f, fcntl.LOCK_EX)
        f.write(server_info + '\n')
        f.flush()
        os.fsync(f.fileno())
        fcntl.lockf(f, fcntl.LOCK_UN)
    return server_info
mpilauncher.py 文件源码 项目:ngraph 作者: NervanaSystems 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_rpc_port_by_rank(self, rank, num_servers):
        if self.mpirun_proc is None:
            raise RuntimeError("Launch mpirun_proc before reading of rpc ports")

        if self._rpc_ports is not None:
            return self._rpc_ports[rank]

        server_info_pattern = re.compile("^" + LINE_TOKEN +
                                         ":([\d]+):([\d]+):([\d]+):" +
                                         LINE_TOKEN + "$")
        self._tmpfile.seek(0)
        while True:
            fcntl.lockf(self._tmpfile, fcntl.LOCK_SH)
            line_count = sum(1 for line in self._tmpfile if server_info_pattern.match(line))
            self._tmpfile.seek(0)
            fcntl.lockf(self._tmpfile, fcntl.LOCK_UN)

            if line_count == num_servers:
                break
            else:
                time.sleep(0.1)

        server_infos = [tuple([int(server_info_pattern.match(line).group(1)),
                               int(server_info_pattern.match(line).group(3))])
                        for line in self._tmpfile]
        server_infos = sorted(server_infos, key=lambda x: x[0])
        self._rpc_ports = [row[1] for row in server_infos]
        logger.debug("get_rpc_ports: ports (in MPI rank order): %s", self._rpc_ports)
        self._tmpfile.close()
        return self._rpc_ports[rank]
utils.py 文件源码 项目:youtube_downloader 作者: aksinghdce 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _unlock_file(f):
            fcntl.flock(f, fcntl.LOCK_UN)
experiment.py 文件源码 项目:chi 作者: rmst 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run_chiboard(self):
    pass
    import subprocess
    from chi import board
    from chi.board import CHIBOARD_HOME, MAGIC_PORT

    port = None
    start = False
    cbc = join(CHIBOARD_HOME, CONFIG_NAME)
    if os.path.isfile(cbc):
      with open(cbc) as f:
        import fcntl
        try:
          fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
          start = True
          fcntl.flock(f, fcntl.LOCK_UN)
        except (BlockingIOError, OSError):  # chiboard is running
          try:
            data = json.load(f)
            port = data.get('port')
          except json.JSONDecodeError:
            port = None

    else:
      start = True

    if start:
      from chi.board import main
      chiboard = main.__file__
      subprocess.check_call([sys.executable, chiboard, '--port', str(MAGIC_PORT), '--daemon'])
      port = MAGIC_PORT

    if port is None:
      logger.warning('chiboard seems to be running but port could not be read from its config')
    else:
      logger.info(f"{self.f.__name__} started. Check progress at http://localhost:{port}/exp/#/local{self.logdir}")
util.py 文件源码 项目:chi 作者: rmst 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def release(self):
    fcntl.flock(self._f, fcntl.LOCK_UN)
    self._f.close()


# def scalar_summaries(prefix='', **kwargs):
#   vs = [tf.Summary.Value(tag=prefix + '/' + name, simple_value=value) for name, value in kwargs.items()]
#   s = tf.Summary(value=vs)
#   return s
config.py 文件源码 项目:CPU-Manager-for-Kubernetes 作者: Intel-Corp 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __release(self):
        self.timer.cancel()
        self.timer = None
        fcntl.flock(self.fd, fcntl.LOCK_UN)
        os.close(self.fd)
helpers.py 文件源码 项目:appetite 作者: Bridgewater 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __exit__(self, type, value, tb): # pylint: disable=redefined-builtin
        """Exit RunSingleInstance class
        :return: None
        """
        try:
            if not self.__is_running:
                fcntl.lockf(self.__filelock, fcntl.LOCK_UN)
                self.__filelock.close()
                os.unlink(self.__lockfile)
        except Exception as err:
            logger.error("Error unlocking single instance file", error=err.message)
synchronization.py 文件源码 项目:MCSManager-fsmodule 作者: Suwings 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _release_all_locks(self):
        filedescriptor = self._filedesc
        if filedescriptor is not None:
            fcntl.flock(filedescriptor, fcntl.LOCK_UN)
            os.close(filedescriptor)
            self._filedescriptor.remove()
portalocker.py 文件源码 项目:true_review_web2py 作者: lucadealfaro 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def unlock(file):
        fcntl.flock(file.fileno(), fcntl.LOCK_UN)
utils.py 文件源码 项目:optimalvibes 作者: littlemika 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def _unlock_file(f):
            fcntl.flock(f, fcntl.LOCK_UN)
ftdi.py 文件源码 项目:bch-firmware-tool 作者: bigclownlabs 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _unlock(self):
        if not fcntl or not self.ser:
            return
        fcntl.flock(self.ser.fileno(), fcntl.LOCK_UN)
        logging.debug('_unlock')


问题


面经


文章

微信
公众号

扫码关注公众号