def _git_update_requirements(venv, package_dir, reqs_dir):
"""
Update from global requirements.
Update an OpenStack git directory's requirements.txt and
test-requirements.txt from global-requirements.txt.
"""
orig_dir = os.getcwd()
os.chdir(reqs_dir)
python = os.path.join(venv, 'bin/python')
cmd = [python, 'update.py', package_dir]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
package = os.path.basename(package_dir)
error_out("Error updating {} from "
"global-requirements.txt".format(package))
os.chdir(orig_dir)
python类chdir()的实例源码
def build(self, file):
if self.built:
raise PermissionError("You cannot build multiple times!")
if not self.loaded:
self.load(file)
old = os.getcwd()
sys.path.append(os.path.dirname(os.path.abspath(file))) # for module import that aren't "include" call
try:
content = open(file, "rb").read()
os.chdir(os.path.dirname(os.path.abspath(file))) # set the current working directory, for open() etc.
exec(compile(content, file, 'exec'), self.user_functions)
except Exception as err:
print("An exception occured while building: ", file=sys.stderr)
lines = traceback.format_exc(None, err).splitlines()
print(" " + lines[-1], file=sys.stderr)
for l in lines[3:-1]:
print(l, file=sys.stderr)
exit(1)
os.chdir(old)
sys.path.remove(os.path.dirname(os.path.abspath(file)))
self.built = True
def load(self, file):
if self.loaded:
return
sys.path.append(os.path.dirname(os.path.abspath(file))) # for module import that aren't "include" call
old = os.getcwd()
try:
content = open(file, "rb").read()
os.chdir(os.path.dirname(os.path.abspath(file))) # set the current working directory, for open() etc.
exec(compile(content, file, 'exec'), self.user_functions)
except Exception as err:
print("An exception occured while loading: ", file=sys.stderr)
lines = traceback.format_exc(None, err).splitlines()
print(" " + lines[-1], file=sys.stderr)
for l in lines[3:-1]:
print(l, file=sys.stderr)
exit(1)
os.chdir(old)
sys.path.remove(os.path.dirname(os.path.abspath(file)))
self.loaded = True
self.mem_offset = 0
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def init_work_dir(self):
retval = os.getcwd()
print '#current dir is : ' + retval
# ??????
store_dir = retval + os.sep + 'tmp'
print '#all imgs are going to be stored in dir :' + store_dir
if not os.path.exists(store_dir):
print '#tmp dir does not exist, attemp to mkdir'
os.mkdir(store_dir)
print '#mkdir sucessfully'
else:
print '#tmp dir is already exist'
self.store_dir = store_dir
# print '#now change current dir to tmp'
# os.chdir(store_dir) #no neccessary
# print os.getcwd()
def init_work_dir(self):
retval = os.getcwd()
print '#current dir is : ' + retval
# ??????
store_dir = retval + os.sep + 'tmp'
print '#all imgs are going to be stored in dir :' + store_dir
if not os.path.exists(store_dir):
print '#tmp dir does not exist, attemp to mkdir'
os.mkdir(store_dir)
print '#mkdir sucessfully'
else:
print '#tmp dir is already exist'
self.store_dir = store_dir
# print '#now change current dir to tmp'
# os.chdir(store_dir) #no neccessary
# print os.getcwd()
def _run_local_lambda(self, lambda_config):
prev_folder = os.getcwd()
os.chdir(self.config.get_projectdir())
sys.path.append(self.config.get_projectdir())
lambda_name = lambda_config["FunctionName"]
lambda_handler = self.import_function(lambda_config["Handler"])
# Run and set a counter
start = time.time()
results = lambda_handler({}, MockContext(lambda_name))
end = time.time()
# restore folder
os.chdir(prev_folder)
# Print results
logger.info("{0}".format(results))
logger.info("\nexecution time: {:.8f}s\nfunction execution "
"timeout: {:2}s".format(end - start, lambda_config["Timeout"]))
def _changing_cd(f, *args, **kwargs):
def inner(*args, **kwargs):
try:
state = State(args[0].view)
except AttributeError:
state = State(args[0].window.active_view())
old = os.getcwd()
try:
# FIXME: Under some circumstances, like when switching projects to
# a file whose _cmdline_cd has not been set, _cmdline_cd might
# return 'None'. In such cases, change to the actual current
# directory as a last measure. (We should probably fix this anyway).
os.chdir(state.settings.vi['_cmdline_cd'] or old)
f(*args, **kwargs)
finally:
os.chdir(old)
return inner
def main(args):
global HTTPD, CREDENTIALS
if args:
load_settings(args[0])
print("Starting server")
server_address = (LISTENIP, LISTENPORT)
if CREDENTIALS:
CREDENTIALS = base64.b64encode(bytes(CREDENTIALS, "utf-8"))
Handler = AuthHandler
else:
Handler = RequestHandler
if not SSL_CERTIFICATE:
HTTPD = HTTPServer(server_address, Handler)
else:
HTTPD = socketserver.TCPServer(server_address, Handler)
HTTPD.socket = ssl.wrap_socket(HTTPD.socket,
certfile=SSL_CERTIFICATE,
keyfile=SSL_KEY,
server_side=True)
print('Listening on: %s://%s:%i' % ('https' if SSL_CERTIFICATE else 'http',
LISTENIP,
LISTENPORT))
if BASEPATH:
os.chdir(BASEPATH)
HTTPD.serve_forever()
def _git_update_requirements(venv, package_dir, reqs_dir):
"""
Update from global requirements.
Update an OpenStack git directory's requirements.txt and
test-requirements.txt from global-requirements.txt.
"""
orig_dir = os.getcwd()
os.chdir(reqs_dir)
python = os.path.join(venv, 'bin/python')
cmd = [python, 'update.py', package_dir]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
package = os.path.basename(package_dir)
error_out("Error updating {} from "
"global-requirements.txt".format(package))
os.chdir(orig_dir)
def _git_update_requirements(venv, package_dir, reqs_dir):
"""
Update from global requirements.
Update an OpenStack git directory's requirements.txt and
test-requirements.txt from global-requirements.txt.
"""
orig_dir = os.getcwd()
os.chdir(reqs_dir)
python = os.path.join(venv, 'bin/python')
cmd = [python, 'update.py', package_dir]
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
package = os.path.basename(package_dir)
error_out("Error updating {} from "
"global-requirements.txt".format(package))
os.chdir(orig_dir)
def copy_abd(tmp_dir_loc, repo_dir, pkg_info):
if pkg_info['SUBDIR'] != '':
try:
os.chdir(pkg_info['SUBDIR'])
except:
err_msg('Failed to enter sub-directory!')
return False
else:
try:
os.chdir(pkg_info['NAME'] + '-' + pkg_info['VER'])
except:
try:
os.chdir(pkg_info['NAME'])
except:
err_msg(
'Failed to determine sub-directory, please specify manually.')
return False
try:
shutil.copytree(repo_dir,
os.path.abspath(os.path.curdir) + '/autobuild/', symlinks=True)
except:
err_msg('Error occurred when copying files from tree!')
return False
return True
def start_ab3(tmp_dir_loc, repo_dir, pkg_info, rm_abdir=False):
start_time = int(time.time())
os.chdir(tmp_dir_loc)
if not copy_abd(tmp_dir_loc, repo_dir, pkg_info):
return False
# For logging support: ptyprocess.PtyProcessUnicode.spawn(['autobuild'])
shadow_defines_loc = os.path.abspath(os.path.curdir)
if not parser_pass_through(pkg_info, shadow_defines_loc):
return False
try:
subprocess.check_call(['autobuild'])
except:
return False
time_span = int(time.time()) - start_time
print('>>>>>>>>>>>>>>>>>> Time for building\033[36m {} \033[0m:\033[36m {} \033[0mseconds'.format(
pkg_info['NAME'], time_span))
if rm_abdir is True:
shutil.rmtree(os.path.abspath(os.path.curdir) + '/autobuild/')
# Will get better display later
return True
def init_env(tree=['default']):
dump_loc = '/var/cache/acbs/tarballs/'
tmp_loc = '/var/cache/acbs/build/'
print("----- Welcome to ACBS - %s -----" % (acbs_version))
try:
if not os.path.isdir(dump_loc):
os.makedirs(dump_loc)
if not os.path.isdir(tmp_loc):
os.makedirs(tmp_loc)
except:
raise IOError('\033[93mFailed to make work directory\033[0m!')
if os.path.exists('/etc/acbs_forest.conf'):
tree_loc = parse_acbs_conf(tree[0])
if tree_loc is not None:
os.chdir(tree_loc)
else:
sys.exit(1)
else:
if not write_acbs_conf():
sys.exit(1)
return
def archive_context(filename):
"""
Unzip filename to a temporary directory, set to the cwd.
The unzipped target is cleaned up after.
"""
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with ContextualZipFile(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def build_mkdocs(self):
"""
Invokes MkDocs to build the static documentation and moves the folder
into the project root folder.
"""
# Setting the working directory
os.chdir(MKDOCS_DIR)
# Building the MkDocs project
pipe = subprocess.PIPE
mkdocs_process = subprocess.Popen(
["mkdocs", "build", "-q"], stdout=pipe, stderr=pipe)
std_op, std_err_op = mkdocs_process.communicate()
if std_err_op:
raise Error("Could not build MkDocs !\n%s" %
std_err_op)
def test_RTagsDaemonStartClean(self):
try:
os.chdir("clean")
except OSError:
print("Test Error: Couldn't cd into 'dirty' test directory.")
raise
self.assertFalse(self.cmake_build_info["build_dir"].is_dir())
self.plugin.setup_rtags_daemon()
try:
rtags_daemon_status = subprocess.check_output(
self.cmake_cmd_info["rtags_status"])
except subprocess.CalledProcessError as e:
print(e.output)
self.assertTrue(
len("*********************************\nfileids\n*********************************\n*********************************\nheadererrors\n*********************************\n*********************************\ninfo\n*********************************\nRunning a release build\nsocketFile: /Users/phillipbonhomme/.rdm\ndataDir: /Users/phillipbonhomme/.cache/rtags/\noptions: 0x14jobCount: 4\nrpVisitFileTimeout: 60000\nrpIndexDataMessageTimeout: 60000\nrpConnectTimeout: 0\nrpConnectTimeout: 0\ndefaultArguments: List<String>(-ferror-limit=50, -Wall, -fspell-checking, -Wno-unknown-warning-option\")\nincludePaths: List<Source::Include>(\")\ndefines: List<Source::Define>(-DRTAGS=\")\nignoredCompilers: Set<Path>(\")\n*********************************\njobs\n*********************************\n"
) <= len(str(rtags_daemon_status)))
def test_RTagsClientStartDirty(self):
try:
os.chdir("dirty")
except OSError:
print("Test Error: Couldn't cd into 'dirty' test directory.")
raise
self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
self.plugin.setup_rtags_daemon()
self.plugin.connect_rtags_client()
try:
rtags_client_status = subprocess.check_output(
self.cmake_cmd_info["rtags_file_status"] +
[str(src_info["cpp"])])
except subprocess.CalledProcessError as e:
print(e.output)
self.assertTrue(str(rtags_client_status).find("managed"))
try:
rtags_client_status = subprocess.check_output(
self.cmake_cmd_info["rtags_file_status"] +
[str(src_info["test_cpp"])])
except subprocess.CalledProcessError as e:
print(e.output)
self.assertTrue(str(rtags_client_status).find("managed"))
def test_RTagsClientSetFile(self):
try:
os.chdir("dirty")
except OSError:
print("Test Error: Couldn't cd into 'dirty' test directory.")
raise
self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
self.plugin.setup_rtags_daemon()
self.plugin.connect_rtags_client()
self.plugin.rtags_set_file([str(src_info["cpp"])])
try:
rtags_client_status = subprocess.check_output(
self.cmake_cmd_info["rtags_file_status"] +
[str(src_info["cpp"])])
except subprocess.CalledProcessError as e:
print(e.output)
self.assertTrue(str(rtags_client_status).find("managed"))
def test_RTagsClientUpdateBuffers(self):
try:
os.chdir("dirty")
except OSError:
print("Test Error: Couldn't cd into 'dirty' test directory.")
raise
self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
self.plugin.setup_rtags_daemon()
self.plugin.connect_rtags_client()
self.plugin.update_rtags_buffers(
[str(src_info["test_cpp"]),
str(src_info["cpp"])])
try:
rtags_client_status = subprocess.check_output(
self.cmake_cmd_info["rtags_buffers"])
except subprocess.CalledProcessError as e:
print(e.output)
filepath = os.getcwd() + str(src_info["test_cpp"])
self.assertTrue(str(rtags_client_status).find(filepath))
def test_RTagsDaemonSink(self):
try:
os.chdir("dirty")
except OSError:
print("Test Error: Couldn't cd into 'dirty' test directory.")
raise
self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
self.plugin.setup_rtags_daemon()
self.plugin.connect_rtags_client()
self.plugin.update_rtags_buffers(
[str(src_info["test_cpp"]),
str(src_info["cpp"])])
try:
rtags_client_status = subprocess.check_output(
self.cmake_cmd_info["rtags_buffers"])
except subprocess.CalledProcessError as e:
print(e.output)
def initialise_worker(cni):
status_set('maintenance', 'Initialising worker network');
local_ip = get_my_ip();
worker_subnet = retrieve('worker_subnet');
central_ip = retrieve('central_ip');
hostname = run_command('hostname').replace('\n', '');
run_command('ovs-vsctl set Open_vSwitch . \
external_ids:k8s-api-server="%s:8080"' % (central_ip));
run_command('ovn-k8s-overlay minion-init --cluster-ip-subnet="192.168.0.0/16" \
--minion-switch-subnet="%s" --node-name="%s"' % (worker_subnet, hostname));
os.chdir('/tmp/');
run_command('wget https://github.com/containernetworking/cni/releases/download/v0.5.2/cni-amd64-v0.5.2.tgz');
run_command('sudo mkdir -p /opt/cni/bin');
run_command('sudo mkdir -p /etc/cni/net.d');
os.chdir('/opt/cni/bin/');
run_command('sudo tar xvzf /tmp/cni-amd64-v0.5.2.tgz');
status_set('active', 'Worker subnet : %s' % (worker_subnet));
set_state('worker.initialised');
def send_cert(cni, mconfig):
worker_hostname = run_command('hostname');
mconfig.set_worker_id(worker_hostname);
os.chdir('/etc/openvswitch');
run_command('sudo ovs-pki req ovncontroller');
req_file = open('ovncontroller-req.pem', 'r');
cert = req_file.read();
mconfig.send_worker_data({
'cert_to_sign': cert,
'worker_hostname': worker_hostname
});
status_set('maintenance', 'Waiting for certificate');
set_state('worker.cert.sent');
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with ContextualZipFile(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def testDefaultIncludeOverrides(self):
"""Test that included files override settings of default.yaml"""
with TemporaryDirectory() as tmp:
os.chdir(tmp)
os.mkdir("recipes")
with open("default.yaml", "w") as f:
f.write("include:\n")
f.write(" - user\n")
f.write("environment:\n")
f.write(" FOO: BAR\n")
f.write(" BAR: BAZ\n")
with open("user.yaml", "w") as f:
f.write("environment:\n")
f.write(" FOO: BAZ\n")
recipeSet = RecipeSet()
recipeSet.parse()
assert recipeSet.defaultEnv() == { "FOO":"BAZ", "BAR":"BAZ" }
def testUserConfigOverrides(self):
"""Test that user configs override default.yaml w/ includes"""
with TemporaryDirectory() as tmp:
os.chdir(tmp)
os.mkdir("recipes")
with open("default.yaml", "w") as f:
f.write("include:\n")
f.write(" - included\n")
f.write("environment:\n")
f.write(" FOO: BAR\n")
with open("included.yaml", "w") as f:
f.write("environment:\n")
f.write(" FOO: BAZ\n")
with open("user.yaml", "w") as f:
f.write("environment:\n")
f.write(" FOO: USER\n")
recipeSet = RecipeSet()
recipeSet.setConfigFiles(["user"])
recipeSet.parse()
assert recipeSet.defaultEnv() == { "FOO":"USER"}
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)