def __init__(self, *args, **kwargs):
"""
Parameters
----------
%(ArrayList.parameters)s
main: Project
The main project this subproject belongs to (or None if this
project is the main project)
num: int
The number of the project
"""
self.main = kwargs.pop('main', None)
self._plot = ProjectPlotter(self)
self.num = kwargs.pop('num', 1)
self._ds_counter = count()
with self.block_signals:
super(Project, self).__init__(*args, **kwargs)
python类count()的实例源码
def get_id(self, desired=-1):
"""Get a valid ID."""
if desired == -1:
# Start with the lowest ID, and look upwards
desired = 1
if desired not in self:
# The desired ID is avalible!
self.add(desired)
return desired
# Check every ID in order to find a valid one
for poss_id in itertools.count(start=1):
if poss_id not in self:
self.add(poss_id)
return poss_id
def __init__(self, i, s, count=1):
self.i = i
self.s = s
self.count = count
def add_string(self, string):
if string in self.strings:
self[string].count += 1
return self[string]
i = len(self.tokens)
s = string
t = Token(i, s)
self.i2t[i] = t
self.s2t[s] = t
self.tokens.add(t)
self.strings.add(s)
return t
def _run(self, group, queue):
LOG.debug("Asynchronous handler started processing %s", group)
for _ in itertools.count():
# NOTE(ivc): this is a mock-friendly replacement for 'while True'
# to allow more controlled environment for unit-tests (e.g. to
# avoid tests getting stuck in infinite loops)
try:
event = queue.get(timeout=self._grace_period)
except six_queue.Empty:
break
# FIXME(ivc): temporary workaround to skip stale events
# If K8s updates resource while the handler is processing it,
# when the handler finishes its work it can fail to update an
# annotation due to the 'resourceVersion' conflict. K8sClient
# was updated to allow *new* annotations to be set ignoring
# 'resourceVersion', but it leads to another problem as the
# Handler will receive old events (i.e. before annotation is set)
# and will start processing the event 'from scratch'.
# It has negative effect on handlers' performance (VIFHandler
# creates ports only to later delete them and LBaaS handler also
# produces some excess requests to Neutron, although with lesser
# impact).
# Possible solutions (can be combined):
# - use K8s ThirdPartyResources to store data/annotations instead
# of native K8s resources (assuming Kuryr-K8s will own those
# resources and no one else would update them)
# - use the resulting 'resourceVersion' received from K8sClient's
# 'annotate' to provide feedback to Async to skip all events
# until that version
# - stick to the 'get-or-create' behaviour in handlers and
# also introduce cache for long operations
time.sleep(STALE_PERIOD)
while not queue.empty():
event = queue.get()
if queue.empty():
time.sleep(STALE_PERIOD)
self._handler(event)
def _done(self, thread, group):
LOG.debug("Asynchronous handler stopped processing %s", group)
queue = self._queues.pop(group)
if not queue.empty():
LOG.critical("Asynchronous handler terminated abnormally; "
"%(count)s events dropped for %(group)s",
{'count': queue.qsize(), 'group': group})
if not self._queues:
LOG.debug("Asynchronous handler is idle")
def __call__(self, event):
deadline = time.time() + self._timeout
for attempt in itertools.count(1):
try:
self._handler(event)
break
except self._exceptions:
with excutils.save_and_reraise_exception() as ex:
if self._sleep(deadline, attempt, ex.value):
ex.reraise = False
def _compat_bit_length(i):
for res in itertools.count():
if i >> res == 0:
return res
def _count_righthand_zero_bits(number, bits):
"""Count the number of zero bits on the right hand side.
Args:
number: an integer.
bits: maximum number of bits to count.
Returns:
The number of zero bits on the right hand side of the number.
"""
if number == 0:
return bits
return min(bits, _compat_bit_length(~number & (number - 1)))
def _compat_bit_length(i):
for res in itertools.count():
if i >> res == 0:
return res
def _count_righthand_zero_bits(number, bits):
"""Count the number of zero bits on the right hand side.
Args:
number: an integer.
bits: maximum number of bits to count.
Returns:
The number of zero bits on the right hand side of the number.
"""
if number == 0:
return bits
return min(bits, _compat_bit_length(~number & (number - 1)))
def __iter__(self):
nums = itertools.count()
while True:
num = next(nums)
name = self.name.replace("_", f"_{num:04d}")
dwg = Drawing(name=name, *self.args, **self.kwargs)
dwg.num = num
sys.stdout.write(".")
sys.stdout.flush()
yield dwg
def namer():
for length in count(1):
for name in product(ascii_lowercase, repeat=length):
yield ''.join(name)
download_planet_python.py 文件源码
项目:python-station-backend
作者: itielshwartz
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def download_posts(output_file, max_page_to_download=None):
with open(output_file, "w") as f: # open the output file
pages_to_download_itr = range(1, max_page_to_download + 1) if max_page_to_download else itertools.count(1)
for i in pages_to_download_itr: # start itrate over the pages
url = BASE_URL.format(i)
logging.info("fetching %s", format(url))
page_data = download_with_retry(url)
if should_stop_page(page_data): # validate it's not the last page
return logging.info("Finished Downloading all data")
f.write(json.dumps(page_data) + "\n") # write page as jsonline
logging.info("finished %s", format(url))
download_planet_python.py 文件源码
项目:python-station-backend
作者: itielshwartz
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def download_with_retry(url):
for sleep_time in itertools.count():
page_data_raw = requests.get(url, verify=False) # get the page
page_data = page_data_raw.text
if page_data_raw.status_code == 200 or should_stop_page(page_data):
return page_data
sleep(sleep_time) # sleep in case bad response
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
assert group is None, 'group argument must be None for now'
count = _current_process._counter.next()
self._identity = _current_process._identity + (count,)
self._authkey = _current_process._authkey
self._daemonic = _current_process._daemonic
self._tempdir = _current_process._tempdir
self._parent_pid = os.getpid()
self._popen = None
self._target = target
self._args = tuple(args)
self._kwargs = dict(kwargs)
self._name = name or type(self).__name__ + '-' + \
':'.join(str(i) for i in self._identity)
def __init__(self):
self._identity = ()
self._daemonic = False
self._name = 'MainProcess'
self._parent_pid = None
self._popen = None
self._counter = itertools.count(1)
self._children = set()
self._authkey = AuthenticationString(bytes(os.urandom(32), 'latin-1'))
self._tempdir = None
def __init__(self):
self.transition_table = {}
self.fixers = []
self.id = next(BMNode.count)
self.content = ''
def start_tree(self, tree, filename):
"""Some fixers need to maintain tree-wide state.
This method is called once, at the start of tree fix-up.
tree - the root node of the tree to be processed.
filename - the name of the file the tree came from.
"""
self.used_names = tree.used_names
self.set_filename(filename)
self.numbers = itertools.count(1)
self.first_log = True
def nlargest(n, iterable, key=None):
"""Find the n largest elements in a dataset.
Equivalent to: sorted(iterable, key=key, reverse=True)[:n]
"""
# Short-cut for n==1 is to use max() when len(iterable)>0
if n == 1:
it = iter(iterable)
head = list(islice(it, 1))
if not head:
return []
if key is None:
return [max(chain(head, it))]
return [max(chain(head, it), key=key)]
# When n>=size, it's faster to use sorted()
try:
size = len(iterable)
except (TypeError, AttributeError):
pass
else:
if n >= size:
return sorted(iterable, key=key, reverse=True)[:n]
# When key is none, use simpler decoration
if key is None:
it = izip(iterable, count(0,-1)) # decorate
result = _nlargest(n, it)
return map(itemgetter(0), result) # undecorate
# General case, slowest method
in1, in2 = tee(iterable)
it = izip(imap(key, in1), count(0,-1), in2) # decorate
result = _nlargest(n, it)
return map(itemgetter(2), result) # undecorate