def push(self, x):
"""Pushes a new element."""
assert self._data is not None
if len(self._data) < self._n:
heapq.heappush(self._data, x)
else:
heapq.heappushpop(self._data, x)
python类heappushpop()的实例源码
def push(self, x):
"""Pushes a new element."""
assert self._data is not None
if len(self._data) < self._n:
heapq.heappush(self._data, x)
else:
heapq.heappushpop(self._data, x)
def add_to_ranked_target_data(RankedDataHeap, maxSize, Data, weights,
keep='largest'):
'''
'''
docIDs = np.arange(Data.nDoc)
# First, decide which docs are promising,
# since we don't want to blow-up memory costs by using *all* docs
if len(RankedDataHeap) > 0:
cutoffThr = RankedDataHeap[0][0]
if keep == 'largest':
docIDs = np.argsort(-1 * weights)[:maxSize]
docIDs = docIDs[weights[docIDs] > cutoffThr]
else:
docIDs = np.argsort(weights)[:maxSize]
docIDs = docIDs[weights[docIDs] < cutoffThr]
if len(docIDs) < 1:
return
# For promising docs, convert to list-of-tuples format,
# and add to the heap
if keep == 'largest':
tList = Data.to_list_of_tuples(docIDs, w=weights)
else:
tList = Data.to_list_of_tuples(docIDs, w=-1 * weights)
for docID, unitTuple in enumerate(tList):
try:
if len(RankedDataHeap) >= maxSize:
heapq.heappushpop(RankedDataHeap, unitTuple)
else:
heapq.heappush(RankedDataHeap, unitTuple)
except ValueError as error:
# skip stupid errors related to duplicate weights
pass
# sample_target_data
###########################################################
def push(self, x):
"""Pushes a new element."""
assert self._data is not None
if len(self._data) < self._n:
heapq.heappush(self._data, x)
else:
heapq.heappushpop(self._data, x)
def explain(self, userid, user_items, itemid, user_weights=None, N=10):
""" Provides explanations for why the item is liked by the user.
Parameters
---------
userid : int
The userid to explain recommendations for
user_items : csr_matrix
Sparse matrix containing the liked items for the user
itemid : int
The itemid to explain recommendations for
user_weights : ndarray, optional
Precomputed Cholesky decomposition of the weighted user liked items.
Useful for speeding up repeated calls to this function, this value
is returned
N : int, optional
The number of liked items to show the contribution for
Returns
-------
total_score : float
The total predicted score for this user/item pair
top_contributions : list
A list of the top N (itemid, score) contributions for this user/item pair
user_weights : ndarray
A factorized representation of the user. Passing this in to
future 'explain' calls will lead to noticeable speedups
"""
# user_weights = Cholesky decomposition of Wu^-1
# from section 5 of the paper CF for Implicit Feedback Datasets
user_items = user_items.tocsr()
if user_weights is None:
A, _ = user_linear_equation(self.item_factors, self.YtY,
user_items, userid,
self.regularization, self.factors)
user_weights = scipy.linalg.cho_factor(A)
seed_item = self.item_factors[itemid]
# weighted_item = y_i^t W_u
weighted_item = scipy.linalg.cho_solve(user_weights, seed_item)
total_score = 0.0
h = []
for i, (itemid, confidence) in enumerate(nonzeros(user_items, userid)):
factor = self.item_factors[itemid]
# s_u^ij = (y_i^t W^u) y_j
score = weighted_item.dot(factor) * confidence
total_score += score
contribution = (score, itemid)
if i < N:
heapq.heappush(h, contribution)
else:
heapq.heappushpop(h, contribution)
items = (heapq.heappop(h) for i in range(len(h)))
top_contributions = list((i, s) for s, i in items)[::-1]
return total_score, top_contributions, user_weights
def stats_reducer(self, key, values):
outputType = CST(key[0])
item = key[1]
crawl = MonthlyCrawl.to_name(key[2])
if outputType in (CST.size, CST.new_items,
CST.size_estimate, CST.size_robotstxt):
verbose_key = (outputType.name, CST(item).name, crawl)
if outputType in (CST.size, CST.size_robotstxt):
val = sum(values)
elif outputType == CST.new_items:
val = MultiCount.sum_values(values)
elif outputType == CST.size_estimate:
# already "reduced" in count job
for val in values:
break
yield verbose_key, val
elif outputType == CST.histogram:
yield((outputType.name, CST(item).name, crawl,
CST(key[3]).name, key[4]), sum(values))
elif outputType in (CST.mimetype, CST.mimetype_detected, CST.scheme,
CST.surt_domain, CST.tld, CST.domain, CST.host,
CST.http_status, CST.robotstxt_status):
item = key[1]
for counts in values:
page_count = MultiCount.get_count(0, counts)
url_count = MultiCount.get_count(1, counts)
if outputType in (CST.domain, CST.surt_domain, CST.tld):
host_count = MultiCount.get_count(2, counts)
if (self.options.min_domain_frequency <= 1 or
outputType not in (CST.host, CST.domain,
CST.surt_domain)):
self.counters[(CST.size.name, outputType.name, crawl)] += 1
self.counters[(CST.histogram.name, outputType.name,
crawl, CST.page.name, page_count)] += 1
self.counters[(CST.histogram.name, outputType.name,
crawl, CST.url.name, url_count)] += 1
if outputType in (CST.domain, CST.surt_domain, CST.tld):
self.counters[(CST.histogram.name, outputType.name,
crawl, CST.host.name, host_count)] += 1
if outputType == CST.tld:
domain_count = MultiCount.get_count(3, counts)
self.counters[(CST.histogram.name, outputType.name,
crawl, CST.domain.name, domain_count)] += 1
if outputType in (CST.domain, CST.host, CST.surt_domain):
outKey = (outputType.name, crawl)
outVal = (page_count, url_count, item)
if outputType in (CST.domain, CST.surt_domain):
outVal = (page_count, url_count, host_count, item)
# take most common
if len(self.mostfrequent[outKey]) < self.options.max_hosts:
heapq.heappush(self.mostfrequent[outKey], outVal)
else:
heapq.heappushpop(self.mostfrequent[outKey], outVal)
else:
yield((outputType.name, item, crawl), counts)
else:
logging.error('Unhandled type {}\n'.format(outputType))
raise