def _del_timeout(self, fd):
if fd._timeout_id:
self._lock.acquire()
i = bisect_left(self._timeouts, (fd._timeout_id, fd))
while i < len(self._timeouts):
if self._timeouts[i] == (fd._timeout_id, fd):
del self._timeouts[i]
fd._timeout_id = None
break
if fd._timeout_id != self._timeouts[i][0]:
logger.warning('fd %s with %s is not found',
fd._fileno, fd._timeout_id)
break
i += 1
if self._polling:
self.interrupt()
self._lock.release()
python类bisect_left()的实例源码
def _del_timeout(self, fd):
if fd._timeout_id:
self._lock.acquire()
i = bisect_left(self._timeouts, (fd._timeout_id, fd))
while i < len(self._timeouts):
if self._timeouts[i] == (fd._timeout_id, fd):
del self._timeouts[i]
fd._timeout_id = None
break
if fd._timeout_id != self._timeouts[i][0]:
logger.warning('fd %s with %s is not found',
fd._fileno, fd._timeout_id)
break
i += 1
if self._polling:
self.interrupt()
self._lock.release()
def keys(self, prefix=None):
if prefix is None or prefix == "" or not self._keys:
return set(self._keys)
if prefix.startswith(self._cachestr):
lo, hi = self._cachepoints
start = i = bisect_left(self._keys, prefix, lo, hi)
else:
start = i = bisect_left(self._keys, prefix)
keys = set()
if start == len(self._keys):
return keys
while self._keys[i].startswith(prefix):
keys.add(self._keys[i])
i += 1
self._cachestr = prefix
self._cachepoints = (start, i)
return keys
def keys(self, prefix=None):
if prefix is None or prefix == "" or not self._keys:
return set(self._keys)
if prefix.startswith(self._cachestr):
lo, hi = self._cachepoints
start = i = bisect_left(self._keys, prefix, lo, hi)
else:
start = i = bisect_left(self._keys, prefix)
keys = set()
if start == len(self._keys):
return keys
while self._keys[i].startswith(prefix):
keys.add(self._keys[i])
i += 1
self._cachestr = prefix
self._cachepoints = (start, i)
return keys
def intranges_contain(int_, ranges):
"""Determine if `int_` falls into one of the ranges in `ranges`."""
tuple_ = _encode_range(int_, 0)
pos = bisect.bisect_left(ranges, tuple_)
# we could be immediately ahead of a tuple (start, end)
# with start < int_ <= end
if pos > 0:
left, right = _decode_range(ranges[pos-1])
if left <= int_ < right:
return True
# or we could be immediately behind a tuple (int_, end)
if pos < len(ranges):
left, _ = _decode_range(ranges[pos])
if left == int_:
return True
return False
def add_region(self, reg):
""" Adds single region to set while removing overlap """
# in case not given as named tuple
region = Region(reg[0], reg[1])
# find where this region should go
start_index = bisect.bisect_left(self.starts, region.start)
end_index = bisect.bisect_left(self.ends, region.end)
# merge if required
if start_index > 0 and self.ends[start_index-1] > region.start:
region = Region(self.starts[start_index-1], region.end)
start_index -= 1
if end_index < len(self.starts) and self.starts[end_index] < region.end:
region = Region(region.start, self.ends[end_index])
end_index += 1
# if no merge, insert this region in the lists
if start_index == end_index:
self.starts = self.starts[:start_index] + [region.start] + self.starts[start_index:]
self.ends = self.ends[:start_index] + [region.end] + self.ends[start_index:]
else:
self.starts = self.starts[:start_index] + [region.start] + self.starts[end_index:]
self.ends = self.ends[:start_index] + [region.end] + self.ends[end_index:]
def _malloc(self, size):
# returns a large enough block -- it might be much larger
i = bisect.bisect_left(self._lengths, size)
if i == len(self._lengths):
length = self._roundup(max(self._size, size), mmap.PAGESIZE)
self._size *= 2
info('allocating a new mmap of length %d', length)
arena = Arena(length)
self._arenas.append(arena)
return (arena, 0, length)
else:
length = self._lengths[i]
seq = self._len_to_seq[length]
block = seq.pop()
if not seq:
del self._len_to_seq[length], self._lengths[i]
(arena, start, stop) = block
del self._start_to_block[(arena, start)]
del self._stop_to_block[(arena, stop)]
return block
def intranges_contain(int_, ranges):
"""Determine if `int_` falls into one of the ranges in `ranges`."""
tuple_ = _encode_range(int_, 0)
pos = bisect.bisect_left(ranges, tuple_)
# we could be immediately ahead of a tuple (start, end)
# with start < int_ <= end
if pos > 0:
left, right = _decode_range(ranges[pos-1])
if left <= int_ < right:
return True
# or we could be immediately behind a tuple (int_, end)
if pos < len(ranges):
left, _ = _decode_range(ranges[pos])
if left == int_:
return True
return False
def binary_search(haystack, needle, returnIndex=False, onlyURI=False):
lBound = 0
uBound = None
surtURIsAndDatetimes = []
cdxjObj = objectifyCDXJData(haystack, onlyURI)
surtURIsAndDatetimes = cdxjObj['data']
metaLineCount = len(cdxjObj['metadata'])
if uBound is not None:
uBound = uBound
else:
uBound = len(surtURIsAndDatetimes)
pos = bisect_left(surtURIsAndDatetimes, needle, lBound, uBound)
if pos != uBound and surtURIsAndDatetimes[pos] == needle:
if returnIndex: # Index useful for adjacent line searching
return pos + metaLineCount
return haystack[pos + metaLineCount]
else:
return None
def union(self, *others):
union = sortedset()
union._items = list(self._items)
for other in others:
if isinstance(other, self.__class__):
i = 0
for item in other._items:
i = bisect_left(union._items, item, i)
if i < len(union._items):
if item != union._items[i]:
union._items.insert(i, item)
else:
union._items.append(item)
else:
for item in other:
union.add(item)
return union
def _diff(self, other):
diff = sortedset()
if isinstance(other, self.__class__):
i = 0
for item in self._items:
i = bisect_left(other._items, item, i)
if i < len(other._items):
if item != other._items[i]:
diff._items.append(item)
else:
diff._items.append(item)
else:
for item in self._items:
if item not in other:
diff.add(item)
return diff
def keys(self, prefix=None):
if prefix is None or prefix == "" or not self._keys:
return set(self._keys)
if prefix.startswith(self._cachestr):
lo, hi = self._cachepoints
start = i = bisect_left(self._keys, prefix, lo, hi)
else:
start = i = bisect_left(self._keys, prefix)
keys = set()
if start == len(self._keys):
return keys
while self._keys[i].startswith(prefix):
keys.add(self._keys[i])
i += 1
self._cachestr = prefix
self._cachepoints = (start, i)
return keys
def keys(self, prefix=None):
if prefix is None or prefix == "" or not self._keys:
return set(self._keys)
if prefix.startswith(self._cachestr):
lo, hi = self._cachepoints
start = i = bisect_left(self._keys, prefix, lo, hi)
else:
start = i = bisect_left(self._keys, prefix)
keys = set()
if start == len(self._keys):
return keys
while self._keys[i].startswith(prefix):
keys.add(self._keys[i])
i += 1
self._cachestr = prefix
self._cachepoints = (start, i)
return keys
def intranges_contain(int_, ranges):
"""Determine if `int_` falls into one of the ranges in `ranges`."""
tuple_ = _encode_range(int_, 0)
pos = bisect.bisect_left(ranges, tuple_)
# we could be immediately ahead of a tuple (start, end)
# with start < int_ <= end
if pos > 0:
left, right = _decode_range(ranges[pos-1])
if left <= int_ < right:
return True
# or we could be immediately behind a tuple (int_, end)
if pos < len(ranges):
left, _ = _decode_range(ranges[pos])
if left == int_:
return True
return False
def union(self, other, find=bisect_left):
i = j = 0
x = self.data
y = Set._getotherdata(other)
result = Set([])
append = result.data.append
extend = result.data.extend
try:
while 1:
if x[i] == y[j]:
append(x[i])
i += 1
j += 1
elif x[i] > y[j]:
cut = find(y, x[i], j)
extend(y[j:cut])
j = cut
else:
cut = find(x, y[j], i)
extend(x[i:cut])
i = cut
except IndexError:
extend(x[i:])
extend(y[j:])
return result
def intersection(self, other, find=bisect_left):
i = j = 0
x = self.data
y = Set._getotherdata(other)
result = Set([])
append = result.data.append
try:
while 1:
if x[i] == y[j]:
append(x[i])
i += 1
j += 1
elif x[i] > y[j]:
j = find(y, x[i], j)
else:
i = find(x, y[j], i)
except IndexError:
pass
return result
def difference(self, other, find=bisect_left):
i = j = 0
x = self.data
y = Set._getotherdata(other)
result = Set([])
extend = result.data.extend
try:
while 1:
if x[i] == y[j]:
i += 1
j += 1
elif x[i] > y[j]:
j = find(y, x[i], j)
else:
cut = find(x, y[j], i)
extend(x[i:cut])
i = cut
except IndexError:
extend(x[i:])
return result
def __getitem__(self, key):
bounds, values = self._bounds, self._values
if hasattr(key, 'start') and hasattr(key, 'stop'):
# we are indexing with an interval. we only return the value
# if that interval is contained within one of our intervals.
start, stop = key.start, key.stop
lindex = bisect_right(bounds, start) if start is not None else 0
rindex = (bisect_left(bounds, stop)
if stop is not None else len(bounds))
if lindex != rindex:
raise KeyError(key)
return values[lindex]
else:
# We are indexing with a single element.
result = values[bisect_right(bounds, key)]
if result is self.nothing:
raise KeyError(key)
return result
def submapping(self, start, stop):
bounds, values, nothing = self._bounds, self._values, self.nothing
lindex = bisect_right(bounds, start) if start is not None else 0
rindex = (bisect_left(bounds, stop)
if stop is not None else len(bounds))
res = type(self)()
res._bounds = blist(bounds[lindex:rindex])
res._values = blist(values[lindex:rindex + 1])
if start is not None:
res[:start] = nothing
if stop is not None:
res[stop:] = nothing
return res
def contains(self, *args):
if len(args) == 1 and isinstance(args[0], Cell):
return self.contains(args[0].id())
elif len(args) == 1 and isinstance(args[0], CellId):
cell_id = args[0]
index = bisect.bisect_left(self.__cell_ids, cell_id)
if index < len(self.__cell_ids) \
and self.__cell_ids[index].range_min() <= cell_id:
return True
return (index != 0 and
self.__cell_ids[index - 1].range_max() >= cell_id)
elif len(args) == 1 and isinstance(args[0], Point):
return self.contains(CellId.from_point(args[0]))
elif len(args) == 1 and isinstance(args[0], self.__class__):
cell_union = args[0]
for i in range(cell_union.num_cells()):
if not self.contains(cell_union.cell_id(i)):
return False
return True
else:
raise NotImplementedError()
def intersects(self, *args):
if len(args) == 1 and isinstance(args[0], CellId):
cell_id = args[0]
index = bisect.bisect_left(self.__cell_ids, cell_id)
if index != len(self.__cell_ids) and \
self.__cell_ids[index].range_min() <= cell_id.range_max():
return True
return (
index != 0 and
self.__cell_ids[index - 1].range_max() >= cell_id.range_min()
)
elif len(args) == 1 and isinstance(args[0], CellUnion):
cell_union = args[0]
for cell_id in cell_union.__cell_ids:
if self.intersects(cell_id):
return True
return False
else:
raise NotImplementedError()
def keys(self, prefix=None):
if prefix is None or prefix == "" or not self._keys:
return set(self._keys)
if prefix.startswith(self._cachestr):
lo, hi = self._cachepoints
start = i = bisect_left(self._keys, prefix, lo, hi)
else:
start = i = bisect_left(self._keys, prefix)
keys = set()
if start == len(self._keys):
return keys
while self._keys[i].startswith(prefix):
keys.add(self._keys[i])
i += 1
self._cachestr = prefix
self._cachepoints = (start, i)
return keys
def keys(self, prefix=None):
if prefix is None or prefix == "" or not self._keys:
return set(self._keys)
if prefix.startswith(self._cachestr):
lo, hi = self._cachepoints
start = i = bisect_left(self._keys, prefix, lo, hi)
else:
start = i = bisect_left(self._keys, prefix)
keys = set()
if start == len(self._keys):
return keys
while self._keys[i].startswith(prefix):
keys.add(self._keys[i])
i += 1
self._cachestr = prefix
self._cachepoints = (start, i)
return keys
def keys(self, prefix=None):
if prefix is None or prefix == "" or not self._keys:
return set(self._keys)
if prefix.startswith(self._cachestr):
lo, hi = self._cachepoints
start = i = bisect_left(self._keys, prefix, lo, hi)
else:
start = i = bisect_left(self._keys, prefix)
keys = set()
if start == len(self._keys):
return keys
while self._keys[i].startswith(prefix):
keys.add(self._keys[i])
i += 1
self._cachestr = prefix
self._cachepoints = (start, i)
return keys
def keys(self, prefix=None):
if prefix is None or prefix == "" or not self._keys:
return set(self._keys)
if prefix.startswith(self._cachestr):
lo, hi = self._cachepoints
start = i = bisect_left(self._keys, prefix, lo, hi)
else:
start = i = bisect_left(self._keys, prefix)
keys = set()
if start == len(self._keys):
return keys
while self._keys[i].startswith(prefix):
keys.add(self._keys[i])
i += 1
self._cachestr = prefix
self._cachepoints = (start, i)
return keys
def intranges_contain(int_, ranges):
"""Determine if `int_` falls into one of the ranges in `ranges`."""
tuple_ = (int_, int_)
pos = bisect.bisect_left(ranges, tuple_)
# we could be immediately ahead of a tuple (start, end)
# with start < int_ <= end
if pos > 0:
left, right = ranges[pos-1]
if left <= int_ < right:
return True
# or we could be immediately behind a tuple (int_, end)
if pos < len(ranges):
left, _ = ranges[pos]
if left == int_:
return True
return False
def keys(self, prefix=None):
if prefix is None or prefix == "" or not self._keys:
return set(self._keys)
if prefix.startswith(self._cachestr):
lo, hi = self._cachepoints
start = i = bisect_left(self._keys, prefix, lo, hi)
else:
start = i = bisect_left(self._keys, prefix)
keys = set()
if start == len(self._keys):
return keys
while self._keys[i].startswith(prefix):
keys.add(self._keys[i])
i += 1
self._cachestr = prefix
self._cachepoints = (start, i)
return keys
def keys(self, prefix=None):
if prefix is None or prefix == "" or not self._keys:
return set(self._keys)
if prefix.startswith(self._cachestr):
lo, hi = self._cachepoints
start = i = bisect_left(self._keys, prefix, lo, hi)
else:
start = i = bisect_left(self._keys, prefix)
keys = set()
if start == len(self._keys):
return keys
while self._keys[i].startswith(prefix):
keys.add(self._keys[i])
i += 1
self._cachestr = prefix
self._cachepoints = (start, i)
return keys
def intranges_contain(int_, ranges):
"""Determine if `int_` falls into one of the ranges in `ranges`."""
tuple_ = _encode_range(int_, 0)
pos = bisect.bisect_left(ranges, tuple_)
# we could be immediately ahead of a tuple (start, end)
# with start < int_ <= end
if pos > 0:
left, right = _decode_range(ranges[pos-1])
if left <= int_ < right:
return True
# or we could be immediately behind a tuple (int_, end)
if pos < len(ranges):
left, _ = _decode_range(ranges[pos])
if left == int_:
return True
return False
def intranges_contain(int_, ranges):
"""Determine if `int_` falls into one of the ranges in `ranges`."""
tuple_ = _encode_range(int_, 0)
pos = bisect.bisect_left(ranges, tuple_)
# we could be immediately ahead of a tuple (start, end)
# with start < int_ <= end
if pos > 0:
left, right = _decode_range(ranges[pos-1])
if left <= int_ < right:
return True
# or we could be immediately behind a tuple (int_, end)
if pos < len(ranges):
left, _ = _decode_range(ranges[pos])
if left == int_:
return True
return False