def __init__(self, additional_compose_file=None, additional_services=None):
# To resolve docker client server version mismatch issue.
os.environ["COMPOSE_API_VERSION"] = "auto"
dir_name = os.path.split(os.getcwd())[-1]
self.project = "{}{}".format(
re.sub(r'[^a-z0-9]', '', dir_name.lower()),
getpass.getuser()
)
self.additional_compose_file = additional_compose_file
self.services = ["zookeeper", "schematizer", "kafka"]
if additional_services is not None:
self.services.extend(additional_services)
# This variable is meant to capture the running/not-running state of
# the dependent testing containers when tests start running. The idea
# is, we'll only start and stop containers if they aren't already
# running. If they are running, we'll just use the ones that exist.
# It takes a while to start all the containers, so when running lots of
# tests, it's best to start them out-of-band and leave them up for the
# duration of the session.
self.containers_already_running = self._are_containers_already_running()
python类getuser()的实例源码
def safeInstall():
FACTORIOPATH = getFactorioPath()
try:
if not os.path.isdir("%s" % (FACTORIOPATH) ):
if os.access("%s/.." % (FACTORIOPATH), os.W_OK):
os.mkdir(FACTORIOPATH, 0o777)
else:
subprocess.call(['sudo', 'mkdir', '-p', FACTORIOPATH])
subprocess.call(['sudo', 'chown', getpass.getuser(), FACTORIOPATH])
os.mkdir(os.path.join(FACTORIOPATH, "saves"))
os.mkdir(os.path.join(FACTORIOPATH, "config"))
with open("%s/.bashrc" % (os.path.expanduser("~")), "r+") as bashrc:
lines = bashrc.read()
if lines.find("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") == -1:
bashrc.write("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n")
print("You'll want to restart your shell for command autocompletion. Tab is your friend.")
updateFactorio()
except IOError as e:
print("Cannot make %s. Please check permissions. Error %s" % (FACTORIOPATH, e))
sys.exit(1)
def _test_resource_paths(my):
path = my.server.get_resource_path('admin')
# not a very accurate test
my.assertEquals(True, 'etc/admin.tacticrc' in path)
paths = my.server.create_resource_paths()
sys_login = getpass.getuser()
dir = my.server.get_home_dir()
is_dir_writeable = os.access(dir, os.W_OK) and os.path.isdir(dir)
if dir and is_dir_writeable:
dir = "%s/.tactic/etc" % dir
else:
if os.name == 'nt':
dir = 'C:/sthpw/etc'
else:
dir = '/tmp/sthpw/etc'
compared = '%s/%s.tacticrc' %(dir, sys_login) in paths
my.assertEquals(True, compared)
# since we use admin to get resource path , my.login should also be admin
my.assertEquals('admin', my.server.get_login())
def __init__(self, this_filename, generation=1):
# Constructor
self.plant_id = str(uuid.uuid4())
self.life_stages = (3600*24, (3600*24)*3, (3600*24)*10, (3600*24)*20, (3600*24)*30)
# self.life_stages = (5, 10, 15, 20, 25)
self.stage = 0
self.mutation = 0
self.species = random.randint(0,len(self.species_list)-1)
self.color = random.randint(0,len(self.color_list)-1)
self.rarity = self.rarity_check()
self.ticks = 0
self.age_formatted = "0"
self.generation = generation
self.dead = False
self.write_lock = False
self.owner = getpass.getuser()
self.file_name = this_filename
self.start_time = int(time.time())
self.last_time = int(time.time())
# must water plant first day
self.watered_timestamp = int(time.time())-(24*3600)-1
self.watered_24h = False
pa_start_django_webapp_with_virtualenv.py 文件源码
项目:helper_scripts
作者: pythonanywhere
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def main(domain, django_version, python_version, nuke):
if domain == 'your-username.pythonanywhere.com':
username = getpass.getuser().lower()
domain = f'{username}.pythonanywhere.com'
project = DjangoProject(domain)
project.sanity_checks(nuke=nuke)
project.create_virtualenv(python_version, django_version, nuke=nuke)
project.run_startproject(nuke=nuke)
project.find_django_files()
project.update_settings_file()
project.run_collectstatic()
project.create_webapp(nuke=nuke)
project.add_static_file_mappings()
project.update_wsgi_file()
project.webapp.reload()
print(snakesay(f'All done! Your site is now live at https://{domain}'))
def main(repo_url, domain, python_version, nuke):
if domain == 'your-username.pythonanywhere.com':
username = getpass.getuser().lower()
domain = f'{username}.pythonanywhere.com'
project = DjangoProject(domain)
project.sanity_checks(nuke=nuke)
project.create_webapp(nuke=nuke)
project.add_static_file_mappings()
project.create_virtualenv(python_version, nuke=nuke)
project.download_repo(repo_url, nuke=nuke),
project.find_django_files()
project.update_wsgi_file()
project.update_settings_file()
project.run_collectstatic()
project.run_migrate()
project.webapp.reload()
print(snakesay(f'All done! Your site is now live at https://{domain}'))
def sanity_checks(self, nuke):
print(snakesay('Running API sanity checks'))
token = os.environ.get('API_TOKEN')
if not token:
raise SanityException(dedent(
'''
Could not find your API token.
You may need to create it on the Accounts page?
You will also need to close this console and open a new one once you've done that.
'''
))
if nuke:
return
url = API_ENDPOINT.format(username=getpass.getuser()) + self.domain + '/'
response = call_api(url, 'get')
if response.status_code == 200:
raise SanityException(f'You already have a webapp for {self.domain}.\n\nUse the --nuke option if you want to replace it.')
def virtualenvs_folder():
actual_virtualenvs = Path(f'/home/{getuser()}/.virtualenvs')
old_virtualenvs = set(Path(actual_virtualenvs).iterdir())
tempdir = _get_temp_dir()
old_workon = os.environ.get('WORKON_HOME')
os.environ['WORKON_HOME'] = str(tempdir)
yield tempdir
if old_workon:
os.environ['WORKON_HOME'] = old_workon
shutil.rmtree(tempdir)
new_envs = set(actual_virtualenvs.iterdir()) - set(old_virtualenvs)
if new_envs:
raise Exception('virtualenvs path mocking failed somewehere: {}, {}'.format(
new_envs, tempdir
))
def get_system_username():
"""
Try to determine the current system user's username.
:returns: The username as a unicode string, or an empty string if the
username could not be determined.
"""
try:
result = getpass.getuser()
except (ImportError, KeyError):
# KeyError will be raised by os.getpwuid() (called by getuser())
# if there is no corresponding entry in the /etc/passwd file
# (a very restricted chroot environment, for example).
return ''
if six.PY2:
try:
result = result.decode(DEFAULT_LOCALE_ENCODING)
except UnicodeDecodeError:
# UnicodeDecodeError - preventive treatment for non-latin Windows.
return ''
return result
def get_system_username():
"""
Try to determine the current system user's username.
:returns: The username as a unicode string, or an empty string if the
username could not be determined.
"""
try:
result = getpass.getuser()
except (ImportError, KeyError):
# KeyError will be raised by os.getpwuid() (called by getuser())
# if there is no corresponding entry in the /etc/passwd file
# (a very restricted chroot environment, for example).
return ''
if six.PY2:
try:
result = result.decode(DEFAULT_LOCALE_ENCODING)
except UnicodeDecodeError:
# UnicodeDecodeError - preventive treatment for non-latin Windows.
return ''
return result
def doConnect(options):
# log.deferr = handleError # HACK
if '@' in options['host']:
options['user'], options['host'] = options['host'].split('@',1)
host = options['host']
if not options['user']:
options['user'] = getpass.getuser()
if not options['port']:
options['port'] = 22
else:
options['port'] = int(options['port'])
host = options['host']
port = options['port']
conn = SSHConnection()
conn.options = options
vhk = default.verifyHostKey
uao = default.SSHUserAuthClient(options['user'], options, conn)
connect.connect(host, port, options, vhk, uao).addErrback(_ebExit)
def doConnect():
# log.deferr = handleError # HACK
if '@' in options['host']:
options['user'], options['host'] = options['host'].split('@',1)
if not options.identitys:
options.identitys = ['~/.ssh/id_rsa', '~/.ssh/id_dsa']
host = options['host']
if not options['user']:
options['user'] = getpass.getuser()
if not options['port']:
options['port'] = 22
else:
options['port'] = int(options['port'])
host = options['host']
port = options['port']
vhk = default.verifyHostKey
uao = default.SSHUserAuthClient(options['user'], options, SSHConnection())
connect.connect(host, port, options, vhk, uao).addErrback(_ebExit)
def create_deployment_manifest(method):
"""Returns a dict describing the current deployable."""
service_info = get_service_info()
commit = get_commit()
version = get_git_version()
info = {
'deployable': service_info['name'],
'method': method,
'username': getpass.getuser(),
'datetime': datetime.utcnow().isoformat(),
'branch': get_branch(),
'commit': commit,
'commit_url': get_repo_url() + "/commit/" + commit,
'release': version['tag'] if version else 'untagged-branch'
}
return info
def post_message(message):
if not SLACKBOT_TOKEN:
print "Slackbot key not found, cannot notify slack of '%s'" % message
print "Set the environment variable DRIFT_SLACKBOT_KEY to remedy."
return
final_message = "{}: {}".format(getpass.getuser(), message)
try:
from slacker import Slacker
except ImportError:
print "Message '{}' not posted to slack".format(message)
print "Slack integration disabled. Enable slack with 'pip install slacker'"
try:
slack = Slacker(SLACKBOT_TOKEN)
slack.chat.post_message(NOTIFICATION_CHANNEL, final_message)
except Exception as e:
log.warning("Cannot post '%s' to Slack: %s", final_message, e)
def _build_fixture_file_name(self):
def commit_author_date(commit):
cmd_output = subprocess.check_output(['git', 'show', '-s', '--format=%ai', head_commit]).strip()
cmd_output = re.sub(r'\s*\+.*', '', cmd_output)
cmd_output = re.sub(r'\s+', 'T', cmd_output)
return cmd_output
head_commit = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD']).strip()
format_args = {
'commit_date': commit_author_date(head_commit),
'commit': head_commit,
'run_date': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S'),
'user': getpass.getuser(),
}
return '%(commit_date)s+%(commit)s+%(run_date)s+%(user)s.zip' % format_args
action_thumbToParent.py 文件源码
项目:ftrack-kredenc-hooks
作者: kredencstudio
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
action_thumbToChildern.py 文件源码
项目:ftrack-kredenc-hooks
作者: kredencstudio
项目源码
文件源码
阅读 109
收藏 0
点赞 0
评论 0
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
action_component_open.py 文件源码
项目:ftrack-kredenc-hooks
作者: kredencstudio
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
note_on_multiple_entities_action.py 文件源码
项目:ftrack-kredenc-hooks
作者: kredencstudio
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
def register(self):
'''Register discover actions on logged in user.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
action_delete_unpublished.py 文件源码
项目:ftrack-kredenc-hooks
作者: kredencstudio
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
def register(self):
'''Register discover actions on logged in user.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
action_component_rename.py 文件源码
项目:ftrack-kredenc-hooks
作者: kredencstudio
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
find_and_replace_action.py 文件源码
项目:ftrack-kredenc-hooks
作者: kredencstudio
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)
action_version_breakdown.py 文件源码
项目:ftrack-kredenc-hooks
作者: kredencstudio
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def register(self):
'''Register action.'''
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.discover and source.user.username={0}'.format(
getpass.getuser()
),
self.discover
)
ftrack.EVENT_HUB.subscribe(
'topic=ftrack.action.launch and source.user.username={0} '
'and data.actionIdentifier={1}'.format(
getpass.getuser(), self.identifier
),
self.launch
)