def __init__(self, name, priority, demand, affinity,
affinity_limits=None,
data_retention_timeout=0,
lease=0,
identity_group=None,
identity=None,
schedule_once=False):
self.global_order = _global_order()
self.allocation = None
self.server = None
self.name = name
self.affinity = Affinity(affinity, affinity_limits)
self.priority = priority
self.demand = np.array(demand, dtype=float)
self.data_retention_timeout = data_retention_timeout
self.lease = lease
self.identity_group = identity_group
self.identity = identity
self.identity_group_ref = None
self.schedule_once = schedule_once
self.evicted = False
self.placement_expiry = None
self.renew = False
# FIXME: What dictates order? heapq.merge in utilization_queue needs this
# comparison.
python类merge()的实例源码
def utilization_queue(self, free_capacity, visitor=None):
"""Returns utilization queue including the sub-allocs.
All app queues from self and sub-allocs are merged in standard order,
and then utilization is recalculated based on total reserved capacity
of this alloc and sub-allocs combined.
The function maintains invariant that any app (self or inside sub-alloc
with utilization < 1 will remain with utilzation < 1.
"""
total_reserved = self.total_reserved()
queues = [alloc.utilization_queue(free_capacity, visitor)
for alloc in self.sub_allocations.values()]
queues.append(self.priv_utilization_queue())
acc_demand = zero_capacity()
available = total_reserved + free_capacity + np.finfo(float).eps
# FIXME: heapq.merge has an overhead of comparison
for item in heapq.merge(*queues):
rank, _util, pending, order, app = item
acc_demand = acc_demand + app.demand
util = utilization(acc_demand, total_reserved, available)
if app.priority == 0:
util = _MAX_UTILIZATION
# - lower rank allocations take precedence.
# - for same rank, utilization takes precedence
# - False < True, so for apps with same utilization we prefer
# those that already running (False == not pending)
# - Global order
entry = (rank, util, pending, order, app)
if visitor:
visitor(self, entry)
yield entry
def hamming_numbers():
last = 1
yield last
a,b,c = tee(hamming_numbers(), 3)
for n in merge((2*i for i in a), (3*i for i in b), (5*i for i in c)):
if n != last:
yield n
last = n
def batch_sort(input_iterator, output_path, buffer_size=32000, output_class=None):
"""batch sort helper with temporary files, supports sorting large stuff"""
if not output_class:
output_class = input_iterator.__class__
chunks = []
try:
while True:
current_chunk = list(islice(input_iterator, buffer_size))
if not current_chunk:
break
current_chunk.sort()
fd, filepath = tempfile.mkstemp()
os.close(fd)
output_chunk = output_class(filepath)
chunks.append(output_chunk)
for elem in current_chunk:
output_chunk.write(elem.obj)
output_chunk.close()
output_file = output_class(output_path)
for elem in heapq.merge(*chunks):
output_file.write(elem.obj)
else:
output_file.write()
output_file.close()
finally:
for chunk in chunks:
try:
chunk.close()
os.remove(chunk.name)
except Exception:
pass
# magic
def imerge(iterables):
return merge(*iterables)
def sort(items, maxsize=100000, tempdir=None, maxfiles=128):
"""Sorts the given items using an external merge sort.
:param tempdir: the path of a directory to use for temporary file
storage. The default is to use the system's temp directory.
:param maxsize: the maximum number of items to keep in memory at once.
:param maxfiles: maximum number of files to open at once.
"""
p = SortingPool(maxsize=maxsize, tempdir=tempdir)
for item in items:
p.add(item)
return p.items(maxfiles=maxfiles)
313_super_ugly_number.py 文件源码
项目:Machine_Learning_Playground
作者: yao23
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def nthSuperUglyNumber(self, n, primes):
"""
:type n: int
:type primes: List[int]
:rtype: int
"""
uglies = [1]
merged = heapq.merge(*map(lambda p: (u*p for u in uglies), primes))
uniqed = (u for u, _ in itertools.groupby(merged))
map(uglies.append, itertools.islice(uniqed, n-1))
return uglies[-1]
# beats 85.23%
def getNewsFeed(self, userId):
"""
Retrieve the 10 most recent tweet ids in the user's news feed. Each item in the news feed must be posted by users who the user followed or by the user herself. Tweets must be ordered from most recent to least recent.
:type userId: int
:rtype: List[int]
"""
tweets = heapq.merge(*(self.tweets[user]
for user in self.followees[userId] | {userId}))
return [t for _, t in itertools.islice(tweets, 10)]
def merge_sort(unsorted, start=None, end=None):
if start is None:
start = 0
end = len(unsorted)
midpoint = (start + end) // 2
if end - start == 1:
yield unsorted[start]
elif end - start > 1:
yield from heapq.merge(
merge_sort(unsorted, start, midpoint),
merge_sort(unsorted, midpoint, end))
def imerge(iterables):
return merge(*iterables)
def dump_app(app_args, path, begin, now, container_image_pattern = ""):
# the app_args argument is supplied in the format
# <app name>:<additional influxdb table 1>:<additional influxdb table 2>:...
app_args = app_args.split(":")
# get the tag (keys and values) and fields from the docker measurements and
# any additional tables
app = dump_column_names(app_args)
# build queries
queries = []
# always extract docker metrics (here referred to as 'system metrics')
for system in SYSTEM_METRICS:
pattern = CONTAINER_IMAGE_PATTERNS[app.name].format(container_image_pattern)
q = """select * from "docker_container_{}" where
container_name =~ /{}/
and container_image =~ /{}/
and time > '%s' and time < '%s'
""".format(system, app.name, pattern)
queries.append(scroll(q, begin, now))
if len(app_args) > 1:
for app_arg in app_args[1:]:
q = "select * from \"{}\" where time > '%s' and time < '%s'".format(app_arg)
print(q)
queries.append(scroll(q, begin, now, prefix = app.name))
path = os.path.join(path, app.filename)
with gzip.open(path, "wb") as f:
columns = app.fields + app.tags + ["time"]
writer = csv.DictWriter(f, fieldnames=columns, dialect=csv.excel_tab, extrasaction='ignore')
writer.writeheader()
for _, row in heapq.merge(*queries):
writer.writerow(row)
return app
# in general, all apps have docker metrics for them, se we retrieve the
# metrics stored in 'docker_container_*' tables by default. we save these
# metrics under an app name equal to the first string in a sequence of strings
# separated by a ':'. app-specific metrics for such app names are gathered from
# the tables specified in subsequent strings.
def run_executables(execalls, cache=None, ccmode=CC_PROCESSES):
# type: (List[ExeCall], Optional[Cache], str) -> Iterator[ExeResult]
"""Run executables in parallel.
Some of the results for the execalls may be found in the cache
so we put these aside in cachedresults.
Each result is yield as soon as available.
"""
def c2exeresult(value):
# type: (bytes) -> ExeResult
returncode, stdout, stderr = unpack_exeresult(value)
return make_exeresult(returncode, stdout, stderr)
def exeresult2c(exeresult):
# type: (ExeResult) -> bytes
return pack_exeresult(exeresult.returncode, exeresult.stdout, exeresult.stderr)
# Package the execalls for eventuall multiprocessing
args_lists = [((ec.exe, ec.cmdargs), {'stdindata': ec.stdindata})
for ec in execalls] # type: List[CallArgs]
cachedresults = []
jobs = [] # type: List[CallArgs]
keys = [] # type: List[str]
jobindices = [] # type: List[int]
if cache is not None:
qkeys = [execall_hash(ec, cache) for ec in execalls]
qresults = cache.mget(qkeys)
for idx, (arg, key, cvalue) in enumerate(izip(args_lists, qkeys, qresults)):
if cvalue is not None:
cachedresults.append((idx, c2exeresult(cvalue)))
else:
keys.append(key)
jobs.append(arg)
jobindices.append(idx)
else:
jobs = args_lists
jobindices = list(range(len(jobs)))
jobiter = iter_parallel(call_popen, jobs, ccmode=ccmode)
def jobresultiter():
# type: () -> Iterator[Tuple[int, ExeResult]]
for idx, (jobidx, job, result) in enumerate(izip(jobindices, jobs, jobiter)):
if cache is not None:
cache.set(keys[idx], exeresult2c(result))
(executable, cmdargs), kwargs = job
log_popen(executable, cmdargs, kwargs['stdindata'], result)
yield jobidx, result
for idx, result in heapq.merge(iter(cachedresults), jobresultiter()):
yield result
def _sow(self, watch, pattern, since, handler, impl):
"""Publish state of the world."""
if since is None:
since = 0
def _publish(item):
when, path, content = item
try:
payload = impl.on_event(str(path), None, content)
if payload is not None:
payload['when'] = when
handler.write_message(payload)
except Exception as err: # pylint: disable=W0703
handler.send_error_msg(str(err))
db_connections = []
fs_records = self._get_fs_sow(watch, pattern, since)
sow = getattr(impl, 'sow', None)
sow_table = getattr(impl, 'sow_table', 'sow')
try:
records = []
if sow:
dbs = sorted(glob.glob(os.path.join(self.root, sow, '*')))
for db in dbs:
if os.path.basename(db).startswith('.'):
continue
conn, db_cursor = self._db_records(
db, sow_table, watch, pattern, since
)
records.append(db_cursor)
# FIXME: Figure out pylint use before assign
db_connections.append(conn) # pylint: disable=E0601
records.append(fs_records)
# Merge db and fs records, removing duplicates.
prev_path = None
for item in heapq.merge(*records):
_when, path, _content = item
if path == prev_path:
continue
prev_path = path
_publish(item)
finally:
for conn in db_connections:
if conn:
conn.close()