def test_zipfile_timestamp():
# An environment variable can be used to influence the timestamp on
# TarInfo objects inside the zip. See issue #143. TemporaryDirectory is
# not a context manager under Python 3.
with temporary_directory() as tempdir:
for filename in ('one', 'two', 'three'):
path = os.path.join(tempdir, filename)
with codecs.open(path, 'w', encoding='utf-8') as fp:
fp.write(filename + '\n')
zip_base_name = os.path.join(tempdir, 'dummy')
# The earliest date representable in TarInfos, 1980-01-01
with environ('SOURCE_DATE_EPOCH', '315576060'):
zip_filename = wheel.archive.make_wheelfile_inner(
zip_base_name, tempdir)
with readable_zipfile(zip_filename) as zf:
for info in zf.infolist():
assert info.date_time[:3] == (1980, 1, 1)
python类open()的实例源码
def read_text(filename,rel_hash):
id_counter = 0
nodes = {}
f = codecs.open(filename, "r", "utf-8")
#Add some default relations if none have been supplied (at least 1 rst and 1 multinuc)
if len(rel_hash) < 2:
rel_hash["elaboration_r"] = "rst"
rel_hash["joint_m"] = "multinuc"
rels = collections.OrderedDict(sorted(rel_hash.items()))
for line in f:
id_counter += 1
nodes[str(id_counter)] = NODE(str(id_counter),id_counter,id_counter,"0",0,"edu",line.strip(),rels.keys()[0],rels.values()[0])
return nodes
def shared_locations(self):
"""
A dictionary of shared locations whose keys are in the set 'prefix',
'purelib', 'platlib', 'scripts', 'headers', 'data' and 'namespace'.
The corresponding value is the absolute path of that category for
this distribution, and takes into account any paths selected by the
user at installation time (e.g. via command-line arguments). In the
case of the 'namespace' key, this would be a list of absolute paths
for the roots of namespace packages in this distribution.
The first time this property is accessed, the relevant information is
read from the SHARED file in the .dist-info directory.
"""
result = {}
shared_path = os.path.join(self.path, 'SHARED')
if os.path.isfile(shared_path):
with codecs.open(shared_path, 'r', encoding='utf-8') as f:
lines = f.read().splitlines()
for line in lines:
key, value = line.split('=', 1)
if key == 'namespace':
result.setdefault(key, []).append(value)
else:
result[key] = value
return result
def compute_dt_dist(docs, labels, tags, model, max_len, batch_size, pad_id, idxvocab, output_file):
#generate batches
num_batches = int(math.ceil(float(len(docs)) / batch_size))
dt_dist = []
t = []
combined = []
docid = 0
for i in xrange(num_batches):
x, _, _, t, s = get_batch_doc(docs, labels, tags, i, max_len, cf.tag_len, batch_size, pad_id)
attention, mean_topic = sess.run([model.attention, model.mean_topic], {model.doc: x, model.tag: t})
dt_dist.extend(attention[:s])
if debug:
for si in xrange(s):
d = x[si]
print "\n\nDoc", docid, "=", " ".join([idxvocab[item] for item in d if (item != pad_id)])
sorted_dist = matutils.argsort(attention[si], reverse=True)
for ti in sorted_dist:
print "Topic", ti, "=", attention[si][ti]
docid += 1
np.save(open(output_file, "w"), dt_dist)
def gen_sent_on_topic(idxvocab, vocabxid, start_symbol, end_symbol, cf):
output = codecs.open(args.gen_sent_on_topic, "w", "utf-8")
topics, entropy = tm.get_topics(sess, topn=topn)
with tf.variable_scope("model", reuse=True, initializer=initializer):
mgen = LM(is_training=False, vocab_size=len(idxvocab), batch_size=1, num_steps=1, config=cf, \
reuse_conv_variables=True)
for t in range(cf.topic_number):
output.write("\n" + "="*100 + "\n")
output.write("Topic " + str(t) + ":\n")
output.write(" ".join([ idxvocab[item] for item in topics[t] ]) + "\n\n")
output.write("\nSentence generation (greedy; argmax):" + "\n")
s = mgen.generate_on_topic(sess, t, vocabxid[start_symbol], 0, cf.lm_sent_len+10, vocabxid[end_symbol])
output.write("[0] " + " ".join([ idxvocab[item] for item in s ]) + "\n")
for temp in gen_temps:
output.write("\nSentence generation (random; temperature = " + str(temp) + "):\n")
for i in xrange(gen_num):
s = mgen.generate_on_topic(sess, t, vocabxid[start_symbol], temp, cf.lm_sent_len+10, \
vocabxid[end_symbol])
output.write("[" + str(i) + "] " + " ".join([ idxvocab[item] for item in s ]) + "\n")
def write_predictions(self, inputs):
'''
Outputs predictions in a file named <model_name_prefix>.predictions.
'''
predictions = numpy.argmax(self.model.predict(inputs), axis=1)
test_output_file = open("%s.predictions" % self.model_name_prefix, "w")
for input_indices, prediction in zip(inputs, predictions):
# The predictions are indices of words in padded sentences. We need to readjust them.
padding_length = 0
for index in input_indices:
if numpy.all(index == 0):
padding_length += 1
else:
break
prediction = prediction - padding_length + 1 # +1 because the indices start at 1.
print >>test_output_file, prediction
def process_train_data(self, input_file, onto_aware):
print >>sys.stderr, "Reading training data"
label_ind = []
tagged_sentences = []
for line in open(input_file):
lnstrp = line.strip()
label, tagged_sentence = lnstrp.split("\t")
if label not in self.label_map:
self.label_map[label] = len(self.label_map)
label_ind.append(self.label_map[label])
tagged_sentences.append(tagged_sentence)
# Shuffling so that when Keras does validation split, it is not always at the end.
sentences_and_labels = zip(tagged_sentences, label_ind)
random.shuffle(sentences_and_labels)
tagged_sentences, label_ind = zip(*sentences_and_labels)
print >>sys.stderr, "Indexing training data"
train_inputs = self.data_processor.prepare_paired_input(tagged_sentences, onto_aware=onto_aware,
for_test=False, remove_singletons=True)
train_labels = self.data_processor.make_one_hot(label_ind)
return train_inputs, train_labels
def process_test_data(self, input_file, onto_aware, is_labeled=True):
if not self.model:
raise RuntimeError, "Model not trained yet!"
print >>sys.stderr, "Reading test data"
label_ind = []
tagged_sentences = []
for line in open(input_file):
lnstrp = line.strip()
if is_labeled:
label, tagged_sentence = lnstrp.split("\t")
if label not in self.label_map:
self.label_map[label] = len(self.label_map)
label_ind.append(self.label_map[label])
else:
tagged_sentence = lnstrp
tagged_sentences.append(tagged_sentence)
print >>sys.stderr, "Indexing test data"
# Infer max sentence length if the model is trained
input_shape = self.model.get_input_shape_at(0)[0] # take the shape of the first of two inputs at 0.
sentlenlimit = input_shape[1] # (num_sentences, num_words, num_senses, num_hyps)
test_inputs = self.data_processor.prepare_paired_input(tagged_sentences, onto_aware=onto_aware,
sentlenlimit=sentlenlimit, for_test=True)
test_labels = self.data_processor.make_one_hot(label_ind)
return test_inputs, test_labels
def test_zipfile_attributes():
# With the change from ZipFile.write() to .writestr(), we need to manually
# set member attributes.
with temporary_directory() as tempdir:
files = (('foo', 0o644), ('bar', 0o755))
for filename, mode in files:
path = os.path.join(tempdir, filename)
with codecs.open(path, 'w', encoding='utf-8') as fp:
fp.write(filename + '\n')
os.chmod(path, mode)
zip_base_name = os.path.join(tempdir, 'dummy')
zip_filename = wheel.archive.make_wheelfile_inner(
zip_base_name, tempdir)
with readable_zipfile(zip_filename) as zf:
for filename, mode in files:
info = zf.getinfo(os.path.join(tempdir, filename))
assert info.external_attr == (mode | 0o100000) << 16
assert info.compress_type == zipfile.ZIP_DEFLATED
def copy_header(path):
encoding = _get_encoding(path)
try:
file = codecs.open(path, "r", encoding)
except:
pass
else:
for row in file:
if not row or row[0] != ';':
break
row = row.strip(" \n")
if row == ';; okuri-ari entries.':
break
print(row)
file.close()
# 2????????????????????????
def shared_locations(self):
"""
A dictionary of shared locations whose keys are in the set 'prefix',
'purelib', 'platlib', 'scripts', 'headers', 'data' and 'namespace'.
The corresponding value is the absolute path of that category for
this distribution, and takes into account any paths selected by the
user at installation time (e.g. via command-line arguments). In the
case of the 'namespace' key, this would be a list of absolute paths
for the roots of namespace packages in this distribution.
The first time this property is accessed, the relevant information is
read from the SHARED file in the .dist-info directory.
"""
result = {}
shared_path = os.path.join(self.path, 'SHARED')
if os.path.isfile(shared_path):
with codecs.open(shared_path, 'r', encoding='utf-8') as f:
lines = f.read().splitlines()
for line in lines:
key, value = line.split('=', 1)
if key == 'namespace':
result.setdefault(key, []).append(value)
else:
result[key] = value
return result
def load(self, filename):
"""
?????????
????? ? ?? ??
TrieNode??:
{
?: [(??_1???_1), (??_2???_2)..],
...
}
?? key = ? , value = [(??_1???_1), (??_2???_2)..]
"""
with codecs.open(filename, 'r', 'utf-8') as f:
for line in f.readlines():
items = line.strip().split()
if len(items) == 3:
self.setdefault(items[0], []
).append((int(items[1]), int(items[2])))
return True
def process_file(self, filename, out_filename=None):
"""
?????????????????
"""
results = {'words': [], 'tags': []}
with codecs.open(filename, 'r', 'utf-8') as input_file:
for line in input_file:
print('PROCESS LINE:{}'.format(line))
result = self.process(line.strip())
print(self.format_result(result))
results['words'].extend(result['words'])
results['tags'].extend(result['tags'])
if out_filename is None:
return results
else:
with codecs.open(out_filename, 'w', 'utf-8') as output_file:
output_file.write(self.format_result(results))
output_file.write('\n')
def get_sqls(self):
"""This function extracts sqls from the java files with mybatis sqls.
Returns:
A list of :class:`SQL`. For example:
[SQL('', u'select a.id, b.name from db.ac a join db.bc b on a.id=b.id or a.id=b.iid where a.cnt > 10')]
"""
sqls = []
for root, dirs, files in os.walk(self.dir):
for file in files:
if not file.endswith('.java'):
continue
with codecs.open(os.path.join(root, file), 'r', encoding=self.encoding) as f:
sqls.extend(MybatisInlineSqlExtractor.get_selects_from_text(MybatisInlineSqlExtractor.remove_comment(f.read())))
return sqls
def get_sqls(self):
"""This function extracts sqls from mysql general log file.
Returns:
A list of :class:`SQL`. For example:
[SQL('', u'select a.id, b.name from db.ac a join db.bc b on a.id=b.id or a.id=b.iid where a.cnt > 10')]
"""
general_log = open(self.log_path)
log = GeneralQueryLog(general_log)
session_db_map = {}
sqls = []
for entry in log:
if entry['command'] == 'Connect':
m = re.search('\s+on\s(?P<name>\w+)', entry['argument'])
if m:
session_db_map[entry['session_id']] = m.groupdict()['name'].strip()
elif entry['command'] == 'Init DB':
session_db_map[entry['session_id']] = entry['argument'].strip()
elif entry['command'] == 'Query':
sql = entry['argument']
if sql.strip()[:6].lower() == 'select':
yield SQL(session_db_map.get(entry['session_id'], ''), sql)
def __init__(self, filename, mode='a', encoding=None, delay=0):
"""
Open the specified file and use it as the stream for logging.
"""
#keep the absolute path, otherwise derived classes which use this
#may come a cropper when the current directory changes
if codecs is None:
encoding = None
self.baseFilename = os.path.abspath(filename)
self.mode = mode
self.encoding = encoding
if delay:
#We don't open the stream, but we still need to call the
#Handler constructor to set level, formatter, lock etc.
Handler.__init__(self)
self.stream = None
else:
StreamHandler.__init__(self, self._open())
def test_save_svgz_filename():
import gzip
qr = segno.make_qr('test')
f = tempfile.NamedTemporaryFile('wb', suffix='.svgz', delete=False)
f.close()
qr.save(f.name)
f = open(f.name, mode='rb')
expected = b'\x1f\x8b\x08' # gzip magic number
val = f.read(len(expected))
f.close()
f = gzip.open(f.name)
try:
content = f.read(6)
finally:
f.close()
os.unlink(f.name)
assert expected == val
assert b'<?xml ' == content
def save(self):
statCache = open(self.statCacheFilePath, 'w')
self.cache['version'] = self.cacheVersion
self.cache['date'] = self.startDate
if not self.cache.has_key('players'):
self.cache['players'] = {}
self.cache['players'][self.playerName] = {
'battles': [] if self.fastCache else self.battles,
'account': self.account,
'accountTanks': self.accountTanks,
'session': self.session,
'impact': self.impact,
'tanks': self.tanks
}
if self.fastCache:
statCache.write(json.dumps(self.cache))
else:
statCache.write(json.dumps(self.cache, sort_keys=True, indent=4, separators=(',', ': ')))
statCache.close()
def _readTxt(fname):
'''Returns array of words and word embedding matrix
'''
words, vectors = [], []
hook = codecs.open(fname, 'r', 'utf-8')
# get summary info about vectors file
(numWords, dim) = (int(s.strip()) for s in hook.readline().split())
for line in hook:
chunks = line.split()
word, vector = chunks[0].strip(), np.array([float(n) for n in chunks[1:]])
words.append(word)
vectors.append(vector)
hook.close()
assert len(words) == numWords
for v in vectors: assert len(v) == dim
return (words, vectors)
def read(analogy_file, setting, strings_only=False):
multi_b = setting == settings.ALL_INFO
multi_d = setting in [settings.ALL_INFO, settings.MULTI_ANSWER]
analogies = {}
with codecs.open(analogy_file, 'r', 'utf-8') as stream:
cur_relation, cur_analogies = None, []
for line in stream:
# relation separators
if line[0] == '#':
if cur_relation:
analogies[cur_relation] = cur_analogies
cur_relation = line[2:].strip()
cur_analogies = []
# everything else is an analogy
else:
analogy = _parseLine(line, multi_b, multi_d, strings_only)
cur_analogies.append(analogy)
analogies[cur_relation] = cur_analogies
return analogies
def readme(path='README.rst'):
"""Try to read README.rst or return empty string if failed.
:param str path: Path to README file.
:return: File contents.
:rtype: str
"""
path = os.path.realpath(os.path.join(os.path.dirname(__file__), path))
handle = None
url_prefix = 'https://raw.githubusercontent.com/Robpol86/{name}/v{version}/'.format(name=NAME, version=VERSION)
try:
handle = codecs.open(path, encoding='utf-8')
return handle.read(131072).replace('.. image:: docs', '.. image:: {0}docs'.format(url_prefix))
except IOError:
return ''
finally:
getattr(handle, 'close', lambda: None)()
def shared_locations(self):
"""
A dictionary of shared locations whose keys are in the set 'prefix',
'purelib', 'platlib', 'scripts', 'headers', 'data' and 'namespace'.
The corresponding value is the absolute path of that category for
this distribution, and takes into account any paths selected by the
user at installation time (e.g. via command-line arguments). In the
case of the 'namespace' key, this would be a list of absolute paths
for the roots of namespace packages in this distribution.
The first time this property is accessed, the relevant information is
read from the SHARED file in the .dist-info directory.
"""
result = {}
shared_path = os.path.join(self.path, 'SHARED')
if os.path.isfile(shared_path):
with codecs.open(shared_path, 'r', encoding='utf-8') as f:
lines = f.read().splitlines()
for line in lines:
key, value = line.split('=', 1)
if key == 'namespace':
result.setdefault(key, []).append(value)
else:
result[key] = value
return result
def shared_locations(self):
"""
A dictionary of shared locations whose keys are in the set 'prefix',
'purelib', 'platlib', 'scripts', 'headers', 'data' and 'namespace'.
The corresponding value is the absolute path of that category for
this distribution, and takes into account any paths selected by the
user at installation time (e.g. via command-line arguments). In the
case of the 'namespace' key, this would be a list of absolute paths
for the roots of namespace packages in this distribution.
The first time this property is accessed, the relevant information is
read from the SHARED file in the .dist-info directory.
"""
result = {}
shared_path = os.path.join(self.path, 'SHARED')
if os.path.isfile(shared_path):
with codecs.open(shared_path, 'r', encoding='utf-8') as f:
lines = f.read().splitlines()
for line in lines:
key, value = line.split('=', 1)
if key == 'namespace':
result.setdefault(key, []).append(value)
else:
result[key] = value
return result
def _download(args):
url, folderName, index = args
session = setupSession()
try:
# time out is another parameter tuned
# fit for the network about 10Mb
image = session.get(url, timeout = 5)
imageName = str(index)
with open(os.path.join(folderName, imageName),'wb') as fout:
fout.write(image.content)
fileExtension = imghdr.what(os.path.join(folderName, imageName))
if fileExtension is None:
os.remove(os.path.join(folderName, imageName))
else:
newName = imageName + '.' + str(fileExtension)
os.rename(os.path.join(folderName, imageName), os.path.join(folderName, newName))
except Exception as e:
print ("failed to download one pages with url of " + str(url))
# basic funciton to get id list
def __init__(self):
self.file = codecs.open('article.json', 'w', encoding="utf-8")
def __init__(self):
self.file = open('articleexport.json', 'wb')
self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
self.exporter.start_exporting()
def handle(self, *args, **options):
schema = getattr(settings, 'SWAGGER_SCHEMA', None)
module = getattr(settings, 'SWAGGER_MODULE', None)
if not schema:
raise ImproperlyConfigured('You have to provide SWAGGER_SCHEMA setting pointing to desired schema')
if not module:
raise ImproperlyConfigured('You have to specify desired controller module name in SWAGGER_MODULE setting')
router = SwaggerRouter()
print('Inspecting available controllers...')
router.update(True)
router.process()
print()
print('Following classes and methods are going to be generated:')
enum = router.get_enum()
for name in enum:
print("{} : {}".format(name, [x['method'] for x in enum[name]['methods']]))
if(options['generate']):
template = Template()
filename = module.split('.')[-1] + '.py'
structure = [{ 'name' : name, 'data' : data } for name, data in six.iteritems(enum)]
print('Generating handlers ({})...'.format(filename))
with codecs.open(filename, 'w', 'utf-8') as f:
f.write(template.render(template_name = 'view.jinja', names = structure))
print('Done.')
else:
print()
print('Use --generate option to create them')
def load_constraints(self, constraints_filepath):
"""
This methods reads a collection of constraints from the specified file, and returns a set with
all constraints for which both of their constituent words are in the specified vocabulary.
"""
constraints_filepath.strip()
constraints = set()
with codecs.open(constraints_filepath, "r", "utf-8") as f:
for line in f:
word_pair = line.split()
if word_pair[0] in self.vocabulary and word_pair[1] in self.vocabulary and word_pair[0] != word_pair[1]:
constraints |= {(self.vocab_index[word_pair[0]], self.vocab_index[word_pair[1]])}
return constraints
def load_word_vectors(file_destination):
"""
This method loads the word vectors from the supplied file destination.
It loads the dictionary of word vectors and prints its size and the vector dimensionality.
"""
print "Loading pretrained word vectors from", file_destination
word_dictionary = {}
try:
f = codecs.open(file_destination, 'r', 'utf-8')
for line in f:
line = line.split(" ", 1)
key = unicode(line[0].lower())
word_dictionary[key] = numpy.fromstring(line[1], dtype="float32", sep=" ")
except:
print "Word vectors could not be loaded from:", file_destination
return {}
print len(word_dictionary), "vectors loaded from", file_destination
return word_dictionary
def print_word_vectors(word_vectors, write_path):
"""
This function prints the collection of word vectors to file, in a plain textual format.
"""
f_write = codecs.open(write_path, 'w', 'utf-8')
for key in word_vectors:
print >>f_write, key, " ".join(map(unicode, numpy.round(word_vectors[key], decimals=6)))
print "Printed", len(word_vectors), "word vectors to:", write_path