def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
python类getuser()的实例源码
action_asset_delete.py 文件源码
项目:ftrack-kredenc-hooks
作者: kredencstudio
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def get_system_username():
"""
Try to determine the current system user's username.
:returns: The username as a unicode string, or an empty string if the
username could not be determined.
"""
try:
result = getpass.getuser()
except (ImportError, KeyError):
# KeyError will be raised by os.getpwuid() (called by getuser())
# if there is no corresponding entry in the /etc/passwd file
# (a very restricted chroot environment, for example).
return ''
if six.PY2:
try:
result = result.decode(DEFAULT_LOCALE_ENCODING)
except UnicodeDecodeError:
# UnicodeDecodeError - preventive treatment for non-latin Windows.
return ''
return result
LocalEngine.py 文件源码
项目:firecloud_developer_toolkit
作者: broadinstitute
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def __init__(self):
self._pipelineName_by_jobId = {}
self._cmdStr_by_jobId = {}
self._resources_by_jobId = {}
self._job_outdir_by_jobId = {}
self._localStatus_by_jobId = {}
self._running_processes_by_jobId = {}
self._filehandles_by_jobId = {}
try:
self._user = getpass.getuser()
except KeyError:
self._user = "unknown"
self._firehose_mode = self._user.startswith('cgaadm')
(cpu_count, mem_gb) = self._get_host_resources()
self._numcores = cpu_count
self._totmem = mem_gb
print ('Local host has ' + str(cpu_count) + ' cores and ' + str(mem_gb) + 'GB.')
def get_system_username():
"""
Try to determine the current system user's username.
:returns: The username as a unicode string, or an empty string if the
username could not be determined.
"""
try:
result = getpass.getuser()
except (ImportError, KeyError):
# KeyError will be raised by os.getpwuid() (called by getuser())
# if there is no corresponding entry in the /etc/passwd file
# (a very restricted chroot environment, for example).
return ''
if six.PY2:
try:
result = result.decode(DEFAULT_LOCALE_ENCODING)
except UnicodeDecodeError:
# UnicodeDecodeError - preventive treatment for non-latin Windows.
return ''
return result
def logBasicSettings():
# record basic user inputs and settings to log file for future purposes
import getpass, time
f = open(textFilePath,'a+')
f.write("\n################################################################################################################\n")
f.write("Executing \"Export SSURGO Shapefiles\" tool\n")
f.write("User Name: " + getpass.getuser() + "\n")
f.write("Date Executed: " + time.ctime() + "\n")
f.write("User Parameters:\n")
f.write("\tFile Geodatabase Feature Dataset: " + inLoc + "\n")
f.write("\tExport Folder: " + outLoc + "\n")
#f.write("\tArea of Interest: " + AOI + "\n")
f.close
del f
## ===================================================================================
def create_config(project_dir, config_file):
identifier = get_input('Project idenitifer', default=project_dir)
name = get_input('Project name', default=project_dir)
desc = get_input('Project description', default='')
author = get_input('Your name', default=getpass.getuser())
love_version = get_input('LOVE version to use',
default=LATEST_LOVE_VERSION)
new_config = Configuration({
'id': identifier,
'name': name,
'description': desc,
'author': author,
'loveVersion': love_version,
'targets': [get_current_platform()]
})
with open(config_file, 'w') as f:
f.write(yaml.dump(new_config.as_dict(), default_flow_style=False))
def saveCurrent(self):
self._seq.data[self.curFrame] = self._seq.data[self.curFrame]._replace(gtorig=self.curData)
if hasattr(self._seq.data[self.curFrame], 'extraData'):
ed = {'pb': {'pbp': self.curPbP, 'pb': self.curPb}, 'vis': self.curVis}
self._seq.data[self.curFrame] = self._seq.data[self.curFrame]._replace(extraData=ed)
if self.filename_joints is not None and self.filename_pb is not None and self.filename_vis is not None:
self.importer.saveSequenceAnnotations(self._seq, {'joints': self.filename_joints,
'vis': self.filename_vis,
'pb': self.filename_pb})
# save log file
if self.file_log is not None:
self.file_log.write('{}, {}, {}@{}, {}\n'.format(self._seq.data[self.curFrame].fileName,
time.strftime("%d.%m.%Y %H:%M:%S +0000", time.gmtime()),
getpass.getuser(),
os.uname()[1],
time.time() - self.start_time))
self.file_log.flush()
def post(content, token=None, username=None, public=False, debug=False):
'''
Post a gist on GitHub.
'''
random = hashlib.sha1(os.urandom(16)).hexdigest()
username = getuser() if username is None else username
now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
description = ('{hash} (twitter-message-bus); from {host} by {user} '
'at {time} UTC.').format(host=getfqdn(), user=username,
time=now, hash=random)
payload = json.dumps({
'files': {
'message': {
'content': content if content is not None else ''
}
},
'public': public,
'description': description
})
response = github(http='post', uri='gists', token=token, payload=payload,
debug=debug)
return (response['id'], random) if 'id' in response else (None, None)
def client_name(self, client_name=None):
"""
Generate a valid :class:`ClientNameRequest`.
Parameters
----------
client_name : string, optional
defaults to output of ``getpass.getuser()``
Returns
-------
ClientNameRequest
"""
if client_name is None:
client_name = getpass.getuser()
command = ClientNameRequest(client_name)
return command
def __validate_configuration(self):
"""
Validates that all essential keys are present in configuration, if not creates them with default values
"""
was_valid = True # If any value is not present indicates need to save configuration to file
if "Application" not in self.config:
self.config["Application"] = {}
was_valid = False
if "cacheDir" not in self.config['Application']:
self.config["Application"]["cacheDir"] = "/tmp/easy-ebook-viewer-cache-" + getpass.getuser() + "/"
was_valid = False
if "javascript" not in self.config['Application']:
self.config["Application"]["javascript"] = "False"
was_valid = False
if "caret" not in self.config['Application']:
self.config["Application"]["caret"] = "False"
was_valid = False
if "stylesheet" not in self.config['Application']:
self.config["Application"]["stylesheet"] = "Day"
was_valid = False
if not was_valid: # Something changed?
self.save_configuration()
def kill_using_shell(logger, pid, signal=signal.SIGTERM):
try:
process = psutil.Process(pid)
# Use sudo only when necessary - consider SubDagOperator and SequentialExecutor case.
if process.username() != getpass.getuser():
args = ["sudo", "kill", "-{}".format(int(signal)), str(pid)]
else:
args = ["kill", "-{}".format(int(signal)), str(pid)]
# PID may not exist and return a non-zero error code
logger.error(subprocess.check_output(args, close_fds=True))
logger.info("Killed process {} with signal {}".format(pid, signal))
return True
except psutil.NoSuchProcess as e:
logger.warning("Process {} no longer exists".format(pid))
return False
except subprocess.CalledProcessError as e:
logger.warning("Failed to kill process {} with signal {}. Output: {}"
.format(pid, signal, e.output))
return False
def __init__(self, task, execution_date, state=None):
self.dag_id = task.dag_id
self.task_id = task.task_id
self.execution_date = execution_date
self.task = task
self.queue = task.queue
self.pool = task.pool
self.priority_weight = task.priority_weight_total
self.try_number = 0
self.max_tries = self.task.retries
self.unixname = getpass.getuser()
self.run_as_user = task.run_as_user
if state:
self.state = state
self.hostname = ''
self.init_on_load()
self._log = logging.getLogger("airflow.task")
def get_system_username():
"""
Try to determine the current system user's username.
:returns: The username as a unicode string, or an empty string if the
username could not be determined.
"""
try:
result = getpass.getuser()
except (ImportError, KeyError):
# KeyError will be raised by os.getpwuid() (called by getuser())
# if there is no corresponding entry in the /etc/passwd file
# (a very restricted chroot environment, for example).
return ''
if six.PY2:
try:
result = result.decode(DEFAULT_LOCALE_ENCODING)
except UnicodeDecodeError:
# UnicodeDecodeError - preventive treatment for non-latin Windows.
return ''
return result
def ls(path):
"""List directory contents"""
cl = Client()
dir_ls = cl.list_dir(path)
stat = dir_ls['status']
if stat == Status.ok:
for item, info in dir_ls['items'].items():
fr = "-rw-r--r--"
if info['type'] == NodeType.directory:
fr = "drwxr-xr-x"
date = datetime.datetime.fromtimestamp(info['date'])
date_format = '%b %d %H:%M' if date.year == datetime.datetime.today().year else '%b %d %Y'
print('%.11s %.10s %6sB %.15s %s' % (fr,
getpass.getuser(),
info['size'],
datetime.datetime.fromtimestamp(info['date']).strftime(date_format),
item))
else:
print(Status.description(stat))
def _get_build_prefix():
""" Returns a safe build_prefix """
path = os.path.join(tempfile.gettempdir(), 'pip-build-%s' % \
getpass.getuser())
if sys.platform == 'win32':
""" on windows(tested on 7) temp dirs are isolated """
return path
try:
os.mkdir(path)
except OSError:
file_uid = None
try:
fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
file_uid = os.fstat(fd).st_uid
os.close(fd)
except OSError:
file_uid = None
if file_uid != os.getuid():
msg = "The temporary folder for building (%s) is not owned by your user!" \
% path
print (msg)
print("pip will not work until the temporary folder is " + \
"either deleted or owned by your user account.")
raise pip.exceptions.InstallationError(msg)
return path
def __init__( self, host = '127.0.0.1', port = 3128, verbose = True ):
if getuser() != 'root':
raise UserWarning( 'Must be run as root' )
self.verbose = verbose
self.port = int( port )
self.port = 3128 if not port else port
self.host = host
self.http = str( host ) + ':' + str( self.port ) if not str( host ).endswith( '/' ) else str( host[ :-1 ] ) + ':' + str( self.port )
super( self.__class__, self ).__init__()
def get_sudoer( self ):
if 'SUDO_USER' in os.environ:
return os.environ[ 'SUDO_USER' ]
else:
return getuser()
def finalize_options(self):
orig.upload.finalize_options(self)
self.username = (
self.username or
getpass.getuser()
)
# Attempt to obtain password. Short circuit evaluation at the first
# sign of success.
self.password = (
self.password or
self._load_password_from_keyring() or
self._prompt_for_password()
)
def __init__(self, cmd_str, uuid):
self.cmd = cmd_str
self.user = getpass.getuser()
self.timestamp = datetime.datetime.now()
self.uuid = uuid