def get_named_set(lang_codes, feature_set):
if feature_set == 'id':
return get_id_set(lang_codes)
if feature_set not in FEATURE_SETS:
print("ERROR: Invalid feature set " + feature_set, file=sys.stderr)
sys.exit()
filename, source, prefix = FEATURE_SETS[feature_set]
feature_database = np.load(filename)
lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ]
lang_indices = [ get_language_index(l, feature_database) for l in lang_codes ]
feature_names = get_feature_names(prefix, feature_database)
feature_indices = [ get_feature_index(f, feature_database) for f in feature_names ]
source_index = get_source_index(source, feature_database)
feature_values = feature_database["data"][lang_indices,:,:][:,feature_indices,:][:,:,source_index]
feature_values = feature_values.squeeze(axis=2)
return feature_names, feature_values
python类load()的实例源码
def directory_has_smart_contract(location):
# returns bool if there is a tsol contract in said directory
# probably makes more sense to put this inside of the tsol package
code_path = glob.glob(os.path.join(location, '*.tsol'))
example = glob.glob(os.path.join(location, '*.json'))
assert len(code_path) > 0 and len(example) > 0, 'Could not find *.tsol and *.json files in provided directory.'
# pop off the first file name and turn the code into a file object
code = open(code_path[0])
# turn the example into a dict
with open(example[0]) as e:
example = json.load(e)
try:
tsol.compile(code, example)
except Exception as e:
print(e)
return False
return True
def load_previous(self, path=None):
"""Load previous copy of config from disk.
In normal usage you don't need to call this method directly - it
is called automatically at object initialization.
:param path:
File path from which to load the previous config. If `None`,
config is loaded from the default location. If `path` is
specified, subsequent `save()` calls will write to the same
path.
"""
self.path = path or self.path
with open(self.path) as f:
self._prev_dict = json.load(f)
for k, v in copy.deepcopy(self._prev_dict).items():
if k not in self:
self[k] = v
def info_to_db(db, name):
cur = db.cursor()
with open(name) as data_file:
data = json.load(data_file)
subject = data['subject']
code = data['code']
title = data['title']
credit = data['credit']
desc = data['desc']
geneds = data['geneds']
if geneds is None:
geneds = ''
else:
geneds = ','.join(geneds)
url = data['url']
cmd = 'INSERT INTO `CourseExplorer` (`Id`, `Subject`, `Code`, `Title`, `Credit`, `Description`, `GenEd`, ' \
'`Url`) VALUES(NULL, %s, %s, %s, %s, %s, %s, %s) '
val = (subject, code, title, credit, desc, geneds, url)
cur.execute(cmd, val)
# if subject == 'PHYS':
# print cmd % val
def _set_asset_paths(self, app):
"""
Read in the manifest json file which acts as a manifest for assets.
This allows us to get the asset path as well as hashed names.
:param app: aiohttp application
:return: None
"""
webpack_stats = app.settings.WEBPACK_MANIFEST_PATH
try:
with open(webpack_stats, 'r') as stats_json:
stats = json.load(stats_json)
if app.settings.WEBPACK_ASSETS_URL:
self.assets_url = app.settings.WEBPACK_ASSETS_URL
else:
self.assets_url = stats['publicPath']
self.assets = stats['assets']
except IOError:
raise RuntimeError(
"'WEBPACK_MANIFEST_PATH' is required to be set and "
"it must point to a valid json file.")
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
ct = headers.get('Content-Type')
if not ct.startswith('application/json'):
logger.debug('Unexpected response for JSON request: %s', ct)
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def save(self, pypi_version, current_time):
# Check to make sure that we own the directory
if not check_path_owner(os.path.dirname(self.statefile_path)):
return
# Now that we've ensured the directory is owned by this user, we'll go
# ahead and make sure that all our directories are created.
ensure_dir(os.path.dirname(self.statefile_path))
# Attempt to write out our version check file
with lockfile.LockFile(self.statefile_path):
if os.path.exists(self.statefile_path):
with open(self.statefile_path) as statefile:
state = json.load(statefile)
else:
state = {}
state[sys.prefix] = {
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
"pypi_version": pypi_version,
}
with open(self.statefile_path, "w") as statefile:
json.dump(state, statefile, sort_keys=True,
separators=(",", ":"))
def load(self):
# XXX JSON is not a great database
for path in load_config_paths('wheel'):
conf = os.path.join(native(path), self.CONFIG_NAME)
if os.path.exists(conf):
with open(conf, 'r') as infile:
self.data = json.load(infile)
for x in ('signers', 'verifiers'):
if not x in self.data:
self.data[x] = []
if 'schema' not in self.data:
self.data['schema'] = self.SCHEMA
elif self.data['schema'] != self.SCHEMA:
raise ValueError(
"Bad wheel.json version {0}, expected {1}".format(
self.data['schema'], self.SCHEMA))
break
return self
def __load_layout(self, config):
var = config.get_value('engine/replace-with-kanji-python', 'layout')
if var is None or var.get_type_string() != 's':
path = os.path.join(os.getenv('IBUS_REPLACE_WITH_KANJI_LOCATION'), 'layouts')
path = os.path.join(path, 'roomazi.json')
if var:
config.unset('engine/replace-with-kanji-python', 'layout')
else:
path = var.get_string()
logger.info("layout: %s", path)
layout = roomazi.layout # Use 'roomazi' as default
try:
with open(path) as f:
layout = json.load(f)
except ValueError as error:
logger.error("JSON error: %s", error)
except OSError as error:
logger.error("Error: %s", error)
except:
logger.error("Unexpected error: %s %s", sys.exc_info()[0], sys.exc_info()[1])
self.__to_kana = self.__handle_roomazi_layout
if 'Type' in layout:
if layout['Type'] == 'Kana':
self.__to_kana = self.__handle_kana_layout
return layout
def _get_cache(ttl, cache_path):
'''
If url contains valid cache, returns it, else returns empty list.
'''
# Check if we have a valid cached version.
try:
cached_time = os.path.getmtime(cache_path)
except OSError:
return []
if current_time() - cached_time < ttl:
log.debug('%s is less than ttl', cache_path)
try:
with open(cache_path) as json_file:
loaded_json = json.load(json_file)
return loaded_json
except IOError:
return []
except ValueError:
log.error('%s was not json formatted', cache_path)
return []
else:
log.debug('%s was older than ttl', cache_path)
return []
def test_update_text(self):
with open(os.path.join(self.FIXTURES_DIR, 'eyes_in_the_skies.json')) as f:
final_data = json.load(f)
original_text = final_data['RTR']['cards'][0]['originalText']
final_text = final_data['RTR']['cards'][0]['text']
# Copy the data and munge it into its original state.
original_data = copy.deepcopy(final_data)
original_data['RTR']['cards'][0]['text'] = original_text
# Import the original data.
parse_data(original_data, ['RTR'])
eyes_in_the_skies = Card.objects.first()
self.assertEqual(eyes_in_the_skies.text, original_text)
# Import the final, updated data.
parse_data(final_data, ['RTR'])
eyes_in_the_skies.refresh_from_db()
self.assertEqual(eyes_in_the_skies.text, final_text)
def test_update_types(self):
with open(os.path.join(self.FIXTURES_DIR, 'jackal_pup.json')) as f:
final_data = json.load(f)
# Copy the data and munge the types.
original_data = copy.deepcopy(final_data)
original_subtype = 'Hound'
original_data['TMP']['cards'][0]['subtypes'] = [original_subtype]
# Import the original data.
parse_data(original_data, ['TMP'])
jackal_pup = Card.objects.first()
self.assertEqual(jackal_pup.subtypes.count(), 1)
self.assertEqual(jackal_pup.subtypes.first().name, original_subtype)
# Import the final, updated data.
parse_data(final_data, ['TMP'])
jackal_pup.refresh_from_db()
self.assertEqual(jackal_pup.subtypes.count(), 1)
self.assertEqual(jackal_pup.subtypes.first().name, 'Jackal')
# The Hound subtype has been deleted.
self.assertFalse(CardSubtype.objects.filter(name=original_subtype).exists())
def test_update_loyalty(self):
"""
Simulates the upgrade process from version 0.2 to version 0.4.
"""
with open(os.path.join(self.FIXTURES_DIR, 'vraska_the_unseen.json')) as f:
final_data = json.load(f)
# Copy the data and munge it to remove the loyalty.
original_data = copy.deepcopy(final_data)
del original_data['RTR']['cards'][0]['loyalty']
# Import the original data.
parse_data(original_data, ['RTR'])
vraska = Card.objects.first()
self.assertIsNone(vraska.loyalty)
# Import the final, updated data.
parse_data(final_data, ['RTR'])
vraska.refresh_from_db()
self.assertEqual(vraska.loyalty, 5)
def main():
if len(sys.argv) < 2:
sys.stderr.write("USAGE: %s measurement\n" % sys.argv[0])
sys.exit(1)
path = sys.argv[1]
with open(os.path.join(path, "metadata.json")) as f:
metadata = json.load(f)
start = date(metadata["start"][:-1])
end = date(metadata["start"][:-1])
print('open measurement "%s" from "%s" to "%s"', metadata["name"], start, end)
for service in metadata["services"]:
print('open service "%s"' % service["name"])
with open(os.path.join(path, service["filename"])) as csvfile:
r = csv.DictReader(csvfile, dialect=csv.excel_tab)
for row in r:
print(row["time"])
def configure(self):
# load config values
try:
with open(self.configfile) as configfile_contents:
self.config = json.load(configfile_contents)
except:
self.config = {}
try:
self.agent_services = self.api_session.get(self.api_endpoint + '/agent/services?stale').json()
except:
print_exc()
exit(135)
self.managed_service = self.agent_services[self.service]
if self.managed_service['Tags'] == None:
self.managed_service['Tags'] = []
if self.role_source == "facter":
self.get_facter_state(self.DEFAULT_FACTERFILE)
else:
print("!! unsupported PG role source !!")
exit(140)
def prompt_pick_backup(message):
"""Prompts the user to pick an existing database, and returns the
selected choice database ID and its metadata"""
# First load all the saved databases (splitting extension and path)
saved_db = [path.splitext(path.split(f)[1])[0] for f in glob('backups/*.tlo')]
# Then prompt the user
print('Available backups databases:')
for i, db_id in enumerate(saved_db):
metadata = get_metadata(db_id)
print('{}. {}, ID: {}'.format(i + 1,
metadata.get('peer_name', '???'),
db_id))
db_id = saved_db[get_integer(message, 1, len(saved_db)) - 1]
return db_id, get_metadata(db_id)
def _get_external_data(url):
result = {}
try:
# urlopen might fail if it runs into redirections,
# because of Python issue #13696. Fixed in locators
# using a custom redirect handler.
resp = urlopen(url)
headers = resp.info()
ct = headers.get('Content-Type')
if not ct.startswith('application/json'):
logger.debug('Unexpected response for JSON request: %s', ct)
else:
reader = codecs.getreader('utf-8')(resp)
#data = reader.read().decode('utf-8')
#result = json.loads(data)
result = json.load(reader)
except Exception as e:
logger.exception('Failed to get external data for %s: %s', url, e)
return result
def save(self, pypi_version, current_time):
# Check to make sure that we own the directory
if not check_path_owner(os.path.dirname(self.statefile_path)):
return
# Now that we've ensured the directory is owned by this user, we'll go
# ahead and make sure that all our directories are created.
ensure_dir(os.path.dirname(self.statefile_path))
# Attempt to write out our version check file
with lockfile.LockFile(self.statefile_path):
if os.path.exists(self.statefile_path):
with open(self.statefile_path) as statefile:
state = json.load(statefile)
else:
state = {}
state[sys.prefix] = {
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
"pypi_version": pypi_version,
}
with open(self.statefile_path, "w") as statefile:
json.dump(state, statefile, sort_keys=True,
separators=(",", ":"))
def load(self):
# XXX JSON is not a great database
for path in load_config_paths('wheel'):
conf = os.path.join(native(path), self.CONFIG_NAME)
if os.path.exists(conf):
with open(conf, 'r') as infile:
self.data = json.load(infile)
for x in ('signers', 'verifiers'):
if x not in self.data:
self.data[x] = []
if 'schema' not in self.data:
self.data['schema'] = self.SCHEMA
elif self.data['schema'] != self.SCHEMA:
raise ValueError(
"Bad wheel.json version {0}, expected {1}".format(
self.data['schema'], self.SCHEMA))
break
return self
def read_store(self):
"""
Get the store from file.
"""
with open(self._path,'r',encoding='ascii') as fr:
outer_data = json.load(fr)
try:
validate(outer_data,OUTER_SCHEMA)
except ValidationError:
raise SSError("Invalid outer schema")
enc_bytes = hex_str_to_bytes(outer_data["enc_blob"])
try:
inner_data = self._box.decrypt(enc_bytes)
except nacl.exceptions.CryptoError:
raise SSCryptoError("Wrong password")
return json.loads(inner_data.decode('utf-8'))
def __init__(self, path='config.json'):
CONFIG_PATH = os.path.join(sys.path[0], path)
dict.__init__(self)
self.path = path
try:
self.load()
except:
print "Creando fichero de configuracion"
self['hs_threshold1'] = 100
self['hs_threshold2'] = 200
self['hs_apertura'] = 1
self['hs_blur'] = 3
self['hs_minLineLength'] = 100
self['hs_maxLineGap'] = 10
self['hs_kernel_dilate'] = 3
self['hs_kernel_erode'] = 3
self['hs_param1'] = 3
self['hs_param2'] = 3
self['hs_param3'] = 3
self['hs_dilate_iteracciones'] = 1
self.save()
def test_match_fetching(self):
"""Check if the server handle correctly a normal request."""
# Forward sample data fetched from the Riot API directly as result
this_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.sep.join([this_dir, "samples", self.MATCH_DATA])) as f:
self.match_data = json.load(f)
# Setup mocks
self.service.cache_manager.find_match.return_value = None
self.service.riot_api_handler.get_match.return_value = self.match_data
response = self.stub.Match(service_pb2.MatchRequest(id=4242,
region=constants_pb2.EUW))
# Check the conversion of the sample is matching the response
converter = JSONConverter(None)
expected = converter.json_match_to_match_pb(self.match_data)
self.assertEqual(response, expected)
self.assertTrue(self.service.cache_manager.save_match.called)
def test_match_from_cache(self):
"""Check if the server returns a match from the cache."""
# Forward sample data fetched from the Riot API directly as result
this_dir = os.path.dirname(os.path.abspath(__file__))
with open(os.sep.join([this_dir, "samples", self.MATCH_DATA])) as f:
self.match_data = json.load(f)
# Setup mocks
self.service.cache_manager.find_match.return_value = self.match_data
response = self.stub.Match(service_pb2.MatchRequest(id=4242,
region=constants_pb2.EUW))
# Check the conversion of the sample is matching the response
self.assertTrue(self.service.cache_manager.find_match.called)
self.assertFalse(self.service.riot_api_handler.get_match.called)
self.assertFalse(self.service.cache_manager.save_match.called)
def load(self):
# Prepare + load directory.
super().load()
# Load the files and parse JSON.
parsed_settings = dict()
try:
for file_name in self.files:
file_path = os.path.join(self.directory, file_name)
with open(file_path, 'r') as file_handle:
parsed_settings.update(json.load(file_handle))
except json.JSONDecodeError as e:
raise ImproperlyConfigured(
'Your settings file(s) contain invalid JSON syntax! Please fix and restart!, {}'.format(str(e))
)
# Loop and set in local settings (+ uppercase keys).
for key, value in parsed_settings.items():
self.settings[key.upper()] = value
def load_previous(self, path=None):
"""Load previous copy of config from disk.
In normal usage you don't need to call this method directly - it
is called automatically at object initialization.
:param path:
File path from which to load the previous config. If `None`,
config is loaded from the default location. If `path` is
specified, subsequent `save()` calls will write to the same
path.
"""
self.path = path or self.path
with open(self.path) as f:
self._prev_dict = json.load(f)
for k, v in copy.deepcopy(self._prev_dict).items():
if k not in self:
self[k] = v
def load_endpoints(self):
self.choose_endpoint.endpoints.clear()
for name in listdir(str(BASE_PATH / 'endpoints')):
if name.endswith('.json'):
item = QListWidgetItem(name.split('.json')[0], self.choose_endpoint.endpoints)
item.setFlags(item.flags() & ~Qt.ItemIsEnabled)
pb_msg_to_endpoints = defaultdict(list)
with open(str(BASE_PATH / 'endpoints' / name)) as fd:
for endpoint in load(fd, object_pairs_hook=OrderedDict):
pb_msg_to_endpoints[endpoint['request']['proto_msg'].split('.')[-1]].append(endpoint)
for pb_msg, endpoints in pb_msg_to_endpoints.items():
item = QListWidgetItem(' ' * 4 + pb_msg, self.choose_endpoint.endpoints)
item.setFlags(item.flags() & ~Qt.ItemIsEnabled)
for endpoint in endpoints:
path_and_qs = '/' + endpoint['request']['url'].split('/', 3).pop()
item = QListWidgetItem(' ' * 8 + path_and_qs, self.choose_endpoint.endpoints)
item.setData(Qt.UserRole, endpoint)
self.set_view(self.choose_endpoint)
def load_barcode_dist(filename, barcode_whitelist, gem_group, proportions=True):
""" Load barcode count distribution from a json file """
# Input barcode whitelist must be an ordered type;
# safeguard against it going out of sync with the distribution file
assert barcode_whitelist is None or isinstance(barcode_whitelist, list)
if not os.path.isfile(filename):
return None
with open(filename, 'r') as f:
values = json.load(f)
start = (gem_group-1)*len(barcode_whitelist)
end = gem_group*len(barcode_whitelist)
barcode_counts = {bc: value for bc, value in zip(barcode_whitelist, values[start:end])}
if proportions:
total_barcode_counts = sum(barcode_counts.values())
barcode_dist = {bc: tk_stats.robust_divide(float(value), float(total_barcode_counts))
for bc, value in barcode_counts.iteritems()}
return barcode_dist
else:
return barcode_counts
def split(args):
assert len(args.read1s) == len(args.read2s)
chunks = []
# Determine the number of buckets required to achieve
# the given chunk size.
chunks_per_gem_group = {}
with open(args.reads_summary) as f:
reads_summary = json.load(f)
for gg in args.gem_groups:
readpairs = reads_summary['%d_total_reads_per_gem_group' % gg]
chunks_per_gem_group[str(gg)] = max(2,
int(math.ceil(float(readpairs) / \
args.readpairs_per_chunk)))
for fastq1, fastq2 in itertools.izip(args.read1s, args.read2s):
chunks.append({
'read1s_chunk': fastq1,
'read2s_chunk': fastq2,
'chunks_per_gem_group': chunks_per_gem_group,
})
return {'chunks': chunks}
def main(args, outs):
# Write read_chunk for consumption by Rust
with open("chunk_args.json", "w") as f:
json.dump(args.read_chunk, f)
output_path = martian.make_path("")
prefix = "fastq_chunk"
chunk_reads_args = ['chunk_reads', '--reads-per-fastq', str(args.reads_per_file), output_path, prefix, "--martian-args", "chunk_args.json"]
print "running chunk reads: [%s]" % str(chunk_reads_args)
subprocess.check_call(chunk_reads_args)
with open(os.path.join(output_path, "read_chunks.json")) as f:
chunk_results = json.load(f)
outs.out_chunks = []
# Write out a new chunk entry for each resulting chunk
for chunk in chunk_results:
print args.read_chunk
chunk_copy = args.read_chunk.copy()
print chunk_copy
chunk_copy['read_chunks'] = chunk
outs.out_chunks.append(chunk_copy)
def load_model(file_path):
with open(file_path, "r") as f:
decision_trees, sparse_features, total_feature_count = load(f)
dts = {}
for key in decision_trees.keys():
dts[key] = []
for dt in decision_trees[key]:
d = DecisionTree([])
for k in dt['model'].keys():
setattr(d, k, dt['model'][k])
dt['model'] = d
dts[key].append(dt)
return ClassificationEngine(
decision_trees=dts,
sparse_features=sparse_features,
total_feature_count=total_feature_count
)