def first(self, query: str, values: Union[List, Dict],
db_name: str = 'default') -> Optional[DictRow]:
return await self._first(query=query, values=values, db_name=db_name)
python类List()的实例源码
def insert(self, query: str, values: Union[List, Dict],
db_name: str = 'default', returning: bool = False):
return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
def delete(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
return await self._execute(query=query, values=values, db_name=db_name)
def _execute(self, query: str, values: Union[List, Dict], db_name: str = 'default',
returning: bool = False):
pool = self.dbs[db_name]['master']
if pool is None:
raise RuntimeError('db {} master is not initialized'.format(db_name))
async with pool.acquire() as conn:
async with conn.cursor(cursor_factory=DictCursor) as cursor:
await cursor.execute(query, values)
if returning:
return await cursor.fetchone()
else:
return cursor.rowcount
def _select(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
dbs = self.dbs[db_name]
pool = dbs.get('slave') or dbs.get('master')
if pool is None:
raise RuntimeError('db {} master is not initialized'.format(db_name))
async with pool.acquire() as conn:
async with conn.cursor(cursor_factory=DictCursor) as cursor:
await cursor.execute(query, values)
return await cursor.fetchall()
def _first(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
dbs = self.dbs[db_name]
pool = dbs.get('slave') or dbs.get('master')
if pool is None:
raise RuntimeError('db {} master is not initialized'.format(db_name))
async with pool.acquire() as conn:
async with conn.cursor(cursor_factory=DictCursor) as cursor:
await cursor.execute(query, values)
return await cursor.fetchone()
def posts_by_user(user: User, limit: Optional[int] = None) -> List[Post]:
return list(islice(user_posts[user], limit))
def posts_for_user(user: User, limit: Optional[int] = None) -> List[Post]:
relevant = merge(*[user_posts[u] for u in following[user]], reverse=True)
return list(islice(relevant, limit))
def get_followers(user: User) -> List[User]:
return sorted(followers[user])
def get_followed(user: User) -> List[User]:
return sorted(following[user])
def search(phrase:str, limit: Optional[int] = None) -> List[Post]:
# XXX this could benefit from caching and from preindexing
return list(islice((post for post in posts if phrase in post.text), limit))
def compute_centroids(groups: Iterable[Sequence[Point]]) -> List[Centroid]:
'Compute the centroid of each group'
return [tuple(map(mean, transpose(group))) for group in groups]
def k_means(data: Iterable[Point], k:int=2, iterations:int=10) -> List[Point]:
'Return k-centroids for the data'
data = list(data)
centroids = sample(data, k)
for i in range(iterations):
labeled = assign_data(centroids, data)
centroids = compute_centroids(labeled.values())
return centroids
def __init__(self,
sentence: str,
chunks: List[Chunk],
surfaces: List[str]):
self.sentence = sentence
self.chunks = chunks
self.surfaces = surfaces
self.depth = self.depth()
def __init__(self, text: str, delimiter: str, rnnlm_model_path: str):
self.text = text
self.sentences = split_text(text, delimiter) # type: List[str]
lengths, self.tss = tokenize(self.sentences)
if not os.path.isfile(rnnlm_model_path):
raise FileNotFoundError(errno.ENOENT,
os.strerror(errno.ENOENT),
rnnlm_model_path)
self.rnnlm_model_path = rnnlm_model_path
self.word_freq, self.n_total_words = self._load_word_freq(threshold=1)
log_prob_scores = \
self._calc_log_prob_scores()
unigram_scores = \
self._calc_unigram_scores()
mean_lp_scores = \
calc_mean_lp_scores(log_prob_scores, lengths)
norm_lp_div_scores = \
calc_norm_lp_div_scores(log_prob_scores, unigram_scores)
norm_lp_sub_scores = \
calc_norm_lp_sub_scores(log_prob_scores, unigram_scores)
slor_scores = \
calc_slor_scores(norm_lp_sub_scores, lengths)
self.log_prob = average(log_prob_scores)
self.mean_lp = average(mean_lp_scores)
self.norm_lp_div = average(norm_lp_div_scores)
self.norm_lp_sub = average(norm_lp_sub_scores)
self.slor = average(slor_scores)
def _calc_log_prob_scores(self) -> List[Union[None, float]]:
"""Get log likelihood scores by calling RNNLM
"""
textfile = tempfile.NamedTemporaryFile(delete=True)
content = '\n'.join([''.join(ts) for ts in self.tss]) + '\n'
textfile.write(str.encode(content))
textfile.seek(0)
command = ['rnnlm',
'-rnnlm',
self.rnnlm_model_path,
'-test',
textfile.name]
process = Popen(command, stdout=PIPE, stderr=PIPE)
output, err = process.communicate()
lines = [line.strip() for line in output.decode('UTF-8').split('\n')
if line.strip() != '']
scores = []
for line in lines:
if line == const.OUT_OF_VOCABULARY:
scores.append(None)
else:
try:
score = float(line)
scores.append(score)
except ValueError:
pass
textfile.close()
return scores
def _calc_unigram_scores(self) -> List[float]:
unigram_scores = []
for ts in self.tss:
unigram_score = 0.0
for t in ts:
n = float(self.n_total_words)
x = float(self.word_freq.get(t, self.word_freq['<unk/>']))
unigram_score += math.log(x / n)
unigram_scores.append(unigram_score)
return unigram_scores
def average(xs: List[Union[None, float]]) -> float:
"""Calculate the arithmetic mean of the given values (possibly None)
>>> '{:.2f}'.format(average([None, 1.0, 2.0]))
'1.50'
"""
return numpy.mean([x for x in xs if x is not None])
def tokenize(sentences: List[str]) -> Tuple[List[int], List[List[str]]]:
tokenizer = Tokenizer()
lengths = []
texts = []
for s in sentences:
result = tokenizer.tokenize(s)
surfaces = [t.surface for t in result]
lengths.append(len(surfaces))
text = ' '.join(surfaces)
texts.append(text)
return lengths, texts
def split_text(text: str, delimiter: str='\n') -> List[str]:
s = [s.strip() for s in text.split(delimiter) if len(s.strip()) > 0]
return s