def dump(cls, obj, file_obj):
"""Serialize object ``obj`` to open pickle file.
.. versionadded:: 1.8
:param obj: Python object to serialize
:type obj: Python object
:param file_obj: file handle
:type file_obj: ``file`` object
"""
return pickle.dump(obj, file_obj, protocol=-1)
# Set up default manager and register built-in serializers
python类dump()的实例源码
def save(self):
"""Save settings to JSON file specified in ``self._filepath``
If you're using this class via :attr:`Workflow.settings`, which
you probably are, ``self._filepath`` will be ``settings.json``
in your workflow's data directory (see :attr:`~Workflow.datadir`).
"""
if self._nosave:
return
data = {}
data.update(self)
# for key, value in self.items():
# data[key] = value
with LockFile(self._filepath):
with atomic_writer(self._filepath, 'wb') as file_obj:
json.dump(data, file_obj, sort_keys=True, indent=2,
encoding='utf-8')
# dict methods
def register(self, name, serializer):
"""Register ``serializer`` object under ``name``.
Raises :class:`AttributeError` if ``serializer`` in invalid.
.. note::
``name`` will be used as the file extension of the saved files.
:param name: Name to register ``serializer`` under
:type name: ``unicode`` or ``str``
:param serializer: object with ``load()`` and ``dump()``
methods
"""
# Basic validation
getattr(serializer, 'load')
getattr(serializer, 'dump')
self._serializers[name] = serializer
def dump(cls, obj, file_obj):
"""Serialize object ``obj`` to open pickle file.
.. versionadded:: 1.8
:param obj: Python object to serialize
:type obj: Python object
:param file_obj: file handle
:type file_obj: ``file`` object
"""
return pickle.dump(obj, file_obj, protocol=-1)
# Set up default manager and register built-in serializers
def write(self, path=None, fileobj=None, legacy=False, skip_unknown=True):
if [path, fileobj].count(None) != 1:
raise ValueError('Exactly one of path and fileobj is needed')
self.validate()
if legacy:
if self._legacy:
legacy_md = self._legacy
else:
legacy_md = self._to_legacy()
if path:
legacy_md.write(path, skip_unknown=skip_unknown)
else:
legacy_md.write_file(fileobj, skip_unknown=skip_unknown)
else:
if self._legacy:
d = self._from_legacy()
else:
d = self._data
if fileobj:
json.dump(d, fileobj, ensure_ascii=True, indent=2,
sort_keys=True)
else:
with codecs.open(path, 'w', 'utf-8') as f:
json.dump(d, f, ensure_ascii=True, indent=2,
sort_keys=True)
def save(self, pypi_version, current_time):
# Check to make sure that we own the directory
if not check_path_owner(os.path.dirname(self.statefile_path)):
return
# Now that we've ensured the directory is owned by this user, we'll go
# ahead and make sure that all our directories are created.
ensure_dir(os.path.dirname(self.statefile_path))
# Attempt to write out our version check file
with lockfile.LockFile(self.statefile_path):
if os.path.exists(self.statefile_path):
with open(self.statefile_path) as statefile:
state = json.load(statefile)
else:
state = {}
state[sys.prefix] = {
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
"pypi_version": pypi_version,
}
with open(self.statefile_path, "w") as statefile:
json.dump(state, statefile, sort_keys=True,
separators=(",", ":"))
def sync(self):
'Write dict to disk'
if self.flag == 'r':
return
filename = self.filename
tempname = filename + '.tmp'
fileobj = open(tempname, 'wb' if self.format=='pickle' else 'w')
try:
self.dump(fileobj)
except Exception:
os.remove(tempname)
raise
finally:
fileobj.close()
shutil.move(tempname, self.filename) # atomic commit
if self.mode is not None:
os.chmod(self.filename, self.mode)
def save_all_recommend_item():
reco = Reco(json_data, show_external_data = False)
all_list = reco.get_all_list()
print(
json.dumps(
all_list,
indent = 4,
sort_keys=True,
ensure_ascii=False
)
)
with open('testItemData.json', 'w') as outfile:
json.dump(all_list, outfile, indent = 4)
with open('testItemDataEncoded.json', 'w') as outfile:
json.dump(all_list, outfile, indent = 4, ensure_ascii=False)
# ??? ??
def save_all_recommend_item():
reco = Reco(json_data, show_external_data = False)
all_list = reco.get_all_list()
print(
json.dumps(
all_list,
indent = 4,
sort_keys=True,
ensure_ascii=False
)
)
with open('testItemData.json', 'w') as outfile:
json.dump(all_list, outfile, indent = 4)
with open('testItemDataEncoded.json', 'w') as outfile:
json.dump(all_list, outfile, indent = 4, ensure_ascii=False)
# ??? ??
def twitter_add(self, ctx, handle : str):
'''
Add a Twitter handle to a text channel
A delay of up to 2 min. is possible due to Twitter rate limits
'''
if handle in self.feeds_info["channels"].get(ctx.message.channel.id, {}).get("handles", []):
await self.bot.embed_reply(":no_entry: This text channel is already following that Twitter handle")
return
message, embed = await self.bot.embed_reply(":hourglass: Please wait")
try:
await self.stream_listener.add_feed(ctx.message.channel, handle)
except tweepy.error.TweepError as e:
embed.description = ":no_entry: Error: {}".format(e)
await self.bot.edit_message(message, embed = embed)
return
if ctx.message.channel.id in self.feeds_info["channels"]:
self.feeds_info["channels"][ctx.message.channel.id]["handles"].append(handle)
else:
self.feeds_info["channels"][ctx.message.channel.id] = {"name" : ctx.message.channel.name, "handles" : [handle]}
with open("data/twitter_feeds.json", 'w') as feeds_file:
json.dump(self.feeds_info, feeds_file, indent = 4)
embed.description = "Added the Twitter handle, [`{0}`](https://twitter.com/{0}), to this text channel".format(handle)
await self.bot.edit_message(message, embed = embed)
def twitter_remove(self, ctx, handle : str):
'''
Remove a Twitter handle from a text channel
A delay of up to 2 min. is possible due to Twitter rate limits
'''
try:
self.feeds_info["channels"].get(ctx.message.channel.id, {}).get("handles", []).remove(handle)
except ValueError:
await self.bot.embed_reply(":no_entry: This text channel isn't following that Twitter handle")
else:
with open("data/twitter_feeds.json", 'w') as feeds_file:
json.dump(self.feeds_info, feeds_file, indent = 4)
message, embed = await self.bot.embed_reply(":hourglass: Please wait")
await self.stream_listener.remove_feed(ctx.message.channel, handle)
embed.description = "Removed the Twitter handle, [`{0}`](https://twitter.com/{0}), from this text channel.".format(handle)
await self.bot.edit_message(message, embed = embed)
def generate_erps_dict(self):
async with clients.aiohttp_session.get("http://www.umop.com/rps101/alloutcomes.htm") as resp:
data = await resp.text()
raw_text = BeautifulSoup(data).text
raw_text = re.sub("\n+", '\n', raw_text).strip()
raw_text = raw_text.lower().replace("video game", "game")
raw_text = raw_text.split('\n')[:-1]
objects = {}
object = raw_text[0].split()[-1]
object_info = {}
for line in raw_text[1:]:
if line[0].isdigit():
objects[object] = object_info
object = line.split()[-1]
object_info = {}
else:
object_info[line.split()[-1]] = ' '.join(line.split()[:-1])
objects[object] = object_info
with open("data/erps_dict.json", 'w') as erps_file:
json.dump(objects, erps_file, indent = 4)
def tag(self, ctx, tag : str = ""):
'''Tags/notes that you can trigger later'''
if not tag:
await self.bot.embed_reply("Add a tag with `{0}tag add [tag] [content]`\nUse `{0}tag [tag]` to trigger the tag you added\n`{0}tag edit [tag] [content]` to edit it and `{0}tag delete [tag]` to delete it".format(ctx.prefix))
return
if tag in self.tags_data.get(ctx.message.author.id, {}).get("tags", []):
await self.bot.reply(self.tags_data[ctx.message.author.id]["tags"][tag])
elif tag in self.tags_data["global"]:
await self.bot.reply(self.tags_data["global"][tag]["response"])
self.tags_data["global"][tag]["usage_counter"] += 1
with open("data/tags.json", 'w') as tags_file:
json.dump(self.tags_data, tags_file, indent = 4)
else:
close_matches = difflib.get_close_matches(tag, list(self.tags_data.get(ctx.message.author.id, {}).get("tags", {}).keys()) + list(self.tags_data["global"].keys()))
close_matches = "\nDid you mean:\n{}".format('\n'.join(close_matches)) if close_matches else ""
await self.bot.embed_reply("Tag not found{}".format(close_matches))
def extract(url):
global img_no
try :
img_no += 1
r = requests.get(url)
tree = html.fromstring(r.text)
div = tree.xpath('//table[@class="masterresultstable"]\
//div[@class="meshtext-wrapper-left"]')
except : div=[]
if div != []:
div = div[0]
else:
return
typ = div.xpath('.//strong/text()')[0]
items = div.xpath('.//li/text()')
img = tree.xpath('//img[@id="theImage"]/@src')[0]
final_data[img_no] = {}
final_data[img_no]['type'] = typ
final_data[img_no]['items'] = items
final_data[img_no]['img'] = domain + img
try :
urllib.urlretrieve(domain+img, path+str(img_no)+".png")
with open('data_new.json', 'w') as f:
json.dump(final_data, f)
output = "Downloading Images : {}".format(img_no)
sys.stdout.write("\r\x1b[K" + output)
sys.stdout.flush()
except :return
def save_cache(self, file):
import json
with open(file, 'w') as f:
json.dump(self.cache, f, indent=4, sort_keys=True)
def yaml(self):
"""Serialize the object to yaml"""
return yaml.dump(self.data)
def save(self):
"""Save this config to disk.
If the charm is using the :mod:`Services Framework <services.base>`
or :meth:'@hook <Hooks.hook>' decorator, this
is called automatically at the end of successful hook execution.
Otherwise, it should be called directly by user code.
To disable automatic saves, set ``implicit_save=False`` on this
instance.
"""
with open(self.path, 'w') as f:
json.dump(self, f)
def _save_ready_file(self):
if self._ready is None:
return
with open(self._ready_file, 'w') as fp:
json.dump(list(self._ready), fp)
def generate(location):
# cli wizard for creating a new contract from a template
if directory_has_smart_contract(location):
example_payload = json.load(open(glob.glob(os.path.join(location, '*.json'))[0]))
print(example_payload)
for k, v in example_payload.items():
value = input(k + ':')
if value != '':
example_payload[k] = value
print(example_payload)
code_path = glob.glob(os.path.join(location, '*.tsol'))
tsol.compile(open(code_path[0]), example_payload)
print('Code compiles with new payload.')
selection = ''
while True:
selection = input('(G)enerate Solidity contract or (E)xport implementation:')
if selection.lower() == 'g':
output_name = input('Name your contract file without an extension:')
code = tsol.generate_code(open(code_path[0]).read(), example_payload)
open(os.path.join(location, '{}.sol'.format(output_name)), 'w').write(code)
break
if selection.lower() == 'e':
output_name = input('Name your implementation file without an extension:')
json.dump(example_payload, open(os.path.join(location, '{}.json'.format(output_name)), 'w'))
break
else:
print('Provided directory does not contain a *.tsol and *.json or does not compile.')
def send_feedback(self):
"""Print stored items to console/Alfred as JSON."""
json.dump(self.obj, sys.stdout)
sys.stdout.flush()