def language_selector(context):
""" displays a language selector dropdown in the admin, based on Django "LANGUAGES" context.
requires:
* USE_I18N = True / settings.py
* LANGUAGES specified / settings.py (otherwise all Django locales will be displayed)
* "set_language" url configured (see https://docs.djangoproject.com/en/dev/topics/i18n/translation/#the-set-language-redirect-view)
"""
output = ""
i18 = getattr(settings, 'USE_I18N', False)
if i18:
template = "admin/language_selector.html"
context['i18n_is_set'] = True
try:
output = render_to_string(template, context)
except:
pass
return output
python类py()的实例源码
def import_class(val):
"""Attempt to import a class from a string representation.
Pattern borrowed from Django REST Framework.
See rest_framework/settings.py#L170-L182
"""
try:
parts = val.split('.')
module_path, class_name = '.'.join(parts[:-1]), parts[-1]
module = import_module(module_path)
return getattr(module, class_name)
except (ImportError, AttributeError) as e:
msg = "Could not import auth/permission class '{}'. {}: {}.".format(
val,
e.__class__.__name__,
e)
raise ImportError(msg)
def __init__(self, *args, **kwargs):
super(SendgridBackend, self).__init__(*args, **kwargs)
if "api_key" in kwargs:
self.sg = sendgrid.SendGridAPIClient(api_key=kwargs["api_key"])
elif hasattr(settings, "SENDGRID_API_KEY") and settings.SENDGRID_API_KEY:
self.sg = sendgrid.SendGridAPIClient(api_key=settings.SENDGRID_API_KEY)
else:
raise ImproperlyConfigured("settings.py must contain a value for SENDGRID_API_KEY. " +
"You may also pass a value to the api_key argument (optional).")
sandbox_mode_in_debug = True
if hasattr(settings, "SENDGRID_SANDBOX_MODE_IN_DEBUG"):
sandbox_mode_in_debug = settings.SENDGRID_SANDBOX_MODE_IN_DEBUG
self.sandbox_mode = settings.DEBUG and sandbox_mode_in_debug
if self.sandbox_mode:
warnings.warn("Sendgrid email backend is in sandbox mode! Emails will not be delivered.")
track_email = True
if hasattr(settings, "SENDGRID_TRACK_EMAIL_OPENS"):
track_email = settings.SENDGRID_TRACK_EMAIL_OPENS
self.track_email = track_email
def get(key, default=None):
if default is None and key not in settings.LOGPIPE:
raise ImproperlyConfigured('Please ensure LOGPIPE["%s"] is defined in your settings.py file.' % key)
return settings.LOGPIPE.get(key, default)
def get_bootstrap_setting(name, default=None):
"""
Read a setting
"""
# Start with a copy of default settings
BOOTSTRAP4 = BOOTSTRAP4_DEFAULTS.copy()
# Override with user settings from settings.py
BOOTSTRAP4.update(getattr(settings, 'BOOTSTRAP4', {}))
return BOOTSTRAP4.get(name, default)
def get_commit_mode():
"""
Disabling AUTOCOMMIT causes enqueued jobs to be stored in a temporary queue.
Jobs in this queue are only enqueued after the request is completed and are
discarded if the request causes an exception (similar to db transactions).
To disable autocommit, put this in settings.py:
RQ = {
'AUTOCOMMIT': False,
}
"""
RQ = getattr(settings, 'RQ', {})
return RQ.get('AUTOCOMMIT', True)
def test_get_queue_url_with_db_default(self):
"""
Test that get_queue use the right parameters for queues using URL for
connection, where no DB given and URL does not contain the db number
(redis-py defaults to 0, should not break).
"""
config = QUEUES['url_default_db']
queue = get_queue('url_default_db')
connection_kwargs = queue.connection.connection_pool.connection_kwargs
self.assertEqual(queue.name, 'url_default_db')
self.assertEqual(connection_kwargs['host'], 'host')
self.assertEqual(connection_kwargs['port'], 1234)
self.assertEqual(connection_kwargs['db'], 0)
self.assertEqual(connection_kwargs['password'], 'password')
def test_job_decorator(self):
# Ensure that decorator passes in the right queue from settings.py
queue_name = 'test3'
config = QUEUES[queue_name]
@job(queue_name)
def test():
pass
result = test.delay()
queue = get_queue(queue_name)
self.assertEqual(result.origin, queue_name)
result.delete()
def test_job_decorator_default(self):
# Ensure that decorator passes in the right queue from settings.py
@job
def test():
pass
result = test.delay()
self.assertEqual(result.origin, 'default')
result.delete()
def pytest_configure(config):
# using test version of settings.py
os.environ['DJANGO_SETTINGS_MODULE'] = "estimators.tests.settings"
django.setup()
# set MEDIA_ROOT to temp_dir before starting tests
from django.conf import settings
settings.MEDIA_ROOT = temp_dir.name
def __init__(self, *args, **kwargs):
super(StripeCharge, self).__init__(*args, **kwargs)
# bring in stripe, and get the api key from settings.py
import stripe
stripe.api_key = getattr(settings,'STRIPE_PRIVATE_KEY','')
self.stripe = stripe
# store the stripe charge id for this sale
def metadata(request, config_loader_path=None, valid_for=None):
"""Returns an XML with the SAML 2.0 metadata for this
SP as configured in the settings.py file.
"""
conf = get_config(config_loader_path, request)
metadata = entity_descriptor(conf)
return HttpResponse(content=text_type(metadata).encode('utf-8'),
content_type="text/xml; charset=utf8")
def make_groups(apps, schema_editor):
# Create the default set of user groups and
# assign the correct permissions.
# Permissions aren't created until after all migrations
# are run so lets create them now!
app_configs = apps.get_app_configs()
for app in app_configs:
app.models_module = True
create_permissions(app, verbosity=0)
app.models_module = None
Group = apps.get_model('auth', 'Group')
Permission = apps.get_model('auth', 'Permission')
for group in settings.DEFAULT_GROUPS:
g,created = Group.objects.get_or_create(name=group)
if group == 'admin':
# Assign all available permissions to the admin group
p = list(Permission.objects.all())
g.permissions.add(*p)
elif group == 'staff':
for perm in settings.DEFAULT_STAFF_PERMISSIONS:
p = Permission.objects.get(name=perm)
g.permissions.add(p)
elif group == 'user':
# Default permissions for users are store in
# the settings.py file
for perm in settings.DEFAULT_USER_PERMISSIONS:
p = Permission.objects.get(name=perm)
g.permissions.add(p)
def make_groups(apps, schema_editor):
# Create the default set of user groups and
# assign the correct permissions.
# Permissions aren't created until after all migrations
# are run so lets create them now!
app_configs = apps.get_app_configs()
for app in app_configs:
app.models_module = True
create_permissions(app, verbosity=0)
app.models_module = None
Group = apps.get_model('auth', 'Group')
Permission = apps.get_model('auth', 'Permission')
for group in settings.DEFAULT_GROUPS:
g,created = Group.objects.get_or_create(name=group)
if group == 'admin':
# Assign all available permissions to the admin group
p = list(Permission.objects.all())
g.permissions.add(*p)
elif group == 'staff':
for perm in settings.DEFAULT_STAFF_PERMISSIONS:
p = Permission.objects.get(name=perm)
g.permissions.add(p)
elif group == 'user':
# Default permissions for users are store in
# the settings.py file
for perm in settings.DEFAULT_USER_PERMISSIONS:
p = Permission.objects.get(name=perm)
g.permissions.add(p)
def recommend_trade(self, nn_price, last_sample, fee_amount=get_fee_amount()):
fee_amount = fee_amount * 2 # fee x 2 since we'd need to clear both buy and sell fees to be profitable
fee_amount = fee_amount * settings.FEE_MANAGEMENT_STRATEGY # see desc in settings.py
anticipated_percent_increase = (nn_price - last_sample) / last_sample
if abs(anticipated_percent_increase) < fee_amount:
should_trade = 'HOLD'
elif anticipated_percent_increase > fee_amount:
should_trade = 'BUY'
elif anticipated_percent_increase < fee_amount:
should_trade = 'SELL'
return should_trade
def recommend_trade(self, nn_price, last_sample, fee_amount=get_fee_amount()):
fee_amount = fee_amount * 2 # fee x 2 since we'd need to clear both buy and sell fees to be profitable
fee_amount = fee_amount * settings.FEE_MANAGEMENT_STRATEGY # see desc in settings.py
anticipated_percent_increase = (nn_price - last_sample) / last_sample
if abs(anticipated_percent_increase) < fee_amount:
should_trade = 'HOLD'
elif anticipated_percent_increase > fee_amount:
should_trade = 'BUY'
elif anticipated_percent_increase < fee_amount:
should_trade = 'SELL'
return should_trade
def import_setting(context, setting):
"""
Injects a setting value from predata/settings.py if it is in
ALLOWABLE_VALUES_FROM_SETTINGS.
"""
if setting in ALLOWABLE_VALUES_FROM_SETTINGS:
context[setting] = getattr(settings, setting, "")
return ""
def metadata(request, config_loader_path=None, valid_for=None):
"""Returns an XML with the SAML 2.0 metadata for this
SP as configured in the settings.py file.
"""
conf = get_config(config_loader_path, request)
metadata = entity_descriptor(conf)
return HttpResponse(content=text_type(metadata).encode('utf-8'),
content_type="text/xml; charset=utf8")
def clean_username(self, username):
"""
Performs any cleaning on the "username" prior to using it to get or
create the user object. Returns the cleaned username.
For more info, reference clean_username function in
django/auth/backends.py
"""
username = username.replace('@' + settings.KRB5_REALM, '')
username_tuple = username.split('/')
if len(username_tuple) > 1:
username = username_tuple[1]
return len(username) > 30 and username[:30] or username
def get_key():
key = settings.KEY
if key is None:
raise Exception('Key not specified in settings.py file')
else:
return key
def handle(self, *args, **options):
from django.conf import settings
if settings.DJANGO_TELEGRAMBOT.get('MODE', 'WEBHOOK') == 'WEBHOOK':
self.stderr.write("Webhook mode active in settings.py, change in POLLING if you want use polling update")
return
updater = self.get_updater(username=options.get('username'), token=options.get('token'))
if not updater:
self.stderr.write("Bot not found")
return
# Enable Logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger("telegrambot")
logger.setLevel(logging.INFO)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter('%(name)s - %(levelname)s - %(message)s'))
logger.addHandler(console)
bots_list = settings.DJANGO_TELEGRAMBOT.get('BOTS', [])
b = None
for bot_set in bots_list:
if bot_set.get('TOKEN', None) == updater.bot.token:
b = bot_set
break
if not b:
self.stderr.write("Cannot find bot settings")
return
allowed_updates = b.get('ALLOWED_UPDATES', None)
timeout = b.get('TIMEOUT', 10)
poll_interval = b.get('POLL_INTERVAL', 0.0)
clean = b.get('POLL_CLEAN', False)
bootstrap_retries = b.get('POLL_BOOTSTRAP_RETRIES', 0)
read_latency = b.get('POLL_READ_LATENCY', 2.)
self.stdout.write("Run polling...")
updater.start_polling(poll_interval=poll_interval,
timeout=timeout,
clean=clean,
bootstrap_retries=bootstrap_retries,
read_latency=read_latency,
allowed_updates=allowed_updates)
self.stdout.write("the bot is started and runs until we press Ctrl-C on the command line.")
updater.idle()