python类mknod()的实例源码

api.py 文件源码 项目:geekcloud 作者: Mr-Linus 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def set_log(level, filename='jumpserver.log'):
    """
    return a log file object
    ??????log??
    """
    log_file = os.path.join(LOG_DIR, filename)
    if not os.path.isfile(log_file):
        os.mknod(log_file)
        os.chmod(log_file, 0777)
    log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR,
                       'critical': logging.CRITICAL}
    logger_f = logging.getLogger('jumpserver')
    logger_f.setLevel(logging.DEBUG)
    fh = logging.FileHandler(log_file)
    fh.setLevel(log_level_total.get(level, logging.DEBUG))
    formatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
    fh.setFormatter(formatter)
    logger_f.addHandler(fh)
    return logger_f
set_configs.py 文件源码 项目:kolla-kubernetes-personal 作者: rthallisey 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def execute_config_strategy():
    config_strategy = os.environ.get("KOLLA_CONFIG_STRATEGY")
    LOG.info("Kolla config strategy set to: %s", config_strategy)
    config = load_config()

    if config_strategy == "COPY_ALWAYS":
        copy_config(config)
    elif config_strategy == "COPY_ONCE":
        if os.path.exists('/configured'):
            LOG.info("The config strategy prevents copying new configs")
            sys.exit(0)
        else:
            copy_config(config)
            os.mknod('/configured')
    else:
        LOG.error('KOLLA_CONFIG_STRATEGY is not set properly')
        sys.exit(1)
api.py 文件源码 项目:geekcloud 作者: GeekCloud-Team 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def set_log(level, filename='jumpserver.log'):
    """
    return a log file object
    ??????log??
    """
    log_file = os.path.join(LOG_DIR, filename)
    if not os.path.isfile(log_file):
        os.mknod(log_file)
        os.chmod(log_file, 0777)
    log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR,
                       'critical': logging.CRITICAL}
    logger_f = logging.getLogger('jumpserver')
    logger_f.setLevel(logging.DEBUG)
    fh = logging.FileHandler(log_file)
    fh.setLevel(log_level_total.get(level, logging.DEBUG))
    formatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
    fh.setFormatter(formatter)
    logger_f.addHandler(fh)
    return logger_f
common.py 文件源码 项目:spider 作者: shancang 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def set_log(level, filename='spider.log'):
    """
    return a log file object
    ??????log??
    """
    if not os.path.isdir(LOG_DIR):
        os.mkdir(LOG_DIR)
    log_file = os.path.join(LOG_DIR, filename)
    if not os.path.isfile(log_file):
        os.mknod(log_file)
        os.chmod(log_file, 0777)
    log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR,
                       'critical': logging.CRITICAL}
    logger_f = logging.getLogger('spider')
    logger_f.setLevel(logging.DEBUG)
    fh = logging.FileHandler(log_file,'a')
    fh.setLevel(log_level_total.get(level, logging.DEBUG))
    formatter = logging.Formatter('%(asctime)s  %(filename)s  [line:%(lineno)d] %(levelname)s  %(message)s')
    fh.setFormatter(formatter)
    logger_f.addHandler(fh)
    keep_fds = [fh.stream.fileno()]
    return logger_f,keep_fds
android_dev_diag_monitor.py 文件源码 项目:mobileinsight-core 作者: mobile-insight 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _mkfifo(self, fifo_path):
        try:
            if os.path.exists(fifo_path):
                # self._run_shell_cmd("rm %s " % fifo_path, wait=True)
                os.remove(fifo_path)
            os.mknod(fifo_path, 0o666 | stat.S_IFIFO)
        except OSError as err:
            if err.errno == errno.EEXIST:   # if already exists, skip this step
                pass
                # print "Fifo file already exists, skipping..."
            elif err.errno == errno.EPERM:  # not permitted, try shell command
                # print "Not permitted to create fifo file, try to switch to
                # root..."
                retcode = self._run_shell_cmd(
                    "mknod %s p" %
                    fifo_path, wait=True)
                if retcode != 0:
                    raise RuntimeError("mknod returns %s" % str(retcode))
            else:
                raise err
