def runCmd(cmd,cmd_timeout=300):
''' run command without showing console window on windows - return stdout and stderr as strings '''
startupinfo = None
output = ""
output_err = ""
debug_log("runCmd: {}".format(cmd))
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
try:
proc = subprocess.Popen(cmd,bufsize=-1,startupinfo=startupinfo,stdout=subprocess.PIPE,stderr=subprocess.PIPE,stdin=None,shell=False,universal_newlines=False)
except SubprocessError as e:
proc = None
debug_log("exception in runCmd: {}".format(e),logging.ERROR)
if proc is not None:
try:
outputb, output_errb = proc.communicate()
output = outputb.decode('utf-8','replace')
output_err = output_errb.decode('utf-8','replace')
except subprocess.TimeoutExpired(timeout=cmd_timeout):
proc.kill()
debug_log("runCmd: Process killed due to timeout",logging.WARNING)
else:
debug_log("runCmd: Proc was none",logging.WARNING)
return output,output_err
python类STARTUPINFO的实例源码
def run_spotty(self, arguments=None):
'''On supported platforms we include spotty binary'''
try:
args = [
self.__spotty_binary,
"-u", self.username,
"-p", self.password,
"--disable-discovery" # for now, disable dns discovery
]
if arguments:
args += arguments
if not "-n" in args:
args += ["-n", self.playername]
# if self.__cache_path:
# args += ["-c", self.__cache_path, "--enable-audio-cache"]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess._subprocess.STARTF_USESHOWWINDOW
return subprocess.Popen(args, startupinfo=startupinfo, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, bufsize=0)
except Exception as exc:
log_exception(__name__, exc)
return None
def run_command(self):
args = ['sfdx', 'force:apex:test:run', '-r', 'human',
'-l', 'RunSpecifiedTests', '-n', self.class_name]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\n' + str(out, 'utf-8'))
else:
printErr = err
if err is None or err == '':
printErr = out
printer.write('\n' + str(printErr, 'utf-8'))
def run_command(self):
args = ['sfdx', 'force:apex:test:run', '-r', 'human']
if not self.test_org is None and len(self.test_org) > 0:
args.push('-u')
args.push(self.input)
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\n' + str(out, 'utf-8'))
else:
printErr = err
if err is None or err == '':
printErr = out
printer.write('\n' + str(printErr, 'utf-8'))
def run_command(self):
args = ['sfdx', 'force:source:push']
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\n' + str(out, 'utf-8'))
else:
printErr = err
if not err is None and not err == '':
printErr = out
else:
printer.write('\nError pushing source')
printer.write('\n' + str(printErr, 'utf-8'))
def run_command(self):
args = ['sfdx', 'force:source:pull']
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\n' + str(out, 'utf-8'))
else:
printErr = err
if not err is None and not err == '':
printErr = out
else:
printer.write('\nError pulling source')
printer.write('\n' + str(printErr, 'utf-8'))
def run_command(self):
args = ['sfdx', 'force:org:create', '-f',
self.def_file, '-a', 'ScratchOrg', '-s']
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nScratch org created')
else:
printer.write('\nError creating scratch org')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:auth:web:login', '-d', '-s', '-a', 'DevHub']
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nDevHub authorized')
else:
printer.write('\nError authorizing Dev Hub:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:visualforce:component:create',
'-n', self.page_name,'-l', self.page_label, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nVisaulforce Component created')
file = os.path.join(self.class_dir, self.page_name + '.component')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Visualforce Component:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:visualforce:page:create',
'-n', self.page_name,'-l', self.page_label, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nVisaulforce page created')
file = os.path.join(self.class_dir, self.page_name + '.page')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Visualforce page:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:lightning:component:create',
'-n', self.cmp_name, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nLightning Component created')
file = os.path.join(self.class_dir, self.cmp_name, self.cmp_name + '.cmp')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Lightning Component:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:lightning:test:create',
'-n', self.event_name, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nLightning Test created')
file = os.path.join(self.class_dir, self.event_name + '.resource')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Lightning Test:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:lightning:interface:create',
'-n', self.event_name, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nLightning Interface created')
file = os.path.join(self.class_dir, self.event_name, self.event_name + '.intf')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Lightning Interface:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:lightning:event:create',
'-n', self.event_name, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nLightning Event created')
file = os.path.join(self.class_dir, self.event_name, self.event_name + '.evt')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Lightning Event:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:lightning:app:create',
'-n', self.app_name, '-d', self.class_dir]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nLightning App created')
file = os.path.join(self.class_dir, self.app_name, self.app_name + '.app')
sublime.active_window().open_file(file)
else:
printer.write('\nError creating Lightning App:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
dx_folder = util.dxProjectFolder()
args = ['sfdx', 'force:project:upgrade', '-f']
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, startupinfo=startupinfo, cwd=dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nProject upgraded')
else:
printer.write('\nError upgrading project:')
printer.write('\n' + str(err, 'utf-8'))
def run_command(self):
args = ['sfdx', 'force:project:create', '-n', self.project_name,
'-t', self.template, '-d', self.project_path]
if self.namespace is not None and self.namespace != '':
args.push('-s')
args.push(self.namespace)
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, startupinfo=startupinfo)
p.wait()
out,err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nProject created')
else:
printer.write('\nError creating project:')
printer.write('\n' + str(out, 'UTF-8'))
def run_command(self):
args = ['sfdx', 'force:apex:execute', '-f', self.file_path]
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
startupinfo=startupinfo, cwd=self.dx_folder)
p.wait()
out, err = p.communicate()
r = p.returncode
if p.returncode == 0:
printer.write('\nFinished running apex')
printer.write('\n' + str(out, 'utf-8'))
else:
printErr = err
if err is None or err == '':
printErr = out
printer.write('\nError running apex')
printer.write('\n' + str(printErr, 'utf-8'))
def prepare_subprocess_args(self):
popen_args = {
'args': self.get_php_cmd(),
'env': self.get_env(),
'stdin': subprocess.PIPE,
'stdout': subprocess.PIPE,
'stderr': subprocess.PIPE,
}
# Prevent cmd.exe window popup on Windows.
if is_windows():
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
popen_args['startupinfo'] = startupinfo
return popen_args
def startBbcomm():
CREATE_NEW_CONSOLE = 0x00000010
try:
info = subprocess.STARTUPINFO()
info.dwFlags = 1
info.wShowWindow = 0
subprocess.Popen(["c:/blp/api/bbcomm.exe"],
creationflags=CREATE_NEW_CONSOLE,
startupinfo=info)
except FileNotFoundError:
try:
info = subprocess.STARTUPINFO()
info.dwFlags = 1
info.wShowWindow = 0
subprocess.Popen(["c:/blp/dapi/bbcomm.exe"],
creationflags=CREATE_NEW_CONSOLE,
startupinfo=info)
except FileNotFoundError:
pass
def __init__(self, commands, context, cwd):
startupinfo = None
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self.__proc = subprocess.Popen(commands,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
startupinfo=startupinfo,
cwd=cwd)
self.__eof = False
self.__context = context
self.__queue_out = Queue()
self.__thread = Thread(target=self.enqueue_output)
self.__thread.start()
def start_server(server_binary_args, working_dir, env):
debug("starting " + str(server_binary_args))
si = None
if os.name == "nt":
si = subprocess.STARTUPINFO() # type: ignore
si.dwFlags |= subprocess.SW_HIDE | subprocess.STARTF_USESHOWWINDOW # type: ignore
try:
process = subprocess.Popen(
server_binary_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=working_dir,
env=env,
startupinfo=si)
return Client(process, working_dir)
except Exception as err:
sublime.status_message("Failed to start LSP server {}".format(str(server_binary_args)))
exception_log("Failed to start server", err)
def get_output(cmd):
if int(sublime.version()) < 3000:
if sublime.platform() != "windows":
# Handle Linux and OS X in Python 2.
run = '"' + '" "'.join(cmd) + '"'
return commands.getoutput(run)
else:
# Handle Windows in Python 2.
# Prevent console window from showing.
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
return subprocess.Popen(cmd, \
stdout=subprocess.PIPE, \
startupinfo=startupinfo).communicate()[0]
else:
# Handle all OS in Python 3.
run = '"' + '" "'.join(cmd) + '"'
return subprocess.check_output(run, stderr=subprocess.STDOUT, shell=True, env=os.environ)
def spawn(args, **kwargs):
"""Spawn a subprocess and return it back
"""
if 'cwd' not in kwargs:
kwargs['cwd'] = os.path.dirname(os.path.abspath(__file__))
kwargs['bufsize'] = -1
if os.name == 'nt':
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
kwargs['startupinfo'] = startupinfo
try:
return subprocess.Popen(args, **kwargs)
except Exception as error:
msg = (
'Your operating system denied the spawn of {0} process: {1}'
).format(args[0], error)
logging.error(msg)
raise RuntimeError(msg)
def start(self):
WorkerClient.stop_worker = False
node_path = global_vars.get_node_path()
if os.name == "nt":
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.SW_HIDE | subprocess.STARTF_USESHOWWINDOW
self.server_proc = subprocess.Popen(
[node_path, self.script_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE, startupinfo=si
)
else:
self.server_proc = subprocess.Popen(
[node_path, self.script_path], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
# start reader thread
if self.server_proc and (not self.server_proc.poll()):
log.debug("worker proc " + str(self.server_proc))
log.debug("starting worker thread")
workerThread = threading.Thread(target=WorkerClient.__reader, args=(
self.server_proc.stdout, self.msgq, self.eventq, self.asyncReq, self.server_proc, self.event_handlers))
workerThread.daemon = True
workerThread.start()
def subprocess_args(include_stdout=True):
if hasattr(subprocess, 'STARTUPINFO'):
si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
env = os.environ
else:
si = None
env = None
if include_stdout:
ret = {'stdout:': subprocess.PIPE}
else:
ret = {}
ret.update({'stdin': subprocess.PIPE,
'stderr': subprocess.PIPE,
'startupinfo': si,
'env': env })
return ret
#Function to get the Process ID
def get_output(cmd):
if int(sublime.version()) < 3000:
if sublime.platform() != "windows":
# Handle Linux and OS X in Python 2.
run = '"' + '" "'.join(cmd) + '"'
return commands.getoutput(run)
else:
# Handle Windows in Python 2.
# Prevent console window from showing.
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
return subprocess.Popen(cmd, \
stdout=subprocess.PIPE, \
startupinfo=startupinfo).communicate()[0]
else:
# Handle all OS in Python 3.
run = '"' + '" "'.join(cmd) + '"'
return subprocess.check_output(run, stderr=subprocess.STDOUT, shell=True, env=os.environ)
def get_output(cmd):
if int(sublime.version()) < 3000:
if sublime.platform() != "windows":
# Handle Linux and OS X in Python 2.
run = '"' + '" "'.join(cmd) + '"'
return commands.getoutput(run)
else:
# Handle Windows in Python 2.
# Prevent console window from showing.
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
return subprocess.Popen(cmd, \
stdout=subprocess.PIPE, \
startupinfo=startupinfo).communicate()[0]
else:
# Handle all OS in Python 3.
run = '"' + '" "'.join(cmd) + '"'
try:
return subprocess.check_output(run, stderr=subprocess.STDOUT, shell=True, env=os.environ)
except Exception as exception:
print(exception.output)
def __init__(self, suproc_command, stdin_queue, stdout_queue, parent):
threading.Thread.__init__(self)
self.setDaemon(False) # we want it to survive parent's death so it can detect innactivity and terminate subproccess
self.setName('pjon_piper_thd')
self._subproc_command = suproc_command
self._birthtime = None
self._stopped = False
self._start_failed = False
self._pipe = None
self._stdout_queue = stdout_queue
self._stdin_queue = stdin_queue
self._parent = parent
if sys.platform == 'win32':
self._startupinfo = subprocess.STARTUPINFO()
self._startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
self._startupinfo.wShowWindow = subprocess.SW_HIDE
self.log = logging.getLogger(self.name)
self.log.handlers = []
self.log.addHandler(logging.NullHandler())
#self.log.propagate = False
self.log.setLevel(logging.INFO)
def get_startup_info():
# Hide the child process window.
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
return startupinfo