def build_docker(settings, student):
"""
Creates a new docker container for the student
"""
LOGGER.info('Beginning a Docker build for student %s', student['username'])
cmd = 'docker build -t {student}_{project} --file={dockerfile} .'
cmd = cmd.format(student=student['username'], project=settings['project']['name'],
dockerfile=settings['build']['dockerfile'])
timeout = settings['build']['timeout']
LOGGER.info(cmd)
subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True,
timeout=timeout, cwd=student['directory'])
LOGGER.info('Completed a Docker build for student %s', student['username'])
python类DEVNULL的实例源码
def clean_hg(settings, student):
"""
cleans a Mercurial repository with the latest submission
"""
LOGGER.info("Using hg to clean up the directory for student %s", student['username'])
#Remove all untracked files and directories
#Override all options to enable the purge extension to clean the directory
cmd = "hg --config='extensions.purge=' purge --all --dirs --files"
timeout = int(settings['clean']['timeout']) or 5
subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
shell=True, timeout=timeout, cwd=student['directory'])
#Undo any changes to tracked files made since the last commit
cmd = "hg update -C"
timeout = int(settings['clean']['timeout']) or 5
subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
shell=True, timeout=timeout, cwd=student['directory'])
def docker_kill_and_remove(ctr_name):
try:
try:
is_running = check_output(['docker', 'top', ctr_name], stderr=DEVNULL)
except:
pass
else:
check_output(['docker', 'kill', ctr_name], stderr=STDOUT)
try:
has_image = check_output(['docker', 'images', '-q', ctr_name], stderr=DEVNULL)
assert has_image
except:
pass
else:
check_output(['docker', 'rm', ctr_name], stderr=STDOUT)
except:
logger.error('could not stop docker container:{}'.format(ctr_name))
def Handle_SquashFS(self, Key):
Path = os.path.join(self.DestDir, Key)
DestDir = Path.rstrip(".raw") + ".extracted"
Binary = "unsquashfs"
if self.CheckDependency(Binary):
return 1
# Need root to preserve permissions.
if self.Debug:
self.Logger.debug(' '.join(["sudo", Binary, "-d", DestDir, Path]))
Result = subprocess.call(["sudo", Binary, "-d", DestDir, Path])
else:
Result = subprocess.call(["sudo", Binary, "-d", DestDir, Path], stdout=subprocess.DEVNULL)
return Result
def Handle_CramFS(self, Key):
Path = os.path.join(self.DestDir, Key)
DestDir = Path.rstrip(".raw") + ".extracted"
Binary = "cramfsck"
if self.CheckDependency(Binary):
return 1
# Need root to preserve permissions.
if self.Debug:
self.Logger.debug(' '.join(["sudo", Binary, "-x", DestDir, Path]))
Result = subprocess.call(["sudo", Binary, "-x", DestDir, Path])
else:
Result = subprocess.call(["sudo", Binary, "-x", DestDir, Path], stdout=subprocess.DEVNULL)
return Result
def Handle_CramFS(self, Key):
ExtractedDir = os.path.join(self.Source, Key + ".extracted")
OrigPath = os.path.join(self.Source, Key + ".raw")
DestPath = os.path.join(self.BuildDir, Key + ".raw")
Binary = "mkcramfs"
if self.CheckDependency(Binary):
return 1
# Need root to access all files.
if self.Debug:
self.Logger.debug(' '.join(["sudo", Binary, ExtractedDir, DestPath]))
Result = subprocess.call(["sudo", Binary, ExtractedDir, DestPath])
else:
Result = subprocess.call(["sudo", Binary, ExtractedDir, DestPath], stdout=subprocess.DEVNULL)
return Result
def _get_gitinfo():
import pygrunt.platform as platform
import subprocess
from pathlib import Path
git = platform.current.find_executable('git')
if git is None:
# No git installed; assume we're on master
return ('master', '')
cwd = str(Path(__file__).parent)
args = [git, 'rev-parse', '--abbrev-ref', 'HEAD']
result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True)
if result.returncode != 0:
# Quietly return defaults on fail
return ('master', '')
branch = result.stdout
args = [git, 'rev-parse', 'HEAD']
result = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=cwd, universal_newlines=True)
if result.returncode != 0:
# Quietly return defaults on fail
return ('master', '')
commit = result.stdout
return (branch.strip(), commit.strip())
def removeDirtyDir(self):
if self.cmake_build_info["build_dir"].is_dir():
print("Cleaning up Build Directory")
subprocess.call(
["rm", "-rf",
str(self.cmake_build_info["build_dir"])],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
def removeOldCMakeFiles(self):
if self.cmake_build_info["old_cmake_dir"].is_dir():
print("Cleaning up Old CMake Directory")
subprocess.call(
["rm", "-rf",
str(self.cmake_build_info["old_cmake_dir"])],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
for path in self.cmake_build_info["old_cmake_files"]:
if path.is_file():
print("Cleaning up Old CMake Files")
subprocess.call(
["rm", str(path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
def setup_rtags_daemon(self):
print("Initializing RTags Daemon")
try:
subprocess.check_output(
self.cmake_cmd_info["rtags_shutdwn"],
stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
print(e.output)
try:
subprocess.check_call(
self.cmake_cmd_info["rdm_cmd"], stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
print(e.output)
raise
def connect_rtags_client(self):
print("Connecting RTags Client")
if self.cmake_build_info["comp_data_cmake"].is_file():
rtags_clt_cmd_out = subprocess.call(
self.cmake_cmd_info["rc_cmd"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
if rtags_clt_cmd_out != 0:
print("Info: RTags Daemon Not Running")
return
else:
print("Error Generating Compilation Database With CMake")
return
def rtags_set_file(self, arg):
current_buffer = arg
try:
subprocess.call(
self.cmake_cmd_info["rtags_file"] + current_buffer,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
print(e.output)
def update_rtags_buffers(self, args):
buffers = args
cpp_buffers = []
for buffer in buffers:
if str(buffer)[-4:] in ['.cpp', '.cc', '.c', '.h', '.hpp']:
cpp_buffers.append(buffer)
try:
subprocess.call(
self.cmake_cmd_info["rtags_buffer"] + cpp_buffers,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
print(e.output)
#@classmethod
def rtags_tagrun(self, cmd):
try:
tagrun = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
print(e.output)
taglines = tagrun.split("\n")
tags = map(split("\t"), taglines)
tags[:] = map(reverse(), tags)
tags[:] = map(self.format_rtags, tags)
return tags
def __exit__(self, exc_type, exc_value, traceback):
try:
if exc_type is None:
env = { k:v for (k,v) in os.environ.items() if k in self.whiteList }
env["BOB_LOCAL_ARTIFACT"] = self.name
env["BOB_REMOTE_ARTIFACT"] = self.remoteName
ret = subprocess.call(["/bin/bash", "-ec", self.uploadCmd],
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
cwd="/tmp", env=env)
if ret != 0:
raise ArtifactUploadError("command return with status {}".format(ret))
finally:
os.unlink(self.name)
return False
def predictLiveBuildId(self):
if self.__commit:
return [ bytes.fromhex(self.__commit) ]
if self.__tag:
# Annotated tags are objects themselves. We need the commit object!
refs = ["refs/tags/" + self.__tag + '^{}', "refs/tags/" + self.__tag]
else:
refs = ["refs/heads/" + self.__branch]
cmdLine = ['git', 'ls-remote', self.__url] + refs
try:
output = subprocess.check_output(cmdLine, universal_newlines=True,
stderr=subprocess.DEVNULL).strip()
except subprocess.CalledProcessError as e:
return [None]
# have we found anything at all?
if not output:
return [None]
# See if we got one of our intended refs. Git is generating lines with
# the following format:
#
# <sha1>\t<refname>
#
# Put the output into a dict with the refname as key. Be extra careful
# and strip out lines not matching this pattern.
output = {
commitAndRef[1].strip() : bytes.fromhex(commitAndRef[0].strip())
for commitAndRef
in (line.split('\t') for line in output.split('\n'))
if len(commitAndRef) == 2 }
for ref in refs:
if ref in output: return [output[ref]]
# uhh, should not happen...
return [None]
def load_mirror_index(self, remote_mirror):
# See if there is a mirror index available from the BASE_URL
mirror_index = os.path.join(self.conf_dir, 'mirror-index')
try:
cmd = [self.tools['git'], 'ls-remote', remote_mirror, self.base_branch]
utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=self.project_dir, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
except:
try:
remote_mirror += "/.git"
cmd = [self.tools['git'], 'ls-remote', remote_mirror, self.base_branch]
utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=self.project_dir, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
except:
# No mirror, return
return None
logger.plain('Loading the mirror index from %s (%s)...' % (remote_mirror, self.base_branch))
# This MIGHT be a valid mirror..
if not os.path.exists(mirror_index):
os.makedirs(mirror_index)
cmd = [self.tools['git'], 'init' ]
utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index)
try:
cmd = [self.tools['git'], 'fetch', '-f', '-n', '-u', remote_mirror, self.base_branch + ':' + self.base_branch]
utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index)
except:
# Could not fetch, return
return None
logger.debug('Found mirrored index.')
cmd = [self.tools['git'], 'checkout', self.base_branch ]
utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index)
cmd = [self.tools['git'], 'reset', '--hard' ]
utils_setup.run_cmd(cmd, log=2, environment=self.env, cwd=mirror_index)
return mirror_index
def exec_cmd(bus, *cmd_line):
"""
Executes an external process using the given arguments. Note that the
backslash (`\`) character must be escaped, because they are within a Python
string, so a total of 2 backslashes to equal 1 real backslash.
For example, to run a command prompt with fancy colors and start it in the
root of the file system when you press <kbd>F1</kbd>, use:
'f1 => cmd cmd.exe /c start cmd.exe /E:ON /V:ON /T:17 /K cd \\'
```
:param bus:
:param cmd_line: The command-line string equivalent to run.
:return:
"""
# The cmd_line is a space-split set of arguments. Because we're running a
# command line, and should allow all kinds of inputs, we'll join it back
# together.
full_cmd = " ".join(cmd_line)
# We won't use shlex, because, for Windows, splitting is just unnecessary.
print('CMD Running `' + full_cmd + '`')
# TODO look at making this a bus command, or at least a bus announcement.
proc = subprocess.Popen(full_cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
bus.fire(event_ids.LOG__INFO, target_ids.LOGGER, {
'message': '{0}: {1}'.format(proc.pid, full_cmd)
})
```
def get_last_update(self, path):
os.chdir(path)
cmd = "stat -c %y .git/FETCH_HEAD".split()
return str(subprocess.check_output(cmd, stderr=subprocess.DEVNULL))[2:21]
def git_version():
'''Return the git revision as a string
Returns
-------
git_version : str
The current git revision
'''
def _minimal_ext_cmd(cmd):
# construct minimal environment
env = {}
for k in ['SYSTEMROOT', 'PATH']:
v = os.environ.get(k)
if v is not None:
env[k] = v
# LANGUAGE is used on win32
env['LANGUAGE'] = 'C'
env['LANG'] = 'C'
env['LC_ALL'] = 'C'
output = subprocess.check_output(cmd,
stderr=subprocess.DEVNULL,
env=env)
return output
try:
out = _minimal_ext_cmd(['git', 'rev-parse', '--verify', '--quiet', '--short', 'HEAD'])
GIT_REVISION = out.strip().decode('ascii')
except subprocess.CalledProcessError:
GIT_REVISION = 'UNKNOWN'
return GIT_REVISION