def send_share_notification(share_id):
"""Super simple you're content has been shared notification to a user."""
if settings.DEBUG:
return
try:
content = Content.objects.get(id=share_id, content_type=ContentType.SHARE, share_of__local=True)
except Content.DoesNotExist:
logger.warning("No share content found with id %s", share_id)
return
content_url = "%s%s" % (settings.SOCIALHOME_URL, content.share_of.get_absolute_url())
subject = _("New share of: %s" % content.share_of.short_text_inline)
context = get_common_context()
context.update({
"subject": subject, "actor_name": content.author.name_or_handle,
"actor_url": "%s%s" % (settings.SOCIALHOME_URL, content.author.get_absolute_url()),
"content_url": content_url, "name": content.share_of.author.name_or_handle,
})
send_mail(
"%s%s" % (settings.EMAIL_SUBJECT_PREFIX, subject),
render_to_string("notifications/share.txt", context=context),
settings.DEFAULT_FROM_EMAIL,
[content.share_of.author.user.email],
fail_silently=False,
html_message=render_to_string("notifications/share.html", context=context),
)
python类DEBUG的实例源码
def render_pdf(self, *args, **kwargs):
"""
Render the PDF and returns as bytes.
:rtype: bytes
"""
html = self.render_html(*args, **kwargs)
options = self.get_pdfkit_options()
if 'debug' in self.request.GET and settings.DEBUG:
options['debug-javascript'] = 1
kwargs = {}
wkhtmltopdf_bin = os.environ.get('WKHTMLTOPDF_BIN')
if wkhtmltopdf_bin:
kwargs['configuration'] = pdfkit.configuration(wkhtmltopdf=wkhtmltopdf_bin)
pdf = pdfkit.from_string(html, False, options, **kwargs)
return pdf
def send_content(content_id):
"""Handle sending a Content object out via the federation layer.
Currently we only deliver public content.
"""
try:
content = Content.objects.get(id=content_id, visibility=Visibility.PUBLIC, content_type=ContentType.CONTENT,
local=True)
except Content.DoesNotExist:
logger.warning("No local content found with id %s", content_id)
return
entity = make_federable_content(content)
if entity:
if settings.DEBUG:
# Don't send in development mode
return
recipients = [
(settings.SOCIALHOME_RELAY_DOMAIN, "diaspora"),
]
recipients.extend(_get_remote_followers(content.author))
handle_send(entity, content.author, recipients)
else:
logger.warning("send_content - No entity for %s", content)
def send_share(content_id):
"""Handle sending a share of a Content object to the federation layer.
Currently we only deliver public shares.
"""
try:
content = Content.objects.get(id=content_id, visibility=Visibility.PUBLIC, content_type=ContentType.SHARE,
local=True)
except Content.DoesNotExist:
logger.warning("No local share found with id %s", content_id)
return
entity = make_federable_content(content)
if entity:
if settings.DEBUG:
# Don't send in development mode
return
recipients = _get_remote_followers(content.author)
if not content.share_of.local:
# Send to original author
recipients.append((content.share_of.author.handle, None))
handle_send(entity, content.author, recipients)
else:
logger.warning("send_share - No entity for %s", content)
def send_content_retraction(content, author_id):
"""Handle sending of retractions.
Currently only for public content.
"""
if not content.visibility == Visibility.PUBLIC or not content.local:
return
author = Profile.objects.get(id=author_id)
entity = make_federable_retraction(content, author)
if entity:
if settings.DEBUG:
# Don't send in development mode
return
recipients = [
(settings.SOCIALHOME_RELAY_DOMAIN, "diaspora"),
]
recipients.extend(_get_remote_followers(author))
handle_send(entity, author, recipients)
else:
logger.warning("send_content_retraction - No retraction entity for %s", content)
def forward_entity(entity, target_content_id):
"""Handle forwarding of an entity related to a target content.
For example: remote replies on local content, remote shares on local content.
Currently only for public content.
"""
try:
target_content = Content.objects.get(id=target_content_id, visibility=Visibility.PUBLIC, local=True)
except Content.DoesNotExist:
logger.warning("forward_entity - No public local content found with id %s", target_content_id)
return
try:
content = Content.objects.get(guid=entity.guid, visibility=Visibility.PUBLIC)
except Content.DoesNotExist:
logger.warning("forward_entity - No content found with guid %s", entity.guid)
return
if settings.DEBUG:
# Don't send in development mode
return
recipients = _get_remote_participants_for_content(target_content, exclude=entity.handle)
recipients.extend(_get_remote_followers(target_content.author, exclude=entity.handle))
handle_send(entity, content.author, recipients, parent_user=target_content.author)
def handle(self, *args, **kwargs):
path = kwargs['path']
# With DEBUG on this will DIE.
settings.DEBUG = False
# figure out which path we want to use.
years = ["2016", "2015", "2014", "2013", "2012", "2011"]
directories = [('tl_%s_us_state' % year, year) for year in years]
tiger_file = ""
for (directory, year) in directories:
if os.path.exists(os.path.join(path, directory)):
print('Found %s files.' % year)
tiger_file = os.path.join(path, directory + "/" + directory + ".shp")
break
if not tiger_file:
print('Could not find files.')
exit()
print("Start States: %s" % datetime.datetime.now())
state_import(tiger_file, year)
print("End States: %s" % datetime.datetime.now())
def handle(self, *args, **kwargs):
path = kwargs['path']
# With DEBUG on this will DIE.
settings.DEBUG = False
# figure out which path we want to use.
years = ["2016", "2015", "2014", "2013", "2012", "2011"]
directories = [('tl_%s_us_county' % year, year) for year in years]
tiger_file = ""
for (directory, year) in directories:
if os.path.exists(os.path.join(path, directory)):
print('Found %s files.' % year)
tiger_file = os.path.join(path, directory + "/" + directory + ".shp")
break
if not tiger_file:
print('Could not find files.')
exit()
print("Start Counties: %s" % datetime.datetime.now())
county_import(tiger_file, year)
print("End Counties: %s" % datetime.datetime.now())
def private_document(request, document_key):
"""
This is temp code. Hopefully I will make this function
a lot better
"""
PRIVATE_MEDIA_ROOT = settings.PRIVATE_MEDIA_ROOT
#Now get the document location and return that to the user.
document_results=documents.objects.get(pk=document_key)
path = PRIVATE_MEDIA_ROOT + '/' + document_results.document.name
#path = '/home/luke/Downloads/gog_gods_will_be_watching_2.1.0.9.sh'
"""
Serve private files to users with read permission.
"""
#logger.debug('Serving {0} to {1}'.format(path, request.user))
#if not permissions.has_read_permission(request, path):
# if settings.DEBUG:
# raise PermissionDenied
# else:
# raise Http404('File not found')
return server.serve(request, path=path)
def get_saml_settings():
base_url = '{scheme}://{domain}'.format(
scheme=settings.SITES['front']['scheme'],
domain=settings.SITES['front']['domain'],
)
debug = settings.DEBUG
saml_settings = dict(settings.SAML_AUTH)
saml_settings['strict'] = settings.SAML_AUTH.get('strict', not debug)
saml_settings['debug'] = settings.SAML_AUTH.get('debug', debug)
del(saml_settings['mapping'])
saml_settings['sp'].update({
'entityId': base_url + reverse('taiga_contrib_saml_auth:metadata'),
'assertionConsumerService': {
'url': base_url + reverse('taiga_contrib_saml_auth:login_complete'),
},
'singleLogoutService': {
'url': base_url + reverse('taiga_contrib_saml_auth:logout_complete'),
},
})
return saml_settings
def media(self):
# taken from django.contrib.admin.options ModelAdmin
extra = '' if settings.DEBUG else '.min'
# if VERSION <= (1, 8):
if StrictVersion(get_version()) < StrictVersion('1.9'):
js = [
'core.js',
'admin/RelatedObjectLookups.js',
'jquery%s.js' % extra,
'jquery.init.js',
]
else:
js = [
'core.js',
'vendor/jquery/jquery%s.js' % extra,
'jquery.init.js',
'admin/RelatedObjectLookups.js',
'actions%s.js' % extra,
'urlify.js',
'prepopulate%s.js' % extra,
'vendor/xregexp/xregexp%s.js' % extra,
]
return forms.Media(js=[static('admin/js/%s' % url) for url in js])
def user_register(message):
if settings.DEBUG == True: return
logger.debug('user_register task start - email: %s' % message['register_user_email'])
user = User.objects.create_user(message['register_user_name'], message['register_user_email'], is_active = False)
message['register_password'] = passwd_generator(size=25)
user.set_password(message['register_password'])
user.save()
if 'slack_user_id' in message:
user.contact = Contact(email = message['register_user_email'], slack_uid = message['slack_user_id'])
user.contact.save()
registration_link = "%s%s?username=%s&key=%s" % (settings.REGISTRATION_URL_PREFIX, reverse_lazy('register_activate'), message['register_user_name'], message['register_password'])
SLACK_MESSAGE = "Hello %s! we've detected you are using our team's slack. please take a minute to activate you account in the following <%s|LINK>.\n (please use same email address you used to sign-up with Slack)" % (message['register_user_name'], registration_link)
logger.debug('user_register sending slack activation message to slack_uid %s' % message['slack_user_id'])
slack.chat.post_message(message['slack_user_id'], SLACK_MESSAGE, as_user=False, username=settings.SLACK_BOT_NAME, icon_url=settings.SLACK_BOT_ICON)
message['registration_link'] = registration_link
else:
register_email(message)
logger.debug('user_register task end - email: %s' % message['register_user_email'])
def _send_mail_or_error_page(subject, content, address, request):
try:
send_mail(subject, content, None, [address])
if settings.DEBUG:
print(u"VALIDATION MAIL to {0}\nSubject: {1}\n{2}".format(
address, subject, content))
except SMTPRecipientsRefused as e:
wrong_email, (error_code, error_msg) = e.recipients.items()[0]
unknown = 'User unknown' in error_msg
if not unknown:
error_email_content = u'{0}: {1}'.format(e.__class__.__name__,
repr(e.recipients))
send_mail(
_('Registration: Sending mail failed: {}'.format(address)),
error_email_content,
None,
[settings.TEAM_EMAIL])
return TemplateResponse(request, 'registration/email_error.html', {
'unknown': unknown,
'error_code': error_code,
'error_msg': error_msg,
'recipient': wrong_email
})
return redirect('registration_request_successful', address)
def process(self):
if self.requires_action:
if settings.DEBUG:
self.setCode(getattr(self, str(self.request.method.lower() + "_" + self.action))())
else:
try:
self.setCode(getattr(self, str(self.request.method.lower() + "_" + self.action))())
except AttributeError:
self.addError("not_found")
self.setCode(status.HTTP_404_NOT_FOUND)
else:
try:
self.setCode(getattr(self, str(self.request.method.lower() + "_process"))())
except AttributeError:
self.addError("not_found")
self.setCode(status.HTTP_404_NOT_FOUND)
def test_project_get_list_with_filters(self, kc):
filters = {'name': 'Ni!'}
request = self.mock_rest_request(**{'GET': dict(**filters)})
kc.tenant_list.return_value = ([
mock.Mock(**{'to_dict.return_value': {'name': 'Ni!'}}),
mock.Mock(**{'to_dict.return_value': {'name': 'Ni!'}})
], False)
with mock.patch.object(settings, 'DEBUG', True):
response = keystone.Projects().get(request)
self.assertStatusCode(response, 200)
self.assertEqual(response.json,
{"has_more": False,
"items": [{"name": "Ni!"}, {"name": "Ni!"}]})
kc.tenant_list.assert_called_once_with(request, paginate=False,
marker=None, domain=None,
user=None, admin=True,
filters=filters)
def serve_protected_thumbnail(request, path):
"""
Serve protected thumbnails to authenticated users.
If the user doesn't have read permissions, redirect to a static image.
"""
source_path = thumbnail_to_original_filename(path)
if not source_path:
raise Http404('File not found')
try:
file_obj = File.objects.get(file=source_path, is_public=False)
except File.DoesNotExist:
raise Http404('File not found')
if not file_obj.has_read_permission(request):
if settings.DEBUG:
raise PermissionDenied
else:
raise Http404('File not found')
try:
thumbnail = ThumbnailFile(name=path, storage=file_obj.file.thumbnail_storage)
return thumbnail_server.serve(request, thumbnail, save_as=False)
except Exception:
raise Http404('File not found')
def serve(request, path, insecure=False, **kwargs):
"""
Serve static files below a given point in the directory structure or
from locations inferred from the staticfiles finders.
To use, put a URL pattern such as::
from django.contrib.staticfiles import views
url(r'^(?P<path>.*)$', views.serve)
in your URLconf.
It uses the django.views.static.serve() view to serve the found files.
"""
if not settings.DEBUG and not insecure:
raise Http404
normalized_path = posixpath.normpath(unquote(path)).lstrip('/')
absolute_path = finders.find(normalized_path)
if not absolute_path:
if path.endswith('/') or path == '':
raise Http404("Directory indexes are not allowed here.")
raise Http404("'%s' could not be found" % path)
document_root, path = os.path.split(absolute_path)
return static.serve(request, path, document_root=document_root, **kwargs)
def get_full_path_with_slash(self, request):
"""
Return the full path of the request with a trailing slash appended.
Raise a RuntimeError if settings.DEBUG is True and request.method is
POST, PUT, or PATCH.
"""
new_path = request.get_full_path(force_append_slash=True)
if settings.DEBUG and request.method in ('POST', 'PUT', 'PATCH'):
raise RuntimeError(
"You called this URL via %(method)s, but the URL doesn't end "
"in a slash and you have APPEND_SLASH set. Django can't "
"redirect to the slash URL while maintaining %(method)s data. "
"Change your form to point to %(url)s (note the trailing "
"slash), or set APPEND_SLASH=False in your Django settings." % {
'method': request.method,
'url': request.get_host() + new_path,
}
)
return new_path
def process_response(self, request, response):
"""
Send broken link emails for relevant 404 NOT FOUND responses.
"""
if response.status_code == 404 and not settings.DEBUG:
domain = request.get_host()
path = request.get_full_path()
referer = force_text(request.META.get('HTTP_REFERER', ''), errors='replace')
if not self.is_ignorable_request(request, path, domain, referer):
ua = force_text(request.META.get('HTTP_USER_AGENT', '<none>'), errors='replace')
ip = request.META.get('REMOTE_ADDR', '<none>')
mail_managers(
"Broken %slink on %s" % (
('INTERNAL ' if self.is_internal_request(domain, referer) else ''),
domain
),
"Referrer: %s\nRequested URL: %s\nUser agent: %s\n"
"IP address: %s\n" % (referer, path, ua, ip),
fail_silently=True)
return response
def render(self, context):
csrf_token = context.get('csrf_token')
if csrf_token:
if csrf_token == 'NOTPROVIDED':
return format_html("")
else:
return format_html("<input type='hidden' name='csrfmiddlewaretoken' value='{}' />", csrf_token)
else:
# It's very probable that the token is missing because of
# misconfiguration, so we raise a warning
if settings.DEBUG:
warnings.warn(
"A {% csrf_token %} was used in a template, but the context "
"did not provide the value. This is usually caused by not "
"using RequestContext."
)
return ''
def static(prefix, view=serve, **kwargs):
"""
Helper function to return a URL pattern for serving files in debug mode.
from django.conf import settings
from django.conf.urls.static import static
urlpatterns = [
# ... the rest of your URLconf goes here ...
] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
"""
# No-op if not in debug mode or an non-local prefix
if not settings.DEBUG or (prefix and '://' in prefix):
return []
elif not prefix:
raise ImproperlyConfigured("Empty static prefix not permitted")
return [
url(r'^%s(?P<path>.*)$' % re.escape(prefix.lstrip('/')), view, kwargs=kwargs),
]
def handle_uncaught_exception(request, resolver, exc_info):
"""
Processing for any otherwise uncaught exceptions (those that will
generate HTTP 500 responses).
"""
if settings.DEBUG_PROPAGATE_EXCEPTIONS:
raise
logger.error(
'Internal Server Error: %s', request.path,
exc_info=exc_info,
extra={'status_code': 500, 'request': request},
)
if settings.DEBUG:
return debug.technical_500_response(request, *exc_info)
# If Http500 handler is not installed, reraise the last exception.
if resolver.urlconf_module is None:
six.reraise(*exc_info)
# Return an HttpResponse that displays a friendly error message.
callback, param_dict = resolver.resolve_error_handler(500)
return callback(request, **param_dict)
def serve(request, path, insecure=False, **kwargs):
"""
Serve static files below a given point in the directory structure or
from locations inferred from the staticfiles finders.
To use, put a URL pattern such as::
from django.contrib.staticfiles import views
url(r'^(?P<path>.*)$', views.serve)
in your URLconf.
It uses the django.views.static.serve() view to serve the found files.
"""
if not settings.DEBUG and not insecure:
raise Http404
normalized_path = posixpath.normpath(unquote(path)).lstrip('/')
absolute_path = finders.find(normalized_path)
if not absolute_path:
if path.endswith('/') or path == '':
raise Http404("Directory indexes are not allowed here.")
raise Http404("'%s' could not be found" % path)
document_root, path = os.path.split(absolute_path)
return static.serve(request, path, document_root=document_root, **kwargs)
def render(self, render_type, context):
"""
Renders the template
:param render_type: the content type to render
:param context: context data dictionary
:return: the rendered content
"""
assert render_type in self.render_types, 'Invalid Render Type'
try:
content = render_to_string('herald/{}/{}.{}'.format(
render_type,
self.template_name,
'txt' if render_type == 'text' else render_type
), context)
except TemplateDoesNotExist:
content = None
if settings.DEBUG:
raise
return content
def get_sanic_application():
"""
Sets up django and returns a Sanic application
"""
if sys.version_info < (3, 5):
raise RuntimeError("The SanicDjango Adaptor may only be used with python 3.5 and above.")
django.setup()
from django.conf import settings
DEBUG = getattr(settings, 'DEBUG', False)
INSTALLED_APPS = getattr(settings, 'INSTALLED_APPS', [])
do_static = DEBUG and 'django.contrib.staticfiles' in INSTALLED_APPS
app = Sanic(__name__)
if do_static:
static_url = getattr(settings, 'STATIC_URL', "/static/")
static_root = getattr(settings, 'STATIC_ROOT', "./static")
app.static(static_url, static_root)
app.handle_request = SanicHandler(app) # patch the app to use the django adaptor handler
return app
def dashboard(request):
# Ideally people should...
# `HTTP -X POST -d JSON http://hostname/symbolicate/`
# But if they do it directly on the root it should still work,
# for legacy reasons.
if request.method == 'POST' and request.body:
return symbolicate_json(request)
absolute_url = request.build_absolute_uri()
if (
absolute_url.endswith(settings.LOGIN_REDIRECT_URL) and
settings.DEBUG
): # pragma: no cover
return redirect('http://localhost:3000' + settings.LOGIN_REDIRECT_URL)
return frontend_index_html(request)
def get_cache(cache=None):
"""Return ``cache`` or the 'default' cache if ``cache`` is not specified or
``cache`` is not configured.
:param cache: The name of the requested cache.
"""
try:
# Check for proper Redis persistent backends
# FIXME: this logic needs to be a system sanity check
if (not settings.DEBUG and cache in PERSISTENT_STORES and
(cache not in settings.CACHES or 'RedisCache' not in
settings.CACHES[cache]['BACKEND'] or
settings.CACHES[cache].get('TIMEOUT', '') is not None)):
raise ImproperlyConfigured(
'Pootle requires a Redis-backed caching backend for %r '
'with `TIMEOUT: None`. Please review your settings.' % cache
)
return caches[cache]
except InvalidCacheBackendError:
return default_cache
def ajax_required(f):
"""Check that the request is an AJAX request.
Use it in your views:
@ajax_required
def my_view(request):
....
Taken from:
http://djangosnippets.org/snippets/771/
"""
@wraps(f)
def wrapper(request, *args, **kwargs):
if not settings.DEBUG and not request.is_ajax():
return HttpResponseBadRequest("This must be an AJAX request.")
return f(request, *args, **kwargs)
return wrapper
def handle_uncaught_exception(request, resolver, exc_info):
"""
Processing for any otherwise uncaught exceptions (those that will
generate HTTP 500 responses).
"""
if settings.DEBUG_PROPAGATE_EXCEPTIONS:
raise
logger.error(
'Internal Server Error: %s', request.path,
exc_info=exc_info,
extra={'status_code': 500, 'request': request},
)
if settings.DEBUG:
return debug.technical_500_response(request, *exc_info)
# Return an HttpResponse that displays a friendly error message.
callback, param_dict = resolver.resolve_error_handler(500)
return callback(request, **param_dict)
def serve(request, path, insecure=False, **kwargs):
"""
Serve static files below a given point in the directory structure or
from locations inferred from the staticfiles finders.
To use, put a URL pattern such as::
from django.contrib.staticfiles import views
url(r'^(?P<path>.*)$', views.serve)
in your URLconf.
It uses the django.views.static.serve() view to serve the found files.
"""
if not settings.DEBUG and not insecure:
raise Http404
normalized_path = posixpath.normpath(unquote(path)).lstrip('/')
absolute_path = finders.find(normalized_path)
if not absolute_path:
if path.endswith('/') or path == '':
raise Http404("Directory indexes are not allowed here.")
raise Http404("'%s' could not be found" % path)
document_root, path = os.path.split(absolute_path)
return static.serve(request, path, document_root=document_root, **kwargs)