def load_build(self):
stack = self.stack
state = stack.pop()
inst = stack[-1]
setstate = getattr(inst, "__setstate__", None)
if setstate:
setstate(state)
return
slotstate = None
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
if state:
inst_dict = inst.__dict__
intern = sys.intern
for k, v in state.items():
if type(k) is str:
inst_dict[intern(k)] = v
else:
inst_dict[k] = v
if slotstate:
for k, v in slotstate.items():
setattr(inst, k, v)
python类intern()的实例源码
def test_intern(self):
global numruns
numruns += 1
self.assertRaises(TypeError, sys.intern)
s = "never interned before" + str(numruns)
self.assertTrue(sys.intern(s) is s)
s2 = s.swapcase().swapcase()
self.assertTrue(sys.intern(s2) is s)
# Subclasses of string can't be interned, because they
# provide too much opportunity for insane things to happen.
# We don't want them in the interned dict and if they aren't
# actually interned, we don't want to create the appearance
# that they are by allowing intern() to succeed.
class S(str):
def __hash__(self):
return 123
self.assertRaises(TypeError, sys.intern, S("abc"))
def doUnsubscribe(securities):
try:
_, sessionRestarted = openBloombergService(app.sessionForSubscriptions, "//blp/mktdata")
if sessionRestarted:
app.allSubscriptions = {}
subscriptionList = blpapi.SubscriptionList()
for security in securities:
correlationId = blpapi.CorrelationId(sys.intern(security))
if security in app.allSubscriptions:
del app.allSubscriptions[security]
subscriptionList.add(security, correlationId=correlationId)
app.sessionForSubscriptions.unsubscribe(subscriptionList)
except Exception as e:
handleBrokenSession(app, e)
traceback.print_exc()
return respond500(e)
response = Response(
json.dumps({ "message": "OK"}).encode(),
status=202,
mimetype='application/json')
response.headers['Access-Control-Allow-Origin'] = allowCORS(request.headers.get('Origin'))
return response
def silent_intern(x):
"""
Perform sys.intern() on the passed argument and return the result.
If the input is ineligible (e.g. a unicode string) the original argument is
returned and no exception is thrown.
"""
try:
return sys.intern(x)
except TypeError:
return x
# From Dinu C. Gherman,
# Python Cookbook, second edition, recipe 6.17, p. 277.
# Also:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/68205
# ASPN: Python Cookbook: Null Object Design Pattern
#TODO??? class Null(object):
def test_intern(self):
global numruns
numruns += 1
self.assertRaises(TypeError, sys.intern)
s = "never interned before" + str(numruns)
self.assertTrue(sys.intern(s) is s)
s2 = s.swapcase().swapcase()
self.assertTrue(sys.intern(s2) is s)
# Subclasses of string can't be interned, because they
# provide too much opportunity for insane things to happen.
# We don't want them in the interned dict and if they aren't
# actually interned, we don't want to create the appearance
# that they are by allowing intern() to succeed.
class S(str):
def __hash__(self):
return 123
self.assertRaises(TypeError, sys.intern, S("abc"))
def silent_intern(x):
"""
Perform sys.intern() on the passed argument and return the result.
If the input is ineligible (e.g. a unicode string) the original argument is
returned and no exception is thrown.
"""
try:
return sys.intern(x)
except TypeError:
return x
# From Dinu C. Gherman,
# Python Cookbook, second edition, recipe 6.17, p. 277.
# Also:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/68205
# ASPN: Python Cookbook: Null Object Design Pattern
#TODO??? class Null(object):
def load_build(self):
stack = self.stack
state = stack.pop()
inst = stack[-1]
setstate = getattr(inst, "__setstate__", None)
if setstate is not None:
setstate(state)
return
slotstate = None
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
if state:
inst_dict = inst.__dict__
intern = sys.intern
for k, v in state.items():
if type(k) is str:
inst_dict[intern(k)] = v
else:
inst_dict[k] = v
if slotstate:
for k, v in slotstate.items():
setattr(inst, k, v)
def test_intern(self):
global numruns
numruns += 1
self.assertRaises(TypeError, sys.intern)
s = "never interned before" + str(numruns)
self.assertTrue(sys.intern(s) is s)
s2 = s.swapcase().swapcase()
self.assertTrue(sys.intern(s2) is s)
# Subclasses of string can't be interned, because they
# provide too much opportunity for insane things to happen.
# We don't want them in the interned dict and if they aren't
# actually interned, we don't want to create the appearance
# that they are by allowing intern() to succeed.
class S(str):
def __hash__(self):
return 123
self.assertRaises(TypeError, sys.intern, S("abc"))
def load_build(self):
stack = self.stack
state = stack.pop()
inst = stack[-1]
setstate = getattr(inst, "__setstate__", None)
if setstate is not None:
setstate(state)
return
slotstate = None
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
if state:
inst_dict = inst.__dict__
intern = sys.intern
for k, v in state.items():
if type(k) is str:
inst_dict[intern(k)] = v
else:
inst_dict[k] = v
if slotstate:
for k, v in slotstate.items():
setattr(inst, k, v)
def test_intern(self):
global numruns
numruns += 1
self.assertRaises(TypeError, sys.intern)
s = "never interned before" + str(numruns)
self.assertTrue(sys.intern(s) is s)
s2 = s.swapcase().swapcase()
self.assertTrue(sys.intern(s2) is s)
# Subclasses of string can't be interned, because they
# provide too much opportunity for insane things to happen.
# We don't want them in the interned dict and if they aren't
# actually interned, we don't want to create the appearance
# that they are by allowing intern() to succeed.
class S(str):
def __hash__(self):
return 123
self.assertRaises(TypeError, sys.intern, S("abc"))
def load(_, fd):
''' Read from file object to enable reading from arbitrary data source and position.
returns global config
'''
title = None; section = dd() # this dict initializes any missing value with an empty list TODO make ordered default dict, but couldn't find any compatible solution
for line in xreadlines(fd):
line = line.strip() # just in case of incorrect formatting
if line.startswith('['): # new section detected: first store last section
if len(section) > 0: _.sections[title] = section # OLD: and title is not None
title = line[1:-1]; section = dd() # default-dictionary of (standard) type map
elif line != '':
try:
idx = line.index('=') # try to parse
key, value = line[:idx], line[idx+1:] # HINT: was .lower() and .rstrip(), but when reading/writing only via this script's API there's no need for that
if key in [TAG, FROM] and value != '':
section[intern(key)].append(intern(value)) # this dict allows several values per key
elif key in [IGNORE, SKIP] and key not in section:
section[intern(key)] = None # only keep key instead of default-appending empty string
elif title == "" and key in [SKIPD, IGNORED, GLOBAL] and value != '':
section[intern(key)].append(intern(value)) # global dir skip or ignore pattern, or global config setting
else: warn("Encountered illegal key <%s>. Skipping entry." % key)
except: warn("Key with no value for illegal key %s" % repr(line))
else: break # an empty line terminates file
if len(section) > 0: _.sections[title] = section # last store OLD: and title is not None
return { k.lower(): v if v.lower() not in ("true", "false") else v.lower().strip() == "true" for k, v in (wrapExc(lambda: kv.split("=")[:2], lambda: (kv, None)) for kv in _.sections.get("", {}).get(GLOBAL, [])) } # return global config map for convenience
def silent_intern(x):
"""
Perform sys.intern() on the passed argument and return the result.
If the input is ineligible (e.g. a unicode string) the original argument is
returned and no exception is thrown.
"""
try:
return sys.intern(x)
except TypeError:
return x
# From Dinu C. Gherman,
# Python Cookbook, second edition, recipe 6.17, p. 277.
# Also:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/68205
# ASPN: Python Cookbook: Null Object Design Pattern
#TODO??? class Null(object):
def post_message(user: User, text: str, timestamp: Optional[Timestamp]=None) -> None:
user = intern(user)
timestamp = timestamp or time()
post = Post(timestamp, user, text)
posts.appendleft(post)
user_posts[user].appendleft(post)
def follow(user: User, followed_user: User) -> None:
user, followed_user = intern(user), intern(followed_user)
following[user].add(followed_user)
followers[followed_user].add(user)
def set_user(user: User, displayname: str, email: str, password: str, bio: Optional[str]=None, photo: Optional[str]=None) -> None:
user = intern(user)
hashed_password = hash_password(password)
user_info[user] = UserInfo(displayname, email, hashed_password, bio, photo)
def __init__(self, opt, data_loader=None, cands=None, shared=None, **kwargs):
# self.data is a list of episodes
# each episode is a tuple of entries
# each entry is a tuple of values for the action/observation table
if shared:
self.image_loader = shared.get('image_loader', None)
self.data = shared.get('data', [])
self.cands = shared.get('cands', None)
else:
self.image_loader = ImageLoader(opt)
self.data = []
self._load(data_loader, opt['datafile'])
self.cands = None if cands == None else set(sys.intern(c) for c in cands)
self.addedCands = []
self.copied_cands = False
def __setitem__(self, key, value):
"""If key is in table, update it. Otherwise, extend the array to make
room. This uses additive resizing not multiplicative, since the number
of keys is not likely to change frequently during a run, so do not abuse
it.
Raises an error if you try to change the type of the value stored for
that key--if you need to do this, you must delete the key first.
"""
val_type = type(value)
if 'Tensor' in str(val_type):
self.tensors[key] = value
return
if val_type not in self.types:
raise TypeError('SharedTable does not support type ' + str(type(value)))
if val_type == str:
value = sys.intern(value)
if key in self.idx:
idx, typ = self.idx[key]
if typ != val_type:
raise TypeError(('Cannot change stored type for {key} from ' +
'{v1} to {v2}. You need to del the key first' +
' if you need to change value types.'
).format(key=key, v1=typ, v2=val_type))
self.arrays[typ][idx] = value
else:
raise KeyError('Cannot add more keys to the shared table as '
'they will not be synced across processes.')
def __init__(
self: 'Property',
name: Optional[str],
value: _Prop_Value,
):
"""Create a new property instance.
"""
if name is None:
self._folded_name = self.real_name = None # type: Optional[str]
else:
self.real_name = sys.intern(name) # type: Optional[str]
self._folded_name = sys.intern(name.casefold()) # type: Optional[str]
self.value = value # type: _Prop_Value
def name(self, new_name):
if new_name is None:
self._folded_name = self.real_name = None
else:
# Intern names to help reduce duplicates in memory.
self.real_name = sys.intern(new_name)
self._folded_name = sys.intern(new_name.casefold())
def parse_parts(self, parts):
if six.PY2:
parts = _py2_fsencode(parts)
parsed = []
sep = self.sep
altsep = self.altsep
drv = root = ''
it = reversed(parts)
for part in it:
if not part:
continue
if altsep:
part = part.replace(altsep, sep)
drv, root, rel = self.splitroot(part)
if sep in rel:
for x in reversed(rel.split(sep)):
if x and x != '.':
parsed.append(intern(x))
else:
if rel and rel != '.':
parsed.append(intern(rel))
if drv or root:
if not drv:
# If no drive is present, try to find one in the previous
# parts. This makes the result of parsing e.g.
# ("C:", "/", "a") reasonably intuitive.
for part in it:
if not part:
continue
if altsep:
part = part.replace(altsep, sep)
drv = self.splitroot(part)[0]
if drv:
break
break
if drv or root:
parsed.append(drv + root)
parsed.reverse()
return drv, root, parsed
def __init__(self, kind, kids=[]):
self.kind = intern(kind)
UserList.__init__(self, kids)
def parse_parts(self, parts):
if six.PY2:
parts = _py2_fsencode(parts)
parsed = []
sep = self.sep
altsep = self.altsep
drv = root = ''
it = reversed(parts)
for part in it:
if not part:
continue
if altsep:
part = part.replace(altsep, sep)
drv, root, rel = self.splitroot(part)
if sep in rel:
for x in reversed(rel.split(sep)):
if x and x != '.':
parsed.append(intern(x))
else:
if rel and rel != '.':
parsed.append(intern(rel))
if drv or root:
if not drv:
# If no drive is present, try to find one in the previous
# parts. This makes the result of parsing e.g.
# ("C:", "/", "a") reasonably intuitive.
for part in it:
if not part:
continue
if altsep:
part = part.replace(altsep, sep)
drv = self.splitroot(part)[0]
if drv:
break
break
if drv or root:
parsed.append(drv + root)
parsed.reverse()
return drv, root, parsed
def internSet(self, items):
new = []
for i in items:
new.append(sys.intern(i))
s = frozenset(new)
h = hash(s)
if h in self.setcache:
return self.setcache[h]
self.setcache[h] = s
return s
def test_prefix_preservation(self):
b = """x = intern( a )"""
a = """import sys\nx = sys.intern( a )"""
self.check(b, a)
b = """y = intern("b" # test
)"""
a = """import sys\ny = sys.intern("b" # test
)"""
self.check(b, a)
b = """z = intern(a+b+c.d, )"""
a = """import sys\nz = sys.intern(a+b+c.d, )"""
self.check(b, a)
def test(self):
b = """x = intern(a)"""
a = """import sys\nx = sys.intern(a)"""
self.check(b, a)
b = """z = intern(a+b+c.d,)"""
a = """import sys\nz = sys.intern(a+b+c.d,)"""
self.check(b, a)
b = """intern("y%s" % 5).replace("y", "")"""
a = """import sys\nsys.intern("y%s" % 5).replace("y", "")"""
self.check(b, a)
# These should not be refactored
def test_unchanged(self):
s = """intern(a=1)"""
self.unchanged(s)
s = """intern(f, g)"""
self.unchanged(s)
s = """intern(*h)"""
self.unchanged(s)
s = """intern(**i)"""
self.unchanged(s)
s = """intern()"""
self.unchanged(s)
def parse_parts(self, parts):
if _py2:
parts = _py2_fsencode(parts)
parsed = []
sep = self.sep
altsep = self.altsep
drv = root = ''
it = reversed(parts)
for part in it:
if not part:
continue
if altsep:
part = part.replace(altsep, sep)
drv, root, rel = self.splitroot(part)
if sep in rel:
for x in reversed(rel.split(sep)):
if x and x != '.':
parsed.append(intern(x))
else:
if rel and rel != '.':
parsed.append(intern(rel))
if drv or root:
if not drv:
# If no drive is present, try to find one in the previous
# parts. This makes the result of parsing e.g.
# ("C:", "/", "a") reasonably intuitive.
for part in it:
drv = self.splitroot(part)[0]
if drv:
break
break
if drv or root:
parsed.append(drv + root)
parsed.reverse()
return drv, root, parsed
def test_prefix_preservation(self):
b = """x = intern( a )"""
a = """import sys\nx = sys.intern( a )"""
self.check(b, a)
b = """y = intern("b" # test
)"""
a = """import sys\ny = sys.intern("b" # test
)"""
self.check(b, a)
b = """z = intern(a+b+c.d, )"""
a = """import sys\nz = sys.intern(a+b+c.d, )"""
self.check(b, a)
def test(self):
b = """x = intern(a)"""
a = """import sys\nx = sys.intern(a)"""
self.check(b, a)
b = """z = intern(a+b+c.d,)"""
a = """import sys\nz = sys.intern(a+b+c.d,)"""
self.check(b, a)
b = """intern("y%s" % 5).replace("y", "")"""
a = """import sys\nsys.intern("y%s" % 5).replace("y", "")"""
self.check(b, a)
# These should not be refactored
def test_unchanged(self):
s = """intern(a=1)"""
self.unchanged(s)
s = """intern(f, g)"""
self.unchanged(s)
s = """intern(*h)"""
self.unchanged(s)
s = """intern(**i)"""
self.unchanged(s)
s = """intern()"""
self.unchanged(s)