def __init__(self, dictionary_file):
with open(dictionary_file, 'r') as f:
self.word2i = json.load(f)['word2i']
self.wpt = TweetTokenizer(preserve_case=False)
if "<stop_dialogue>" not in self.word2i:
self.word2i["<stop_dialogue>"] = len(self.word2i)
self.i2word = {}
for (k, v) in self.word2i.items():
self.i2word[v] = k
# Retrieve key values
self.no_words = len(self.word2i)
self.start_token = self.word2i["<start>"]
self.stop_token = self.word2i["?"]
self.stop_dialogue = self.word2i["<stop_dialogue>"]
self.padding_token = self.word2i["<padding>"]
self.yes_token = self.word2i["<yes>"]
self.no_token = self.word2i["<no>"]
self.non_applicable_token = self.word2i["<n/a>"]
self.answers = [self.yes_token, self.no_token, self.non_applicable_token]
python类TweetTokenizer()的实例源码
def __init__(self, root, fileids=None,
word_tokenizer=TweetTokenizer(),
encoding='utf8'):
"""
:param root: The root directory for this corpus.
:param fileids: A list or regexp specifying the fileids in this corpus.
:param word_tokenizer: Tokenizer for breaking the text of Tweets into
smaller units, including but not limited to words.
"""
CorpusReader.__init__(self, root, fileids, encoding)
for path in self.abspaths(self._fileids):
if isinstance(path, ZipFilePathPointer):
pass
elif os.path.getsize(path) == 0:
raise ValueError("File {} is empty".format(path))
"""Check that all user-created corpus files are non-empty."""
self._word_tokenizer = word_tokenizer
twitter.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def __init__(self, root, fileids=None,
word_tokenizer=TweetTokenizer(),
encoding='utf8'):
"""
:param root: The root directory for this corpus.
:param fileids: A list or regexp specifying the fileids in this corpus.
:param word_tokenizer: Tokenizer for breaking the text of Tweets into
smaller units, including but not limited to words.
"""
CorpusReader.__init__(self, root, fileids, encoding)
for path in self.abspaths(self._fileids):
if isinstance(path, ZipFilePathPointer):
pass
elif os.path.getsize(path) == 0:
raise ValueError("File {} is empty".format(path))
"""Check that all user-created corpus files are non-empty."""
self._word_tokenizer = word_tokenizer
def __init__(self, root, fileids=None,
word_tokenizer=TweetTokenizer(),
encoding='utf8'):
"""
:param root: The root directory for this corpus.
:param fileids: A list or regexp specifying the fileids in this corpus.
:param word_tokenizer: Tokenizer for breaking the text of Tweets into
smaller units, including but not limited to words.
"""
CorpusReader.__init__(self, root, fileids, encoding)
for path in self.abspaths(self._fileids):
if isinstance(path, ZipFilePathPointer):
pass
elif os.path.getsize(path) == 0:
raise ValueError("File {} is empty".format(path))
"""Check that all user-created corpus files are non-empty."""
self._word_tokenizer = word_tokenizer
def tokenize_texts(texts, words):
results = []
for text in texts:
t = text.lower().strip()
t = t.replace('\n', ' ').replace('\t', ' ')
t = t.replace("'s", " 's ")
t = t.replace("'ll", " 'll ")
t = t.replace('-', ' - ')
t = t.replace('.', ' . ')
res = TweetTokenizer(preserve_case=False, reduce_len=True).tokenize(t)
ids = []
for w in res:
w_id = words.get(w)
if w_id is None:
# log.warning("Unknown word found: %s", w)
w_id = 0
ids.append(w_id)
results.append(ids)
return results
def __init__(self, root, fileids=None,
word_tokenizer=TweetTokenizer(),
encoding='utf8'):
"""
:param root: The root directory for this corpus.
:param fileids: A list or regexp specifying the fileids in this corpus.
:param word_tokenizer: Tokenizer for breaking the text of Tweets into
smaller units, including but not limited to words.
"""
CorpusReader.__init__(self, root, fileids, encoding)
for path in self.abspaths(self._fileids):
if isinstance(path, ZipFilePathPointer):
pass
elif os.path.getsize(path) == 0:
raise ValueError("File {} is empty".format(path))
"""Check that all user-created corpus files are non-empty."""
self._word_tokenizer = word_tokenizer
def read_data(file=file_path):
col_names = ['System-Id', 'Message', 'drug-offset-start', 'drug-offset-end', 'sideEffect-offset-start',
'sideEffect-offset-end', 'WM1', 'WM2', 'relType']
data_frame = pd.read_csv(file, skipinitialspace=True, usecols=col_names)
mssg_frame = data_frame['Message'].drop_duplicates()
tokenizer = TweetTokenizer()
string = []
for mssg in mssg_frame:
tokens = tokenizer.tokenize(mssg)
for token in tokens:
if is_word(token):
string.append(token.lower())
if not os.path.isfile("words.txt"):
with open("words.txt", "w") as text_file:
print(string, file=text_file)
return data_frame
# TODO use space splitter and then strip the word
# TODO change regex to [a-z0-9].+
def __init__(self, root, fileids=None,
word_tokenizer=TweetTokenizer(),
encoding='utf8'):
"""
:param root: The root directory for this corpus.
:param fileids: A list or regexp specifying the fileids in this corpus.
:param word_tokenizer: Tokenizer for breaking the text of Tweets into
smaller units, including but not limited to words.
"""
CorpusReader.__init__(self, root, fileids, encoding)
for path in self.abspaths(self._fileids):
if isinstance(path, ZipFilePathPointer):
pass
elif os.path.getsize(path) == 0:
raise ValueError("File {} is empty".format(path))
"""Check that all user-created corpus files are non-empty."""
self._word_tokenizer = word_tokenizer
def preprocess(tweet):
preprocessed = copy.copy(tweet)
preprocessed = preprocessed.lower()
# remove some emoticons the TweetTokenizer does not know
preprocessed = remove_emoticons(preprocessed)
# split contractions like "he's" -> "he s",
# by using imported contractions dictionary
preprocessed = split_contractions(preprocessed)
# split compounds like "next-level" -> "next level"
preprocessed = split_compounds(preprocessed)
# remove links
preprocessed = remove_links(preprocessed)
# remove all special characters and return tokenized text
preprocessed = remove_special_characters(preprocessed)
preprocessed = remove_empty_sentences(preprocessed)
return preprocessed
def __init__(self, root, fileids=None,
word_tokenizer=TweetTokenizer(),
encoding='utf8'):
"""
:param root: The root directory for this corpus.
:param fileids: A list or regexp specifying the fileids in this corpus.
:param word_tokenizer: Tokenizer for breaking the text of Tweets into
smaller units, including but not limited to words.
"""
CorpusReader.__init__(self, root, fileids, encoding)
for path in self.abspaths(self._fileids):
if isinstance(path, ZipFilePathPointer):
pass
elif os.path.getsize(path) == 0:
raise ValueError("File {} is empty".format(path))
"""Check that all user-created corpus files are non-empty."""
self._word_tokenizer = word_tokenizer
def __init__(self, root, fileids=None,
word_tokenizer=TweetTokenizer(),
encoding='utf8'):
"""
:param root: The root directory for this corpus.
:param fileids: A list or regexp specifying the fileids in this corpus.
:param word_tokenizer: Tokenizer for breaking the text of Tweets into
smaller units, including but not limited to words.
"""
CorpusReader.__init__(self, root, fileids, encoding)
for path in self.abspaths(self._fileids):
if isinstance(path, ZipFilePathPointer):
pass
elif os.path.getsize(path) == 0:
raise ValueError("File {} is empty".format(path))
"""Check that all user-created corpus files are non-empty."""
self._word_tokenizer = word_tokenizer
def rank_by_inverted_words(raw_query, filehashes=None):
from nltk.tokenize import TweetTokenizer
tokenizer = TweetTokenizer()
keywords = tokenizer.tokenize(raw_query)
kv_paperwords = lambda filehash: KeyValueStore('paperwords:' + filehash)
if not filehashes: # retrieve all from db. complexity warning.
scopes = KeyValueStore.scopes('paper:*')
filehashes = [scope[len('paper:'):] for scope in scopes]
score_by_filehash = {}
for filehash in filehashes:
word_dict = kv_paperwords(filehash)
score = 0.
for word in keywords:
score += word_dict.get(word, default=0.)
score_by_filehash[filehash] = score
print score_by_filehash
return sorted(score_by_filehash, key=lambda k: score_by_filehash[k], reverse=True)
def predict(input_string):
mask = lambda w, v: 1 if w not in v else v[w]
tknzr = TweetTokenizer(reduce_len=True, preserve_case=False)
words = tknzr.tokenize(input_string)
vec = [[mask(w, pd.vocab) for w in words]]
vec = np.array( vec, dtype="int32")
vec = pad_sequences(vec, maxlen=pd.max_sequence)
predictions = model.predict(vec)
sarcasm = round(predictions[0][1], 2) * 100
return (words, sarcasm)
##################################################################
def twitter_tokenizer(x):
return TweetTokenizer(strip_handles=True).tokenize(x)
def get_tweet_tags(tweet):
""" Break up a tweet into individual word parts """
tknzr = TweetTokenizer()
tokens = tknzr.tokenize(tweet)
# replace handles with real names
for n, tok in enumerate(tokens):
if tok.startswith('@'):
handle = tok.strip("@")
if handle in user.students:
# If we have a database entry for the mentioned user, we can
# easily substitute a full name.
usr = user.NPUser(handle)
tokens[n] = usr.fullname
else:
# If there is no database entry, we use the user's alias. While
# this is the full name in many cases, it is often not reliable
usr = api.get_user(handle)
tokens[n] = usr.name
tagged = nltk.pos_tag(tokens)
# In nltk, if a teacher's name is written with a period after an
# abbreviated prefix, it is awkwardly broken up into 3 tags
for n, tag in enumerate(tagged):
# If there is the weird period after the prefix,
if tag[1] == '.':
# and it is in fact splitting up a person's name,
if tagged[n - 1][1] == 'NNP' and tagged[n + 1][1] == 'NNP':
if tagged[n - 1][0] in ['Mr', 'Ms', 'Mrs', 'Mx']:
# combine it into the actual name,
tagged[n - 1] = ('{}. {}'.format(tagged[n - 1][0],
tagged[n + 1][0]), 'NNP')
# and then remove the extra tags.
del tagged[n + 1]
del tagged[n]
return tagged
def preprocess_tweets( docs, stopwords, min_df = 3, min_term_length = 2, ngram_range = (1,1), apply_tfidf = True, apply_norm = True):
"""
Preprocess a list containing text documents stored as strings, where the documents have already been tokenized and are separated by whitespace
"""
from nltk.tokenize import TweetTokenizer
tweet_tokenizer = TweetTokenizer(preserve_case = False, strip_handles=True, reduce_len=True)
def custom_tokenizer( s ):
# need to manually replace quotes
s = s.replace("'"," ").replace('"',' ')
tokens = []
for x in tweet_tokenizer.tokenize(s):
if len(x) >= min_term_length:
if x[0] == "#" or x[0].isalpha():
tokens.append( x )
return tokens
# Build the Vector Space Model, apply TF-IDF and normalize lines to unit length all in one call
if apply_norm:
norm_function = "l2"
else:
norm_function = None
tfidf = TfidfVectorizer(stop_words=stopwords, lowercase=True, strip_accents="unicode", tokenizer=custom_tokenizer, use_idf=apply_tfidf, norm=norm_function, min_df = min_df, ngram_range = ngram_range)
X = tfidf.fit_transform(docs)
terms = []
# store the vocabulary map
v = tfidf.vocabulary_
for i in range(len(v)):
terms.append("")
for term in v.keys():
terms[ v[term] ] = term
return (X,terms)
# --------------------------------------------------------------
def test_tweet_tokenizer(self):
"""
Test TweetTokenizer using words with special and accented characters.
"""
tokenizer = TweetTokenizer(strip_handles=True, reduce_len=True)
s9 = "@myke: Let's test these words: resumé España München français"
tokens = tokenizer.tokenize(s9)
expected = [':', "Let's", 'test', 'these', 'words', ':', 'resumé',
'España', 'München', 'français']
self.assertEqual(tokens, expected)
def tweet_tokenize(self, tweet):
#http://www.nltk.org/api/nltk.tokenize.html
tknzr = TweetTokenizer()
tokens = tknzr.tokenize(tweet)
return tokens
def tokenize(tweet):
tknzr = TweetTokenizer(strip_handles=True, reduce_len=True, preserve_case=False)
return tknzr.tokenize(tweet)
# Read cleaned training tweets file into pandas and randomize it
test_tokenize.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_tweet_tokenizer(self):
"""
Test TweetTokenizer using words with special and accented characters.
"""
tokenizer = TweetTokenizer(strip_handles=True, reduce_len=True)
s9 = "@myke: Let's test these words: resumé España München français"
tokens = tokenizer.tokenize(s9)
expected = [':', "Let's", 'test', 'these', 'words', ':', 'resumé',
'España', 'München', 'français']
self.assertEqual(tokens, expected)
def __init__(self):
self.tokenizers = {
'en': TweetTokenizer(),
'de': WordPunctTokenizer(),
'it': WordPunctTokenizer(),
'fr': WordPunctTokenizer(),
'default': WordPunctTokenizer()
}
self.tokenizer = TweetTokenizer()
def load_tweetkeywords():
"""
Check and see which keywords are used in each tweet, and load the association
table linking tweets and keywords
"""
# TweetKeyword.query.delete()
tweets = Tweet.query.all()
keyword_query = Keyword.query.all()
keywords = []
[keywords.append(word.keyword) for word in keyword_query]
tknzr = TweetTokenizer()
for tweet in tweets:
tokenized_tweets = tknzr.tokenize(tweet.text)
for token in tokenized_tweets:
if token in keywords:
tweet_id = Tweet.query.filter(Tweet.tweet_id == tweet.tweet_id).one()
keyword_id = Keyword.query.filter(Keyword.keyword == token).one()
tweet_keyword = TweetKeyword(keyword_id=keyword_id.keyword_id, tweet_id=tweet_id.tweet_id)
print "Added to TweetKeyword table: {}".format(tweet_keyword.keyword_id)
db.session.add(tweet_keyword)
db.session.commit()
################################################################################
def load_data_and_labels_sam():
# load
with open("./input/2780_freshmen_tweets.csv", 'rU') as f:
rdr = csv.reader(f)
dataset = list(rdr)[1:] # remove header
# filter out tweets with unknown sentiment
dataset = [entry for entry in dataset if entry[4] != '0']
# generate x
tk = TweetTokenizer(reduce_len=True)
x_text = [entry[3] for entry in dataset]
x_text = [clean_str(tweet) for tweet in x_text]
x_text = [tk.tokenize(tweet) for tweet in x_text]
# generate y
y = [entry[4] for entry in dataset]
for idx, label in enumerate(y):
if label == '1': # positive
y[idx] = [1, 0, 0]
elif label == '2': # neutral
y[idx] = [0, 1, 0]
elif label == '3': # negative
y[idx] = [0, 0, 1]
else:
print 'wrong label in sam: ' + label
return [x_text, y]
def load_data_and_labels_gameforum():
# load
with open("./input/gameforum-1000.csv", 'rU') as f:
rdr = csv.reader(f)
dataset = list(rdr)[1:] # remove header
dataset = [entry for entry in dataset if (entry[1] == '1' or entry[1] == '2' or entry[1] == '3')]
# generate x
tk = TweetTokenizer(reduce_len=True)
x_text = [entry[0] for entry in dataset]
x_text = [clean_str(post) for post in x_text]
x_text = [tk.tokenize(post) for post in x_text]
# generate y
y = [entry[1] for entry in dataset]
for idx, label in enumerate(y):
if label == '1': # positive
y[idx] = [1, 0, 0]
elif label == '2': # neutral
y[idx] = [0, 1, 0]
elif label == '3': # negative
y[idx] = [0, 0, 1]
else:
print 'wrong label in gameforum: ' + label
return [x_text, y]
def test_tweet_tokenizer(self):
"""
Test TweetTokenizer using words with special and accented characters.
"""
tokenizer = TweetTokenizer(strip_handles=True, reduce_len=True)
s9 = "@myke: Let's test these words: resumé España München français"
tokens = tokenizer.tokenize(s9)
expected = [':', "Let's", 'test', 'these', 'words', ':', 'resumé',
'España', 'München', 'français']
self.assertEqual(tokens, expected)
def __init__(self, input_text, state_size=2, chain=None):
self.tokenizer = TweetTokenizer(reduce_len=True)
self.tag_sep = "@::@"
# Circumvent some limitations of markovify by allowing one to create a
# POSifiedText from a markovify.Text instance
if isinstance(input_text, markovify.Text):
m = input_text
self.input_text = m.input_text
self.rejoined_text = m.rejoined_text
self.chain = m.chain
else:
super().__init__(input_text, state_size, chain)
def load_model(config, model):
"""
Load a complete model and censor with path to model
:param config:
:param model:
:return:
"""
# Load model
model = Classifier.load(model)
censor = CensorModel(config)
# Tokenizer
tokenizer = TweetTokenizer()
# Join features
bow = features.BagOfGrams()
# Bag of gram, 2-grams, 3-grams
bow.add(features.BagOfWords())
bow.add(features.BagOf2Grams())
bow.add(features.BagOf3Grams())
return tokenizer, bow, model, censor
# end load_model
# end Classifier
def tokenize(tweets, sentiment):
# NLTK has a tokenizer built out specifically for short messaging data
# here we will use some of it's features to:
# turn all words to lowercase,
# reduce the length of repeated characters ('hiiiiiiiii' and 'hiiiii' both become 'hiii' with three repeats of the 'i'),
# and get rid of any handles that might exist in the message
tokenizer = TweetTokenizer(preserve_case=False,reduce_len=True,strip_handles=True)
tokenizedTweets = []
cleanedSentiment = []
asciiIssues = 0
for rowIdx, tweet in enumerate(tweets):
try:
tokenizedWords = tokenizer.tokenize(tweet)
tokenizedTweets.append(tokenizedWords)
cleanedSentiment.append(sentiment[rowIdx])
except:
# there are some weird ascii encoding issues present in a small part of our dataset.
# they represent < 1% of our dataset
# for MVP, i'm going to ignore them to focus on the 99% use case
# these issues do not exist in the test data set, so it is safe to ignore these rows
asciiIssues += 1
return tokenizedTweets, cleanedSentiment
# some algorithms do not train well on ordered data. This function shuffles our data so we don't have one big block of positive documents followed by another large block of negative documents
def test_tweet_tokenizer(self):
"""
Test TweetTokenizer using words with special and accented characters.
"""
tokenizer = TweetTokenizer(strip_handles=True, reduce_len=True)
s9 = "@myke: Let's test these words: resumé España München français"
tokens = tokenizer.tokenize(s9)
expected = [':', "Let's", 'test', 'these', 'words', ':', 'resumé',
'España', 'München', 'français']
self.assertEqual(tokens, expected)
def tokenize(text, tokenizer = TweetTokenizer()):
return [ tokenizer.tokenize(sentence) for sentence in sent_tokenize(text) ]