def gl_init(self):
self.gl_vertex_shader_factory = functools.lru_cache(maxsize=None)(functools.partial(gl.Shader,GL_VERTEX_SHADER))
self.gl_fragment_shader_factory = functools.lru_cache(maxsize=None)(functools.partial(gl.Shader,GL_FRAGMENT_SHADER))
self.gl_program_factory = functools.lru_cache(maxsize=None)(GLProgram)
self.gl_texture_factory = functools.lru_cache(maxsize=None)(gx.texture.GLTexture)
array_table = {gx.VA_PTNMTXIDX:GLMatrixIndexArray()}
array_table.update((attribute,array.gl_convert()) for attribute,array in self.array_table.items())
for shape in self.shapes:
shape.gl_init(array_table)
for material in self.materials:
material.gl_init()
for texture in self.textures:
texture.gl_init(self.gl_texture_factory)
self.gl_joints = [copy.copy(joint) for joint in self.joints]
self.gl_joint_matrices = numpy.empty((len(self.joints),3,4),numpy.float32)
self.gl_matrix_table = gl.TextureBuffer(GL_DYNAMIC_DRAW,GL_RGBA32F,(len(self.matrix_descriptors),3,4),numpy.float32)
self.gl_update_matrix_table()
self.gl_draw_objects = list(self.gl_generate_draw_objects(self.scene_graph))
self.gl_draw_objects.sort(key=lambda draw_object: draw_object.material.unknown0)
python类lru_cache()的实例源码
def indexed_cache(func):
func = functools.lru_cache()(func)
@functools.wraps(func)
@utils.catch(IndexError, return_value=lex.generics.index_error)
def inner(inp, *, index, **kwargs):
results = func(**kwargs)
if isinstance(results, list):
tools.save_results(inp, range(len(results)), results.__getitem__)
return results[index - 1 if index else 0]
else:
return results
return inner
###############################################################################
def reuse(func=None, *, cache=lru_cache()):
"""Cache and reuse a generator function across multiple calls."""
# Allow this decorator to work with or without being called
if func is None:
return partial(reuse, cache=cache)
# Either initialize an empty history and start a new generator, or
# retrieve an existing history and the already-started generator
# that produced it
@cache
def resume(*args, **kwargs):
return [], func(*args, **kwargs)
@wraps(func)
def reuser(*args, **kwargs):
history, gen = resume(*args, **kwargs)
yield from history
record = history.append # Avoid inner-loop name lookup
for x in gen:
record(x)
yield x
return reuser
def __init__(self,
bot_user,
osu_client,
model_cache_dir,
model_cache_size,
token_secret,
upload_url):
super().__init__({bot_user})
self.bot_user = bot_user
self.osu_client = osu_client
self.model_cache_dir = pathlib.Path(model_cache_dir)
self.token_secret = Fernet(token_secret)
self.upload_url = upload_url
self.get_model = lru_cache(model_cache_size)(self._get_model)
self._user_stats = ExpiringCache()
self._candidates = LockedIterator(self._gen_candidates())
def stop_containers(self, instances):
"""
Stops all the specified containers in parallel, still respecting links
"""
current_formation = self.introspector.introspect()
# Inner function that we can pass to dependency_sort
@functools.lru_cache(maxsize=512)
def get_incoming_links(instance):
result = set()
for potential_linker in current_formation:
links_to = potential_linker.links.values()
if instance in links_to:
result.add(potential_linker)
return result
# Resolve container list to include descendency
instances = dependency_sort(instances, get_incoming_links)
# Parallel-stop things
self.parallel_execute(
instances,
lambda instance, done: all((linker in done) for linker in get_incoming_links(instance)),
executor=self.stop_container,
)
def test_need_for_rlock(self):
# This will deadlock on an LRU cache that uses a regular lock
@functools.lru_cache(maxsize=10)
def test_func(x):
'Used to demonstrate a reentrant lru_cache call within a single thread'
return x
class DoubleEq:
'Demonstrate a reentrant lru_cache call within a single thread'
def __init__(self, x):
self.x = x
def __hash__(self):
return self.x
def __eq__(self, other):
if self.x == 2:
test_func(DoubleEq(1))
return self.x == other.x
test_func(DoubleEq(1)) # Load the cache
test_func(DoubleEq(2)) # Load the cache
self.assertEqual(test_func(DoubleEq(2)), # Trigger a re-entrant __eq__ call
DoubleEq(2)) # Verify the correct return value
def __init__(self,
path,
*,
cache=DEFAULT_CACHE_SIZE,
download_url=DEFAULT_DOWNLOAD_URL):
self.path = path = pathlib.Path(path)
self._read_beatmap = lru_cache(cache)(self._raw_read_beatmap)
self._db = db = sqlite3.connect(str(path / '.slider.db'))
with db:
db.execute(
"""\
CREATE TABLE IF NOT EXISTS beatmaps (
md5 BLOB PRIMARY KEY,
id INT,
path TEXT UNIQUE NOT NULL
)
""",
)
self._download_url = download_url
def locking_lru_cache(maxsize=128, typed=False): # can't implement ignored_keywords because we use python's lru_cache...
"An lru cache with a lock, to prevent concurrent invocations and allow reusing from cache"
def deco(func):
caching_func = lru_cache(maxsize, typed)(func)
func._main_lock = RLock()
func._keyed_locks = defaultdict(RLock)
@wraps(func)
def inner(*args, **kwargs):
key = _make_key(args, kwargs, typed=typed)
with func._main_lock:
key_lock = func._keyed_locks[key]
with key_lock:
return caching_func(*args, **kwargs)
@wraps(caching_func.cache_clear)
def clear():
with func._main_lock:
return caching_func.cache_clear()
inner.cache_clear = clear
return inner
return deco
def testSetRowFactorySize(self):
try:
from functools import lru_cache
except ImportError: # Python < 3.2
lru_cache = None
queries = ['select 1 as a, 2 as b, 3 as c', 'select 123 as abc']
query = self.c.query
for maxsize in (None, 0, 1, 2, 3, 10, 1024):
pg.set_row_factory_size(maxsize)
for i in range(3):
for q in queries:
r = query(q).namedresult()[0]
if q.endswith('abc'):
self.assertEqual(r, (123,))
self.assertEqual(r._fields, ('abc',))
else:
self.assertEqual(r, (1, 2, 3))
self.assertEqual(r._fields, ('a', 'b', 'c'))
if lru_cache:
info = pg._row_factory.cache_info()
self.assertEqual(info.maxsize, maxsize)
self.assertEqual(info.hits + info.misses, 6)
self.assertEqual(info.hits,
0 if maxsize is not None and maxsize < 2 else 4)
def get_block(self, block_list):
block_ = QA_fetch_stock_block_adv()
_data = []
try:
for item in block_list:
_data.extend(block_.get_block(item).code)
return np.unique(_data).tolist()
except Exception as e:
raise e
#@lru_cache()
def QA_backtest_sell_available(self, __code):
try:
return self.account.sell_available[__code]
except:
return 0
# @lru_cache()
def QA_backtest_get_block(self, block_list):
block_ = QA_fetch_stock_block_adv()
_data = []
try:
for item in block_list:
_data.extend(block_.get_block(item).code)
return np.unique(_data).tolist()
except Exception as e:
raise e
#@lru_cache()
def QA_backtest_sell_available(self, __code):
try:
return self.account.sell_available[__code]
except:
return 0
# @lru_cache()
def clear_path_caches():
"""Clear the caches of all path-related methods in this module that use an lru_cache."""
create_environment.cache_clear()
which.cache_clear()
find_python.cache_clear()
get_python_paths.cache_clear()
find_executable.cache_clear()
def _get_indexword(model):
@functools.lru_cache(maxsize=50)
def indexword(word):
try:
return model.voc.index(word)
except ValueError:
return None
return indexword
def lru_cache(*args, **kwargs):
def decorator(func):
return func
return decorator
def callable(obj):
return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)
# --- stdlib additions
# py 3.2 functools.lru_cache
# Taken from: http://code.activestate.com/recipes/578078
# Credit: Raymond Hettinger
def callable(obj):
return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)
# --- stdlib additions
# py 3.2 functools.lru_cache
# Taken from: http://code.activestate.com/recipes/578078
# Credit: Raymond Hettinger
def __init__(self, codes, separator='@@'):
self.encode = functools.lru_cache(maxsize=65536)(self.encode)
self.bpe_codes = [tuple(item.split()) for item in codes]
# some hacking to deal with duplicates (only consider first instance)
self.bpe_codes = dict([(code,i) for (i,code) in reversed(list(enumerate(self.bpe_codes)))])
self.separator = separator
def lru_cache(maxsize=128, key_fn=None):
"""Decorator that adds an LRU cache of size maxsize to the decorated function.
maxsize is the number of different keys cache can accomodate.
key_fn is the function that builds key from args. The default key function
creates a tuple out of args and kwargs. If you use the default, there is no reason
not to use functools.lru_cache directly.
Possible use cases:
- Your cache key is very large, so you don't want to keep the whole key in memory.
- The function takes some arguments that don't affect the result.
"""
def decorator(fn):
cache = LRUCache(maxsize)
argspec = inspect2.getfullargspec(fn)
arg_names = argspec.args[1:] + argspec.kwonlyargs # remove self
kwargs_defaults = get_kwargs_defaults(argspec)
cache_key = key_fn
if cache_key is None:
def cache_key(args, kwargs):
return get_args_tuple(args, kwargs, arg_names, kwargs_defaults)
@functools.wraps(fn)
def wrapper(*args, **kwargs):
key = cache_key(args, kwargs)
try:
return cache[key]
except KeyError:
value = fn(*args, **kwargs)
cache[key] = value
return value
return wrapper
return decorator
def isinstance(val, types):
if types is int:
types = (int, long)
elif type(types) is tuple and int in types:
types += (long,)
return _builtin_isinstance(val, types)
# functools.lru_cache is Python 3.2+ only.
# /@functools.lru_cache()/d
# int().to_bytes is Python 3.2+ only.
# s/\(\w+\)\.to_bytes(/_int_to_bytes(\1, /
def callable(obj):
return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)
# --- stdlib additions
# py 3.2 functools.lru_cache
# Taken from: http://code.activestate.com/recipes/578078
# Credit: Raymond Hettinger
def callable(obj):
return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)
# --- stdlib additions
# py 3.2 functools.lru_cache
# Taken from: http://code.activestate.com/recipes/578078
# Credit: Raymond Hettinger
def test_lru_with_maxsize_none(self):
@functools.lru_cache(maxsize=None)
def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2)
self.assertEqual([fib(n) for n in range(16)],
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610])
self.assertEqual(fib.cache_info(),
functools._CacheInfo(hits=28, misses=16, maxsize=None, currsize=16))
fib.cache_clear()
self.assertEqual(fib.cache_info(),
functools._CacheInfo(hits=0, misses=0, maxsize=None, currsize=0))
def lru_cache(maxsize=128, typed=False):
"""Decorator to wrap a function with a memoizing callable that saves
up to `maxsize` results based on a Least Recently Used (LRU)
algorithm.
"""
return _cache(LRUCache(maxsize), typed)
def callable(obj):
return any("__call__" in klass.__dict__ for klass in type(obj).__mro__)
# --- stdlib additions
# py 3.2 functools.lru_cache
# Taken from: http://code.activestate.com/recipes/578078
# Credit: Raymond Hettinger
def simple_cache(func):
"""
Save results for the :meth:'path.using_module' classmethod.
When Python 3.2 is available, use functools.lru_cache instead.
"""
saved_results = {}
def wrapper(cls, module):
if module in saved_results:
return saved_results[module]
saved_results[module] = func(cls, module)
return saved_results[module]
return wrapper
def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
"Return true if numbers a and b are close to each other."
return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
# ______________________________________________________________________________
# Misc Functions
# TODO: Use functools.lru_cache memoization decorator
def isclose(a, b, rel_tol=1e-09, abs_tol=0.0):
"Return true if numbers a and b are close to each other."
return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
# ______________________________________________________________________________
# Misc Functions
# TODO: Use functools.lru_cache memoization decorator
def subsequence_kernel_primed_lru_wrapped(s, t):
@lru_cache(maxsize=None)
def subsequence_kernel_primed(s_counter, jtot, l, i): # where (i = 1, … , n-1)
"""
In order to deal with non-contiguous substrings, it is necessary to
introduce a decay factor ? ? (0, 1) that can be used to weight the presence of a certain feature in a text
:param s: string 1
:param t: string 2
:param l: lambda represents the weight?
:param i: length of subsequence
:return:
"""
if i == 0:
return 1
elif min(s_counter, jtot) < i: #
return 0
else:
s_counter_minus_one = s_counter - 1
x = s[s_counter_minus_one] # last character. sx means the hole string, when they write only s they mean exclude last char
the_sum = 0
i_minus_one = i -1
for j in range(jtot):
if x == t[j]:
the_sum += subsequence_kernel_primed(s_counter_minus_one, j, l, i_minus_one) * l ** (jtot - j + 2)
res = l * subsequence_kernel_primed(s_counter_minus_one, jtot, l, i) + the_sum
return res
return subsequence_kernel_primed