def __setitem__(self, key, value):
"""If key is in the dictionary, sets its value to value;
otherwise adds the key to the dictionary with the given value
>>> d = SortedDict(dict(s=1, a=2, n=3, i=4, t=5, y=6))
>>> d["t"] = -17
>>> d["z"] = 43
>>> d["@"] = -11
>>> d["m"] = 22
>>> d["r"] = 5
>>> d.keys()
['@', 'a', 'i', 'm', 'n', 'r', 's', 't', 'y', 'z']
"""
if key not in self.__dict:
bisect.insort_left(self.__keys, key)
self.__dict[key] = value
python类insort_left()的实例源码
def __setitem__(self, key, value):
"""If key is in the dictionary, sets its value to value;
otherwise adds the key to the dictionary with the given value
>>> d = OrderedDict(dict(s=1, a=2, n=3, i=4, t=5, y=6))
>>> d["t"] = -17
>>> d["z"] = 43
>>> d["@"] = -11
>>> d["m"] = 22
>>> d["r"] = 5
>>> d.keys()
['@', 'a', 'i', 'm', 'n', 'r', 's', 't', 'y', 'z']
"""
if key not in self.__dict:
bisect.insort_left(self.__keys, key)
self.__dict[key] = value
def maxSlidingWindow(self, nums, k):
"""
:type nums: List[int]
:type k: int
:rtype: List[int]
"""
import bisect
if len(nums) == 0: return [] # Special cases
if k == 1: return nums
ans = []
window = []
for i in xrange(len(nums)):
bisect.insort_left(window, nums[i])
if i == k-1:
ans.append(window[-1]) # First element
if i >= k:
window.remove(nums[i-k]) # Remove the oldest element
ans.append(window[-1])
assert len(ans) == len(nums)-k+1
return ans
def groupAnagrams(self, strs):
"""
:type strs: List[str]
:rtype: List[List[str]]
"""
import bisect
def getStrSig(s):
"""
get the signature of str s
"""
primes = [2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97,101]
ord_a = ord('a')
hashval = 1
for c in s: hashval *= primes[ ord(c) - ord_a ]
return hashval
worddict = {}
for s in strs:
sig = getStrSig(s) # Here use: sig = ''.join(sorted(s)) still beats 93.9%
if sig not in worddict:
worddict[sig] = [s]
else:
bisect.insort_left(worddict[sig], s) # insert the anagram with correct order
return worddict.values()
def add_bom_level(doc, qty, bom):
# Get the BOM doc
#frappe.msgprint("Getting BOM for" + bom)
bom_doc = frappe.get_doc("BOM",bom)
#Add the operations from this BOM to the list
new_operation = get_bom_operation(bom, qty)
new_operation.idx = len(doc.operations)+1
doc.operations.append(new_operation)
#Go through each item in the BOM to decide if it is a Purchased or Manufactured item
for myItem in bom_doc.items:
item_quantity = qty * myItem.qty
item_doc = frappe.get_doc("Item",myItem.item_code)
if myItem.bom_no and item_doc.default_material_request_type == "Manufacture":
# This is a manufactured item and we should add it to the BOM Operations then scan it's children
add_bom_level(doc,item_quantity, myItem.bom_no)
else:
# Consider it a purchased item, and just set the basics (Item Code, Qty, Supplier, Currency, Price List)
#frappe.msgprint("Getting BOM for" + bom)
new_purchased = get_purchase_item(myItem.item_code, item_quantity)
new_purchased.idx = len(doc.items)+1
bisect.insort_left(doc.items,new_purchased)
def update(self, x, sa, is_mask):
# ??????
self.last_sa = sa
if len(self.q_series) < self.period + 1:
self.q_series.append(x)
else:
self.q_series = self.q_series[1:] + self.q_series[:1]
self.q_series[-1] = x
self._std = np.std(self.q_series[1:], axis = 0)
if not is_mask:
bisect.insort_left(self.anormal_heap, self.last_sa)
def _add_item(self, item):
item._active = True
self._items[item._id] = item
if item._expiration_datetime:
bisect.insort_left(self._expires_list, (item._expiration_datetime, item._id))
self._schedule_expiration_check()
self._storage_change()
def setdefault(self, key, value):
"""If key is in the dictionary, returns its value;
otherwise adds the key with the given value which is also
returned
>>> d = SortedDict(dict(s=1, a=2, n=3, i=4, t=5, y=6))
>>> d.setdefault("n", 99)
3
>>> d.values()
[2, 4, 3, 1, 5, 6]
>>> d.setdefault("r", -20)
-20
>>> d.items()[2:]
[('n', 3), ('r', -20), ('s', 1), ('t', 5), ('y', 6)]
>>> d.setdefault("@", -11)
-11
>>> d.setdefault("z", 99)
99
>>> d.setdefault("m", 50)
50
>>> d.keys()
['@', 'a', 'i', 'm', 'n', 'r', 's', 't', 'y', 'z']
"""
if key not in self.__dict:
bisect.insort_left(self.__keys, key)
return self.__dict.setdefault(key, value)
def setdefault(self, key, value):
"""If key is in the dictionary, returns its value;
otherwise adds the key with the given value which is also
returned
>>> d = OrderedDict(dict(s=1, a=2, n=3, i=4, t=5, y=6))
>>> d.setdefault("n", 99)
3
>>> d.values()
[2, 4, 3, 1, 5, 6]
>>> d.setdefault("r", -20)
-20
>>> d.items()[2:]
[('n', 3), ('r', -20), ('s', 1), ('t', 5), ('y', 6)]
>>> d.setdefault("@", -11)
-11
>>> d.setdefault("z", 99)
99
>>> d.setdefault("m", 50)
50
>>> d.keys()
['@', 'a', 'i', 'm', 'n', 'r', 's', 't', 'y', 'z']
"""
if key not in self.__dict:
bisect.insort_left(self.__keys, key)
return self.__dict.setdefault(key, value)
def add(self, movie):
"""Adds the given movie to the list if it isn't already
present. Returns True if added; otherwise returns False."""
if id(movie) in self.__movieFromId:
return False
key = self.key(movie.title, movie.year)
bisect.insort_left(self.__movies, [key, movie])
self.__movieFromId[id(movie)] = movie
self.__dirty = True
return True
def place(self):
print 'Game Index:',self.root.gameindex
# --------DO NOT DELETE!!!-------
if self.iteration < 10:
a = np.random.rand(10, 10)
b = np.random.rand(10)
c = np.random.rand(10, 3)
d = np.random.rand(3)
self.net = neural.Net(a, b, c, d, 1)
# else:
# if self.file:
# self.file.close()
# bisect.insort_left(self.model_list, Model('files/data%s.txt' % self.iteration))
# # self.model_list.append(Model('files/data%s.txt' % self.iteration))
# if self.iteration%20 == 0:
# for i in self.model_list:
# print i.size
# for i in range(10):
# os.remove(self.model_list[0].file)
# del self.model_list[0]
self.iteration += 1
Player.place(self)
def _insort_call(self, call):
# We want to insert the call in the appropriate time slot. A simple
# bisect.insort_left() is not sufficient as the comparison of two
# methods is not defined in Python 3.
times = [c[0] for c in self._calls]
index = bisect.bisect_left(times, call[0])
self._calls.insert(index, call)
def _update_tag_positions(self, rule):
"""
Update _tag_positions to reflect the changes to tags that are
made by *rule*.
"""
# Update the tag index.
for pos in self._positions_by_rule[rule]:
# Delete the old tag.
old_tag_positions = self._tag_positions[rule.original_tag]
old_index = bisect.bisect_left(old_tag_positions, pos)
del old_tag_positions[old_index]
# Insert the new tag.
new_tag_positions = self._tag_positions[rule.replacement_tag]
bisect.insort_left(new_tag_positions, pos)
brill_trainer.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def _update_tag_positions(self, rule):
"""
Update _tag_positions to reflect the changes to tags that are
made by *rule*.
"""
# Update the tag index.
for pos in self._positions_by_rule[rule]:
# Delete the old tag.
old_tag_positions = self._tag_positions[rule.original_tag]
old_index = bisect.bisect_left(old_tag_positions, pos)
del old_tag_positions[old_index]
# Insert the new tag.
new_tag_positions = self._tag_positions[rule.replacement_tag]
bisect.insort_left(new_tag_positions, pos)
def split(self, nsegments=2, length=None, k=1, s=0):
"""Split the branch into segments"""
if length is None:
sublength = self.length / nsegments # the target length of the segments
else:
sublength = float(length)
# get the data that requires interpolation
x, y, z, r = self['x'], self['y'], self['z'], self['radius']
# the cumulative length
t = numpy.r_[0, numpy.cumsum(((x[:-1] - x[1:]) ** 2 + (y[:-1] - y[1:]) ** 2) ** .5)]
# create interpolation coefficients
import scipy.interpolate
splinecoeffs, u = scipy.interpolate.splprep([x, y, z, r], u=t, k=k, s=s)
# create new parametrization array. It is supposed to consist of the old lengths
# plus points that cut the length into the proper segment size
tnew = t.tolist()
# the indices where to split the resulting tnew
indices = [0]
import bisect
for n in range(1, int(self.length / sublength) + 1):
index = bisect.bisect_left(tnew, n * sublength)
indices.append(index)
bisect.insort_left(tnew, n * sublength)
# append the end index for the last segment if last segment size is larger than eps
if tnew[-1] - tnew[-2] > 0.01:
indices.append(len(tnew))
# interpolate the parametrization
xn, yn, zn, rn = scipy.interpolate.splev(tnew, splinecoeffs)
branchgen = lambda i0, i1: Branch(x=xn[i0:i1], y=yn[i0:i1], z=zn[i0:i1], r=rn[i0:i1])
return [branchgen(i0, i1 + 1) for i0, i1 in zip(indices[:-1], indices[1:])]
def _update_tag_positions(self, rule):
"""
Update _tag_positions to reflect the changes to tags that are
made by *rule*.
"""
# Update the tag index.
for pos in self._positions_by_rule[rule]:
# Delete the old tag.
old_tag_positions = self._tag_positions[rule.original_tag]
old_index = bisect.bisect_left(old_tag_positions, pos)
del old_tag_positions[old_index]
# Insert the new tag.
new_tag_positions = self._tag_positions[rule.replacement_tag]
bisect.insort_left(new_tag_positions, pos)
def push(self, id, freq):
if id == None:
return
self.list.append((id, freq))
bisect.insort_left(self.sortedList, MyElement(id, freq, len(self.list)))
def batch_list_insert(elem, sorted_elems):
""" Step 1/3 of batch insertion.
The first precomputing phase of a batch insert. Determines the list
of elements to ultimately insert into the MHT.
Arguments:
elem - the next element to insert in this phase
sorted_elems - the elements in the range to be inserted into
so far
"""
bisect.insort_left(sorted_elems, elem)
def _update_tag_positions(self, rule):
"""
Update _tag_positions to reflect the changes to tags that are
made by *rule*.
"""
# Update the tag index.
for pos in self._positions_by_rule[rule]:
# Delete the old tag.
old_tag_positions = self._tag_positions[rule.original_tag]
old_index = bisect.bisect_left(old_tag_positions, pos)
del old_tag_positions[old_index]
# Insert the new tag.
new_tag_positions = self._tag_positions[rule.replacement_tag]
bisect.insort_left(new_tag_positions, pos)
def _InsertTask(self, task):
"""Insert a task into the store, keeps lists sorted.
Args:
task: the new task.
"""
assert self._lock.locked()
eta = task.eta_usec()
name = task.task_name()
bisect.insort_left(self._sorted_by_eta, (eta, name, task))
if task.has_tag():
bisect.insort_left(self._sorted_by_tag, (task.tag(), eta, name, task))
bisect.insort_left(self._sorted_by_name, (name, task))
self.task_name_archive.add(name)
def _PostponeTaskInsertOnly(self, task, new_eta_usec):
assert self._lock.locked()
task.set_eta_usec(new_eta_usec)
name = task.task_name()
bisect.insort_left(self._sorted_by_eta, (new_eta_usec, name, task))
if task.has_tag():
tag = task.tag()
bisect.insort_left(self._sorted_by_tag, (tag, new_eta_usec, name, task))
def _InsertTask(self, task):
"""Insert a task into the dummy store, keeps lists sorted.
Args:
task: the new task.
"""
eta = task.eta_usec()
name = task.task_name()
bisect.insort_left(self._sorted_by_eta, (eta, name, task))
bisect.insort_left(self._sorted_by_name, (name, task))
def _update_tag_positions(self, rule):
"""
Update _tag_positions to reflect the changes to tags that are
made by *rule*.
"""
# Update the tag index.
for pos in self._positions_by_rule[rule]:
# Delete the old tag.
old_tag_positions = self._tag_positions[rule.original_tag]
old_index = bisect.bisect_left(old_tag_positions, pos)
del old_tag_positions[old_index]
# Insert the new tag.
new_tag_positions = self._tag_positions[rule.replacement_tag]
bisect.insort_left(new_tag_positions, pos)
def _update_tag_positions(self, rule):
"""
Update _tag_positions to reflect the changes to tags that are
made by *rule*.
"""
# Update the tag index.
for pos in self._positions_by_rule[rule]:
# Delete the old tag.
old_tag_positions = self._tag_positions[rule.original_tag]
old_index = bisect.bisect_left(old_tag_positions, pos)
del old_tag_positions[old_index]
# Insert the new tag.
new_tag_positions = self._tag_positions[rule.replacement_tag]
bisect.insort_left(new_tag_positions, pos)
def add(self, value):
index = self.indexer(value)
bisect.insort_left(self.indexes, index)
if index not in self.lookup:
self.lookup[index] = [value]
else:
self.lookup[index].append(value)
def _update_tag_positions(self, rule):
"""
Update _tag_positions to reflect the changes to tags that are
made by *rule*.
"""
# Update the tag index.
for pos in self._positions_by_rule[rule]:
# Delete the old tag.
old_tag_positions = self._tag_positions[rule.original_tag]
old_index = bisect.bisect_left(old_tag_positions, pos)
del old_tag_positions[old_index]
# Insert the new tag.
new_tag_positions = self._tag_positions[rule.replacement_tag]
bisect.insort_left(new_tag_positions, pos)
def insert(self):
"""
Insert the current B3 instance in the database.
Will also store the B3 instance reference in the QApplication.
"""
# store it in the database first so we get the id
cursor = B3App.Instance().storage.cursor()
cursor.execute("INSERT INTO b3 (config) VALUES (?)", (self.config_path,))
self.id = cursor.lastrowid
LOG.debug('stored new process in the database: @%s:%s', self.id, self.config_path)
cursor.close()
# store in the QApplication
if self not in B3App.Instance().processes:
bisect.insort_left(B3App.Instance().processes, self)
def _update_tag_positions(self, rule):
"""
Update _tag_positions to reflect the changes to tags that are
made by *rule*.
"""
# Update the tag index.
for pos in self._positions_by_rule[rule]:
# Delete the old tag.
old_tag_positions = self._tag_positions[rule.original_tag]
old_index = bisect.bisect_left(old_tag_positions, pos)
del old_tag_positions[old_index]
# Insert the new tag.
new_tag_positions = self._tag_positions[rule.replacement_tag]
bisect.insort_left(new_tag_positions, pos)
def chooseserver(self):
connection = self.connect('www.speedtest.net')
now = int(time() * 1000)
# really contribute to speedtest.net OS statistics
# maybe they won't block us again...
extra_headers = {
'Connection': 'Keep-Alive',
'User-Agent': self.USER_AGENTS['Linux']
}
connection.request(
'GET', '/speedtest-config.php?x=%d' % now, None, extra_headers)
response = connection.getresponse()
reply = response.read().decode('utf-8')
match = re.search(
r'<client ip="([^"]*)" lat="([^"]*)" lon="([^"]*)"', reply)
location = None
if match is None:
util.info('[SC] Failed to retrieve coordinates')
return None
location = match.groups()
util.info('[SC] Your IP: %s' % location[0])
util.info('[SC] Your latitude: %s' % location[1])
util.info('[SC] Your longitude: %s' % location[2])
connection.request(
'GET', '/speedtest-servers.php?x=%d' % now, None, extra_headers)
response = connection.getresponse()
reply = response.read().decode('utf-8')
server_list = re.findall(
r'<server url="([^"]*)" lat="([^"]*)" lon="([^"]*)"', reply)
my_lat = float(location[1])
my_lon = float(location[2])
sorted_server_list = []
for server in server_list:
s_lat = float(server[1])
s_lon = float(server[2])
distance = sqrt(pow(s_lat - my_lat, 2) + pow(s_lon - my_lon, 2))
bisect.insort_left(sorted_server_list, (distance, server[0]))
best_server = (999999, '')
for server in sorted_server_list[:10]:
util.debug('[SC] server: %s' % server[1])
match = re.search(
r'http://([^/]+)/speedtest/upload\.php', server[1])
if match is None:
continue
server_host = match.groups()[0]
latency = self.ping(server_host)
if latency < best_server[0]:
best_server = (latency, server_host)
if not best_server[1]:
raise Exception('Cannot find a test server')
util.debug('[SC] Best server: %s' % best_server[1])
return best_server[1]