def set_log(level, filename='jumpserver.log'):
"""
return a log file object
??????log??
"""
log_file = os.path.join(LOG_DIR, filename)
if not os.path.isfile(log_file):
os.mknod(log_file)
os.chmod(log_file, 0777)
log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR,
'critical': logging.CRITICAL}
logger_f = logging.getLogger('jumpserver')
logger_f.setLevel(logging.DEBUG)
fh = logging.FileHandler(log_file)
fh.setLevel(log_level_total.get(level, logging.DEBUG))
formatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger_f.addHandler(fh)
return logger_f
python类mknod()的实例源码
def execute_config_strategy():
config_strategy = os.environ.get("KOLLA_CONFIG_STRATEGY")
LOG.info("Kolla config strategy set to: %s", config_strategy)
config = load_config()
if config_strategy == "COPY_ALWAYS":
copy_config(config)
elif config_strategy == "COPY_ONCE":
if os.path.exists('/configured'):
LOG.info("The config strategy prevents copying new configs")
sys.exit(0)
else:
copy_config(config)
os.mknod('/configured')
else:
LOG.error('KOLLA_CONFIG_STRATEGY is not set properly')
sys.exit(1)
def set_log(level, filename='jumpserver.log'):
"""
return a log file object
??????log??
"""
log_file = os.path.join(LOG_DIR, filename)
if not os.path.isfile(log_file):
os.mknod(log_file)
os.chmod(log_file, 0777)
log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR,
'critical': logging.CRITICAL}
logger_f = logging.getLogger('jumpserver')
logger_f.setLevel(logging.DEBUG)
fh = logging.FileHandler(log_file)
fh.setLevel(log_level_total.get(level, logging.DEBUG))
formatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger_f.addHandler(fh)
return logger_f
def set_log(level, filename='spider.log'):
"""
return a log file object
??????log??
"""
if not os.path.isdir(LOG_DIR):
os.mkdir(LOG_DIR)
log_file = os.path.join(LOG_DIR, filename)
if not os.path.isfile(log_file):
os.mknod(log_file)
os.chmod(log_file, 0777)
log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR,
'critical': logging.CRITICAL}
logger_f = logging.getLogger('spider')
logger_f.setLevel(logging.DEBUG)
fh = logging.FileHandler(log_file,'a')
fh.setLevel(log_level_total.get(level, logging.DEBUG))
formatter = logging.Formatter('%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s')
fh.setFormatter(formatter)
logger_f.addHandler(fh)
keep_fds = [fh.stream.fileno()]
return logger_f,keep_fds
android_dev_diag_monitor.py 文件源码
项目:mobileinsight-core
作者: mobile-insight
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def _mkfifo(self, fifo_path):
try:
if os.path.exists(fifo_path):
# self._run_shell_cmd("rm %s " % fifo_path, wait=True)
os.remove(fifo_path)
os.mknod(fifo_path, 0o666 | stat.S_IFIFO)
except OSError as err:
if err.errno == errno.EEXIST: # if already exists, skip this step
pass
# print "Fifo file already exists, skipping..."
elif err.errno == errno.EPERM: # not permitted, try shell command
# print "Not permitted to create fifo file, try to switch to
# root..."
retcode = self._run_shell_cmd(
"mknod %s p" %
fifo_path, wait=True)
if retcode != 0:
raise RuntimeError("mknod returns %s" % str(retcode))
else:
raise err
def test_check_file_exists(mock_err_exit, tmpdir, test_file_name, include_dirname):
if include_dirname:
file_path = os.path.join(tmpdir.dirname, tmpdir.basename, test_file_name)
if test_file_name == 'exist_file':
os.mknod(file_path)
else:
file_path = test_file_name
check_file_exists(file_path)
if include_dirname is True:
if test_file_name == 'exist_file':
mock_err_exit.assert_not_called()
else:
mock_err_exit.assert_called_once()
if include_dirname is False:
mock_err_exit.assert_called_once()
tmpdir.remove()
def test_make_file_executable(mock_err_exit, mock_getstatusoutput,
tmpdir, test_file_name,
status, include_dir):
if include_dir:
file_path = os.path.join(tmpdir.dirname, tmpdir.basename, test_file_name)
else:
file_path = test_file_name
if test_file_name == 'no_exe_file':
os.mknod(file_path)
if test_file_name == 'exe_file':
os.mknod(file_path, 0700)
output = 'test_out'
mock_getstatusoutput.return_value = (status, output)
make_file_executable(file_path)
if test_file_name == 'exe_file':
mock_err_exit.assert_not_called()
assert os.access(file_path, os.X_OK)
if test_file_name == 'no_exe_file' and status is False:
mock_err_exit.assert_not_called()
if test_file_name == 'no_exe_file' and status is True:
mock_err_exit.assert_called()
if test_file_name == 'no_exist_file':
mock_err_exit.assert_called()
tmpdir.remove()
def set_log(level, filename='jumpserver.log'):
"""
return a log file object
??????log??
"""
log_file = os.path.join(LOG_DIR, filename)
if not os.path.isfile(log_file):
os.mknod(log_file)
os.chmod(log_file, 0777)
log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR,
'critical': logging.CRITICAL}
logger_f = logging.getLogger('jumpserver')
logger_f.setLevel(logging.DEBUG)
fh = logging.FileHandler(log_file)
fh.setLevel(log_level_total.get(level, logging.DEBUG))
formatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger_f.addHandler(fh)
return logger_f
def make_chunky_home(chunky_home_root):
try:
if not os.path.exists(os.path.join(chunky_home_root, '.chunky')):
os.mkdir(os.path.join(chunky_home_root, '.chunky'))
if not os.path.exists(os.path.join(chunky_home_root, '.chunky', 'pipelines')):
os.mkdir(os.path.join(chunky_home_root, '.chunky', 'pipelines'))
if not os.path.isfile(os.path.join(chunky_home_root, '.chunky', 'pipelines', '__init__.py')):
os.mknod(os.path.join(chunky_home_root, '.chunky', 'pipelines', '__init__.py'), 0o644)
if not os.path.exists(os.path.join(chunky_home_root, '.chunky', 'configs')):
os.mkdir(os.path.join(chunky_home_root, '.chunky', 'configs'))
sys.stdout.write('ChunkyPipes successfully initialized at {}\n'.format(chunky_home_root))
if chunky_home_root != os.path.expanduser('~'):
sys.stdout.write('Please set a CHUNKY_HOME environment variable to {}\n'.format(chunky_home_root))
except OSError as e:
sys.stderr.write('An error occurred initializing ChunkyPipes at {}.\n{}\n'.format(
chunky_home_root,
e.message
))
def test_mknod_dir_fd(self):
# Test using mknodat() to create a FIFO (the only use specified
# by POSIX).
support.unlink(support.TESTFN)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
else:
self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
finally:
posix.close(f)
def execute_config_strategy():
config_strategy = os.environ.get("KOLLA_CONFIG_STRATEGY")
LOG.info("Kolla config strategy set to: %s", config_strategy)
config = load_config()
if config_strategy == "COPY_ALWAYS":
copy_config(config)
elif config_strategy == "COPY_ONCE":
if os.path.exists('/configured'):
LOG.info("The config strategy prevents copying new configs")
sys.exit(0)
else:
copy_config(config)
os.mknod('/configured')
else:
LOG.error('KOLLA_CONFIG_STRATEGY is not set properly')
sys.exit(1)
def get_tasklist():
'''
:return: ???????????List
'''
taskList = []
try:
fileHander = open(_task_file, 'r', encoding='utf-8') # ????????
except IOError as e:
print('????{0}???'.format(_task_file))
print("## ????{0}??...".format(_task_file))
os.mknod(_task_file) # ????
print("??????'{0}'?????????".format(_task_file))
print("???????????README.txt??")
fileList = fileHander.readlines()
# ????
for line in fileList:
if line[0] == '#': # ????????‘#’??????
continue
else:
if len(line) > 6: # ?? ?? ?? ???6???
taskList.append(line.split()) # ?????
else:
continue # ????????????
return taskList
def test_mknod(self):
# Test using mknod() to create a FIFO (the only use specified
# by POSIX).
support.unlink(support.TESTFN)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
try:
posix.mknod(support.TESTFN, mode, 0)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
else:
self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
# Keyword arguments are also supported
support.unlink(support.TESTFN)
try:
posix.mknod(path=support.TESTFN, mode=mode, device=0,
dir_fd=None)
except OSError as e:
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
def test_mknod_dir_fd(self):
# Test using mknodat() to create a FIFO (the only use specified
# by POSIX).
support.unlink(support.TESTFN)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
else:
self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
finally:
posix.close(f)
def makedev(self, type, major, minor):
"""Make a special file with specified type, and major/minor nums"""
if type == 'c':
datatype = 'chr'
mode = stat.S_IFCHR | 0600
elif type == 'b':
datatype = 'blk'
mode = stat.S_IFBLK | 0600
else: raise RPathException
try: self.conn.os.mknod(self.path, mode, self.conn.os.makedev(major, minor))
except (OSError, AttributeError), e:
if isinstance(e, AttributeError) or e.errno == errno.EPERM:
# AttributeError will be raised by Python 2.2, which
# doesn't have os.mknod
log.Log("unable to mknod %s -- using touch instead" % self.path, 4)
self.touch()
self.setdata()
def set_log(level, filename='jumpserver.log'):
"""
return a log file object
??????log??
"""
log_file = os.path.join(LOG_DIR, filename)
if not os.path.isfile(log_file):
os.mknod(log_file)
os.chmod(log_file, 0777)
log_level_total = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARN, 'error': logging.ERROR,
'critical': logging.CRITICAL}
logger_f = logging.getLogger('jumpserver')
logger_f.setLevel(logging.DEBUG)
fh = logging.FileHandler(log_file)
fh.setLevel(log_level_total.get(level, logging.DEBUG))
formatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger_f.addHandler(fh)
return logger_f
def test_mknod_dir_fd(self):
# Test using mknodat() to create a FIFO (the only use specified
# by POSIX).
support.unlink(support.TESTFN)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
else:
self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
finally:
posix.close(f)
def execute_config_strategy(config):
config_strategy = os.environ.get("KOLLA_CONFIG_STRATEGY")
LOG.info("Kolla config strategy set to: %s", config_strategy)
if config_strategy == "COPY_ALWAYS":
copy_config(config)
handle_permissions(config)
elif config_strategy == "COPY_ONCE":
if os.path.exists('/configured'):
raise ImmutableConfig(
"The config strategy prevents copying new configs",
exit_code=0)
else:
copy_config(config)
handle_permissions(config)
os.mknod('/configured')
else:
raise InvalidConfig('KOLLA_CONFIG_STRATEGY is not set properly')
def makedev(self, tarinfo, targetpath):
"""Make a character or block device called targetpath.
"""
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makedev(self, tarinfo, targetpath):
"""Make a character or block device called targetpath.
"""
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makedev(self, tarinfo, targetpath):
"""Make a character or block device called targetpath.
"""
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makedev(self, tarinfo, targetpath):
"""Make a character or block device called targetpath.
"""
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makedev(self, tarinfo, targetpath):
"""Make a character or block device called targetpath.
"""
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makedev(self, tarinfo, targetpath):
"""Make a character or block device called targetpath.
"""
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def mknod(self, path, mode, dev):
return os.mknod(self._full_path(path), mode, dev)
def makedev(self, tarinfo, targetpath):
"""Make a character or block device called targetpath.
"""
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makedev(self, tarinfo, targetpath):
"""Make a character or block device called targetpath.
"""
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makedev(self, tarinfo, targetpath):
"""Make a character or block device called targetpath.
"""
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makedev(self, tarinfo, targetpath):
"""Make a character or block device called targetpath.
"""
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makedev(self, tarinfo, targetpath):
"""Make a character or block device called targetpath.
"""
if not hasattr(os, "mknod") or not hasattr(os, "makedev"):
raise ExtractError("special devices not supported by system")
mode = tarinfo.mode
if tarinfo.isblk():
mode |= stat.S_IFBLK
else:
mode |= stat.S_IFCHR
os.mknod(targetpath, mode,
os.makedev(tarinfo.devmajor, tarinfo.devminor))