test_utils.py 文件源码 项目:daisy 作者: opnfv 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_check_file_exists(mock_err_exit, tmpdir, test_file_name, include_dirname):
    if include_dirname:
        file_path = os.path.join(tmpdir.dirname, tmpdir.basename, test_file_name)
        if test_file_name == 'exist_file':
            os.mknod(file_path)
    else:
        file_path = test_file_name
    check_file_exists(file_path)
    if include_dirname is True:
        if test_file_name == 'exist_file':
            mock_err_exit.assert_not_called()
        else:
            mock_err_exit.assert_called_once()
    if include_dirname is False:
        mock_err_exit.assert_called_once()
    tmpdir.remove()
test_utils.py 文件源码 项目:daisy 作者: opnfv 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_make_file_executable(mock_err_exit, mock_getstatusoutput,
                              tmpdir, test_file_name,
                              status, include_dir):
    if include_dir:
        file_path = os.path.join(tmpdir.dirname, tmpdir.basename, test_file_name)
    else:
        file_path = test_file_name
    if test_file_name == 'no_exe_file':
        os.mknod(file_path)
    if test_file_name == 'exe_file':
        os.mknod(file_path, 0700)
    output = 'test_out'
    mock_getstatusoutput.return_value = (status, output)
    make_file_executable(file_path)
    if test_file_name == 'exe_file':
        mock_err_exit.assert_not_called()
        assert os.access(file_path, os.X_OK)
    if test_file_name == 'no_exe_file' and status is False:
        mock_err_exit.assert_not_called()
    if test_file_name == 'no_exe_file' and status is True:
        mock_err_exit.assert_called()
    if test_file_name == 'no_exist_file':
        mock_err_exit.assert_called()
    tmpdir.remove()
api.py 文件源码 项目:server 作者: sgr-smile2015 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def set_log(level, filename='jumpserver.log'):
    """
    return a log file object
    ??????log??
    """
    log_file = os.path.join(LOG_DIR, filename)
    if not os.path.isfile(log_file):
        os.mknod(log_file)
        os.chmod(log_file, 0777)
    log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR,
                       'critical': logging.CRITICAL}
    logger_f = logging.getLogger('jumpserver')
    logger_f.setLevel(logging.DEBUG)
    fh = logging.FileHandler(log_file)
    fh.setLevel(log_level_total.get(level, logging.DEBUG))
    formatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
    fh.setFormatter(formatter)
    logger_f.addHandler(fh)
    return logger_f
init.py 文件源码 项目:chunky-pipes 作者: djf604 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def make_chunky_home(chunky_home_root):
        try:
            if not os.path.exists(os.path.join(chunky_home_root, '.chunky')):
                os.mkdir(os.path.join(chunky_home_root, '.chunky'))
            if not os.path.exists(os.path.join(chunky_home_root, '.chunky', 'pipelines')):
                os.mkdir(os.path.join(chunky_home_root, '.chunky', 'pipelines'))
            if not os.path.isfile(os.path.join(chunky_home_root, '.chunky', 'pipelines', '__init__.py')):
                os.mknod(os.path.join(chunky_home_root, '.chunky', 'pipelines', '__init__.py'), 0o644)
            if not os.path.exists(os.path.join(chunky_home_root, '.chunky', 'configs')):
                os.mkdir(os.path.join(chunky_home_root, '.chunky', 'configs'))

            sys.stdout.write('ChunkyPipes successfully initialized at {}\n'.format(chunky_home_root))

            if chunky_home_root != os.path.expanduser('~'):
                sys.stdout.write('Please set a CHUNKY_HOME environment variable to {}\n'.format(chunky_home_root))
        except OSError as e:
            sys.stderr.write('An error occurred initializing ChunkyPipes at {}.\n{}\n'.format(
                chunky_home_root,
                e.message
            ))
test_posix.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_mknod_dir_fd(self):
        # Test using mknodat() to create a FIFO (the only use specified
        # by POSIX).
        support.unlink(support.TESTFN)
        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
        f = posix.open(posix.getcwd(), posix.O_RDONLY)
        try:
            posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
        except OSError as e:
            # Some old systems don't allow unprivileged users to use
            # mknod(), or only support creating device nodes.
            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
        else:
            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
        finally:
            posix.close(f)
