def kernel():
"""Create a ipykernel conda environment separate from this test
environment where jupyter console is installed.
The environments must be separate otherwise we cannot easily check
if kernel start is activating the environment or if it was already
active when the test suite started.
"""
# unique name for the kernel and environment
name = str(uuid4())
env_path = '{}/kernel-env-{name}'.format(gettempdir(), name=name)
pexpect.run('/bin/bash -c "conda create -y -p {env_path} ipykernel && \
source activate {env_path} && \
python -m ipykernel install --user \
--name {name}"'.format(env_path=env_path, name=name))
# query jupyter for the user data directory in a separate command to
# make parsing easier
stdout = pexpect.run('jupyter --data-dir')
user_path = stdout.decode('utf-8').strip()
# the kernel spec resides in the jupyter user data path
spec_path = os.path.join(user_path, 'kernels', name)
yield Kernel(name, os.path.join(spec_path, 'kernel.json'), env_path)
shutil.rmtree(env_path)
python类run()的实例源码
def GetGcloud(args, project=None, service=None):
"""Get gcloud command with arguments.
Functionalities might be expanded later to run gcloud commands.
Args:
args: command with arguments as an array
project: the project on which the glcoud compute will work
service: the service on gcloud that you want to use. default: compute
Returns:
returns thr formatted command for gcloud compute
"""
command = ["gcloud"]
if service:
command.append(service)
if project:
command.extend(["--project", project])
command.extend(args)
return command
# TODO(sohamcodes): pexpect.spawn can be changed to subprocess call
# However, timeout for subprocess is available only on python3
# So, we can implement it later.
def RunCommand(args, timeout=None, logfile=None):
"""Runs a given command through pexpect.run.
This function acts as a wrapper over pxpect.run . You can have exception or
return values based on the exitstatus of the command execution. If exitstatus
is not zero, then it will return -1, unless you want RuntimeError. If there
is TIMEOUT, then exception is raised. If events do not match, command's
output is printed, and -1 is returned.
Args:
args: command with arguments as an array
timeout: timeout for pexpect.run .
logfile: an opened filestream to write the output
Raises:
RuntimeError: Command's exit status is not zero
Returns:
Returns -1, if bad exitstatus is not zero and when events do not match
Otherwise returns 0, if everything is fine
"""
child = pexpect.spawn(args[0], args=args[1:], timeout=timeout,
logfile=logfile)
child.expect(pexpect.EOF)
child.close()
if child.exitstatus:
print args
raise RuntimeError(("Error: {}\nProblem running command. "
"Exit status: {}").format(child.before,
child.exitstatus))
return 0
def get_docker_volume_list(*expected_volumes):
'''Get the output from "docker volume ls" for specified volumes.'''
volume_listing_pattern = (
r'(?P<driver>\S+)\s+'
r'(?P<name>\S+)'
# r'\s*$'
)
volume_listing_re = re.compile(volume_listing_pattern)
docker_volumes_response = pexpect.run('docker volume ls')
volume_list = []
for line in docker_volumes_response.split('\n'):
match = volume_listing_re.match(line)
if match:
volume_list.append(match.groupdict())
return volume_list
# - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def getoutput(self, cmd):
"""Run a command and return its stdout/stderr as a string.
Parameters
----------
cmd : str
A command to be executed in the system shell.
Returns
-------
output : str
A string containing the combination of stdout and stderr from the
subprocess, in whatever order the subprocess originally wrote to its
file descriptors (so the order of the information in this string is the
correct order as would be seen if running the command in a terminal).
"""
try:
return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
except KeyboardInterrupt:
print('^C', file=sys.stderr, end='')
def getoutput_pexpect(self, cmd):
"""Run a command and return its stdout/stderr as a string.
Parameters
----------
cmd : str
A command to be executed in the system shell.
Returns
-------
output : str
A string containing the combination of stdout and stderr from the
subprocess, in whatever order the subprocess originally wrote to its
file descriptors (so the order of the information in this string is the
correct order as would be seen if running the command in a terminal).
"""
try:
return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
except KeyboardInterrupt:
print('^C', file=sys.stderr, end='')
def get_architecture(firmware_id):
"""Gets the architecture of the given firmware image."""
print(bcolors.OKBLUE + "[-] Getting the firmware architecture..." + bcolors.ENDC)
command = GETARCH_COMMAND.format(FIRMADYNE_PATH, OUTPUT_DIR, firmware_id)
print(bcolors.ITALIC + command + bcolors.ENDC)
output = pexpect.run(command, events={'Password for user {}:'.format(USER):PASSWORD + '\n'})
# extract the architecture info from the output
arch = ""
try:
arch = output.split('\n')[0].split(':')[1]
except:
print(bcolors.FAIL + "[!] The firmware architecture couldn't be determined..." + bcolors.ENDC)
print(bcolors.ITALIC + "[!] Please try manually with the file command and provide the correct architecture type with the --arch parameter..." + bcolors.ENDC)
else:
print(bcolors.OKGREEN + "[+] The architecture of your firmware image is:" + arch + bcolors.ENDC)
return arch
def exec2():
child.expect('#')
if subprocess.getstatusoutput('id root >> /dev/null 2&1 && echo $?') != 0:
__newpasswd = 'edong&1310'
subprocess.getstatusoutput('useradd zach')
run('passwd zach',events={'(?i)password:':__newpasswd})
#TODO run EQUAL TO FOLLOW COMMIT!
'''
child.expect('password:')
child.sendline()
child.expect('password:')
child.sendline(__newpasswd)
'''
child.expect('#')
child.sendline('su - zach')
child.expect('$')
child.sendline('whomai')
def getoutput(self, cmd):
"""Run a command and return its stdout/stderr as a string.
Parameters
----------
cmd : str
A command to be executed in the system shell.
Returns
-------
output : str
A string containing the combination of stdout and stderr from the
subprocess, in whatever order the subprocess originally wrote to its
file descriptors (so the order of the information in this string is the
correct order as would be seen if running the command in a terminal).
"""
try:
return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
except KeyboardInterrupt:
print('^C', file=sys.stderr, end='')
def getoutput_pexpect(self, cmd):
"""Run a command and return its stdout/stderr as a string.
Parameters
----------
cmd : str
A command to be executed in the system shell.
Returns
-------
output : str
A string containing the combination of stdout and stderr from the
subprocess, in whatever order the subprocess originally wrote to its
file descriptors (so the order of the information in this string is the
correct order as would be seen if running the command in a terminal).
"""
try:
return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
except KeyboardInterrupt:
print('^C', file=sys.stderr, end='')
def getoutput(self, cmd):
"""Run a command and return its stdout/stderr as a string.
Parameters
----------
cmd : str
A command to be executed in the system shell.
Returns
-------
output : str
A string containing the combination of stdout and stderr from the
subprocess, in whatever order the subprocess originally wrote to its
file descriptors (so the order of the information in this string is the
correct order as would be seen if running the command in a terminal).
"""
try:
return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
except KeyboardInterrupt:
print('^C', file=sys.stderr, end='')
def getoutput_pexpect(self, cmd):
"""Run a command and return its stdout/stderr as a string.
Parameters
----------
cmd : str
A command to be executed in the system shell.
Returns
-------
output : str
A string containing the combination of stdout and stderr from the
subprocess, in whatever order the subprocess originally wrote to its
file descriptors (so the order of the information in this string is the
correct order as would be seen if running the command in a terminal).
"""
try:
return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
except KeyboardInterrupt:
print('^C', file=sys.stderr, end='')
def getoutput(self, cmd):
"""Run a command and return its stdout/stderr as a string.
Parameters
----------
cmd : str
A command to be executed in the system shell.
Returns
-------
output : str
A string containing the combination of stdout and stderr from the
subprocess, in whatever order the subprocess originally wrote to its
file descriptors (so the order of the information in this string is the
correct order as would be seen if running the command in a terminal).
"""
try:
return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
except KeyboardInterrupt:
print('^C', file=sys.stderr, end='')
def getoutput_pexpect(self, cmd):
"""Run a command and return its stdout/stderr as a string.
Parameters
----------
cmd : str
A command to be executed in the system shell.
Returns
-------
output : str
A string containing the combination of stdout and stderr from the
subprocess, in whatever order the subprocess originally wrote to its
file descriptors (so the order of the information in this string is the
correct order as would be seen if running the command in a terminal).
"""
try:
return pexpect.run(self.sh, args=['-c', cmd]).replace('\r\n', '\n')
except KeyboardInterrupt:
print('^C', file=sys.stderr, end='')
def _start(self, startup_args=None):
if not startup_args:
startup_args = self._create_startup_arg_list(self._current_color,
**self.init_kwargs)
try:
previous_instances = pexpect.run('pgrep -d, -u %s xflux' % pexpect.run('whoami')).strip()
if previous_instances != "":
for process in previous_instances.split(","):
pexpect.run('kill -9 %s' % process)
self._xflux = pexpect.spawn("xflux", startup_args)
#logfile=file("tmp/xfluxout.txt",'w'))
except pexpect.ExceptionPexpect:
raise FileNotFoundError(
"\nError: Please install xflux in the PATH \n")
def run(step, cmd, expect, bail, timeout=10):
print_local_step(step)
res = pexpect.run(cmd, timeout=timeout).decode('utf-8')
log(res)
res = ansi.sub('', res)
if expect not in res:
bail_out(bail, res)
return res
def test_original_spec(kernel):
"""The kernel should output a conda path in the test suite environment.
More of a test of the complicated logic in the test fixture than
anything, but it ensures a a good test baseline.
"""
stdout = pexpect.run('which conda')
conda_path = stdout.decode('utf-8').strip()
assert conda_path.startswith(sys.prefix)
def _stop_tunnel(cmd):
pexpect.run(cmd)
def RunGCloudService(args, project, service, logfile=None):
"""This function runs a gcloud `service` command.
Args:
args: command with arguments as an array
project: the project in which the remotehost belongs to
service: the service user wants to run on gcloud
logfile: an opened filestream to write log
Returns:
Returns the return value of RunCommand
"""
return RunCommand(GetGcloud(args, project=project,
service=service), logfile=logfile)
def TryFunctionWithTimeout(func, error_handler, num_tries,
sleep_between_attempt_secs, *args, **kwargs):
"""The function tries to run a function without any exception.
The function tries for a certain number of tries. If it cannot succeed,
it raises BenchmarkError.
Args:
func: the function to try running without exception
error_handler: the exception that the function should catch and keep trying.
num_tries: number of tries it should make before raising the final
exception
sleep_between_attempt_secs: number of seconds to sleep between each retry
*args: arguments to the function
**kwargs: named arguments to the function
Raises:
BenchmarkError: When all tries are failed.
"""
count = num_tries
while count > 0:
try:
count -= 1
ret_val = func(*args, **kwargs)
if not ret_val:
return
else:
print ret_val
except error_handler as e:
print e
print ("Problem running function, {}. Trying again after"
" {}s. Total tries left: {}.").format(func,
sleep_between_attempt_secs,
count)
time.sleep(sleep_between_attempt_secs)
raise BenchmarkError("All tries failed.")
def tearDown(self):
'''Test case common fixture teardown.'''
logger.info("======= tearDown: %s", self.delete_artifacts)
self.log_docker_constructs()
new_containers = self.new_constructs('container')
new_images = self.new_constructs('image')
new_volumes = self.new_constructs('volume')
if self.delete_artifacts:
# Every case should clean up its docker images and containers.
for container in new_containers:
# These should have been launched with the --rm flag,
# so they should be removed once stopped.
logger.info("REMOVING %s", container['id'])
pexpect.run('docker stop {}'.format(container['id']))
for image in new_images:
logger.info("REMOVING %s", image['id'])
pexpect.run('docker rmi {}'.format(image['id']))
for volume in new_volumes:
logger.info("REMOVING %s", volume['name'])
pexpect.run('docker volume rm {}'.format(volume['name']))
else:
# We'll leave behind any new docker constructs, so we need
# to update the "original docker volumes".
self.constructs['current']['container'].extend(new_containers)
self.constructs['current']['image'].extend(new_images)
self.constructs['current']['volume'].extend(new_volumes)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_docker_image_list(*expected_images):
'''Get the output from "docker image ls" for specified image names.'''
image_listing_pattern = (
r'(?P<name>[^\s]+)\s+'
r'(?P<tag>[^\s]+)\s+'
r'(?P<id>[0-9a-f]+)\s+'
r'(?P<created>.+ago)\s+'
r'(?P<size>[^\s]+)'
r'\s*$'
)
image_listing_re = re.compile(image_listing_pattern)
docker_images_response = pexpect.run('docker image ls')
image_list = []
expected_image_nametag_pairs = [
(x.split(':') + ['latest'])[0:2] for x in expected_images
] if expected_images else None
for line in docker_images_response.split('\n'):
match = image_listing_re.match(line)
if (
match and (
not expected_images or [
match.groupdict()['name'], match.groupdict()['tag']
] in expected_image_nametag_pairs
)
):
image_list.append(match.groupdict())
return image_list
# - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_docker_container_list(*expected_containers):
'''Get the output from "docker ps -a" for specified container names.'''
container_listing_pattern = (
r'(?P<id>[0-9a-f]+)\s+'
r'(?P<image>[^\s]+)\s+'
r'(?P<command>"[^"]+")\s+'
r'(?P<created>.+ago)\s+'
r'(?P<status>(Created|Exited.*ago|Up \d+ \S+))\s+'
r'(?P<ports>[^\s]+)?\s+'
r'(?P<name>[a-z]+_[a-z]+)'
# r'\s*$'
)
container_listing_re = re.compile(container_listing_pattern)
docker_containers_response = pexpect.run('docker ps -a')
container_list = []
# expected_container_nametag_pairs = [
# (x.split(':') + ['latest'])[0:2] for x in expected_containers
# ] if expected_containers else []
for line in docker_containers_response.split('\n'):
match = container_listing_re.match(line)
if match:
container_list.append(match.groupdict())
return container_list
# - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def get_container_details(self, image):
'''Run a docker container shell and retrieve several details.'''
# (detail name, command, result filter) for extracting details
# from container command lines.
shell_commands = (
('pwd', 'pwd', None),
('phantomjs_path', 'which phantomjs', None),
('phantomjs_perms', 'ls -l $(which phantomjs)', lambda x: x[1:10]),
('config_file', 'ls {}'.format(CONTAINER_CONFIG_PATH), None),
('config_contents', 'cat {}'.format(CONTAINER_CONFIG_PATH), None),
)
command = "docker run --rm -it {} bash".format(image)
logger.info('IMAGE: %s', image)
logger.info('CONTAINER LAUNCH COMMAND: %s', command)
spawn = pexpect.spawn(command)
container_details = {}
for field, shell_command, response_filter in shell_commands:
container_details[field] = interact(
spawn, shell_command, response_filter
)
# Exit the container.
spawn.sendcontrol('d')
# "Expand" the config records if we found a config file.
if container_details['config_file'] == CONTAINER_CONFIG_PATH:
try:
exec(container_details['config_contents'], container_details)
except SyntaxError:
pass
# The '__builtins__' are noise:
if '__builtins__' in container_details:
del container_details['__builtins__']
return container_details
# - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def test_make_aardvark_sqlite_no_images_tag(self):
'''Test "make aardvark-sqlite" without specifying the images tag.'''
self.case_worker(
target='aardvark-sqlite',
expected_docker_images=[
'aardvark-base',
'aardvark-data-init',
'aardvark-collector',
'aardvark-apiserver',
],
expected_details={
'_common': {
'pwd': '/usr/share/aardvark-data',
'NUM_THREADS': 5,
'PHANTOMJS': EXPECTED_PHANTOMJS_PATH,
'ROLENAME': 'Aardvark',
'SQLALCHEMY_DATABASE_URI': EXPECTED_SQLITE_DB_URI,
'SQLALCHEMY_TRACK_MODIFICATIONS': EXPECTED_SQL_TRACK_MODS,
},
'aardvark-base': {
'pwd': '/etc/aardvark',
},
},
expected_artifacts=[
'aardvark-base-docker-build',
'aardvark-data-docker-build',
'aardvark-data-docker-run',
'aardvark-apiserver-docker-build',
'aardvark-collector-docker-build',
],
set_images_tag=False
)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Define test suites.
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def __init__(self):
""" Constructor. """
output = pexpect.run('electrum listaddresses')
print(output)
pattern = re.compile(r'\[\W*"[A-z0-9]+"\W*\]') #the specific output for electrum if 1 adress exists
print(pattern.search(output))
if(pattern.search(output)):
#if a wallet exists, initialize that one
print('using already existing wallet')
else:
self._create_wallet()
subprocess.call(['electrum', 'daemon', 'start'])
def _create_wallet(self):
print('did not find an existing wallet, creating a new one')
#ensure the daemon is stopped, as this causes path errors (mostly usefull for development)
pexpect.run('electrum daemon stop')
#build a new wallet if no wallet yet exists
walletpair=str(subprocess.check_output('python addrgen/addrgen.py',shell=True))
walletpair = re.split('\W+', walletpair)
self.address = walletpair[1]
self.privkey = walletpair[2]
print('created a wallet with address \''+self.address+'\' and privatekey \''+self.privkey+'\'')
child = pexpect.spawn('electrum', ['restore', self.privkey])
#respectively: use default password, use default fee (0.002), use default gap limit and give seed
self._answer_prompt(child, '')
#check if wallet was created succesfulyl
command = """electrum listaddresses"""
output = pexpect.run(command)
walletFinder = re.compile(r'\[\W*"([A-z0-9]+)"\W*\]')
result = walletFinder.search(output)
#This horrible feedback loop is here due to a quirk of electrum.
#Needs refactoring, but do not refactor without extensive testing (i.e. multiple vps all from clean install)
#Because electrum behaviour right after startup tends to differ from server to server (i suspect something to do wtih specs)
try:
print result.group(1)
return result.group(1)
except:
return self._create_wallet()
# def __del__(self):
# '''
# clear up the electrum service
# '''
# subprocess.call(['electrum', 'daemon', 'stop'])
def _stop_tunnel(cmd):
pexpect.run(cmd)
def _stop_tunnel(cmd):
pexpect.run(cmd)
def _stop_tunnel(cmd):
pexpect.run(cmd)