def add_sharing_banner(page, response):
if not getattr(settings, 'WAGTAILSHARING_BANNER', True):
return
if hasattr(response, 'render') and callable(response.render):
response.render()
html = force_text(response.content)
body = re.search(r'(?i)<body.*?>', html)
if body:
endpos = body.end()
banner_template_name = 'wagtailsharing/banner.html'
banner_template = loader.get_template(banner_template_name)
banner_html = banner_template.render()
banner_html = force_text(banner_html)
content_with_banner = html[:endpos] + banner_html + html[endpos:]
response.content = content_with_banner
python类settings()的实例源码
def __init__(self, testcases_to_run, nr_of_tests, all_should_pass=True,
print_bad=True, runner_options=None):
runner_options = runner_options or {}
self.nr_of_tests = nr_of_tests
self.all_should_pass = all_should_pass
self.print_bad = print_bad
django_runner_cls = get_runner(settings)
django_runner = django_runner_cls(**runner_options)
self.suite = django_runner.test_suite()
for testcase_cls in testcases_to_run:
tests = django_runner.test_loader.loadTestsFromTestCase(
testcase_cls)
self.suite.addTests(tests)
self.test_runner = django_runner.test_runner(
resultclass=django_runner.get_resultclass(),
stream=six.StringIO()
)
def create_notification(**kwargs):
"""Notify signal receiver."""
# make fresh copy and retain kwargs
params = kwargs.copy()
del params['signal']
del params['sender']
with suppress(KeyError):
del params['silent']
silent = kwargs.get('silent', False)
# If it's a silent notification create the notification but don't save it
if silent:
notification = Notification(**params)
else:
notification = Notification.objects.create(**params)
# send via custom adapters
for adapter_path in getattr(settings, 'NOTIFICATION_ADAPTERS', []):
adapter = import_attr(adapter_path)
adapter(**kwargs).notify()
if getattr(settings, 'NOTIFICATIONS_WEBSOCKET', False):
send_to_queue(notification)
def post_stats(request, response, data):
es_url_template = getattr(settings, 'CAVALRY_ELASTICSEARCH_URL_TEMPLATE', None)
if not es_url_template:
return
payload = build_payload(data, request, response)
es_url = es_url_template.format_map(
dict(
payload,
ymd=datetime.utcnow().strftime('%Y-%m-%d'),
),
)
body = force_bytes(json.dumps(payload, cls=PayloadJSONEncoder))
try:
resp = sess.post(es_url, data=body, headers={'Content-Type': 'application/json'}, timeout=0.5)
if resp.status_code != 201:
log.warning('Unable to post data to %s (error %s): %s', es_url, resp.status_code, resp.text)
except Exception as e:
log.warning('Unable to post data to %s: %s', es_url, e)
def test_server_list_pagination_more(self):
page_size = getattr(settings, 'API_RESULT_PAGE_SIZE', 1)
servers = self.servers.list()
novaclient = self.stub_novaclient()
novaclient.servers = self.mox.CreateMockAnything()
novaclient.servers.list(True,
{'all_tenants': True,
'marker': None,
'limit': page_size + 1}) \
.AndReturn(servers[:page_size + 1])
self.mox.ReplayAll()
ret_val, has_more = api.nova.server_list(self.request,
{'marker': None,
'paginate': True},
all_tenants=True)
for server in ret_val:
self.assertIsInstance(server, api.nova.Server)
self.assertEqual(page_size, len(ret_val))
self.assertTrue(has_more)
def heatclient(request, password=None):
api_version = "1"
insecure = getattr(settings, 'OPENSTACK_SSL_NO_VERIFY', False)
cacert = getattr(settings, 'OPENSTACK_SSL_CACERT', None)
endpoint = base.url_for(request, 'orchestration')
kwargs = {
'token': request.user.token.id,
'insecure': insecure,
'ca_file': cacert,
'username': request.user.username,
'password': password
# 'timeout': args.timeout,
# 'ca_file': args.ca_file,
# 'cert_file': args.cert_file,
# 'key_file': args.key_file,
}
client = heat_client.Client(api_version, endpoint, **kwargs)
client.format_parameters = format_parameters
return client
def get_active_version(self):
if self._active is not None:
return self.supported[self._active]
key = getattr(settings, self.SETTINGS_KEY, {}).get(self.service_type)
if key is None:
# TODO(gabriel): support API version discovery here; we'll leave
# the setting in as a way of overriding the latest available
# version.
key = self.preferred
# Since we do a key lookup in the supported dict the type matters,
# let's ensure people know if they use a string when the key isn't.
if isinstance(key, six.string_types):
msg = ('The version "%s" specified for the %s service should be '
'either an integer or a float, not a string.' %
(key, self.service_type))
raise exceptions.ConfigurationError(msg)
# Provide a helpful error message if the specified version isn't in the
# supported list.
if key not in self.supported:
choices = ", ".join(str(k) for k in six.iterkeys(self.supported))
msg = ('%s is not a supported API version for the %s service, '
' choices are: %s' % (key, self.service_type, choices))
raise exceptions.ConfigurationError(msg)
self._active = key
return self.supported[self._active]
def url_for(request, service_type, endpoint_type=None, region=None):
endpoint_type = endpoint_type or getattr(settings,
'OPENSTACK_ENDPOINT_TYPE',
'publicURL')
fallback_endpoint_type = getattr(settings, 'SECONDARY_ENDPOINT_TYPE', None)
catalog = request.user.service_catalog
service = get_service_from_catalog(catalog, service_type)
if service:
if not region:
region = request.user.services_region
url = get_url_for_service(service,
region,
endpoint_type)
if not url and fallback_endpoint_type:
url = get_url_for_service(service,
region,
fallback_endpoint_type)
if url:
return url
raise exceptions.ServiceCatalogException(service_type)
def get_context_data(self, **kwargs):
context = super(NetworkTopologyView, self).get_context_data(**kwargs)
network_config = getattr(settings, 'OPENSTACK_NEUTRON_NETWORK', {})
context['launch_instance_allowed'] = self._has_permission(
(("compute", "compute:create"),))
context['instance_quota_exceeded'] = self._quota_exceeded('instances')
context['create_network_allowed'] = self._has_permission(
(("network", "create_network"),))
context['network_quota_exceeded'] = self._quota_exceeded('networks')
context['create_router_allowed'] = (
network_config.get('enable_router', True) and
self._has_permission((("network", "create_router"),)))
context['router_quota_exceeded'] = self._quota_exceeded('routers')
context['console_type'] = getattr(
settings, 'CONSOLE_TYPE', 'AUTO')
context['show_ng_launch'] = getattr(
settings, 'LAUNCH_INSTANCE_NG_ENABLED', True)
context['show_legacy_launch'] = getattr(
settings, 'LAUNCH_INSTANCE_LEGACY_ENABLED', False)
return context
def get_subnetpool_choices(self, request):
subnetpool_choices = [('', _('Select a pool'))]
default_ipv6_subnet_pool_label = \
getattr(settings, 'OPENSTACK_NEUTRON_NETWORK', {}).get(
'default_ipv6_subnet_pool_label', None)
default_ipv4_subnet_pool_label = \
getattr(settings, 'OPENSTACK_NEUTRON_NETWORK', {}).get(
'default_ipv4_subnet_pool_label', None)
if default_ipv6_subnet_pool_label:
subnetpool_dict = {'ip_version': 6,
'name': default_ipv6_subnet_pool_label}
subnetpool = api.neutron.SubnetPool(subnetpool_dict)
subnetpool_choices.append(('', subnetpool))
if default_ipv4_subnet_pool_label:
subnetpool_dict = {'ip_version': 4,
'name': default_ipv4_subnet_pool_label}
subnetpool = api.neutron.SubnetPool(subnetpool_dict)
subnetpool_choices.append(('', subnetpool))
for subnetpool in api.neutron.subnetpool_list(request):
subnetpool_choices.append((subnetpool.id, subnetpool))
return subnetpool_choices
def test_settings(self):
from django import conf
settings_mod = sys.modules[self.mod_name]
settings_mod.DEBUG = True
settings_mod.AMF_TIME_OFFSET = 1000
old_settings = conf.settings
conf.settings = conf.Settings(self.mod_name)
gw = django.DjangoGateway()
try:
self.assertTrue(gw.debug)
self.assertEqual(gw.timezone_offset, 1000)
finally:
conf.settings = old_settings
def save(self, *args, **kwargs):
if self.date_taken is None:
try:
exif_date = self.exif.get('DateTimeOriginal', None)
if exif_date is not None:
d, t = exif_date.split(" ")
year, month, day = d.split(':')
hour, minute, second = t.split(':')
if getattr(settings, "USE_TZ", False):
tz = get_current_timezone()
self.date_taken = make_aware(datetime(
int(year), int(month), int(day),
int(hour), int(minute), int(second)), tz)
else:
self.date_taken = datetime(
int(year), int(month), int(day),
int(hour), int(minute), int(second))
except Exception:
pass
if self.date_taken is None:
self.date_taken = now()
super(Image, self).save(*args, **kwargs)
def swappable_first_key(self, item):
"""
Sorting key function that places potential swappable models first in
lists of created models (only real way to solve #22783)
"""
try:
model = self.new_apps.get_model(item[0], item[1])
base_names = [base.__name__ for base in model.__bases__]
string_version = "%s.%s" % (item[0], item[1])
if (
model._meta.swappable or
"AbstractUser" in base_names or
"AbstractBaseUser" in base_names or
settings.AUTH_USER_MODEL.lower() == string_version.lower()
):
return ("___" + item[0], "___" + item[1])
except LookupError:
pass
return item
def initialize():
"""Initialize the client from Django settings once"""
import infusionsoft
if not infusionsoft.is_initialized:
api_key = getattr(settings, 'INFUSIONSOFT_API_KEY', None)
app_name = getattr(settings, 'INFUSIONSOFT_APP_NAME', None)
if app_name:
app_name_or_api_url = app_name
else:
app_name_or_api_url = getattr(settings, 'INFUSIONSOFT_API_URL',
None)
if not api_key or not app_name_or_api_url:
raise ValueError(
'Please set INFUSIONSOFT_APP_NAME or INFUSIONSOFT_API_URL, '
'and INFUSIONSOFT_API_KEY in your settings')
options = getattr(settings, 'INFUSIONSOFT_CLIENT_OPTIONS', {})
infusionsoft.initialize(app_name_or_api_url, api_key, **options)
def __init__(self, *args, **kwargs):
# Before we get too far, make sure pysqlite 2.5+ is installed.
if Database.version_info < (2, 5, 0):
raise ImproperlyConfigured('Only versions of pysqlite 2.5+ are '
'compatible with SpatiaLite and GeoDjango.')
# Trying to find the location of the SpatiaLite library.
# Here we are figuring out the path to the SpatiaLite library
# (`libspatialite`). If it's not in the system library path (e.g., it
# cannot be found by `ctypes.util.find_library`), then it may be set
# manually in the settings via the `SPATIALITE_LIBRARY_PATH` setting.
self.spatialite_lib = getattr(settings, 'SPATIALITE_LIBRARY_PATH',
find_library('spatialite'))
if not self.spatialite_lib:
raise ImproperlyConfigured('Unable to locate the SpatiaLite library. '
'Make sure it is in your library path, or set '
'SPATIALITE_LIBRARY_PATH in your settings.'
)
super(DatabaseWrapper, self).__init__(*args, **kwargs)
self.features = DatabaseFeatures(self)
self.ops = SpatiaLiteOperations(self)
self.client = SpatiaLiteClient(self)
self.introspection = SpatiaLiteIntrospection(self)
def render(self, name, value, attrs=None):
# Prepare values
attrs = self.build_attrs(attrs, name=name)
if not value:
value = ''
options = getattr(settings, 'MARKEDIT_DEFAULT_SETTINGS', {})
if 'options' in attrs:
options = self._eval_value(attrs['options'], {})
del attrs['options']
# Render widget to HTML
t = loader.get_template('markedit/ui.html')
c = Context({
'attributes': self._render_attrs(attrs),
'value': conditional_escape(force_unicode(value)),
'id': attrs['id'],
'options': options,
})
return t.render(c)
def add_arguments(self, parser):
parser.add_argument(
'args', metavar='test_label', nargs='*',
help='Module paths to test; can be modulename, modulename.TestCase or modulename.TestCase.test_method'
)
parser.add_argument(
'--noinput', '--no-input', action='store_false', dest='interactive', default=True,
help='Tells Django to NOT prompt the user for input of any kind.',
)
parser.add_argument(
'--failfast', action='store_true', dest='failfast', default=False,
help='Tells Django to stop running the test suite after first failed test.',
)
parser.add_argument(
'--testrunner', action='store', dest='testrunner',
help='Tells Django to use specified test runner class instead of '
'the one specified by the TEST_RUNNER setting.',
)
test_runner_class = get_runner(settings, self.test_runner)
if hasattr(test_runner_class, 'add_arguments'):
test_runner_class.add_arguments(parser)
def check_url_namespaces_unique(app_configs, **kwargs):
"""
Warn if URL namespaces used in applications aren't unique.
"""
if not getattr(settings, 'ROOT_URLCONF', None):
return []
from django.urls import get_resolver
resolver = get_resolver()
all_namespaces = _load_all_namespaces(resolver)
counter = Counter(all_namespaces)
non_unique_namespaces = [n for n, count in counter.items() if count > 1]
errors = []
for namespace in non_unique_namespaces:
errors.append(Warning(
"URL namespace '{}' isn't unique. You may not be able to reverse "
"all URLs in this namespace".format(namespace),
id="urls.W005",
))
return errors
def __init__(self, *args, **kwargs):
# Before we get too far, make sure pysqlite 2.5+ is installed.
if Database.version_info < (2, 5, 0):
raise ImproperlyConfigured('Only versions of pysqlite 2.5+ are '
'compatible with SpatiaLite and GeoDjango.')
# Trying to find the location of the SpatiaLite library.
# Here we are figuring out the path to the SpatiaLite library
# (`libspatialite`). If it's not in the system library path (e.g., it
# cannot be found by `ctypes.util.find_library`), then it may be set
# manually in the settings via the `SPATIALITE_LIBRARY_PATH` setting.
self.spatialite_lib = getattr(settings, 'SPATIALITE_LIBRARY_PATH',
find_library('spatialite'))
if not self.spatialite_lib:
raise ImproperlyConfigured('Unable to locate the SpatiaLite library. '
'Make sure it is in your library path, or set '
'SPATIALITE_LIBRARY_PATH in your settings.'
)
super(DatabaseWrapper, self).__init__(*args, **kwargs)
def get_dirs(self):
dirs = super(ThemeLoader, self).get_dirs()
theme = get_theme()
theme_path = getattr(settings, 'WAGTAIL_THEME_PATH', None)
if theme:
if theme_path:
# Prepend theme path if WAGTAIL_THEME_PATH is set
theme_dirs = [
os.path.join(dir, theme_path, theme) for dir in dirs]
else:
# Append theme for each directory in the DIRS option of the
# TEMPLATES setting
theme_dirs = [os.path.join(dir, theme) for dir in dirs]
return theme_dirs
return dirs
def __init__(self):
super(NotificationMethod, self).__init__()
self.messages = []
self.jid = getattr(settings, 'ASYNC_XMPP_JID', None)
self.password = getattr(settings, 'ASYNC_XMPP_PASSWORD', None)
if self.jid is None or self.password is None:
self.server_configured = False
return
self.server = getattr(settings, 'ASYNC_XMPP_SERVER', None)
self.port = getattr(settings, 'ASYNC_XMPP_SERVER_PORT', 5222)
self.connection_tuple = None
self.use_srv = True
self.jid = xmpp.JID(self.jid)
if self.server is not None:
self.connection_tuple = (self.server, self.port)
self.use_srv = False
self.client = Client(self.jid.getDomain())
if not self.client.connect(server=self.connection_tuple, use_srv=self.use_srv):
self.server_configured = False
return
if not self.client.auth(self.jid.getNode(), self.password, resource=self.jid.getResource()):
self.server_configured = False
return
self.client.disconnected()
self.server_configured = True
def get_callback_function(setting_name, default=None):
func = getattr(settings, setting_name, None)
if not func:
return default
if callable(func):
return func
if isinstance(func, six.string_types):
func = import_string(func)
if not callable(func):
raise ImproperlyConfigured(
'{name} must be callable.'.format(name=setting_name)
)
return func
def ready(self):
self.schema = getattr(settings, 'SWAGGER_SCHEMA', None)
if not self.schema:
raise ImproperlyConfigured('You have to provide SWAGGER_SCHEMA setting pointing to desired schema')
else:
self.module = getattr(settings, 'SWAGGER_MODULE', None)
self.swagger = Swagger(self.schema, self.module)
def handle(self, *args, **options):
schema = getattr(settings, 'SWAGGER_SCHEMA', None)
module = getattr(settings, 'SWAGGER_MODULE', None)
if not schema:
raise ImproperlyConfigured('You have to provide SWAGGER_SCHEMA setting pointing to desired schema')
if not module:
raise ImproperlyConfigured('You have to specify desired controller module name in SWAGGER_MODULE setting')
router = SwaggerRouter()
print('Inspecting available controllers...')
router.update(True)
router.process()
print()
print('Following classes and methods are going to be generated:')
enum = router.get_enum()
for name in enum:
print("{} : {}".format(name, [x['method'] for x in enum[name]['methods']]))
if(options['generate']):
template = Template()
filename = module.split('.')[-1] + '.py'
structure = [{ 'name' : name, 'data' : data } for name, data in six.iteritems(enum)]
print('Generating handlers ({})...'.format(filename))
with codecs.open(filename, 'w', 'utf-8') as f:
f.write(template.render(template_name = 'view.jinja', names = structure))
print('Done.')
else:
print()
print('Use --generate option to create them')
def is_open_for_signup(self, request):
return getattr(settings, 'ACCOUNT_ALLOW_REGISTRATION', True)
def is_open_for_signup(self, request, sociallogin):
return getattr(settings, 'ACCOUNT_ALLOW_REGISTRATION', True)
def get_context_data(self, **kwargs):
context = super(MochaView, self).get_context_data(**kwargs)
context["settings"] = settings
return context
def dotted_paths_for_init(self):
if not hasattr(settings, self.settings_name):
return self.defaults
return getattr(settings, self.settings_name)
def run(self, *a, **kw):
"""
as self.stopTestRun is ran before the actual results were printed,
need to override run() to print things after
"""
datafile_path = get_datafile_path()
self.djpt_writer = Writer(datafile_path)
self.djpt_writer.start()
retval = super(DjptTestRunnerMixin, self).run(*a, **kw)
self.djpt_writer.end()
if getattr(settings, 'DJPT_PRINT_WORST_REPORT', True):
self.stream.write(
'To see the Worst Performing Items report, '
'run manage.py djpt_worst_report')
return retval
def _validate_data(self):
if not self.settings_based:
return
if self._data:
raise TypeError(
'Either provide data (kwargs) or settings_based, '
'not both.')
if self.is_anonymous():
raise TypeError(
'Can only be settings based when collector_id is provided.')