def post_tweet(data):
try:
post = Post.objects.get(title=data['title'])
tags = await get_tags(post=post)
media = await get_media(post=post)
sentiment = post.sentiment or "N/A"
if not (tags is None):
status = "{0} [{1}]: {2}{3}/ {4}".format(post.title[:80], sentiment, settings.DOMAIN, post.slug, tags)
else:
status = "{0} [{1}]: {2}{3}/".format(post.title[:80], sentiment, settings.DOMAIN, post.slug)
if not (media is None):
api.PostUpdate(status=status, media=media)
else:
api.PostUpdate(status=status, media=None)
print(colored.green("Sent tweet {}.".format(status)))
except Exception as e:
print(colored.red("At Twitter post: {0}".format(e)))
python类DOMAIN的实例源码
def setup_ca():
print('creating dummy CA')
if not CertificateAuthority.objects.exists():
ec = EthicsCommission.objects.get(uuid=settings.ETHICS_COMMISSION_UUID)
subject = '/CN={}/O={}'.format(settings.DOMAIN, ec.name[:64])
openssl.setup(subject)
if not os.path.exists(settings.ECS_CA_ROOT):
os.mkdir(settings.ECS_CA_ROOT)
ca_cert_path = os.path.join(settings.ECS_CA_ROOT, 'ca.cert.pem')
with open(ca_cert_path, 'w') as f:
ca = CertificateAuthority.objects.get()
f.write(ca.cert)
openssl.gen_crl()
def active_user(request, token): # ??????
try:
username = token_confirm.confirm_validate_token(token)
except:
username = token_confirm.remove_validate_token(token)
users = User.objects.filter(username=username)
for user in users:
user.delete()
return render(request, 'message.html',
{'message': u'????????????????<a href=\"' + unicode(settings.DOMAIN) + u'/signup\">??</a>'})
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
return render(request, 'message.html', {'message': u"????????????????????"})
user.is_active = True
user.save()
message = u'????????<a href=\"' + unicode(settings.ROOT_URLCONF) + u'/login\">??</a>??'
return render(request, 'message.html', {'message':message})
def send_activation_email(user, key):
activation_link = '{}://{}{}'.format(
settings.PROTOCOL,
settings.DOMAIN,
reverse('account:activate', kwargs={'activation_key': key})
)
context = {
'activation_link': activation_link,
'name': user.first_name
}
send_mail(
subject_template_name='emails/activate/verify_email_subject.txt',
email_template_name='emails/activate/verify_email.txt',
to_email=user.email,
context=context,
html_email_template_name='emails/activate/verify_email.html'
)
def _prepare_debuglog_message(message, caller_level=2):
# Por si acaso se le mete una cadena de tipo str,
# este módulo es capaz de detectar eso y convertirla a UTF8
if type(message) == str:
message = unicode(message, "UTF-8")
# Hora
now = timezone.now()
# Contexto desde el que se ha llamado
curframe = inspect.currentframe()
# Objeto frame que llamó a dlprint
calframes = inspect.getouterframes(curframe, caller_level)
caller_frame = calframes[2][0]
caller_name = calframes[2][3]
# Ruta del archivo que llamó a dlprint
filename_path = caller_frame.f_code.co_filename
filename = filename_path.split("/")[-1]
# Obtención del mensaje
return u"DjangoVirtualPOS: {0} {1} \"{2}\" at {3}:{5} in {6} ({4}:{5})\n".format(
now.strftime("%Y-%m-%d %H:%M:%S %Z"), settings.DOMAIN, message,
filename, filename_path, caller_frame.f_lineno, caller_name
)
# Prints the debug message
def get_media(post):
if post.image:
media = "{0}{1}".format(settings.DOMAIN, post.image)
else:
media = None
return media
def handle(self, *args, **options):
posts = Post.objects.all()
for post in posts:
try:
ref = "{0}{1}/ ".format(settings.DOMAIN, post.slug)
dict = {'referer': ref}
dict.update(HEADERS)
obj = requests.get(post.url, headers=dict)
if obj.status_code == 200:
print((colored.green("OK for: {0}".format(post.url))))
except Exception as e:
print((colored.red("[ERROR]: {0}".format(e))))
self.stdout.write(self.style.SUCCESS('Successfully done parsing jobs'))
def get_ttachment(post, data):
attachment = {
'name': data['title'],
'link': '{0}{1}/'.format(settings.DOMAIN, post.slug)
}
sentiment = post.sentiment or "N/A"
summary = post.summary or ""
attachment['description'] = strip_tags(summary) + " " + sentiment
if post.image:
attachment['picture'] = '{0}{1}'.format(settings.DOMAIN, post.image)
return attachment
def setUpClass(cls):
super().setUpClass()
cls.tmpdir = tempfile.mkdtemp()
TEST_SMTPD_CONFIG= {
'listen_addr': ('127.0.0.1', 8024),
'domain': 'ecs',
}
cls.OLD_SMTPD_CONFIG = settings.SMTPD_CONFIG
cls.OLD_DOMAIN = settings.DOMAIN
settings.SMTPD_CONFIG = TEST_SMTPD_CONFIG
settings.DOMAIN = "ecs"
cls.mail_receiver = EcsMailReceiver()
def tearDownClass(cls):
shutil.rmtree(cls.tmpdir)
settings.SMTPD_CONFIG = cls.OLD_SMTPD_CONFIG
settings.DOMAIN = cls.OLD_DOMAIN
super().tearDownClass()
def return_address(self):
return 'ecs-{}@{}'.format(self.uuid.hex,
settings.DOMAIN)
def get_image(self, field):
if field:
url_prefix = 'http://%s/media/' % DOMAIN
return url_prefix + field.name
else:
return ''
def get_url_by_name(self, name, args=(), query_string={}, openid=False):
try:
url = reverse(name, args=args)
if query_string:
string = ''
for key in query_string:
string += key+'='+str(query_string[key]) + '&'
url = url + '?' + string.rstrip('&')
except:
url = ''
url = 'http://'+settings.DOMAIN+url
if openid:
return get_openid_api_url(self, url) #????openid
else:
return get_weixin_site_url(self, url)
def get_site_url(self):
'''??????'''
url = 'http://%s/appsite/%s/home/' % (DOMAIN, self.token)
return get_weixin_site_url(self, url)
def get_kefu_url(self):
'''??????'''
url = 'http://%s/appsite/%s/kefu/' % (DOMAIN, self.token)
return get_openid_api_url(self, url)
def get_vote_url(self):
'''??'''
url = reverse('appsite:vote', args=(self.token,))
url = 'http://%s%s' % (DOMAIN, url)
return get_openid_api_url(self, url)
def get_image_url(self):
if self.picurl:
return self.picurl
elif self.image:
url_prefix = 'http://%s/media/' % DOMAIN
return url_prefix + self.image.name
else:
return ''
def get_url(self):
appitem = self.get_appitem()
if appitem:
if not self.content and self.url:
return self.url
if self.tag == 'article':
url = 'http://%s/appsite/%s/article-detail/%s/' % (DOMAIN, appitem.token, self.id)
return get_weixin_site_url(appitem, url)
else:
return self.url
else:
return ''
def get_url(self):
appitem = self.appitem_set.first()
if appitem:
if self.tag == 'navigation':
url = 'http://%s/appsite/%s/navigation-list/%s/' % (DOMAIN, appitem.token, self.id)
return get_weixin_site_url(appitem, url)
if not self.is_single:
url = 'http://%s/appsite/%s/article-list/%s/' % (DOMAIN, appitem.token, self.id)
else:
articles = self.article_set.all()
article_count = articles.count()
albums = self.album_set.all()
album_count = albums.count()
if article_count == 1 and album_count == 0:
article = articles[0]
url = article.get_url()
elif article_count == 0 and album_count == 1:
album = albums[0]
url = album.get_url()
else:
url = 'http://%s/appsite/%s/article-list/%s/' % (DOMAIN, appitem.token, self.id)
return get_weixin_site_url(appitem, url)
else:
return ''
def get_image_url(self):
if self.image:
url_prefix = 'http://%s/media/' % DOMAIN
return url_prefix + self.image.name
else:
url_prefix = 'http://%s/' %DOMAIN
return url_prefix + 'static/yimi_admin/images/category_default.jpg'
def get_image_url(self):
if self.image:
url_prefix = 'http://%s/media/' % DOMAIN
return url_prefix + self.image.name
else:
return '/'
def get_image_url(self):
if self.image:
url_prefix = 'http://%s/media/' % DOMAIN
return url_prefix + self.image.name
else:
return '/'
def get_transfer_url(self):
url = 'http://%s/appsite/%s/navigation-detail/%s/transfer/' % (DOMAIN, self.appitem.token, self.id)
return get_openid_api_url(self.appitem, url)
def get_driving_url(self):
url = 'http://%s/appsite/%s/navigation-detail/%s/driving/' % (DOMAIN, self.appitem.token, self.id)
return get_openid_api_url(self.appitem, url)
def get_image_url(self):
if self.image:
url_prefix = 'http://%s/media/' % DOMAIN
return url_prefix + self.image.name
else:
return '/'
def get_image_url(self):
if self.image:
url_prefix = 'http://%s/media/' % DOMAIN
return url_prefix + self.image.name
else:
return '/'
def send_invitation(invitation):
existing = email_exists(invitation.email)
if existing:
invitation_link = '{}://{}{}?next={}'.format(
settings.PROTOCOL,
settings.DOMAIN,
reverse('social:begin', args=['gluu']),
reverse('account:accept-invite', kwargs={'activation_key': invitation.activation_key})
)
else:
invitation_link = '{}://{}{}'.format(
settings.PROTOCOL,
settings.DOMAIN,
reverse('account:register-invite', kwargs={'activation_key': invitation.activation_key})
)
context = {
'invited_by': invitation.invited_by.get_full_name(),
'account': invitation.invited_by.account.get_name(),
'invitation_link': invitation_link,
'existing': existing
}
send_mail(
subject_template_name='emails/invite/invite_billing_admin_subject.txt',
email_template_name='emails/invite/invite_billing_admin.txt',
to_email=invitation.email,
context=context,
html_email_template_name='emails/invite/invite_billing_admin.html'
)
def send_premium_invitation(invitation):
existing = email_exists(invitation.email)
if existing:
invitation_link = '{}://{}{}?next={}'.format(
settings.PROTOCOL,
settings.DOMAIN,
reverse('social:begin', args=['gluu']),
reverse('account:register-premium-auth', kwargs={'activation_key': invitation.activation_key}),
)
else:
invitation_link = '{}://{}{}'.format(
settings.PROTOCOL,
settings.DOMAIN,
reverse('account:register-premium', kwargs={'activation_key': invitation.activation_key}),
)
context = {
'invitation_link': invitation_link,
'existing': existing
}
send_mail(
subject_template_name='emails/invite/invite_premium_subject.txt',
email_template_name='emails/invite/invite_premium.txt',
to_email=invitation.email,
context=context,
html_email_template_name='emails/invite/invite_premium.html'
)
def send_summary_email(payment, record, last_4=None):
billing_management_link = '{}://{}{}'.format(
settings.PROTOCOL,
settings.DOMAIN,
reverse('payment:view-cards')
)
connection = get_connection(
username=settings.EMAIL_HOST_USER_BILLING,
password=settings.EMAIL_HOST_PASSWORD_BILLING
)
billing_date = '7th {} {}'.format(
month_name[datetime.date.today().month], datetime.date.today().year)
context = {
'billing_date': billing_date,
'account_name': payment.account.get_name(),
'billing_management_link': billing_management_link,
'last_4': last_4,
'payment': payment,
'record': record
}
recipients = payment.account.billing_admins.values_list('email', flat=True)
to_emails = [c for c in recipients]
send_mail(
subject_template_name='emails/billing/monthly_summary_subject.txt',
email_template_name='emails/billing/monthly_summary.txt',
to_email=to_emails,
bcc=[settings.BILLING_BCC, ],
context=context,
from_email=settings.BILLING_EMAIL,
html_email_template_name='emails/billing/monthly_summary.html',
connection=connection
)
def parse_desc(self):
ctx = {}
for download in self.download_set.all():
ctx['DL_{}'.format(download.slug)] = reverse(
'wui_download_file',
kwargs={'download_id': download.pk}
)
vm = self.vm
if vm:
if not vm.provider or vm.ip_addr == UNKNOWN_HOST:
return None
if vm.ip_addr == LOCALHOST:
ctx['HOST'] = settings.DOMAIN
else:
ctx['HOST'] = vm.ip_addr
try:
provider = ALLOWED_PROVIDERS[vm.provider]
except KeyError:
logger.warning(
"Illegal provider '%s' used in VM %s",
vm.provider, vm.slug
)
return None
for port in vm.port_set.all():
if provider.port_forwarding:
ctx['PORT_{}'.format(port.guest_port)] = port.host_port
else:
ctx['PORT_{}'.format(port.guest_port)] = port.guest_port
return self.desc.format_map(defaultdict(str, **ctx))