def startRPC(self, port, eventListenerPort):
logging.basicConfig(filename='worldpay-within-wrapper.log', level=logging.DEBUG)
reqOS = ["darwin", "win32", "windows", "linux"]
reqArch = ["x64", "ia32"]
cfg = launcher.Config(reqOS, reqArch)
launcherLocal = launcher.launcher()
# define log file name for rpc agent, so e.g
# for "runConsumerOWP.py" it will be: "rpc-wpwithin-runConsumerOWP.log"
logfilename = os.path.basename(sys.argv[0])
logfilename = "rpc-wpwithin-" + logfilename.rsplit(".", 1)[0] + ".log"
args = []
if eventListenerPort > 0:
logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,warn,error,fatal,info" + " -callbackport " + str(eventListenerPort))
args = ['-port', str(port), '-logfile', logfilename, '-loglevel', 'debug,warn,error,fatal,info', '-callbackport', str(eventListenerPort)]
else:
logging.debug(str(os.getcwd()) + "" + "-port " + str(port) + " -logfile " + logfilename + " -loglevel debug,warn,error,fatal,info")
args = ['-port', str(port), '-logfile', logfilename, '-loglevel', 'debug,warn,error,fatal,info']
process = launcherLocal.launch(cfg, os.getcwd() + "", args)
return process
python类getcwd()的实例源码
def compose(env):
"""Compose the configuration."""
offset = _get_utils(env)
rsync = ["rsync",
"-aczv",
USER_FOLDER,
os.path.join(offset, USER_FOLDER),
"--delete-after",
_get_exclude("*.pyc"),
_get_exclude("README.md"),
_get_exclude("__init__.py"),
_get_exclude("__config__.py")]
call(rsync, "rsync user definitions")
here = os.getcwd()
composition = ["python2.7",
"config_compose.py",
"--output", os.path.join(here, FILE_NAME)]
call(composition, "compose configuration", working_dir=offset)
def test_RTagsClientUpdateBuffers(self):
try:
os.chdir("dirty")
except OSError:
print("Test Error: Couldn't cd into 'dirty' test directory.")
raise
self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
self.assertTrue(self.cmake_build_info["comp_data_cmake"].is_file())
self.plugin.setup_rtags_daemon()
self.plugin.connect_rtags_client()
self.plugin.update_rtags_buffers(
[str(src_info["test_cpp"]),
str(src_info["cpp"])])
try:
rtags_client_status = subprocess.check_output(
self.cmake_cmd_info["rtags_buffers"])
except subprocess.CalledProcessError as e:
print(e.output)
filepath = os.getcwd() + str(src_info["test_cpp"])
self.assertTrue(str(rtags_client_status).find(filepath))
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with get_zip_class()(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def load_dynamic_config(configurations, config_dir=getcwd()):
"""Load and parse dynamic config"""
# Create full path of config
config_file = '{path}/config.py'.format(path=config_dir)
# Insert config path so we can import it
sys.path.insert(0, path.dirname(path.abspath(config_file)))
try:
config_module = __import__('config')
for key, value in config_module.CONFIG.items():
LOG.debug('Importing %s with key %s', key, value)
# Update configparser object
configurations.update({key: value})
except ImportError:
# Provide a default if config not found
configurations = {}
def archive_context(filename):
# extracting the archive
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
with ContextualZipFile(filename) as archive:
archive.extractall()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
yield
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def pquery(command, stdin=None, **kwargs):
if very_verbose:
info('Query "'+' '.join(command)+'" in '+getcwd())
try:
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
except OSError as e:
if e[0] == errno.ENOENT:
error(
"Could not execute \"%s\".\n"
"Please verify that it's installed and accessible from your current path by executing \"%s\".\n" % (command[0], command[0]), e[0])
else:
raise e
stdout, _ = proc.communicate(stdin)
if very_verbose:
log(str(stdout).strip()+"\n")
if proc.returncode != 0:
raise ProcessException(proc.returncode, command[0], ' '.join(command), getcwd())
return stdout
def seturl(url):
info("Setting url to \"%s\" in %s" % (url, getcwd()))
hgrc = os.path.join('.hg', 'hgrc')
tagpaths = '[paths]'
remote = 'default'
lines = []
try:
with open(hgrc) as f:
lines = f.read().splitlines()
except IOError:
pass
if tagpaths in lines:
idx = lines.index(tagpaths)
m = re.match(r'^([\w_]+)\s*=\s*(.*)$', lines[idx+1])
if m:
remote = m.group(1)
del lines[idx+1]
lines.insert(idx, remote+' = '+url)
else:
lines.append(tagpaths)
lines.append(remote+' = '+url)
def unignore(dest):
Hg.ignore_file = os.path.join('.hg', 'hgignore')
try:
with open(Hg.ignore_file) as f:
lines = f.read().splitlines()
except IOError:
lines = []
if dest in lines:
lines.remove(dest)
try:
with open(Hg.ignore_file, 'w') as f:
f.write('\n'.join(lines) + '\n')
except IOError:
error("Unable to write ignore file in \"%s\"" % os.path.join(getcwd(), Hg.ignore_file), 1)
# pylint: disable=no-self-argument, no-method-argument, no-member, no-self-use, unused-argument
def checkout(rev, clean=False):
if not rev:
return
info("Checkout \"%s\" in %s" % (rev, os.path.basename(getcwd())))
branch = None
refs = Git.getbranches(rev)
for ref in refs: # re-associate with a local or remote branch (rev is the same)
m = re.match(r'^(.*?)\/(.*?)$', ref)
if m and m.group(2) != "HEAD": # matches origin/<branch> and isn't HEAD ref
if not os.path.exists(os.path.join('.git', 'refs', 'heads', m.group(2))): # okay only if local branch with that name doesn't exist (git will checkout the origin/<branch> in that case)
branch = m.group(2)
elif ref != "HEAD":
branch = ref # matches local branch and isn't HEAD ref
if branch:
info("Revision \"%s\" matches a branch \"%s\" reference. Re-associating with branch" % (rev, branch))
popen([git_cmd, 'checkout', branch] + ([] if very_verbose else ['-q']))
break
if not branch:
popen([git_cmd, 'checkout', rev] + (['-f'] if clean else []) + ([] if very_verbose else ['-q']))
def update(rev=None, clean=False, clean_files=False, is_local=False):
if not is_local:
Git.fetch()
if clean:
Git.discard(clean_files)
if rev:
Git.checkout(rev, clean)
else:
remote = Git.getremote()
branch = Git.getbranch()
if remote and branch:
try:
Git.merge('%s/%s' % (remote, branch))
except ProcessException:
pass
else:
err = "Unable to update \"%s\" in \"%s\"." % (os.path.basename(getcwd()), getcwd())
if not remote:
info(err+" The local repository is not associated with a remote one.")
if not branch:
info(err+" Working set is not on a branch.")
def ignore(dest):
try:
with open(Git.ignore_file) as f:
exists = dest in f.read().splitlines()
except IOError:
exists = False
if not exists:
try:
ignore_file_parent_directory = os.path.dirname(Git.ignore_file)
if not os.path.exists(ignore_file_parent_directory):
os.mkdir(ignore_file_parent_directory)
with open(Git.ignore_file, 'a') as f:
f.write(dest.replace("\\", "/") + '\n')
except IOError:
error("Unable to write ignore file in \"%s\"" % os.path.join(getcwd(), Git.ignore_file), 1)
def fromrepo(cls, path=None):
repo = cls()
if path is None:
path = Repo.findparent(getcwd())
if path is None:
error(
"Could not find mbed program in current path \"%s\".\n"
"You can fix this by calling \"mbed new .\" or \"mbed config root .\" in the root of your program." % getcwd())
repo.path = os.path.abspath(path)
repo.name = os.path.basename(repo.path)
cache_cfg = Global().get_cfg('CACHE', '')
if cache_repositories and cache_cfg and cache_cfg != 'none' and cache_cfg != 'off' and cache_cfg != 'disabled':
loc = cache_cfg if (cache_cfg and cache_cfg != 'on' and cache_cfg != 'enabled') else None
repo.cache = loc or os.path.join(tempfile.gettempdir(), 'mbed-repo-cache')
repo.sync()
if repo.scm is None:
warning(
"Program \"%s\" in \"%s\" does not use source control management.\n"
"To fix this you should use \"mbed new .\" in the root of your program." % (repo.name, repo.path))
return repo
def pathtype(cls, path=None):
path = os.path.abspath(path or getcwd())
depth = 0
while cd(path):
tpath = path
path = Repo.findparent(path)
if path:
depth += 1
path = os.path.split(path)[0]
if tpath == path: # Reached root.
break
else:
break
return "directory" if depth == 0 else ("program" if depth == 1 else "library")
def __init__(self, path=None, print_warning=False):
path = os.path.abspath(path or getcwd())
self.path = path
self.is_cwd = True
while cd(path):
tpath = path
if os.path.isfile(os.path.join(path, Cfg.file)):
self.path = path
self.is_cwd = False
break
path = os.path.split(path)[0]
if tpath == path: # Reached root.
break
self.name = os.path.basename(self.path)
self.is_classic = os.path.isfile(os.path.join(self.path, 'mbed.bld'))
# is_cwd flag indicates that current dir is assumed to be root, not root repo
if self.is_cwd and print_warning:
warning(
"Could not find mbed program in current path \"%s\".\n"
"You can fix this by calling \"mbed new .\" in the root of your program." % self.path)
def main():
# Add examples to path.
parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
sys.path.insert(0, parent_dir)
# Grab all example files.
example_dir = os.path.join(os.getcwd(), "..", "examples")
example_files = [f for f in os.listdir(example_dir) if os.path.isfile(os.path.join(example_dir, f)) and "py" == f.split(".")[-1] and "init" not in f and "viz_exam" not in f]
print("\n" + "="*32)
print("== Running", len(example_files), "simple_rl tests ==")
print("="*32 + "\n")
total_passed = 0
for i, ex in enumerate(example_files):
print("\t [Test", str(i + 1) + "] ", ex + ": ",)
result = run_example(os.path.join(example_dir, ex))
if result:
total_passed += 1
print("\t\tPASS.")
else:
print("\t\tFAIL.")
print("\nResults:", total_passed, "/", len(example_files), "passed.")
def create(ctx, maintype, subtype, app_name, factory, args):
args = args or []
maintype = maintype or ctx.obj['SETTINGS'].get(
'default_maintype', 'python')
subtype = subtype or ctx.obj['SETTINGS'].get(
'default_subtype', 'app')
click.echo('Type: {};\t Subtype: {};\t App name: {};'.format(
maintype, subtype, app_name))
current_dir = os.getcwd()
additional_dirs = ctx.obj['SETTINGS'].get('templates', [])
factory_module, path = get_factory(maintype, factory, additional_dirs)
if not factory_module:
click.echo('ERROR: factory not found:{}'.format(maintype), err=True)
exit(1)
app_factory = factory_module.AppFactory(path, args)
app_factory.setup(subtype, app_name, current_dir)
app_factory.set_context(ctx.obj['SETTINGS'].get('context', {}))
app_factory.run()
click.echo('Done!')
def chdir(directory):
"""Change the current working directory to a different directory for a code
block and return the previous directory after the block exits. Useful to
run commands from a specificed directory.
:param str directory: The directory path to change to for this context.
"""
cur = os.getcwd()
try:
yield os.chdir(directory)
finally:
os.chdir(cur)
def _add_services(self, this_service, other_services):
"""Add services.
Add services to the deployment where this_service is the local charm
that we're testing and other_services are the other services that
are being used in the local amulet tests.
"""
if this_service['name'] != os.path.basename(os.getcwd()):
s = this_service['name']
msg = "The charm's root directory name needs to be {}".format(s)
amulet.raise_status(amulet.FAIL, msg=msg)
if 'units' not in this_service:
this_service['units'] = 1
self.d.add(this_service['name'], units=this_service['units'],
constraints=this_service.get('constraints'))
for svc in other_services:
if 'location' in svc:
branch_location = svc['location']
elif self.series:
branch_location = 'cs:{}/{}'.format(self.series, svc['name']),
else:
branch_location = None
if 'units' not in svc:
svc['units'] = 1
self.d.add(svc['name'], charm=branch_location, units=svc['units'],
constraints=svc.get('constraints'))
def show_io(input_dir, output_dir):
''' show directory structure and inputs and autputs to scoring program'''
swrite('\n=== DIRECTORIES ===\n\n')
# Show this directory
swrite("-- Current directory " + pwd() + ":\n")
write_list(ls('.'))
write_list(ls('./*'))
write_list(ls('./*/*'))
swrite("\n")
# List input and output directories
swrite("-- Input directory " + input_dir + ":\n")
write_list(ls(input_dir))
write_list(ls(input_dir + '/*'))
write_list(ls(input_dir + '/*/*'))
write_list(ls(input_dir + '/*/*/*'))
swrite("\n")
swrite("-- Output directory " + output_dir + ":\n")
write_list(ls(output_dir))
write_list(ls(output_dir + '/*'))
swrite("\n")
# write meta data to sdterr
swrite('\n=== METADATA ===\n\n')
swrite("-- Current directory " + pwd() + ":\n")
try:
metadata = yaml.load(open('metadata', 'r'))
for key,value in metadata.items():
swrite(key + ': ')
swrite(str(value) + '\n')
except:
swrite("none\n");
swrite("-- Input directory " + input_dir + ":\n")
try:
metadata = yaml.load(open(os.path.join(input_dir, 'metadata'), 'r'))
for key,value in metadata.items():
swrite(key + ': ')
swrite(str(value) + '\n')
swrite("\n")
except:
swrite("none\n");