def get_bot_define_response(self, original_content: str) -> str:
split_content = original_content.split(' ')
# If there are more than one word (a phrase)
if len(split_content) > 1:
return DefineHandler.PHRASE_ERROR_MESSAGE
to_define = split_content[0].strip()
to_define_lower = to_define.lower()
# Check for presence of non-letters
non_letters = set(to_define_lower) - set(string.ascii_lowercase)
if len(non_letters):
return self.SYMBOLS_PRESENT_ERROR_MESSAGE
# No word was entered.
if not to_define_lower:
return self.EMPTY_WORD_REQUEST_ERROR_MESSAGE
else:
response = '**{}**:\n'.format(to_define)
try:
# Use OwlBot API to fetch definition.
api_result = requests.get(self.DEFINITION_API_URL.format(to_define_lower))
# Convert API result from string to JSON format.
definitions = api_result.json()
# Could not fetch definitions for the given word.
if not definitions:
response += self.REQUEST_ERROR_MESSAGE
else: # Definitions available.
# Show definitions line by line.
for d in definitions:
example = d['example'] if d['example'] else '*No example available.*'
response += '\n' + '* (**{}**) {}\n {}'.format(d['type'], d['defenition'], html2text.html2text(example))
except Exception as e:
response += self.REQUEST_ERROR_MESSAGE
logging.exception("")
return response
python类html2text()的实例源码
def review_notification(email_url, user_email, context, mail, copy_to_staffs=False, copy_to_gatekeeper=False): # pylint: disable=too-many-arguments
"""Compose the message and send the email."""
if config.CLAIMANT_EMAIL_NOTIFICATION and email_url is not None:
# Generate message
flatemail = FlatPage.objects.get(url=email_url)
template = Template(flatemail.content)
context.update({
"notes": mail.justification,
"protocol": "https",
"site": Site.objects.get(id=SITE_ID),
"FELLOWS_MANAGEMENT_EMAIL": config.FELLOWS_MANAGEMENT_EMAIL,
})
context = Context(context)
html = template.render(context)
plain_text = html2text(html)
mail.justification = plain_text
# Email to claimant
msg = EmailMultiAlternatives(
flatemail.title,
plain_text,
mail.sender.email,
user_email,
cc=[config.WEBSITE_GATEKEEPER_EMAIL] if copy_to_gatekeeper else None,
bcc=ast.literal_eval(config.STAFFS_EMAIL) if copy_to_staffs else None,
reply_to=[config.FELLOWS_MANAGEMENT_EMAIL]
)
msg.attach_alternative(html, "text/html")
msg.send(fail_silently=False)
# Every email is archived in the database
mail.save()
def update_check(self):
await self.bot.wait_until_ready()
self.bot.logger.debug("Started GAF Steam Announcement RSS Update Check Loop")
while not self.bot.is_closed():
response, _, code = await net.get_url("http://steamcommunity.com/groups/TheNeverEndingGAF/rss/")
xml = await response.read()
root = etree.fromstring(xml)
last_pub = dateparser.parse(self.bot.config["pub_dates"]["gaf"])
new_posts = []
for element in root.xpath("//item"):
post_pub = dateparser.parse(element[3].text)
if post_pub > last_pub:
new_posts.append(element)
# Iterate over new posts
for i, p in reversed(list(enumerate(new_posts))):
# Update date if it's the newest post. Should be last elemen iterated through
if i == 0:
self.bot.config["pub_dates"]["gaf"] = p[3].text
await self.bot.update_config()
self.bot.logger.debug("Updated GAF pub date")
# Post to guilds
for guild in self.bot.guilds:
guild_config = await self.bot.get_guild_config(guild.id)
if guild_config["feeds"]["gaf"]["enabled"]:
channel = discord.utils.get(guild.channels, id=guild_config["feeds"]["gaf"]["channel"])
with channel.typing():
if len(html2text.html2text(p.find("description").text)) > 1900:
content = html2text.html2text(p.find("description").text[:1900]) + ". . ."
else:
content = html2text.html2text(p.find("description").text)
embed = discord.Embed(
title="{}".format(p.find("title").text),
colour=discord.Colour.gold(),
url="{}".format(p.find("link").text),
timestamp=dateparser.parse(p[3].text),
description=content
)
embed.set_thumbnail(url="http://www.neverendinggaf.com/graphics/logos/gaf-logo.jpg")
embed.set_footer(text="Author - {}".format(p.find("author").text))
if "@everyone" in content:
message_content = "**New Announcement** - Content Below @everyone"
else:
message_content = "**New Announcement** - Content Below"
message_content += "\n*Author* : {}".format(p.find("author").text)
await channel.send(content=message_content, embed=embed)
self.bot.logger.debug(f"Sent new GAF Steam Announcement to guild {guild} channel {channel}")
await asyncio.sleep(60)
def new_notification(staff_url, email_url, user_email, context, mail):
if config.STAFF_EMAIL_NOTIFICATION:
# Email to staff
context.update({
"protocol": "https",
"site": Site.objects.get(id=SITE_ID),
"FELLOWS_MANAGEMENT_EMAIL": config.FELLOWS_MANAGEMENT_EMAIL,
})
flatemail = FlatPage.objects.get(url=staff_url)
template = Template(flatemail.content)
jinja_context = Context(context)
html = template.render(jinja_context)
plain_text = html2text(html)
mail_staffs(
flatemail.title,
plain_text,
html_message=html,
fail_silently=False
)
if config.CLAIMANT_EMAIL_NOTIFICATION:
# Email to claimant
context.update({
"protocol": "https",
"site": Site.objects.get(id=SITE_ID),
"FELLOWS_MANAGEMENT_EMAIL": config.FELLOWS_MANAGEMENT_EMAIL,
})
flatemail = FlatPage.objects.get(url=email_url)
template = Template(flatemail.content)
jinja_context = Context(context)
html = template.render(jinja_context)
plain_text = html2text(html)
msg = EmailMultiAlternatives(
flatemail.title,
plain_text,
DEFAULT_FROM_EMAIL,
user_email,
reply_to=[config.FELLOWS_MANAGEMENT_EMAIL]
)
msg.attach_alternative(html, "text/html")
msg.send(fail_silently=False)
mail.justification = plain_text
mail.save()
def to_text(html, rehtml=False):
parser = HTML2Text()
parser.wrap_links = False
parser.skip_internal_links = True
parser.inline_links = True
parser.ignore_anchors = True
parser.ignore_images = True
parser.ignore_emphasis = True
parser.ignore_links = True
text = parser.handle(html)
text = text.strip(' \t\n\r')
if rehtml:
text = text.replace('\n', '<br/>')
text = text.replace('\\', '')
return text
def get_queryset(self, krs, nip, regon, google, no_regon, no_nip):
regon = regon or []
nip = nip or []
self.processor = html2text.HTML2Text()
self.processor.ignore_emphasis = True
self.processor.bypass_tables = True
self.processor.ignore_links = True
self.session = requests.Session()
for keyword in tqdm(google or []):
if not no_regon:
result = self.search_google("{} REGON".format(keyword), REGON_PATTERN)
print("For '{}' found {}".format(keyword, result))
regon += result
if not no_nip:
result = self.search_google("{} NIP".format(keyword), NIP_PATTERN)
print("For '{}' found {}".format(keyword, result))
nip += [x.replace('-', '') for x in result if len(x.replace('-', '')) == 10]
queries = [{'krs': v} for v in set(krs)] if krs else []
queries += [{'nip': v} for v in set(nip)] if nip else []
queries += [{'regon': v} for v in set(regon)] if regon else []
return queries
def set_data (self, resp):
if not resp.text.strip ():
self.data = None
else:
ct = resp.headers.get ('content-type')
if ct is None or ct.find ('text/html') == 0:
h = html2text.HTML2Text()
h.ignore_links = True
text = h.handle(resp.text)
self.data = text
elif ct is None or ct.find ('text/') == 0:
self.data = resp.text.strip ()
else:
data = resp.json ()
if isinstance (data, dict):
self.data.update (data)
else:
self.data = data
if not str(resp.status_code).startswith("2"):
raise AssertionError ("%s %s\n%s\n%s" % (resp.status_code, resp.reason, "-" * (20 + len (resp.reason)), self))
def __init__(self):
"""
"""
try:
directory = settings.GENERAL_CONFIG['email_storage_dir']
if not os.path.exists(directory):
os.makedirs(directory)
except Exception as ex:
raise MailerServiceException(ex)
self._db_conn = sqlite3.connect(directory + '/' + CERBERUS_EMAIL_DB)
cursor = self._db_conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS emails
(publicid text, sender text, recipient text, subject text, body text, category text, timestamp int)''')
self._db_conn.commit()
self._html_parser = html2text.HTML2Text()
self._html_parser.body_width = 0
def mk_plaintext(self):
try:
h = html2text.HTML2Text()
h.ignore_images = True
h.inline_links = False
h.wrap_links = False
h.unicode_snob = True # Prevents accents removing
h.skip_internal_links = True
h.ignore_anchors = True
h.body_width = 0
h.use_automatic_links = True
h.ignore_tables = True
except html.parser.HTMLParseError as e:
raise WrongHTML(e)
return h.handle(self.mk_html())
def parse_item(self, response):
title = response.xpath('//h1[@class="title"]/text()').extract()[0]
body = response.xpath('//div[@class="show-content"]').extract()[0]
attr = response.xpath('//script[@data-name="note"]/text()').extract()
images = response.xpath('//div[@class="image-package"]/img/@src').extract()
notes = json.loads(attr[0].strip())
# ??markdown ??
h = html2text.HTML2Text()
h.ignore_links = False
h.inline_links = False
content = h.handle(body)
item = JianshuItem()
item["title"] = title
item["content"] = content.replace('-\n', '-').replace('\n?', '?')
item["url"] = notes['url']
item["slug"] = notes['slug']
item["views_count"] = notes['views_count']
item["likes_count"] = notes['likes_count']
item["images"] = images
yield item
def _load_entry(self, entry, fields, add_content):
res = list(_get_val_from_rss_entry(entry, fields))
if add_content:
content = _get_content_from_rss_entry(entry)
if content:
if self._conf["html2text"]:
try:
import html2text as h2t
content = h2t.HTML2Text(bodywidth=74).handle(content)
except ImportError:
self._ctx.log_error(
"RssInput: loading HTML2Text error "
"(module not found)")
res.append("")
res.extend(" " + line.strip()
for line in content.strip().split("\n"))
self._ctx.log_debug(repr(res))
return "\n".join(res).strip()
def printImportScreen(title, url, src, keyword):
print '====================================='
title = (title.encode('utf-8').replace(' ',''))[:50] + '...'
print 'Found Keyword in the page "%s"' % (title)
print 'URL: %s' % url
print '====================================='
h = html2text.HTML2Text()
h.ignore_links = True
foundKey = False
listCount = 0
gg = h.handle(src)
for i in gg.encode('utf-8').split('\n'):
if keyword in i: foundKey = True
if i == '\n': continue
if foundKey:
listCount += 1
if listCount > 10:
break
print i
return
def parse(self, response):
data = json.loads(response.text)
converter = html2text.HTML2Text()
for job in data['hits']['hits']:
item = JobItem()
item['url'] = urljoin(
"https://www.workingnomads.co/jobs/",
job['_source']['slug'])
item['title'] = job['_source']['title']
item['site'] = 'WorkingNomads'
item['text'] = converter.handle(job['_source']['description'])
item['text'] = [item['text'] + ' '.join(item.get('tags', []))]
try:
posted = converter.handle(job['_source']['pub_date'])
item['date_posted'] = posted.split('+')[0]
except Exception as e:
self.logger.error(e)
yield item
def textwindow(url):
title = url
h = html2text.HTML2Text()
h.ignore_links = True
h.ignore_images = True
s = gethtml(url)
s = h.handle(s)
s = h.unescape(s)
text = convert65536(s)
top = Tkinter.Toplevel()
top.geometry("+200+100")
top.title(title)
top.bind("<Escape>", lambda _ : top.destroy())
S = Tkinter.Scrollbar(top)
customFont = tkFont.Font(family="Arial", size=16)
T = TextPlus(top,height=20,width=78,font=customFont,bg="lightgrey")
S.pack(side=Tkinter.RIGHT,fill=Tkinter.Y)
T.pack(side=Tkinter.LEFT,fill=Tkinter.Y)
S.config(command=T.yview)
T.config(yscrollcommand=S.set)
T.insert(Tkinter.END,text)
def replace_markdown(self, html):
h2t = html2text.HTML2Text()
h2t.body_width = 0
h2t.default_image_alt = "IMAGE"
return h2t.handle(html)
def dehtmlify(body):
"""
Try to dehtmlify a text
:param str body: The html content
:rtype: str
:return: The dehtmlified content
"""
html = html2text.HTML2Text()
html.body_width = 0
body = html.handle(body.replace('\r\n', '<br/>'))
body = re.sub(r'^(\s*\n){2,}', '\n', body, flags=re.MULTILINE)
return body
def get_dehtmlified(report_id):
""" Get raw email of report
"""
try:
report = Report.objects.get(id=report_id)
html = html2text.HTML2Text()
html.body_width = 0
body = html.handle(report.body.replace('\r\n', '<br/>'))
body = re.sub(r'^(\s*\n){2,}', '\n', body, flags=re.MULTILINE)
return {'dehtmlify': body}
except (ObjectDoesNotExist, ValueError):
raise NotFound('Report not found')
def format_mail(loop, msg, to_text=True, ignore_tables=True):
"""Format the mail to markdown
Parameter
---------
msg: email.message
to_text: bool, optional
Convert text/html mails to text/plain with markdown formatting
Returns
-------
text: str
"""
h = html2text.HTML2Text()
h.ignore_tables = ignore_tables
body = None
for part in msg.walk():
if to_text and part.get_content_type() == "text/html":
body = h.handle(quopri.decodestring(part.get_payload()).decode())
break
elif part.get_content_type() == "text/plain":
body = quopri.decodestring(part.get_payload())
break
if not body:
log.error("Could not find text body mail")
body = quopri.decodestring(msg.as_string())
text = f"### {msg['Subject']} \n {body}"
return text
def edit_file(self, full_path, filename, to_zim=False):
text_maker = html2text.HTML2Text()
with open(full_path, 'r') as f:
html = f.read()
content = ''
if html:
try:
content = text_maker.handle(unicode(html, errors='ignore'))
content = content.encode('ascii', 'ignore')
content = content.split('\00')[0] # remove null chars
content = content.replace('\.', '.') # remove escape chars
except Exception as e:
self._exception('convert content of note to markdown', full_path, e)
else:
content = ''
if to_zim:
content = self.to_zim_syntax(content)
fn_path = self._rename_file(full_path, filename)
with open(fn_path, 'w') as f:
try:
f.write(content.encode('ascii', 'ignore'))
except Exception as e:
self._exception('save note', fn_path, e)
return
def send_reset_password_email(user: models.User) -> None:
token = user.get_reset_token()
html_body = current_app.config['EMAIL_TEMPLATE'].replace(
'\n\n', '<br><br>'
).format(
site_url=current_app.config["EXTERNAL_URL"],
url=f'{psef.app.config["EXTERNAL_URL"]}/reset_'
f'password/?user={user.id}&token={token}',
user_id=user.id,
token=token,
user_name=html.escape(user.name),
user_email=html.escape(user.email),
)
text_maker = html2text.HTML2Text(bodywidth=78)
text_maker.inline_links = False
text_maker.wrap_links = False
message = Message(
subject=f'Reset password on {psef.app.config["EXTERNAL_URL"]}',
body=text_maker.handle(html_body),
html=html_body,
recipients=[user.email],
)
try:
mail.send(message)
except Exception:
raise APIException(
'Something went wrong sending the email, '
'please contact your site admin',
f'Sending email to {user.id} went wrong.',
APICodes.UNKOWN_ERROR,
500,
)
def dom2text(dom, ignore_images=True, ignore_emphasis=True, ignore_tables=True):
from lxml import etree
import html2text
htt = html2text.HTML2Text()
htt.body_width = 0
htt.ignore_images = ignore_images
htt.ignore_emphasis = ignore_emphasis
htt.ignore_tables = ignore_tables
return htt.handle(etree.tostring(dom).decode())
def reply_from_template(self, template_name, extra_context=None, html=False):
context = {
'msg': self,
'settings': settings,
}
if extra_context:
context.update(extra_context)
body = render_to_string(template_name, context)
subject = 'Re: ' + self.subject
to = '{} <{}>'.format(self.from_name, self.from_email) if self.from_name else self.from_email
if html:
h = html2text.HTML2Text(bodywidth=0)
text_content = h.handle(body)
msg = EmailMultiAlternatives(subject, text_content, settings.DEFAULT_FROM_EMAIL, [to])
msg.attach_alternative(body, "text/html")
msg.send(fail_silently=False)
else:
return send_mail(
subject,
body,
settings.DEFAULT_FROM_EMAIL,
[to],
fail_silently=False,
)
def html2markdown(html):
"""html is unicode"""
if not html:
return html
h = html2text.HTML2Text()
h.ignore_images = True
h.ignore_links = True
return h.handle(html)
def html2markdown(html, url, download_image, image_path):
if not download_image:
h = HTML2Text(baseurl = url, bodywidth = 0)
else:
html = download_html_image(url, html, image_path)
h = HTML2Text(bodywidth = 0)
md = h.handle(html)
return md
def _filter(self, item: str, result: common.Result) -> ty.Iterable[str]:
assert isinstance(item, str)
try:
import html2text as h2t
except ImportError:
raise common.FilterError(self, "module html2text not found")
conv = h2t.HTML2Text(bodywidth=self._conf.get("width"))
yield conv.handle(item)
def main():
speech.stop()
if not appex.is_running_extension():
console.hud_alert('Reading clipboard')
text = clipboard.get()
url = None
else:
text = appex.get_text()
url = appex.get_url()
if url == None:
try:
url = [ mgroups[0] for mgroups in GRUBER_URLINTEXT_PAT.findall(text) ][0]
except:
pass
if url != None:
console.hud_alert('Reading: ' + url)
h = html2text.HTML2Text()
try:
r = requests.get(
url=url,
headers={"User-agent": "Mozilla/5.0{0:06}".format(random.randrange(999999))})
except requests.ConnectionError as e:
console.alert('Unable to connect to url.')
return True
html_content = r.text.decode('utf-8')
text = html2text.html2text(html_content)
else:
console.hud_alert('Reading text: ' + str(text))
if text:
speech.say(text)
stop = console.alert('Done?', hide_cancel_button=True, button1='OK')
speech.stop()
else:
console.hud_alert('No text found.')
def main():
if appex.is_running_extension():
url = appex.get_url()
if url == None:
text = appex.get_text()
url = [ mgroups[0] for mgroups in GRUBER_URLINTEXT_PAT.findall(text) ][0]
else:
text = clipboard.get().strip()
url = [ mgroups[0] for mgroups in GRUBER_URLINTEXT_PAT.findall(text) ][0]
if not "http" in url:
url = "http://"
try:
url = console.input_alert("URL", "", url)
except:
return True
console.hud_alert('URL: %s' % url)
h = html2text.HTML2Text()
try:
r = requests.get(
url=url,
headers={"User-agent": "Mozilla/5.0{0:06}".format(random.randrange(999999))}
)
except Exception as e:
raise(e.message)
return True
html_content = r.text.decode('utf-8')
rendered_content = html2text.html2text(html_content)
clipboard.set(rendered_content)
launch_e = console.alert('Markdown copied to clipboard. Launch Evernote?', button1='Yes', button2='No', hide_cancel_button=True)
if launch_e ==1:
_eurl = "evernote://x-callback-url/new-note?type=clipboard&title=DRAFT&text="
app=UIApplication.sharedApplication()
eurl=nsurl(_eurl)
app.openURL_(eurl)
appex.finish()
def main():
if appex.is_running_extension():
url = appex.get_url()
else:
url = clipboard.get().strip()
if not RE_URL.match(url):
try:
url = console.input_alert("Enter gamefaqs URL", "", "https://www.gamefaqs.com/")
except KeyboardInterrupt:
sys.exit(0)
newurl = "{0}?print=1".format(url)
#baseurl = http://www.gamefaqs.com/ps3/959558-fallout-new-vegas/faqs/61226
if RE_URL.match(url):
h = html2text.HTML2Text()
r = requests.get(
url=newurl,
headers={"User-agent": "Mozilla/5.0{0:06}".format(random.randrange(999999))}
)
html_content = r.text.decode('utf-8')
rendered_content = html2text.html2text(html_content)
filename = url.partition("gamefaqs.com/")[-1].partition("/")[-1].partition("/faqs")[0]+".txt"
filepath = os.path.join(os.path.expanduser("~/Documents"), filename)
with open(filepath, "w") as fo:
fo.write(rendered_content)
console.hud_alert('Success! Saved {0}'.format(filename), "success")
def detect_language(html) :
"""
Detect the language of the text content of a page.
"""
# handle string, need bytes
try :
html = html.decode("utf8")
except :
try :
html = html.decode("latin1")
except :
pass
h = html2text.HTML2Text()
return langdetect.detect(h.handle(html))
def html2string(self, response):
"""HTML 2 string converter. Returns a string."""
converter = html2text.HTML2Text()
converter.ignore_links = True
encoding = self.detect_encoding(response)
decoded_html = response.body.decode(encoding, 'ignore')
string = converter.handle(decoded_html)
return string