def job_root(instance, filename=''):
"""
Return the path of `filename` stored at the root folder of his job `instance`.
Parameters
----------
instance : AJob
The model instance associated
filename : str
Original filename
Returns
--------
str
Path to filename which is unique for a job
"""
return os.path.join(root_job(instance), filename)
python类unique()的实例源码
def job_data(instance, filename=''):
"""
Return the path of `filename` stored in a subfolder of the root folder of his job `instance`.
Parameters
----------
instance : AJob or ADataFile
The model instance associated
filename : str
Original filename
Returns
--------
str
Path to filename which is unique for a job
"""
head = root_job(instance.job) if isinstance(instance, ADataFile) else root_job(instance)
tail = _user_path(instance.upload_to_data, filename) or os.path.join('data', filename)
return os.path.join(head, tail)
def job_results(instance, filename=''):
"""
Return the path of `filename` stored in a subfolder of the root folder of his job `instance`.
Parameters
----------
instance : AJob
The model instance associated
filename : str
Original filename
Returns
--------
str
Path to filename which is unique for a job
"""
tail = _user_path(instance.upload_to_results, filename) or os.path.join('results', filename)
return os.path.join(root_job(instance), tail)
def ensure_index(cls):
collection = cls.collection()
collection.create_index(
[
("execution_id", generic.SORT_ASC),
("task_type", generic.SORT_ASC)
],
name="index_execution_id",
unique=True
)
collection.create_index(
[
("time.created", generic.SORT_ASC),
("time.started", generic.SORT_ASC),
("time.completed", generic.SORT_ASC),
("time.cancelled", generic.SORT_ASC),
("time.failed", generic.SORT_ASC)
],
name="index_time"
)
collection.create_index(
TTL_FIELDNAME,
expireAfterSeconds=0,
name="index_task_ttl"
)
git-nautilus-icons.py 文件源码
项目:git_nautilus_icons
作者: chrisjbillington
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def get_statuses_by_dir(path, file_statuses):
"""Sort the file statuses into which directory at the current level they
are under. Only keep unique statuses, and return a dictionary of sets for
each directory rooted at the given path."""
statuses_by_dir = defaultdict(set)
prefix = path + '/'
len_prefix = len(prefix)
for name, status in file_statuses.items():
if not name.startswith(prefix):
continue
dirname = name[:name.find('/', len_prefix)]
statuses_by_dir[dirname].add(status)
return statuses_by_dir
# import bprofile
# @bprofile.BProfile('test.png')
def func_11():
Months = Enum('Moths', ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul',
'Aug', 'Sep', 'Oct', 'Nov', 'Dec')
)
for name, member in Months.__members__.items():
print(name, '=>', member, ',', member.value)
# value??????????int??????1??
# ?????????????????Enum?????????
# unique??????????????????
def create_linter(
cls: t.Type['AssignmentLinter'],
assignment_id: int,
name: str,
config: str,
) -> 'AssignmentLinter':
"""Create a new instance of this class for a given :class:`Assignment`
with a given :py:class:`.linters.Linter`
:param assignment_id: The id of the assignment
:param name: Name of the linter
:returns: The created AssignmentLinter
"""
id = str(uuid.uuid4())
# Find a unique id.
while db.session.query(
AssignmentLinter.query.filter(cls.id == id).exists()
).scalar(): # pragma: no cover
id = str(uuid.uuid4())
self = cls(id=id, assignment_id=assignment_id, name=name)
self.config = config
self.tests = []
for work in Assignment.query.get(assignment_id
).get_all_latest_submissions():
self.tests.append(LinterInstance(work, self))
return self
def __init__(self, work: Work, tester: AssignmentLinter) -> None:
# Find a unique id
id = str(uuid.uuid4())
while db.session.query(
LinterInstance.query.filter(LinterInstance.id == id).exists()
).scalar(): # pragma: no cover
id = str(uuid.uuid4())
self.id = id
self.work = work
self.tester = tester
def __init__(self, key, delta):
"""Initialize Digest Frequency.
:param key: unique identifier, used in database
:param delta: timedelta covered by the frequency, or None (e.g., 7 days)
"""
self.key = key
self.delta = delta
def __init__(self, name, type_, class_):
self.key = name.lower()
self.name = name
self.type = type_
self.class_ = class_ & _CLASS_MASK
self.unique = (class_ & _CLASS_UNIQUE) != 0
def to_string(self, hdr, other):
"""String representation with additional information"""
result = "%s[%s,%s" % (hdr, self.get_type(self.type),
self.get_class_(self.class_))
if self.unique:
result += "-unique,"
else:
result += ","
result += self.name
if other is not None:
result += ",%s]" % other
else:
result += "]"
return result
def write_record(self, record, now):
"""Writes a record (answer, authoritative answer, additional) to
the packet"""
if self.state == self.State.finished:
return 1
start_data_length, start_size = len(self.data), self.size
self.write_name(record.name)
self.write_short(record.type)
if record.unique and self.multicast:
self.write_short(record.class_ | _CLASS_UNIQUE)
else:
self.write_short(record.class_)
if now == 0:
self.write_int(record.ttl)
else:
self.write_int(record.get_remaining_ttl(now))
index = len(self.data)
# Adjust size for the short we will write before this record
self.size += 2
record.write(self)
self.size -= 2
length = sum((len(d) for d in self.data[index:]))
# Here is the short we adjusted for
self.insert_short(index, length)
# if we go over, then rollback and quit
if self.size > _MAX_MSG_ABSOLUTE:
while len(self.data) > start_data_length:
self.data.pop()
self.size = start_size
self.state = self.State.finished
return 1
return 0
def register_service(self, info, ttl=_DNS_TTL, allow_name_change=False):
"""Registers service information to the network with a default TTL
of 60 seconds. Zeroconf will then respond to requests for
information for that service. The name of the service may be
changed if needed to make it unique on the network."""
self.check_service(info, allow_name_change)
self.services[info.name.lower()] = info
if info.type in self.servicetypes:
self.servicetypes[info.type] += 1
else:
self.servicetypes[info.type] = 1
now = current_time_millis()
next_time = now
i = 0
while i < 3:
if now < next_time:
self.wait(next_time - now)
now = current_time_millis()
continue
out = DNSOutgoing(_FLAGS_QR_RESPONSE | _FLAGS_AA)
out.add_answer_at_time(
DNSPointer(info.type, _TYPE_PTR, _CLASS_IN, ttl, info.name), 0)
out.add_answer_at_time(
DNSService(info.name, _TYPE_SRV, _CLASS_IN,
ttl, info.priority, info.weight, info.port,
info.server), 0)
out.add_answer_at_time(
DNSText(info.name, _TYPE_TXT, _CLASS_IN, ttl, info.text), 0)
if info.address:
out.add_answer_at_time(
DNSAddress(info.server, _TYPE_A, _CLASS_IN,
ttl, info.address), 0)
self.send(out)
i += 1
next_time += _REGISTER_TIME
def const_key(obj):
# Python implmentation of the C function _PyCode_ConstantKey()
# of Python 3.6
obj_type = type(obj)
# Note: check obj_type == test_type rather than isinstance(obj, test_type)
# to not merge instance of subtypes
if (obj is None
or obj is Ellipsis
or obj_type in {int, bool, bytes, str, types.CodeType}):
return (obj_type, obj)
if obj_type == float:
# all we need is to make the tuple different in either the 0.0
# or -0.0 case from all others, just to avoid the "coercion".
if obj == 0.0 and math.copysign(1.0, obj) < 0:
return (obj_type, obj, None)
else:
return (obj_type, obj)
if obj_type == complex:
# For the complex case we must make complex(x, 0.)
# different from complex(x, -0.) and complex(0., y)
# different from complex(-0., y), for any x and y.
# All four complex zeros must be distinguished.
real_negzero = (obj.real == 0.0 and math.copysign(1.0, obj.real) < 0.0)
imag_negzero = (obj.imag == 0.0 and math.copysign(1.0, obj.imag) < 0.0)
# use True, False and None singleton as tags for the real and imag
# sign, to make tuples different
if real_negzero and imag_negzero:
return (obj_type, obj, True)
elif imag_negzero:
return (obj_type, obj, False)
elif real_negzero:
return (obj_type, obj, None)
else:
return (obj_type, obj)
if type(obj) == tuple:
key = tuple(const_key(item) for item in obj)
return (obj_type, obj, key)
if type(obj) == frozenset:
key = frozenset(const_key(item) for item in obj)
return (obj_type, obj, key)
# for other types, use the object identifier as an unique identifier
# to ensure that they are seen as unequal.
return (obj_type, obj, id(obj))
def check_service(self, info, allow_name_change):
"""Checks the network for a unique service name, modifying the
ServiceInfo passed in if it is not unique."""
# This is kind of funky because of the subtype based tests
# need to make subtypes a first class citizen
service_name = service_type_name(info.name)
if not info.type.endswith(service_name):
raise BadTypeInNameException
instance_name = info.name[:-len(service_name) - 1]
next_instance_number = 2
now = current_time_millis()
next_time = now
i = 0
while i < 3:
# check for a name conflict
while self.cache.current_entry_with_name_and_alias(
info.type, info.name):
if not allow_name_change:
raise NonUniqueNameException
# change the name and look for a conflict
info.name = '%s-%s.%s' % (
instance_name, next_instance_number, info.type)
next_instance_number += 1
service_type_name(info.name)
next_time = now
i = 0
if now < next_time:
self.wait(next_time - now)
now = current_time_millis()
continue
out = DNSOutgoing(_FLAGS_QR_QUERY | _FLAGS_AA)
self.debug = out
out.add_question(DNSQuestion(info.type, _TYPE_PTR, _CLASS_IN))
out.add_authorative_answer(DNSPointer(
info.type, _TYPE_PTR, _CLASS_IN, _DNS_TTL, info.name))
self.send(out)
i += 1
next_time += _CHECK_TIME