def save_as_header(self, response, **kwargs):
"""
* if save_as is False the header will not be added
* if save_as is a filename, it will be used in the header
* if save_as is True or None the filename will be determined from the
file path
"""
save_as = kwargs.get('save_as', None)
if save_as is False:
return
file_obj = kwargs.get('file_obj', None)
if save_as is True or save_as is None:
filename = os.path.basename(file_obj.path)
else:
filename = save_as
response['Content-Disposition'] = smart_str(
'attachment; filename=%s' % filename)
python类smart_str()的实例源码
def get_html(data):
template_raw = open('feed.tmpl', 'r').read()
for post in data:
if 'message' in post:
if (type(post['message']) is str):
post['message'] = fixnewlines(post['message'])
if 'flag' not in post :
post['message'] = enable_links(post['message'])
post['flag'] = 1
post['message'] = post['message'].replace("\"","'")
post['short_message'] = truncate(post['message'],150)
post['read_more'] = truncate_length(post['message'],150)
json.dump(data, open('docs/feed.json', 'w'))
template = Template(template_raw)
html = template.render(data=data)
# smart_str helps in unicode rendering
return smart_str(html)
def _default_text(self):
if not self.use_default:
note = ""
else:
if self.default == "":
note = _('Default value: ""')
elif self.choices:
work = []
for x in self.choices:
try:
if x[0] == self.default or x[0] in self.default:
work.append('%s' % str(smart_str(x[1])))
except TypeError:
continue
note = gettext('Default value: ') + ", ".join(work)
else:
note = _("Default value: %s") % str(self.default)
return note
def add_images(self, product_ids, images):
domain = self.shop.domain
headers = {'Content-Type': MULTIPART_CONTENT,
'Authorization': self.shop.authheader}
for i, p_images in enumerate(images):
# TODO: why sometimes index does not exist?
if i >= len(product_ids):
send_email.delay('mail/error', {'domain': domain, 'message': str(product_ids),
'data_list': '',
'full_data': ''}, "Add images error")
break
product_id = product_ids[i]
for image in p_images:
data = encode_multipart(BOUNDARY, {'image': image})
req = urllib2.Request(smart_str("http://%s/api/images/products/%s" % (domain, product_id)),
headers=headers, data=smart_str(data))
urllib2.urlopen(req)
self.make_first_image_default(product_ids, headers)
def make_first_image_default(self, product_ids, headers):
if self.__version__ == PrestashopAPI15.__version__:
return
del headers['Content-Type']
domain = self.shop.domain
data_list = []
context = {'prestashop_url': domain, 'products': data_list}
for product_id in product_ids:
prod_dict = {'id': product_id}
data = get_xml_data("http://%s/api/products/%s" % (domain, product_id), headers)
prod_dict['out_of_stock'] = data.findtext('.//out_of_stock')
prod_dict['price'] = data.findtext('.//price')
prod_dict['quantity'] = data.findtext('.//quantity')
prod_dict['image_id'] = data.findtext('.//images/image/id')
data_list.append(prod_dict)
headers['Content-Type'] = 'application/x-www-form-urlencoded'
xml = render_to_string('xml_templates/update_products.xml', context)
req = urllib2.Request(smart_str("http://%s/api/products" % (domain,)),
headers=headers)
req.get_method = lambda: 'PUT'
urllib2.urlopen(req, data=xml.encode('utf-8'))
def clean_body(self):
"""
Perform Akismet validation of the message.
"""
if 'body' in self.cleaned_data and getattr(settings, 'AKISMET_API_KEY', ''):
from akismet import Akismet
from django.utils.encoding import smart_str
akismet_api = Akismet(key=settings.AKISMET_API_KEY,
blog_url='http://%s/' % Site.objects.get_current().domain)
if akismet_api.verify_key():
akismet_data = { 'comment_type': 'comment',
'referer': self.request.META.get('HTTP_REFERER', ''),
'user_ip': self.request.META.get('REMOTE_ADDR', ''),
'user_agent': self.request.META.get('HTTP_USER_AGENT', '') }
if akismet_api.comment_check(smart_str(self.cleaned_data['body']), data=akismet_data, build_data=True):
raise forms.ValidationError(_("Akismet thinks this message is spam"))
return self.cleaned_data['body']
def save_as_header(self, response, **kwargs):
"""
* if save_as is False the header will not be added
* if save_as is a filename, it will be used in the header
* if save_as is True or None the filename will be determined from the
file path
"""
save_as = kwargs.get('save_as', None)
if save_as is False:
return
file_obj = kwargs.get('file_obj', None)
if save_as is True or save_as is None:
filename = os.path.basename(file_obj.path)
else:
filename = save_as
response['Content-Disposition'] = smart_str(
'attachment; filename=%s' % filename)
def get_hexdigest(algorithm, salt, raw_password):
"""
Returns a string of the hexdigest of the given plaintext password and salt
using the given algorithm ('md5', 'sha1' or 'crypt').
"""
raw_password, salt = smart_str(raw_password), smart_str(salt)
if algorithm == 'crypt':
try:
import crypt
except ImportError:
raise ValueError('"crypt" password algorithm not supported in this environment')
return crypt.crypt(raw_password, salt)
if algorithm == 'md5':
return hashlib.md5(salt + raw_password).hexdigest()
elif algorithm == 'sha1':
return hashlib.sha1(salt + raw_password).hexdigest()
elif algorithm == 'sha256':
return hashlib.sha256(salt + raw_password).hexdigest()
raise ValueError("Got unknown password algorithm type in password.")
def main():
events = align_data(argv[1])
count = 0
for event in events:
if len(event) == 4:
print count
count += 1
name = smart_str(event[1][0])
date = event[0]
gender = event[2]
do_glicko(event[3], name, date, gender)
sorted_boys = sorted(ratings_boys.items(), key=itemgetter(1))
sorted_girls = sorted(ratings_girls.items(), key=itemgetter(1))
write_rating(sorted_boys, "male")
write_rating(sorted_girls, "female")
write_ath(entries_girls)
write_ath(entries_boys)
def render(self, context):
mods = [(smart_str(k, 'ascii'), op, v.resolve(context))
for k, op, v in self.mods]
if self.qdict:
qdict = self.qdict.resolve(context)
else:
qdict = None
# Internally work only with QueryDict
qdict = self._get_initial_query_dict(qdict)
# assert isinstance(qdict, QueryDict)
for k, op, v in mods:
qdict.setlist(k, self._process_list(qdict.getlist(k), op, v))
qstring = qdict.urlencode()
if qstring:
qstring = '?' + qstring
if self.asvar:
context[self.asvar] = qstring
return ''
else:
return qstring
def render(self, context):
args = [arg.resolve(context) for arg in self.args]
kwargs = dict([(smart_str(k, 'ascii'), v.resolve(context))
for k, v in self.kwargs.items()])
name = self.name
url = self.get_url(name, *args, **kwargs)
if self.as_var:
context[self.as_var] = url
return ''
else:
return url
def get_html(data):
template_raw = open('feed.tmpl', 'r').read()
for post in data:
if 'message' in post:
if (type(post['message']) is str):
post['message'] = fixnewlines(post['message'])
if 'flag' not in post :
post['message'] = enable_links(post['message'])
post['flag'] = 1
json.dump(data, open('docs/feed.json', 'w'))
template = Template(template_raw)
html = template.render(data=data)
# smart_str helps in unicode rendering
return smart_str(html)
def default_username_algo(email):
# store the username as a base64 encoded sha1 of the email address
# this protects against data leakage because usernames are often
# treated as public identifiers (so we can't use the email address).
return base64.urlsafe_b64encode(
hashlib.sha1(smart_bytes(email)).digest()
).rstrip(b'=')
def __str__(self):
return smart_str(self.variant)
def download(request, submission_id):
submission = TaskSubmission.objects.get(pk=submission_id)
file_name = '{0!s}_{1!s}_{2!s}'.format(submission.task.slug,
submission.user.username,
submission_id)
path = smart_str(submission.get_submission_path())
response = HttpResponse(content_type='application/force-download')
response['Content-Disposition'] = 'attachment; filename={0!s}'.format(
smart_str(file_name))
response['X-Sendfile'] = path
response['X-Accel-Redirect'] = get_submission_uri_from_path(path)
return response
def init_request(self, *args, **kwargs):
self.relate_obj = None
for k, v in self.request.GET.items():
if smart_str(k).startswith(RELATE_PREFIX):
self.relate_obj = RelateObject(
self.admin_view, smart_str(k)[len(RELATE_PREFIX):], v)
break
return bool(self.relate_obj)
def _get_akismet_data(self, blog_url, comment, content_object, request):
# Field documentation:
# http://akismet.com/development/api/#comment-check
data = {
# Comment info
'permalink': urljoin(blog_url, content_object.get_absolute_url()),
'comment_type': 'comment', # comment, trackback, pingback, see http://blog.akismet.com/2012/06/19/pro-tip-tell-us-your-comment_type/
'comment_author': getattr(comment, 'name', ''),
'comment_author_email': getattr(comment, 'email', ''),
'comment_author_url': getattr(comment, 'url', ''),
'comment_content': smart_str(comment.comment),
'comment_date': comment.submit_date,
# Request info
'referrer': request.META.get('HTTP_REFERER', ''),
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
'user_ip': comment.ip_address,
}
if comment.user_id and comment.user.is_superuser:
data['user_role'] = 'administrator' # always passes test
# If the language is known, provide it.
language = _get_article_language(content_object)
if language:
data['blog_lang'] = language
return data
def _get_SIZE_filename(self, size):
photosize = PhotoSizeCache().sizes.get(size)
return smart_str(os.path.join(self.cache_path(),
self._get_filename_for_size(photosize.name)))
def init_request(self, *args, **kwargs):
self.relate_obj = None
for k, v in self.request.GET.items():
if smart_str(k).startswith(RELATE_PREFIX):
self.relate_obj = RelateObject(
self.admin_view, smart_str(k)[len(RELATE_PREFIX):], v)
break
return bool(self.relate_obj)
def init_request(self, *args, **kwargs):
self.relate_obj = None
for k, v in self.request.GET.items():
if smart_str(k).startswith(RELATE_PREFIX):
self.relate_obj = RelateObject(
self.admin_view, smart_str(k)[len(RELATE_PREFIX):], v)
break
return bool(self.relate_obj)
def init_request(self, *args, **kwargs):
self.relate_obj = None
for k, v in self.request.GET.items():
if smart_str(k).startswith(RELATE_PREFIX):
self.relate_obj = RelateObject(
self.admin_view, smart_str(k)[len(RELATE_PREFIX):], v)
break
return bool(self.relate_obj)
def get_context_object_name(self, object_list):
"""Get the name of the item to be used in the context.
See original in ``django.views.generic.list.MultipleObjectMixin``.
"""
if self.context_object_name:
return self.context_object_name
elif hasattr(object_list, 'model'):
object_name = object_list.model._meta.object_name.lower()
return smart_str('{0}_list'.format(object_name))
else:
return None
def upload_image(request, upload_path=None):
form = ImageForm(request.POST, request.FILES)
if form.is_valid():
image = form.cleaned_data['file']
if image.content_type not in ['image/png', 'image/jpg', 'image/jpeg', 'image/pjpeg']:
return HttpResponse('Bad image format')
image_name, extension = os.path.splitext(image.name)
m = md5(smart_str(image_name).encode('utf-8'))
hashed_name = '{0}{1}'.format(m.hexdigest(), extension)
image_path = default_storage.save(os.path.join(upload_path or UPLOAD_PATH, hashed_name), image)
image_url = default_storage.url(image_path)
return HttpResponse(json.dumps({'filelink': image_url}))
return HttpResponseForbidden()
def uploaded_images_json(request, upload_path=None):
upload_path = upload_path or UPLOAD_PATH
results = list()
path = os.path.join(settings.MEDIA_ROOT, upload_path)
if os.path.isdir(path):
for image in os.listdir(path):
image_path = '{0}{1}'.format(path, smart_str(image))
if not os.path.isdir(image_path) and imghdr.what(image_path):
thumb = get_thumbnail(image_path, '100x74', crop='center')
image_url = os.path.join(settings.MEDIA_URL, upload_path, image)
results.append({'thumb': thumb.url, 'image': image_url})
return HttpResponse(json.dumps(results))
return HttpResponse('{}')
def render(self, name, value, attrs=None, renderer=None):
value = '' if not value else value
final_attrs = self.build_attrs(attrs, extra_attrs={'name': name})
field_id = final_attrs.get('id')
self.imperavi_settings.update({
'imageUpload': reverse('imperavi-upload-image', kwargs={'upload_path': self.upload_path}),
'imageGetJson': reverse('imperavi-get-json', kwargs={'upload_path': self.upload_path}),
'fileUpload': reverse('imperavi-upload-file', kwargs={'upload_path': self.upload_path}),
'linkFileUpload': reverse('imperavi-upload-link-file', kwargs={'upload_path': self.upload_path}),
})
imperavi_settings = json.dumps(self.imperavi_settings)
return mark_safe(u"""
<div style="width: 800px;">
<textarea%(attrs)s>%(value)s</textarea>
</div>
<script>
$(document).ready(
function() {
$("#%(id)s").parent().siblings('label').css('float', 'none');
$("#%(id)s").height(300);
$("#%(id)s").redactor(%(imperavi_settings)s);
}
);
</script>
""" % {
'attrs': flatatt(final_attrs),
'value': conditional_escape(smart_str(value)),
'id': field_id,
'imperavi_settings': imperavi_settings,
}
)
def test_render_to_temporary_file(self):
"""Should render a template to a temporary file."""
title = 'A test template.'
template = loader.get_template('sample.html')
temp_file = render_to_temporary_file(template, context={'title': title})
temp_file.seek(0)
saved_content = smart_str(temp_file.read())
self.assertTrue(title in saved_content)
temp_file.close()
def _render_file(self, template, context):
"""Helper method for testing rendered file deleted/persists tests."""
render = RenderedFile(template=template, context=context)
render.temporary_file.seek(0)
saved_content = smart_str(render.temporary_file.read())
return (saved_content, render.filename)
def init_request(self, *args, **kwargs):
self.relate_obj = None
for k, v in self.request.GET.items():
if smart_str(k).startswith(RELATE_PREFIX):
self.relate_obj = RelateObject(
self.admin_view, smart_str(k)[len(RELATE_PREFIX):], v)
break
return bool(self.relate_obj)
def download_file(request, file_id=None):
file = FileUpload.objects.get(id=file_id)
response = HttpResponse()
response['Content-Type'] = ''
response['Content-Disposition'] = "attachment; filename=" + file.filename
response['X-Sendfile'] = smart_str(os.path.join(settings.MEDIA_ROOT, file.filename))
return response
def post(self, request, *args, **kwargs):
try:
self.create_card(request.POST.get("stripeToken"))
return redirect("pinax_stripe_subscription_list")
except CardError as e:
return self.render_to_response(self.get_context_data(errors=smart_str(e)))