def test_create_oauth2_token(self):
admin_user = mixer.blend('auth.User', is_staff=True, is_superuser=True)
app = Application.objects.create(
name='SuperAPI OAUTH2 APP',
user=admin_user,
client_type=Application.CLIENT_PUBLIC,
authorization_grant_type=Application.GRANT_PASSWORD,
)
assert Application.objects.count() == 1, "Should be equal"
random = get_random_string(length=16)
admin_token = AccessToken.objects.create(
user=admin_user,
scope='read write',
# ?? ????? . . .
expires=timezone.now() + timedelta(minutes=5),
token=f'{random}---{admin_user.username}',
application=app
)
assert admin_token is not None, "??? ???"
python类get_random_string()的实例源码
def test_post_pet(client, api_urls, with_tag):
payload = {
'name': get_random_string(),
}
if with_tag:
payload['tag'] = get_random_string()
pet = get_data_from_response(
client.post(
'/api/pets',
json.dumps(payload),
content_type='application/json'
)
)
assert pet['name'] == payload['name']
assert pet['id']
if with_tag:
assert pet['tag'] == payload['tag']
# Test we can get the pet from the API now
assert get_data_from_response(client.get('/api/pets')) == [pet]
assert get_data_from_response(client.get('/api/pets/{}'.format(pet['id']))) == pet
def _simulate_preorder(self, client, product):
secret = get_random_string(32)
p = Preorder.objects.create(order_code=get_random_string(12), is_paid=True)
p.positions.create(secret=secret, product=product)
resp = client.post('/api/transactions/', json.dumps({
"positions": [
{"product": product.pk,
"price": "0.00",
"secret": secret,
"type": "redeem",
"_title": product.name}
]
}), content_type="application/json")
c = json.loads(resp.content.decode('utf-8'))
assert c['success']
return c['id']
def send_activation_email(request, user):
subject = _('Profile Activation')
from_email = settings.DEFAULT_FROM_EMAIL
current_site = get_current_site(request)
domain = current_site.domain
code = get_random_string(20)
context = {
'domain': domain,
'code': code,
}
act = Activation()
act.code = code
act.user = user
act.save()
html_content = render_to_string('email/activation_profile.html', context=context, request=request)
text_content = strip_tags(html_content)
msg = EmailMultiAlternatives(subject, text_content, from_email, [user.email])
msg.attach_alternative(html_content, 'text/html')
msg.send()
def send_activation_change_email(request, user, new_email):
subject = _('Change email')
from_email = settings.DEFAULT_FROM_EMAIL
current_site = get_current_site(request)
domain = current_site.domain
code = get_random_string(20)
context = {
'domain': domain,
'code': code,
}
act = Activation()
act.code = code
act.user = user
act.email = new_email
act.save()
html_content = render_to_string('email/change_email.html', context=context, request=request)
text_content = strip_tags(html_content)
msg = EmailMultiAlternatives(subject, text_content, from_email, [user.email])
msg.attach_alternative(html_content, 'text/html')
msg.send()
def readable_random_token(alphanumeric=False, add_spaces=False, short_token=False, long_token=False):
"""
Generate a random token that is also reasonably readable.
Generates 4 segments of 4 characters, seperated by dashes. Can either use digits only (default),
or non-confusing letters and digits (alphanumeric=True). If add_spaces is set, spaces are added
around the groups. This is intended to prevent mobile phones that see e.g. "-3" as an emoticon.
If short_token is set, the token is two segments of four characters.
"""
segments = 4
if short_token:
segments = 2
if long_token:
segments = 8
if alphanumeric:
allowed_chars = "BCDFGHJLKMNPQRSTVWXYZ23456789"
else:
allowed_chars = "1234567890"
elements = [get_random_string(length=4, allowed_chars=allowed_chars) for i in range(segments)]
join_str = "-"
if add_spaces:
join_str = " - "
return join_str.join(elements)
def setUp(self):
self.certificate = Certificate()
self.certificate.exam_complete_date = rand_gen.randomDate("2008-1-1", "2009-1-1", random.random())
self.certificate.date_certificate_mju = rand_gen.randomDate("2009-1-1", "2010-1-1", random.random())
self.certificate.date_certificate = rand_gen.randomDate("2009-1-1", "2010-1-1", random.random())
self.certificate.info_quality = rand_gen.randomDate("2009-1-1", "2010-1-1", random.random())
self.certificate.save()
self.department = Department()
self.department.location = get_random_string(allowed_chars="dprmt")
self.department.save()
self.user = User(username="usr")
self.user.save()
self.arbitre = Arbitration()
self.arbitre.dismissal_date = rand_gen.randomDate("2009-1-1", "2010-1-1", random.random())
self.arbitre.certificate = self.certificate
self.arbitre.dep = self.department
self.arbitre.user = self.user
self.arbitre.save()
self.DisciplinalPenalty = DisciplinalPenalty()
self.DisciplinalPenalty.issue_date = rand_gen.randomDate("2010-1-1", "2011-1-1", random.random())
self.DisciplinalPenalty.Arbitration = self.arbitre
self.DisciplinalPenalty.protocol_number = get_random_string(allowed_chars="prtclnbr")
self.DisciplinalPenalty.save()
def setUp(self):
self.certificate = Certificate()
self.certificate.exam_complete_date = rand_gen.randomDate("2008-1-1", "2009-1-1", random.random())
self.certificate.date_certificate_mju = rand_gen.randomDate("2009-1-1", "2010-1-1", random.random())
self.certificate.date_certificate = rand_gen.randomDate("2009-1-1", "2010-1-1", random.random())
self.certificate.info_quality = rand_gen.randomDate("2009-1-1", "2010-1-1", random.random())
self.certificate.save()
self.user = User(username="usr")
self.user.save()
self.department = Department()
self.department.location = get_random_string(allowed_chars="dprmt")
self.department.save()
self.arbitre = Arbitration()
self.arbitre.dismissal_date = rand_gen.randomDate("2009-1-1", "2010-1-1", random.random())
self.arbitre.certificate = self.certificate
self.arbitre.dep = self.department
self.arbitre.user = self.user
self.arbitre.save()
def form_valid(self, form):
user = User.objects.filter(email=self.request.POST.get('email'))
if user:
user = user[0]
subject = "Password Reset"
password = get_random_string(6)
message = '<p>Your Password for the forum account is <strong>'+password + \
'</strong></p><br/><p>Use this credentials to login into <a href="' + \
settings.HOST_URL + '/forum/">forum</a></p>'
to = user.email
from_email = settings.DEFAULT_FROM_EMAIL
Memail([to], from_email, subject, message, email_template_name=None, context=None)
user.set_password(password)
user.save()
data = {
"error": False, "response": "An Email is sent to the entered email id"}
return JsonResponse(data)
else:
data = {
"error": True, "message": "User With this email id doesn't exists!!!"}
return JsonResponse(data)
def get_draft_url(url):
"""
Return the given URL with a draft mode HMAC in its querystring.
"""
if verify_draft_url(url):
# Nothing to do. Already a valid draft URL.
return url
# Parse querystring and add draft mode HMAC.
url = urlparse.urlparse(url)
salt = get_random_string(5)
# QueryDict requires a bytestring as its first argument
query = QueryDict(force_bytes(url.query), mutable=True)
query['edit'] = '%s:%s' % (salt, get_draft_hmac(salt, url.path))
# Reconstruct URL.
parts = list(url)
parts[4] = query.urlencode(safe=':')
return urlparse.urlunparse(parts)
def test_slack_notifier(project, settings):
settings.GOTIFY_IMMEDIATE = False
sn = SlackNotifier.objects.create(
webhook_url='http://example.com',
message_header_suffix=get_random_string(),
)
sn.projects.add(project)
event = Event.objects.create_from_raven(project_id=project.id, body=json.loads(exc_payload))
with requests_mock.mock() as m:
m.post('http://example.com', text='ok')
call_command('gotify_send')
assert m.called
req = m.request_history[0]
assert req.method == 'POST'
payload = json.loads(force_text(req.body))
assert event.message in payload['text']
assert event.project.name in payload['text']
assert sn.message_header_suffix in payload['text']
def test_view_list_method(self, mocker):
"""Test view `list()` method"""
projects = {get_random_string(): get_random_string()}
mocked_get_projects = mocker.patch(
'projects.views.utils.get_projects',
)
mocked_get_or_set = mocker.patch(
'projects.views.cache.get_or_set',
return_value=projects
)
url = reverse('{0.base_name}:list'.format(self))
response = self.client.get(url)
assert response.status_code == status.HTTP_200_OK
assert response.data == projects
mocked_get_or_set.assert_called_once_with(
key=utils.AVAILABLE_PROJECTS_KEY,
default=mocked_get_projects.return_value,
timeout=None,
)
def user_edit(request, response, user_id=None):
has_superuser(request)
user = User.objects.get(pk=user_id) if user_id else None
action = log.UPDATE if user else log.CREATE
action_name = u'??' if user else u'??'
if request.method == 'POST':
form = UserForm(request.POST, instance=user)
if form.is_valid():
raw_password = None
if not user:
raw_password = get_random_string(6)
form.instance.password_digest = get_password_digest(raw_password)
user = form.save()
log(request, log.SUCCESS, u'%s???: %s' % (action_name, user), action,
view_name='admin:-user-show', view_args=(user.pk,), form=form)
messages.success(request,
u'??%s???%s' % (action_name, (u'?????: %s' % raw_password) if raw_password else ''),
'sticky' if raw_password else '')
return redirect('admin:-user-show', user.pk)
log(request, log.ERROR, u'%s?????' % action_name, action, form=form)
else:
form = UserForm(instance=user)
return locals()
def get_draft_url(url):
"""
Return the given URL with a draft mode HMAC in its querystring.
"""
if verify_draft_url(url):
# Nothing to do. Already a valid draft URL.
return url
# Parse querystring and add draft mode HMAC.
url = urlparse.urlparse(url)
salt = get_random_string(5)
# QueryDict requires a bytestring as its first argument
query = QueryDict(force_bytes(url.query), mutable=True)
query['edit'] = '%s:%s' % (salt, get_draft_hmac(salt, url.path))
# Reconstruct URL.
parts = list(url)
parts[4] = query.urlencode(safe=':')
return urlparse.urlunparse(parts)
def policy_row_factory(**kwargs):
"""Generate random data in a policy-row format"""
row = {
'policyNumber': str(random.randint(100, 999)), # nosec
'policyTitle': get_random_string(32),
'uriPolicyId': 'http://example.com/' + get_random_string(10),
'ombPolicyId': get_random_string(32),
'policyType': random.choice([t.value for t in PolicyTypes]), # nosec
'policyIssuanceYear': date(
random.randint(1950, 2016), # nosec
random.randint(1, 12), # nosec
random.randint(1, 28)).strftime('%m/%d/%Y'), # nosec
'policySunset': 'NA',
'issuingBody': get_random_string(10)
}
row.update(kwargs)
return row
def send_user_confirmation(self, request):
"""
Called after user completes initial registration form.
Generate a confirmation key, then
send an email to the associated user with a link (containing the key)
to a form allowing them to set their password and confirm their identity.
"""
# Generate and set a random confirmation key
self.confirmation_key = get_random_string(length=20, allowed_chars='0123456789')
self.save()
# Use appropriate email templates to generate and send the email
context = {
"site": get_current_site(request),
"conf_key": self.confirmation_key,
}
self.user.email_user(
render_to_string("registration/confirmation_email_subject.txt", context=context),
render_to_string("registration/confirmation_email.html", context=context),
html_message=render_to_string("registration/confirmation_email.html", context=context),
)
def test_post_issue_no_jurisdiction(mf_api_client, random_service):
assert not Jurisdiction.objects.exists()
for attempt in [1, 2]:
issues = get_data_from_response(
mf_api_client.post(ISSUE_LIST_ENDPOINT, {
"service_code": random_service.service_code,
"lat": 30,
"long": 30,
"description": get_random_string(),
}),
201,
schema=LIST_OF_ISSUES_SCHEMA
)
issue = issues[0]
assert Issue.objects.filter(identifier=issue['service_request_id']).exists()
assert Jurisdiction.objects.filter(identifier="default").exists() # default Jurisdiction was created
assert Jurisdiction.objects.count() == 1
issues = get_data_from_response(
mf_api_client.get(
reverse('georeport/v2:issue-detail', kwargs={'identifier': issue['service_request_id']}),
), schema=LIST_OF_ISSUES_SCHEMA
)
verify_issue(issues[0])
def test_get_issue_multi_jurisdiction_filters_correctly(mf_api_client, random_service):
assert not Jurisdiction.objects.exists() # Precondition check
jurisdictions = [
Jurisdiction.objects.create(identifier="j%s" % x, name="j%s" % x)
for x in range(4)
]
for j in jurisdictions:
for x in range(5):
Issue.objects.create(
jurisdiction=j,
service=random_service,
description=get_random_string(),
address='Test Street 10',
)
for j in jurisdictions:
issues = get_data_from_response(
mf_api_client.get(ISSUE_LIST_ENDPOINT, {'jurisdiction_id': j.identifier}),
schema=LIST_OF_ISSUES_SCHEMA
)
# Only getting the Issues for the requested Jurisdiction:
assert len(issues) == Issue.objects.filter(jurisdiction=j).count()
def test_default_moderation_status(mf_api_client, random_service, settings, status):
"""
Test that when the default mod status is not 'public',
freshly created issues are not visible via the list endpoint
"""
settings.ISSUES_DEFAULT_MODERATION_STATUS = status
posted_issues = get_data_from_response(
mf_api_client.post(ISSUE_LIST_ENDPOINT, {
"lat": 15,
"long": 15,
"description": get_random_string(),
"service_code": random_service.service_code,
}),
201,
schema=LIST_OF_ISSUES_SCHEMA
)
verify_issue(posted_issues[0])
listed_issues = get_data_from_response(
mf_api_client.get(ISSUE_LIST_ENDPOINT),
200,
)
assert bool(listed_issues) == (status == 'public')
def test_post_issue_api_key(mf_api_client, random_service, api_key_mode, pass_api_key):
expected_app = Application.autodetermine()
expected_status = 201
if api_key_mode == 'actual-apps':
for x in range(5):
expected_app = Application.objects.create(identifier='app%d' % (x + 1))
if not pass_api_key:
expected_status = 400
input_data = dict(
description=get_random_string(),
service_code=random_service.service_code,
address='hello',
api_key=(expected_app.key if pass_api_key else ''),
)
issues = get_data_from_response(
mf_api_client.post(ISSUE_LIST_ENDPOINT, input_data),
status_code=expected_status,
schema=LIST_OF_ISSUES_SCHEMA,
)
if expected_status >= 400:
return # Nothing more to do here
issue = verify_issue(issues[0])
assert issue.application == expected_app
def test_create_fake_data_then_send_get_request_via_user_viewset(self):
admin_user = mixer.blend('auth.User', is_staff=True, is_superuser=True)
app = Application.objects.create(
name='SuperAPI OAUTH2 APP',
user=admin_user,
client_type=Application.CLIENT_PUBLIC,
authorization_grant_type=Application.GRANT_PASSWORD,
)
assert Application.objects.count() == 1, "Should be equal"
random = get_random_string(length=16)
admin_token = AccessToken.objects.create(
user=admin_user,
scope='read write',
# ?? ????? . . .
expires=timezone.now() + timedelta(minutes=5),
token=f'{random}---{admin_user.username}',
application=app
)
for cnt in range(50):
mixer.blend('auth.User', is_active=True)
url = reverse('user-list')
# ? ??? ??? ???.
self.client.credentials(
HTTP_AUTHORIZATION=f'Bearer {admin_token.token}'
)
response = self.client.get(url, format='json')
assert response.status_code == status.HTTP_200_OK
def get_token(self, access_user, app):
random = get_random_string(length=1024)
access_token = AccessToken.objects.create(
user=access_user,
scope='read write',
expires=timezone.now() + timedelta(minutes=5),
token=f'{random}---{access_user.username}',
application=app
)
return access_token.token
def create_access_key(self, kwarg):
"""Create access key for user if user exists."""
if kwarg[username] not in self.users:
raise client_error('CreateAccessKey',
'404',
'User %s not found' % kwarg[username])
key_id = get_random_string(length=20)
key = get_random_string(length=40)
self.users[kwarg[username]].access_keys[key_id] = key
return access_key_response(kwarg[username], key, key_id)
def create_group(self, kwarg):
"""Create group if it does not exist."""
if kwarg[group_name] in self.groups:
raise client_error('CreateGroup',
'409',
'Group %s exists' % kwarg[group_name])
group_id = get_random_string(length=10)
self.groups[kwarg[group_name]] = group_id
return group_response(kwarg[group_name], group_id)
def _set_username(self):
"""Set username to user.username and validate."""
self.username = self.user.username
# If username is invalid generate a new username. Lendable must
# be available for checkout by user to hit this code.
if not self._validate_username():
self.username = get_random_string(length=20)
def _set_username(self):
"""Normalize username to remove none ascii chars and validate."""
self.username = unidecode(self.user.username)
# If username is invalid or username account already exists create
# a new username. Lendable must be available for checkout by user
# to hit this code.
if not self._validate_username() or \
self.amazon_account_utils.iam_user_exists(self.username):
self.username = get_random_string(length=20)
def inject_html(request, response, data, summary_data):
try:
body_closing_index = response.content.rindex(b'</body>')
except ValueError:
return
content = ' | '.join(
'%s=%s' % (key, round(value, 3) if isinstance(value, float) else value)
for (key, value)
in sorted(summary_data.items())
)
ns = 'cv_%s' % get_random_string()
html = """<style>{style}</style><div id="{ns}">{content}</div>""".format(
ns=ns,
content=content,
style=STYLE.replace('#cv', '#' + ns),
)
script = generate_console_script(data, with_stacks=bool(request.GET.get('_cavalry_stacks')))
html += "<script>{script}</script>".format(script='\n'.join(script))
response.content = (
response.content[:body_closing_index] +
html.encode('utf-8') +
response.content[body_closing_index:]
)
if 'content-length' in response:
response['content-length'] = len(response.content)
def generate_origin_id():
return get_random_string(32)
def get_available_name(self, name, max_length=None):
"""
Returns a filename that's free on the target storage system, and
available for new content to be written to.
"""
dir_name, file_name = os.path.split(name)
file_root, file_ext = os.path.splitext(file_name)
# If the filename already exists, add an underscore and a random 7
# character alphanumeric string (before the file extension, if one
# exists) to the filename until the generated filename doesn't exist.
# Truncate original name if required, so the new filename does not
# exceed the max_length.
while self.exists(name) or (max_length and len(name) > max_length):
# file_ext includes the dot.
name = os.path.join(dir_name, "%s_%s%s" % (file_root, get_random_string(7), file_ext))
if max_length is None:
continue
# Truncate file_root if max_length exceeded.
truncation = len(name) - max_length
if truncation > 0:
file_root = file_root[:-truncation]
# Entire file_root was truncated in attempt to find an available filename.
if not file_root:
raise SuspiciousFileOperation(
'Storage can not find an available filename for "%s". '
'Please make sure that the corresponding file field '
'allows sufficient "max_length".' % name
)
name = os.path.join(dir_name, "%s_%s%s" % (file_root, get_random_string(7), file_ext))
return name
def make_random_password(self, length=10,
allowed_chars='abcdefghjkmnpqrstuvwxyz'
'ABCDEFGHJKLMNPQRSTUVWXYZ'
'23456789'):
"""
Generate a random password with the given length and given
allowed_chars. The default value of allowed_chars does not have "I" or
"O" or letters and digits that look similar -- just to avoid confusion.
"""
return get_random_string(length, allowed_chars)