def request_authorization(self):
"""Returns an URL the user has to follow to authorize this app."""
# Requests authorization
request_secret = secrets.token_hex()
payload = {
'client_id': self.__client_id,
'response_type': 'code',
'redirect_uri': 'https://localhost:4000/steps/2',
'scope': 'user-read-recently-played',
'state': request_secret
}
params = ("{}={}".format(param, value)
for param, value
in payload.items())
auth_url = 'https://accounts.spotify.com/authorize?{}'.format(
'&'.join(params))
return auth_url
python类token_hex()的实例源码
def addstatus(cmd, message, args):
if args:
status_text = ' '.join(args)
status_exists = await cmd.db[cmd.db.db_cfg.database].StatusFiles.find_one({'Text': status_text})
if not status_exists:
status_id = secrets.token_hex(5)
status_data = {
'Text': status_text,
'ID': status_id
}
await cmd.db[cmd.db.db_cfg.database].StatusFiles.insert_one(status_data)
response = discord.Embed(color=0x77B255, title=f'? Added status `{status_id}`.')
else:
response = discord.Embed(color=0xBE1931, title='? Status already exists.')
else:
response = discord.Embed(color=0xBE1931, title='? Nothing inputed.')
await message.channel.send(embed=response)
def hash_mdp(mdp, hashage):
"""Hache le mot-de-passe.
Ceci à l'aide de l'algorithme passé
en argument avec un salt permettant
un hachage plus efficace.
Args:
Mot-de-passe normale.
Type du hachage souhaité.
Returns:
Retourne le mot-de-passe haché en hexadécimale.
"""
# secrets génère un nombre aléatoire en héxadécimale
salt = secrets.token_hex(16)
contenu = salt + mdp
h = hashlib.new(hashage)
h.update(contenu.encode('utf-8'))
return h.hexdigest() + ':' + salt
def application(app_id=None):
"""Register an application client."""
form = ApplicationFrom()
if app_id:
client = Client.select().where(Client.id == app_id).first()
else:
client = Client.select().where(Client.user_id == current_user.id).first()
if client:
flash(
f"You aready have registered application '{client.name}' and issued API credentials.",
"info")
return redirect(url_for("api_credentials", app_id=client.id))
if form.validate_on_submit():
client = Client(org_id=current_user.organisation.id)
form.populate_obj(client)
client.client_id = secrets.token_hex(10)
client.client_secret = secrets.token_urlsafe(20)
client.save()
flash(f"Application '{client.name}' was successfully registered.", "success")
return redirect(url_for("api_credentials", app_id=client.id))
return render_template("application.html", form=form)
def addstatus(cmd, message, args):
if args:
status_text = ' '.join(args)
status_exists = cmd.db[cmd.db.db_cfg.database].StatusFiles.find_one({'Text': status_text})
if not status_exists:
status_id = secrets.token_hex(5)
status_data = {
'Text': status_text,
'ID': status_id
}
cmd.db[cmd.db.db_cfg.database].StatusFiles.insert_one(status_data)
response = discord.Embed(color=0x77B255, title=f'? Added status `{status_id}`.')
else:
response = discord.Embed(color=0xBE1931, title='? Status already exists.')
else:
response = discord.Embed(color=0xBE1931, title='? Nothing inputed.')
await message.channel.send(embed=response)
tools.py 文件源码
项目:analytics-platform-control-panel
作者: ministryofjustice
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def deploy_for(self, user):
"""
Deploy the given tool in the user namespace.
>>> rstudio = Tool('rstudio')
>>> rstudio.deploy_for(alice)
"""
username = user.username.lower()
auth_proxy_cookie_secret = secrets.token_hex(32)
tool_cookie_secret = secrets.token_hex(32)
self.helm.upgrade_release(
f'{username}-{self.name}',
f'mojanalytics/{self.name}',
'--namespace', user.k8s_namespace,
'--set', f'username={username}',
'--set', f'aws.iamRole={user.iam_role_name}',
'--set', f'toolsDomain={settings.TOOLS_DOMAIN}',
'--set', f'authProxy.cookieSecret={auth_proxy_cookie_secret}',
'--set', f'{self.name}.secureCookieKey={tool_cookie_secret}',
'--set', f'authProxy.auth0.domain={self.auth_client_domain}',
'--set', f'authProxy.auth0.clientId={self.auth_client_id}',
'--set', f'authProxy.auth0.clientSecret={self.auth_client_secret}',
)
def regenerate(self):
self.activated_at = None
self.codes.all().update(used_at=timezone.now())
self.save()
for n in range(10):
code = PaperCode(device=self, code=secrets.token_hex(4))
code.save()
def handle(self, **options):
options['secret_key'] = token_hex(60)
options['template'] = 'project_template'
base_dir, name = super(Command, self).handle(**options)
# Setup Alembic Migrations
alembic_dir = os.path.join(base_dir, name, 'migrations')
cfg = Config(os.path.join(alembic_dir, 'alembic.ini'))
command.init(config=cfg, directory=alembic_dir)
with open(os.path.join(alembic_dir, 'env.py'), 'r') as f:
env_py = f.read()
env_py = env_py.replace('target_metadata = None', METADATA_CODE)
env_py = env_py.replace('config = context.config', SQL_CONNECT_OVERRIDE)
with open(os.path.join(alembic_dir, 'env.py'), 'w') as f:
f.write(env_py)
def test_create_new_monetary_account(self):
"""
Tests the creation of a new monetary account. This account will be
deleted after test exited with code 0.
This test has no assertion as of its testing to see if the code runs
without errors
"""
create_map = {
MonetaryAccountBank.FIELD_CURRENCY: self._FIELD_CURRENCY,
MonetaryAccountBank.FIELD_DESCRIPTION:
self._MONETARY_ACCOUNT_PREFIX + token_hex()
}
monetary_account_id = MonetaryAccountBank.create(self._API_CONTEXT,
create_map,
self._USER_ID).value
update_map = {
MonetaryAccountBank.FIELD_STATUS: self._FIELD_STATUS,
MonetaryAccountBank.FIELD_SUB_STATUS: self._FIELD_SUB_STATUS,
MonetaryAccountBank.FIELD_REASON: self._FIELD_REASON,
MonetaryAccountBank.FIELD_REASON_DESCRIPTION:
self._FIELD_REASON_DESCRIPTION,
}
MonetaryAccountBank.update(self._API_CONTEXT, update_map, self._USER_ID,
monetary_account_id)
def generate_inventory_item(self):
token = secrets.token_hex(16)
data = {
'item_id': token,
'item_file_id': self.file_id
}
return data
def generate_inventory_item(self):
token = secrets.token_hex(16)
data = {
'item_id': token,
'quality': self.roll_quality(),
'item_file_id': self.file_id
}
return data
def update_id(db, interaction):
new_id = secrets.token_hex(4)
new_data = copy.deepcopy(interaction)
new_data.update({'ReactionID': new_id})
await db[db.db_cfg.database]['Interactions'].update_one(interaction, {'$set': new_data})
def generate_data(message, poll_args):
uid = message.author.id
if message.channel:
cid = message.channel.id
else:
cid = None
if message.guild:
sid = message.guild.id
else:
sid = None
poll_file_data = {
'id': secrets.token_hex(3),
'created': arrow.utcnow().float_timestamp,
'origin': {
'author': uid,
'channel': cid,
'server': sid
},
'poll': {
'question': poll_args[0],
'answers': poll_args[1:]
},
'votes': {},
'permissions': {
'channels': [],
'users': [],
'roles': []
},
'settings': {
'visible': False,
'expires': None,
'active': True
}
}
return poll_file_data
def make_item_id():
token = secrets.token_hex(16)
return token
def api_credentials(app_id=None):
"""Manage API credentials."""
if app_id:
client = Client.select().where(Client.id == app_id).first()
else:
client = Client.select().where(Client.user_id == current_user.id).first()
if not client:
return redirect(url_for("application"))
form = CredentialForm(obj=client)
print(form.reset.data)
print("***", request.form)
if form.validate_on_submit():
if form.revoke.data:
Token.delete().where(Token.client == client).execute()
elif form.reset.data:
form.client_id.data = client.client_id = secrets.token_hex(10)
form.client_secret.data = client.client_secret = secrets.token_urlsafe(20)
client.save()
elif form.update_app.data:
form.populate_obj(client)
client.save()
elif form.delete.data:
Token.delete().where(Token.client == client).execute()
client.delete().execute()
return redirect(url_for("application"))
return render_template("api_credentials.html", form=form)
def generate_token(cls):
return token_hex(32)
def generate_token(cls):
return token_hex(32)
def generate_inventory_item(self):
token = secrets.token_hex(16)
data = {
'item_id': token,
'item_file_id': self.file_id
}
return data
def generate_inventory_item(self):
token = secrets.token_hex(16)
data = {
'item_id': token,
'quality': self.roll_quality(),
'item_file_id': self.file_id
}
return data
def create_device(self):
device_id = secrets.token_hex(32)
self.devices[device_id] = []
return device_id
def token_hex(nbytes=32):
token = binascii.hexlify(token_bytes(nbytes))
return token.decode('ascii') if PY3 else token
def main():
parser = ArgumentParser(description='Generate a secret key.')
parser.add_argument('-l', '--length', type=int, default=32, help='Length of secret key in bytes.')
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument('-x', '--hex', action='store_true', help='Convert secret key to hexadecimal.')
group.add_argument('-a', '--alphanum', action='store_true', help='Generate alphanumeric keys only.')
A = parser.parse_args()
if A.alphanum:
alphabet = string.ascii_letters + string.digits
print(''.join(choice(alphabet) for i in range(A.length)))
elif A.hex: print(token_hex(A.length))
else: print(token_bytes(A.length))
#end if
#end def
def tokhex(length=10, urlsafe=False):
import secrets
if urlsafe == True:
return secrets.token_urlsafe(length)
else:
return secrets.token_hex(length)
# Show A Type Of Face
def addreact(cmd, message, args):
if args:
if len(args) >= 2:
reaction_name = args[0]
allowed_reactions = []
for command in cmd.bot.modules.commands:
if cmd.bot.modules.commands[command].category.lower() == 'interactions':
allowed_reactions.append(command)
if reaction_name.lower() in allowed_reactions:
reaction_url = '%20'.join(args[1:])
if reaction_url.startswith('http'):
if reaction_url.endswith('.gif'):
exist_check = await cmd.db[cmd.db.db_cfg.database]['Interactions'].find_one(
{'URL': reaction_url})
if not exist_check:
reaction_id = secrets.token_hex(4)
reaction_data = {
'Name': reaction_name.lower(),
'UserID': message.author.id,
'ServerID': message.guild.id,
'URL': reaction_url,
'ReactionID': reaction_id
}
await cmd.db[cmd.db.db_cfg.database]['Interactions'].insert_one(reaction_data)
lookup = {'Name': reaction_name.lower()}
inter_count = await cmd.db[cmd.db.db_cfg.database]['Interactions'].find(lookup).count()
title = f'? Added **{reaction_name.lower()}** number **{inter_count}**.'
response = discord.Embed(color=0x77B255, title=title)
if 'log_ch' in cmd.cfg:
log_ch_id = cmd.cfg['log_ch']
log_ch = discord.utils.find(lambda x: x.id == log_ch_id, cmd.bot.get_all_channels())
if log_ch:
author = f'{message.author.name}#{message.author.discriminator}'
data_desc = f'Author: {author}'
data_desc += f'\nAuthor ID: {message.author.id}'
data_desc += f'\nGuild: {message.guild.name}'
data_desc += f'\nGuild ID: {message.guild.id}'
data_desc += f'\nReaction URL: [Here]({reaction_url})'
data_desc += f'\nReaction ID: {reaction_id}'
log_resp_title = f'?? Added {reaction_name.lower()} number {inter_count}'
log_resp = discord.Embed(color=0x3B88C3)
log_resp.add_field(name=log_resp_title, value=data_desc)
log_resp.set_thumbnail(url=reaction_url)
await log_ch.send(embed=log_resp)
else:
response = discord.Embed(color=0xBE1931, title=f'? Reaction already exists.')
else:
response = discord.Embed(color=0xBE1931, title=f'? Reaction URL must end with .gif.')
else:
response = discord.Embed(color=0xBE1931, title=f'? Not a valid URL.')
else:
response = discord.Embed(color=0xBE1931, title=f'? Unrecognized interaction name.')
else:
response = discord.Embed(color=0xBE1931, title=f'? Not enough arguments.')
else:
response = discord.Embed(color=0xBE1931, title=f'? Nothing inputted.')
await message.channel.send(embed=response)
def addreact(cmd, message, args):
if args:
if len(args) >= 2:
reaction_name = args[0]
allowed_reactions = []
for command in cmd.bot.modules.commands:
if cmd.bot.modules.commands[command].category.lower() == 'interactions':
allowed_reactions.append(command)
if reaction_name.lower() in allowed_reactions:
reaction_url = '%20'.join(args[1:])
if reaction_url.startswith('http'):
if reaction_url.endswith('.gif'):
exist_check = cmd.db[cmd.db.db_cfg.database]['Interactions'].find_one({'URL': reaction_url})
if not exist_check:
reaction_id = secrets.token_hex(4)
reaction_data = {
'Name': reaction_name.lower(),
'UserID': message.author.id,
'ServerID': message.guild.id,
'URL': reaction_url,
'ReactionID': reaction_id
}
cmd.db[cmd.db.db_cfg.database]['Interactions'].insert_one(reaction_data)
interactions = cmd.db[cmd.db.db_cfg.database]['Interactions'].find(
{'Name': reaction_name.lower()})
inter_count = len(list(interactions))
title = f'? Added **{reaction_name.lower()}** number **{inter_count}**.'
response = discord.Embed(color=0x77B255, title=title)
if 'log_ch' in cmd.cfg:
log_ch_id = cmd.cfg['log_ch']
log_ch = discord.utils.find(lambda x: x.id == log_ch_id, cmd.bot.get_all_channels())
if log_ch:
author = f'{message.author.name}#{message.author.discriminator}'
data_desc = f'Author: {author}'
data_desc += f'\nAuthor ID: {message.author.id}'
data_desc += f'\nGuild: {message.guild.name}'
data_desc += f'\nGuild ID: {message.guild.id}'
data_desc += f'\nReaction URL: [Here]({reaction_url})'
data_desc += f'\nReaction ID: {reaction_id}'
log_resp_title = f'?? Added {reaction_name.lower()} number {inter_count}'
log_resp = discord.Embed(color=0x3B88C3)
log_resp.add_field(name=log_resp_title, value=data_desc)
log_resp.set_thumbnail(url=reaction_url)
await log_ch.send(embed=log_resp)
else:
response = discord.Embed(color=0xBE1931, title=f'? Reaction already exists.')
else:
response = discord.Embed(color=0xBE1931, title=f'? Reaction URL must end with .gif.')
else:
response = discord.Embed(color=0xBE1931, title=f'? Not a valid URL.')
else:
response = discord.Embed(color=0xBE1931, title=f'? Unrecognized interaction name.')
else:
response = discord.Embed(color=0xBE1931, title=f'? Not enough arguments.')
else:
response = discord.Embed(color=0xBE1931, title=f'? Nothing inputted.')
await message.channel.send(embed=response)
def remindme(cmd, message, args):
if args:
time_req = args[0]
try:
in_seconds = convert_to_seconds(time_req)
upper_limit = 7776000
if in_seconds <= upper_limit:
rem_count = cmd.db[cmd.db.db_cfg.database].Reminders.find({'UserID': message.author.id}).count()
rem_limit = 15
if rem_count < rem_limit:
if len(args) > 1:
text_message = ' '.join(args[1:])
else:
text_message = 'No reminder message set.'
execution_stamp = arrow.utcnow().timestamp + in_seconds
timestamp = arrow.get(execution_stamp).datetime
if in_seconds < 60:
time_diff = f'In {in_seconds} seconds'
else:
time_diff = arrow.get(execution_stamp + 5).humanize(arrow.utcnow())
reminder_id = secrets.token_hex(2)
reminder_data = {
'ReminderID': reminder_id,
'UserID': message.author.id,
'CreationStamp': arrow.utcnow().timestamp,
'ExecutionStamp': execution_stamp,
'ChannelID': message.channel.id,
'ServerID': message.guild.id,
'TextMessage': text_message
}
cmd.db[cmd.db.db_cfg.database]['Reminders'].insert_one(reminder_data)
response = discord.Embed(color=0x66CC66, timestamp=timestamp)
response.description = text_message
response.set_author(name=f'Reminder {reminder_id} Created', icon_url=user_avatar(message.author))
response.set_footer(text=f'Executes: {time_diff.title()}')
else:
response = discord.Embed(color=0xBE1931, title='? You already have 15 reminders pending.')
else:
response = discord.Embed(color=0xBE1931, title='? Reminders have a limit of 90 days.')
except LookupError:
response = discord.Embed(color=0xBE1931, title='? Please use the format HH:MM:SS.')
except ValueError:
response = discord.Embed(color=0xBE1931, title='? Inputted value is invalid.')
else:
response = discord.Embed(color=0xBE1931, title='? No arguments inputted.')
await message.channel.send(embed=response)