def user_credentials_prompt():
""" user credentials prompt """
# user message
usr_msg = "Please type in your router credentials."
print(usr_msg)
# user credential prompt
user = input('User: ')
user_pw = getpass.getpass('User Password: ')
enable_pw = getpass.getpass('Enable Password: ')
# console formatting
print('')
return user, user_pw, enable_pw
python类getpass()的实例源码
def get_user_api_token(logger):
"""
Generate iAuditor API Token
:param logger: the logger
:return: API Token if authenticated else None
"""
username = input("iAuditor username: ")
password = getpass()
generate_token_url = "https://api.safetyculture.io/auth"
payload = "username=" + username + "&password=" + password + "&grant_type=password"
headers = {
'content-type': "application/x-www-form-urlencoded",
'cache-control': "no-cache",
}
response = requests.request("POST", generate_token_url, data=payload, headers=headers)
if response.status_code == requests.codes.ok:
return response.json()['access_token']
else:
logger.error('An error occurred calling ' + generate_token_url + ': ' + str(response.json()))
return None
def repo_creds(username, encrypted_password, has_pass):
"""Get a representation of the container repository credentials"""
from azure.servicefabric.models import RegistryCredential
from getpass import getpass
# Wonky since we allow empty string as an encrypted passphrase
if not any([username, encrypted_password is not None, has_pass]):
return None
if (encrypted_password is not None) and (not username):
raise CLIError('Missing container repository username')
if has_pass and (not username):
raise CLIError('Missing container repository username')
if encrypted_password is not None:
return RegistryCredential(registry_user_name=username,
registry_password=encrypted_password,
password_encrypted=True)
elif has_pass:
passphrase = getpass(prompt='Container repository password: ')
return RegistryCredential(registry_user_name=username,
registry_password=passphrase,
password_encrypted=False)
return RegistryCredential(registry_user_name=username)
def __init__(self, okta_profile, verbose):
home_dir = os.path.expanduser('~')
okta_config = home_dir + '/.okta-aws'
parser = RawConfigParser()
parser.read(okta_config)
profile = okta_profile
if parser.has_option(profile, 'base-url'):
self.base_url = "https://%s" % parser.get(profile, 'base-url')
else:
print("No base-url set in ~/.okta-aws")
if parser.has_option(profile, 'username'):
self.username = parser.get(profile, 'username')
if verbose:
print("Authenticating as: %s" % self.username)
else:
self.username = raw_input('Enter username: ')
if parser.has_option(profile, 'password'):
self.password = parser.get(profile, 'password')
else:
self.password = getpass('Enter password: ')
self.verbose = verbose
def bootstrap(branch='master'):
env.sudo_password = getpass('Initial value for env.sudo_password: ')
env.domain_name = prompt('Enter your domain name:', default='meetup_facebook_bot')
create_permanent_folder()
create_log_folder()
install_postgres()
database_url = setup_postgres(username=env.user, database_name=env.user)
renew_ini_file(database_url)
install_python()
fetch_sources_from_repo(branch, PROJECT_FOLDER)
reinstall_venv()
install_modules()
install_nginx()
configure_letsencrypt_if_necessary()
add_nginx_reload_crontab_job()
configure_nginx_if_necessary()
setup_ufw()
start_systemctl_service(UWSGI_SERVICE_NAME)
start_systemctl_service('nginx')
run_setup_scripts()
status()
def keygen(args):
"""Sub-command to generate keys"""
if not args.cert:
args.cert = "data/{}.crt".format(args.signer_id.replace("@", "_at_"))
for f in [args.cert, args.private_key]: check_writable(f)
from pyseeder.crypto import keygen
if args.no_encryption:
priv_key_password = None
else:
from getpass import getpass
priv_key_password = getpass("Set private key password: ").encode("utf-8")
keygen(args.cert, args.private_key, args.signer_id, priv_key_password)
def prompt_pass(name, default=None):
"""
Grabs hidden (password) input from command line.
:param name: prompt text
:param default: default value if no input provided.
"""
prompt = name + (default and ' [%s]' % default or '')
prompt += name.endswith('?') and ' ' or ': '
while True:
rv = getpass.getpass(prompt)
if rv:
return rv
if default is not None:
return default
def popPeek(server, user, port=110):
try:
P = poplib.POP3(server, port)
P.user(user)
P.pass_(getpass.getpass())
except:
print "Failed to connect to server."
sys.exit(1)
deleted = 0
try:
l = P.list()
msgcount = len(l[1])
for i in range(msgcount):
msg = i+1
top = P.top(msg, 0)
for line in top[1]:
print line
input = raw_input("D to delete, any other key to leave message on server: ")
if input=="D":
P.dele(msg)
deleted += 1
P.quit()
print "%d messages deleted. %d messages left on server" % (deleted, msgcount-deleted)
except:
P.rset()
P.quit()
deleted = 0
print "\n%d messages deleted. %d messages left on server" % (deleted, msgcount-deleted)
def login(args):
"""
Logs in the specified user, prompting for password
if necessary.
"""
if not args.password:
args.password = getpass.getpass("Password: ")
data = {
"email": args.user,
"password": args.password}
if args.token:
data["token"] = args.token
response = sync_request(
args, "POST", "sse_cfg/user",
data=data)
if response:
return response.json()
raise CliException(
"Failed to login {User}".format(
User=args.user))
def register(args):
"""
Registers the specified user, prompting twice for
password if necessary.
"""
if not args.password:
args.password = getpass.getpass("Password: ")
check = getpass.getpass("Repeat: ")
if args.password != check:
raise CliException(
"Passwords do not match")
response = sync_request(
args, "POST", "sse_cfg/register",
data={
"email": args.user,
"password": args.password})
if response:
return response.json()
raise CliException(
"Failed to register {User}".format(
User=args.user))
def _test_resource_paths(my):
path = my.server.get_resource_path('admin')
# not a very accurate test
my.assertEquals(True, 'etc/admin.tacticrc' in path)
paths = my.server.create_resource_paths()
sys_login = getpass.getuser()
dir = my.server.get_home_dir()
is_dir_writeable = os.access(dir, os.W_OK) and os.path.isdir(dir)
if dir and is_dir_writeable:
dir = "%s/.tactic/etc" % dir
else:
if os.name == 'nt':
dir = 'C:/sthpw/etc'
else:
dir = '/tmp/sthpw/etc'
compared = '%s/%s.tacticrc' %(dir, sys_login) in paths
my.assertEquals(True, compared)
# since we use admin to get resource path , my.login should also be admin
my.assertEquals('admin', my.server.get_login())
def read_private():
global global_password
if global_password is None:
setpassword(getpass.getpass("Please enter the password to decrypt your keystore: "))
if os.path.exists('private.json'):
with open('private.json','r') as f:
toread = json.load(f)
key = crypto.kdf(global_password,toread['salt'])
try:
plain = crypto.decrypt(toread['priv'],key)
except ValueError:
raise Exception("Invalid password for keystore")
return json.loads(plain),toread['salt']
else:
#file doesn't exist, just invent a salt
return {'revoked_keys':[]},base64.b64encode(crypto.generate_random_key())
def init_mtls(config):
logger.info("Setting up mTLS...")
tls_dir = config["ca_dir"]
if tls_dir[0]!='/':
tls_dir = os.path.abspath('%s/%s'%(common.WORK_DIR,tls_dir))
# We need to securely pull in the ca password
my_key_pw = getpass.getpass("Please enter the password to decrypt your keystore: ")
ca_util.setpassword(my_key_pw)
# Create HIL Server Connect certs (if not already present)
if not os.path.exists("%s/%s-cert.crt"%(tls_dir,config["ip"])):
logger.info("Generating new Node Monitor TLS Certs in %s for connecting"%tls_dir)
ca_util.cmd_mkcert(tls_dir,config["ip"])
ca_path = "%s/cacert.crt"%(tls_dir)
my_cert = "%s/%s-cert.crt"%(tls_dir,config["ip"])
my_priv_key = "%s/%s-private.pem"%(tls_dir,config["ip"])
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_verify_locations(cafile=ca_path)
context.load_cert_chain(certfile=my_cert,keyfile=my_priv_key,password=my_key_pw)
context.verify_mode = ssl.CERT_REQUIRED
return context
def validate_password():
passwd = getpass.getpass()
print "Again"
passwd2 = getpass.getpass()
if (passwd != passwd2):
print "Password mismatch"
return validate_password()
"""Check against cracklib to avoid simple passwords"""
try:
cracklib.VeryFascistCheck (passwd)
except ValueError as msg:
print msg
return validate_password()
return passwd
def raw(prompt, *args, **kwargs):
"""Calls input to allow user to input an arbitrary string. User can go
back by entering the `go_back` string. Works in both Python 2 and 3.
"""
go_back = kwargs.get('go_back', '<')
type_ = kwargs.get('type', str)
with stdout_redirected(sys.stderr):
while True:
try:
if kwargs.get('secret', False):
answer = getpass.getpass(prompt)
elif sys.version_info < (3, 0):
answer = raw_input(prompt)
else:
answer = input(prompt)
if answer == go_back:
raise QuestionnaireGoBack
return type_(answer)
except ValueError:
eprint('\n`{}` is not a valid `{}`\n'.format(answer, type_))
def changePassPhrase(options):
if not options['filename']:
filename = os.path.expanduser('~/.ssh/id_rsa')
options['filename'] = raw_input('Enter file in which the key is (%s): ' % filename)
try:
key = keys.getPrivateKeyObject(options['filename'])
except keys.BadKeyError, e:
if e.args[0] != 'encrypted key with no passphrase':
raise
else:
if not options['pass']:
options['pass'] = getpass.getpass('Enter old passphrase: ')
key = keys.getPrivateKeyObject(options['filename'], passphrase = options['pass'])
if not options['newpass']:
while 1:
p1 = getpass.getpass('Enter new passphrase (empty for no passphrase): ')
p2 = getpass.getpass('Enter same passphrase again: ')
if p1 == p2:
break
print 'Passphrases do not match. Try again.'
options['newpass'] = p1
open(options['filename'], 'w').write(
keys.makePrivateKeyString(key, passphrase=options['newpass']))
print 'Your identification has been saved with the new passphrase.'
def getGenericAnswers(self, name, instruction, prompts):
responses = []
try:
oldout, oldin = sys.stdout, sys.stdin
sys.stdin = sys.stdout = open('/dev/tty','r+')
if name:
print name
if instruction:
print instruction
for prompt, echo in prompts:
if echo:
responses.append(raw_input(prompt))
else:
responses.append(getpass.getpass(prompt))
finally:
sys.stdout,sys.stdin=oldout,oldin
return defer.succeed(responses)
def defConv(items):
resp = []
for i in range(len(items)):
message, kind = items[i]
if kind == 1: # password
p = getpass.getpass(message)
resp.append((p, 0))
elif kind == 2: # text
p = raw_input(message)
resp.append((p, 0))
elif kind in (3,4):
print message
resp.append(("", 0))
else:
return defer.fail('foo')
d = defer.succeed(resp)
return d
def getCredentials():
'''prompt user for username and password'''
colourPrint('bold',
('You may wish to store your credentials and server ' +
'preferences in this file by opening it in a text editor ' +
'and filling in the username, password, and server ' +
'fields.\nIf you choose not to do this, you will be ' +
'prompted for this information on each run of this script.'))
colourPrint('yellow',
'\nPlease enter your username for SmoothStreamsTV:')
username = input('')
colourPrint('green',
'\nThank you, ' + username + '.\n')
colourPrint('yellow',
'\nPlease enter your password for SmoothStreamsTV:')
password = getpass('')
return username, password
# end getCredentials()
def ask(message, ofile=sys.stderr, ifile=sys.stdin, style=Fore.MAGENTA,
noecho=False, accept_empty=True):
"""
Print a question on *ofile* and wait for an answer on *ifile* using
:py:meth:`io.TextIOBase.readline`.
*style* may be ``None`` for non-colored output (this does not override the
behavior setup by :py:func:`enable_colors`).
"""
if noecho and ifile != sys.stdin:
raise ValueError("noecho option implies input from stdin")
while True:
with ScopedColoredStream(ofile, style, flush_on_exit=True) as stream:
stream.write(message)
if noecho:
ans = getpass.getpass(prompt="", stream=ofile)
else:
ans = ifile.readline().rstrip("\n\r")
if not accept_empty and not ans.strip():
continue
return ans
def save_new_credentials(self):
username = None
password = None
print("\nPlease input your NordVPN credentials:")
while not username and not password:
username = input("Username/Email: ")
password = getpass.getpass("Password: ")
if not self.config.has_section(self.SECTION_TITLE):
self.config.add_section(self.SECTION_TITLE)
self.config.set(self.SECTION_TITLE, 'username', username)
self.config.set(self.SECTION_TITLE, 'password', password)
self.save()
self.logger.info("New credentials saved successfully!")
def adduser():
"""add user"""
from getpass import getpass
username = raw_input("\_username: ")
email = raw_input("\_email: ")
role_id = raw_input("\_[1:moderator 2:admin 3:user]: ")
password = getpass("\_password: ")
u = User(
email = email,
username = username,
password = password,
role_id = role_id
)
db.session.add(u)
db.session.commit()
print "<user %s add in database>" % username
def deploy(self, cwd=False):
assert not self.is_deployed, 'This contract already exists on the chain.'
assert self.sol, 'No solidity code loaded into this object'
response = database.insert_contract(self.name,
self.abi,
self.bytecode,
self.gas_estimates,
self.method_identifiers,
cwd)
okay = web3.personal.Personal(self.web3)
options = 'Unlock: \n' + '\n'.join([' '.join([str(i),':',x]) for i, x in enumerate(okay.listAccounts)]) + '\n'
self.defaultAccount = okay.listAccounts[int(input(options))]
result = okay.unlockAccount(self.defaultAccount, getpass.getpass('\nPassword:'), 5000)
if result:
self.address = self.web3.eth.sendTransaction(transaction={'data' : '0x' + self.bytecode, 'from': self.defaultAccount, 'gaslimit': 30000})
self.instance = self.web3.eth.contract(self.address)
else:
raise Exepction('unable to unlock account')
#update the deployed and address to the db and an instance for pulling and interacting with the contract again
return update_contract(json.dumps(self.address), self.method_identifiers, self.name)
def zoomeye_api_test():
zoomeye = ZoomEye()
zoomeye.username = raw_input('ZoomEye Username: ')
zoomeye.password = getpass.getpass(prompt='ZoomEye Password: ')
zoomeye.login()
print(zoomeye.resources_info())
data = zoomeye.dork_search('solr')
show_site_ip(data)
data = zoomeye.dork_search('country:cn')
show_site_ip(data)
data = zoomeye.dork_search('solr country:cn')
show_site_ip(data)
data = zoomeye.dork_search('solr country:cn', resource='host')
show_ip_port(data)
def siteSign(self, address, privatekey=None, inner_path="content.json", publish=False):
from Site import Site
logging.info("Signing site: %s..." % address)
site = Site(address, allow_create=False)
if not privatekey: # If no privatekey definied
from User import UserManager
user = UserManager.user_manager.get()
if user:
site_data = user.getSiteData(address)
privatekey = site_data.get("privatekey")
else:
privatekey = None
if not privatekey:
# Not found in users.json, ask from console
import getpass
privatekey = getpass.getpass("Private key (input hidden):")
succ = site.content_manager.sign(inner_path=inner_path, privatekey=privatekey, update_changed_files=True)
if succ and publish:
self.sitePublish(address, inner_path=inner_path)
def _configure_github(github):
"""Determine and return the GitHub configuration.
Args:
github (dict): The current GitHub configuration.
Returns:
dict: The new GitHub configuration.
"""
answer = copy(github)
logger.info('Since you intend to publish to GitHub, you need to '
'supply credentials.')
logger.info('Create an access token at: '
'https://github.com/settings/tokens')
logger.info('It needs the "repo" scope and nothing else.')
while not answer.get('username'):
answer['username'] = six.moves.input('GitHub username: ')
while not answer.get('token'):
answer['token'] = getpass.getpass('GitHub token (input is hidden): ')
return answer
def interactively_acquire_token(self) -> str:
"""
Walks the user through executing a login into the Dualis-System to get the Token and saves it.
@return: The Token for Dualis.
"""
print('[The following Input is not saved, it is only used temporarily to generate a login token.]')
token = None
while token is None:
dualis_username = input('Username for Dualis: ')
dualis_password = getpass('Password for Dualis [no output]: ')
try:
token = login_helper.obtain_login_token(dualis_username, dualis_password)
except RequestRejectedError as error:
print('Login Failed! (%s) Please try again.' % (error))
except (ValueError, RuntimeError) as error:
print('Error while communicating with the Dualis System! (%s) Please try again.' % (error))
self.config_helper.set_property('token', token)
return token
def get_password(max_password_prompts=3):
"""Read password from TTY."""
verify = strutils.bool_from_string(env("OS_VERIFY_PASSWORD"))
pw = None
if hasattr(sys.stdin, "isatty") and sys.stdin.isatty():
# Check for Ctrl-D
try:
for __ in moves.range(max_password_prompts):
pw1 = getpass.getpass("OS Password: ")
if verify:
pw2 = getpass.getpass("Please verify: ")
else:
pw2 = pw1
if pw1 == pw2 and pw1:
pw = pw1
break
except EOFError:
pass
return pw
def request_secret(secret_key, message, hidden=True):
"""
Request a secret from the user. Save the secrets to disk afterwards.
If there is already a secret for the key, return it.
:param secret_key: the key the input should be stored under in the secrets
:param message: the message to show to the user
:param hidden: hide the input (recommended for passwords and such)
:return the secret
"""
if secret_key in _secrets:
return _secrets[secret_key]
if hidden:
secret = getpass(message).strip()
else:
secret = input(message).strip()
_secrets[secret_key] = secret
save_secrets()
return secret
def rawInput(message, isPass=False):
if isPass:
data = getpass.getpass(message)
else:
data = raw_input(message)
return tools.stdinEncode(data)