set_configs.py 文件源码 项目:kolla-images 作者: coolsvap 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def execute_config_strategy():
    config_strategy = os.environ.get("KOLLA_CONFIG_STRATEGY")
    LOG.info("Kolla config strategy set to: %s", config_strategy)
    config = load_config()

    if config_strategy == "COPY_ALWAYS":
        copy_config(config)
    elif config_strategy == "COPY_ONCE":
        if os.path.exists('/configured'):
            LOG.info("The config strategy prevents copying new configs")
            sys.exit(0)
        else:
            copy_config(config)
            os.mknod('/configured')
    else:
        LOG.error('KOLLA_CONFIG_STRATEGY is not set properly')
        sys.exit(1)
chat.py 文件源码 项目:auto_chat 作者: SmallPond 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_tasklist():
    '''
    :return: ???????????List
    '''
    taskList = []
    try:
        fileHander = open(_task_file, 'r', encoding='utf-8')      # ????????
    except IOError as e:
        print('????{0}???'.format(_task_file))
        print("## ????{0}??...".format(_task_file))
        os.mknod(_task_file)                                               # ????
        print("??????'{0}'?????????".format(_task_file))
        print("???????????README.txt??")

    fileList = fileHander.readlines()
    # ????
    for line in fileList:
        if line[0] == '#':                                   # ????????‘#’??????
            continue
        else:
            if len(line) > 6:                                # ?? ?? ?? ???6???
                taskList.append(line.split())                # ?????
            else:
                continue                                    # ????????????
    return taskList
test_posix.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_mknod(self):
        # Test using mknod() to create a FIFO (the only use specified
        # by POSIX).
        support.unlink(support.TESTFN)
        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
        try:
            posix.mknod(support.TESTFN, mode, 0)
        except OSError as e:
            # Some old systems don't allow unprivileged users to use
            # mknod(), or only support creating device nodes.
            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
        else:
            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))

        # Keyword arguments are also supported
        support.unlink(support.TESTFN)
        try:
            posix.mknod(path=support.TESTFN, mode=mode, device=0,
                dir_fd=None)
        except OSError as e:
            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
test_posix.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_mknod_dir_fd(self):
        # Test using mknodat() to create a FIFO (the only use specified
        # by POSIX).
        support.unlink(support.TESTFN)
        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
        f = posix.open(posix.getcwd(), posix.O_RDONLY)
        try:
            posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
        except OSError as e:
            # Some old systems don't allow unprivileged users to use
            # mknod(), or only support creating device nodes.
            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
        else:
            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
        finally:
            posix.close(f)
rpath.py 文件源码 项目:rdiff-backup 作者: sol1 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def makedev(self, type, major, minor):
        """Make a special file with specified type, and major/minor nums"""
        if type == 'c':
            datatype = 'chr'
            mode = stat.S_IFCHR | 0600
        elif type == 'b':
            datatype = 'blk'
            mode = stat.S_IFBLK | 0600
        else: raise RPathException
        try: self.conn.os.mknod(self.path, mode, self.conn.os.makedev(major, minor))
        except (OSError, AttributeError), e:
            if isinstance(e, AttributeError) or e.errno == errno.EPERM:
                # AttributeError will be raised by Python 2.2, which
                # doesn't have os.mknod
                log.Log("unable to mknod %s -- using touch instead" % self.path, 4)
                self.touch()
        self.setdata()
api.py 文件源码 项目:jumpsever 作者: jacksonyoudi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def set_log(level, filename='jumpserver.log'):
    """
    return a log file object
    ??????log??
    """
    log_file = os.path.join(LOG_DIR, filename)
    if not os.path.isfile(log_file):
        os.mknod(log_file)
        os.chmod(log_file, 0777)
    log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR,
                       'critical': logging.CRITICAL}
    logger_f = logging.getLogger('jumpserver')
    logger_f.setLevel(logging.DEBUG)
    fh = logging.FileHandler(log_file)
    fh.setLevel(log_level_total.get(level, logging.DEBUG))
    formatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
    fh.setFormatter(formatter)
    logger_f.addHandler(fh)
    return logger_f
test_posix.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def test_mknod_dir_fd(self):
        # Test using mknodat() to create a FIFO (the only use specified
        # by POSIX).
        support.unlink(support.TESTFN)
        mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
        f = posix.open(posix.getcwd(), posix.O_RDONLY)
        try:
            posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
        except OSError as e:
            # Some old systems don't allow unprivileged users to use
            # mknod(), or only support creating device nodes.
            self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
        else:
            self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
        finally:
            posix.close(f)
set_configs.py 文件源码 项目:kolla 作者: sieve-microservices 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def execute_config_strategy(config):
    config_strategy = os.environ.get("KOLLA_CONFIG_STRATEGY")
    LOG.info("Kolla config strategy set to: %s", config_strategy)
    if config_strategy == "COPY_ALWAYS":
        copy_config(config)
        handle_permissions(config)
    elif config_strategy == "COPY_ONCE":
        if os.path.exists('/configured'):
            raise ImmutableConfig(
                "The config strategy prevents copying new configs",
                exit_code=0)
        else:
            copy_config(config)
            handle_permissions(config)
            os.mknod('/configured')
    else:
        raise InvalidConfig('KOLLA_CONFIG_STRATEGY is not set properly')
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def makedev(self, tarinfo, targetpath):
        """Make a character or block device called targetpath.
        """
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError("special devices not supported by system")

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        os.mknod(targetpath, mode,
                 os.makedev(tarinfo.devmajor, tarinfo.devminor))
tarfile.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def makedev(self, tarinfo, targetpath):
        """Make a character or block device called targetpath.
        """
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError("special devices not supported by system")

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        os.mknod(targetpath, mode,
                 os.makedev(tarinfo.devmajor, tarinfo.devminor))
tarfile.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def makedev(self, tarinfo, targetpath):
        """Make a character or block device called targetpath.
        """
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError("special devices not supported by system")

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        os.mknod(targetpath, mode,
                 os.makedev(tarinfo.devmajor, tarinfo.devminor))
tarfile.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def makedev(self, tarinfo, targetpath):
        """Make a character or block device called targetpath.
        """
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError("special devices not supported by system")

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        os.mknod(targetpath, mode,
                 os.makedev(tarinfo.devmajor, tarinfo.devminor))
tarfile.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def makedev(self, tarinfo, targetpath):
        """Make a character or block device called targetpath.
        """
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError("special devices not supported by system")

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        os.mknod(targetpath, mode,
                 os.makedev(tarinfo.devmajor, tarinfo.devminor))
tarfile.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def makedev(self, tarinfo, targetpath):
        """Make a character or block device called targetpath.
        """
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError("special devices not supported by system")

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        os.mknod(targetpath, mode,
                 os.makedev(tarinfo.devmajor, tarinfo.devminor))
blobfs.py 文件源码 项目:blobfs 作者: mbartoli 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def mknod(self, path, mode, dev):
        return os.mknod(self._full_path(path), mode, dev)
tarfile.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def makedev(self, tarinfo, targetpath):
        """Make a character or block device called targetpath.
        """
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError("special devices not supported by system")

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        os.mknod(targetpath, mode,
                 os.makedev(tarinfo.devmajor, tarinfo.devminor))
tarfile.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def makedev(self, tarinfo, targetpath):
        """Make a character or block device called targetpath.
        """
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError("special devices not supported by system")

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        os.mknod(targetpath, mode,
                 os.makedev(tarinfo.devmajor, tarinfo.devminor))
tarfile.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def makedev(self, tarinfo, targetpath):
        """Make a character or block device called targetpath.
        """
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError("special devices not supported by system")

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        os.mknod(targetpath, mode,
                 os.makedev(tarinfo.devmajor, tarinfo.devminor))
tarfile.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def makedev(self, tarinfo, targetpath):
        """Make a character or block device called targetpath.
        """
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError("special devices not supported by system")

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        os.mknod(targetpath, mode,
                 os.makedev(tarinfo.devmajor, tarinfo.devminor))
tarfile.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def makedev(self, tarinfo, targetpath):
        """Make a character or block device called targetpath.
        """
        if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
            raise ExtractError("special devices not supported by system")

        mode = tarinfo.mode
        if tarinfo.isblk():
            mode |= stat.S_IFBLK
        else:
            mode |= stat.S_IFCHR

        os.mknod(targetpath, mode,
                 os.makedev(tarinfo.devmajor, tarinfo.devminor))


问题


面经


文章

微信
公众号

扫码关注公众号