def find_lib_dir(filename=None):
import sys
if filename is None:
Cython.python_library_file()
candidates = [Path(sys.exec_prefix, 'libs/'), Path('/lib'), Path('/usr/lib')]
for path in candidates:
if Path(path, filename).exists():
return str(path)
return None
# TODO: Cython project
# TODO: Embed support for Cython project
# TODO: Somehow package a whole set of modules with a runner inside?
python类Path()的实例源码
def download(self, local_dir_=None, url_=None):
'''
Args:
local_dir_: where to save downloaded file
url_: where to download dataset, if None, use default 'http://yann.lecun.com/exdb/mnist/'
'''
# TODO check whether file exists
if url_ is None:
url_ = 'http://yann.lecun.com/exdb/mnist/'
if local_dir_ is None:
local_dir = self.DEFAULT_DIR
else:
local_dir = Path(local_dir_)
local_dir.mkdir(parents=True, exist_ok=True)
in_filename = '%(subset)s-%(type_s)s-idx%(ndim)s-ubyte.gz'
for subset, (type_s, ndim) in product(
('train', 't10k'), zip(('images', 'labels'), (3,1))):
filename = in_filename % locals()
urllib.request.urlretrieve( url_ + filename, str(local_dir / filename))
def get_template_language(self, file_):
"""
Return the template language
Every template file must end in
with the language code, and the
code must match the ISO_6301 lang code
https://en.wikipedia.org/wiki/List_of_ISO_639-1_codes
valid examples:
account_created_pt.html
payment_created_en.txt
"""
stem = Path(file_).stem
language_code = stem.split('_')[-1:][0]
if len(language_code) != 2:
# TODO naive and temp implementation
# check if the two chars correspond to one of the
# available languages
raise Exception('Template file `%s` must end in ISO_639-1 language code.' % file_)
return language_code.lower()
def add_local_charm_dir(self, charm_dir, series):
"""Upload a local charm to the model.
This will automatically generate an archive from
the charm dir.
:param charm_dir: Path to the charm directory
:param series: Charm series
"""
fh = tempfile.NamedTemporaryFile()
CharmArchiveGenerator(charm_dir).make_archive(fh.name)
with fh:
func = partial(
self.add_local_charm, fh, series, os.stat(fh.name).st_size)
charm_url = await self._connector.loop.run_in_executor(None, func)
log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
return charm_url
def sync_tools(
self, all_=False, destination=None, dry_run=False, public=False,
source=None, stream=None, version=None):
"""Copy Juju tools into this model.
:param bool all_: Copy all versions, not just the latest
:param str destination: Path to local destination directory
:param bool dry_run: Don't do the actual copy
:param bool public: Tools are for a public cloud, so generate mirrors
information
:param str source: Path to local source directory
:param str stream: Simplestreams stream for which to sync metadata
:param str version: Copy a specific major.minor version
"""
raise NotImplementedError()
def build_package(builder_image, package_type, version, out_dir, dependencies):
"""
Build a deb or RPM package using a fpm-within-docker Docker image.
:param package_type str: "rpm" or "deb".
:param version str: The package version.
:param out_dir Path: Directory where package will be output.
:param dependencies list: package names the resulting package should depend
on.
"""
run([
"docker", "run", "--rm", "-e", "PACKAGE_VERSION=" + version,
"-e", "PACKAGE_TYPE=" + package_type,
"-v", "{}:/build-inside:rw".format(THIS_DIRECTORY),
"-v", "{}:/source:rw".format(THIS_DIRECTORY.parent),
"-v", str(out_dir) + ":/out", "-w", "/build-inside", builder_image,
"/build-inside/build-package.sh", *dependencies
],
check=True)
def prompt_extractor(self, item):
extractor = extractors[item.data(Qt.UserRole)]
inputs = []
if not assert_installed(self.view, **extractor.get('depends', {})):
return
if not extractor.get('pick_url', False):
files, mime = QFileDialog.getOpenFileNames()
for path in files:
inputs.append((path, Path(path).stem))
else:
text, good = QInputDialog.getText(self.view, ' ', 'Input an URL:')
if text:
url = urlparse(text)
inputs.append((url.geturl(), url.netloc))
if inputs:
wait = QProgressDialog('Extracting .proto structures...', None, 0, 0)
wait.setWindowTitle(' ')
self.set_view(wait)
self.worker = Worker(inputs, extractor)
self.worker.progress.connect(self.extraction_progress)
self.worker.finished.connect(self.extraction_done)
self.worker.start()
def CreateFile(
self, directory_path: str, file_name: str, filename_suffix: str):
"""Creates a empty file.
Args:
directory_path (str): The path to the directory the file should be
created.
file_name (str): the name of the new file.
filename_suffix (str): the suffix of the new file.
Returns:
str: the path of the created file
"""
file_path = self.CreateFilePath(directory_path, file_name,
filename_suffix)
if not os.path.exists(directory_path):
self._CreateFolder(directory_path)
pathlib.Path(file_path).touch()
return file_path
def testPluginNameIfExisting(self):
"""test method after getting the plugin Name from the user if the plugin
Name already exists"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_info='the_plugin',
prompt_error='the_plugin', )
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
plugin_exists=True, change_bool_after_every_call_plugin_exists=True,
valid_name=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
actualName = 'the_plugin'
controller._path = 'somepath'
actual = controller.PluginName(None, None, actualName)
expected = 'Plugin exists. Choose new Name'
actual_prompt = self._ReadFromFile(path)
self.assertEqual(expected, actual_prompt)
self.assertEqual(actualName, actual)
def testCreateSQLQueryModelWithUserInputWithError(self):
"""test method CreateEventModelWithUserInput"""
error_message = "Some Error..."
fake_execution = fake_sqlite_query_execution.SQLQueryExecution(
sql_query_data.SQLQueryData(has_error=True,
error_message=error_message)
)
sql_query = 'SELECT createdDate FROM Users ORDER BY createdDate'
name = 'Contact'
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_info=name)
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
folder_exists=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
actual = controller._CreateSQLQueryModelWithUserInput(sql_query, False,
fake_execution)
self.assertIsNone(actual)
def testSourcePathIfNotExisting(self):
"""test method after getting the source path from the user"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_error='the source path')
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
folder_exists=False, change_bool_after_every_call_folder_exists=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
actualPath = 'testpath'
source_path = controller.SourcePath(None, None, actualPath)
expected = 'Folder does not exists. Enter correct one'
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
self.assertEqual(source_path, 'the source path')
def testTestPathIfExisting(self):
"""test method after getting the source path from the user"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler())
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
file_exists=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
actualPath = os.path.join(path_helper.TestDatabasePath(),
'twitter_ios.db')
valid_path = controller.TestPath(None, None, actualPath)
actual_output = self._ReadFromFile(path)
self.assertEqual(actualPath, controller._testfile)
self.assertEqual('', actual_output)
self.assertEqual(valid_path, actualPath)
def testTestPathIfNotExisting(self):
"""test method after getting the source path from the user"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
wrongPath = os.path.join(tmpdir, 'testpath')
validPath = os.path.join(tmpdir, 'testpathvalid')
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_error=validPath)
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
file_exists=False, change_bool_after_every_call_file_exists=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
actual_path = controller.TestPath(None, None, wrongPath)
expected = 'File does not exists. Choose another.'
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
self.assertEqual(validPath, actual_path)
# close connection so the temp file can be deleted before the program
# circle is finished
controller._query_execution._connection.close()
def testValidateRowNameIfNotOk(self):
"""test the validate row name method if not ok"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_error='TheValidRowName')
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
valid_row_name=False,
change_bool_after_every_call_valid_row_name=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
valid = controller._ValidateRowName("theWrongName")
expected = ('Row name is not in a valid format. Choose new Name ['
'RowName...]')
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
self.assertEqual(valid, 'TheValidRowName')
def testValidateTimestampStringIfNotOk(self):
"""test the validate timestamp string method if not ok"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_error='this,that,bla')
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
valid_comma_separated_string=False,
change_bool_after_every_call_valid_comma_separated_string=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
valid = controller._ValidateTimestampString("this, that,bla")
expected = (
'Timestamps are not in valid format. Reenter them correctly [name,'
'name...]')
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
self.assertEqual(valid, 'this,that,bla')
def testValidateColumnStringIfNotOk(self):
"""test the validate column string method if not ok"""
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'testfile')
pathlib.Path(path).touch()
output_handler = output_handler_file.OutputHandlerFile(
path, file_handler.FileHandler(), prompt_error='this,that,bla')
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
valid_comma_separated_string=False,
change_bool_after_every_call_valid_comma_separated_string=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
valid = controller._ValidateColumnString("this, that,bla")
expected = (
'Column names are not in valid format. Reenter them correctly [name,'
'name...]')
actual = self._ReadFromFile(path)
self.assertEqual(expected, actual)
self.assertEqual(valid, 'this,that,bla')
def testGenerateIfNotConfirmed(self):
"""test the generate if confirmed """
template_path = path_helper.TemplatePath()
with self.assertRaises(SystemExit):
with tempfile.TemporaryDirectory() as tmpdir:
file = os.path.join(tmpdir, 'testfile')
pathlib.Path(file).touch()
output_handler = output_handler_file.OutputHandlerFile(
file, file_handler.FileHandler(), confirm=False)
plugin_helper = fake_sqlite_plugin_helper.FakeSQLitePluginHelper(
valid_name=False,
change_bool_after_every_call_valid_name=True)
controller = sqlite_controller.SQLiteController(output_handler,
plugin_helper)
controller.Generate('not used', 'not used')
self.assertFalse(template_path)
def writerow(self, row):
"""
:param row:
:return:
"""
self._bytes_written += self._out_writer.writerow(row)
row_txt = self._buffer.getvalue()
self._out_csv.write(row_txt)
self._reset_buffer()
self._out_csv.flush()
if self._bytes_written > self.max_bytes:
self._out_csv.close()
self._make_csv_writer()
out_name = str(Path(self._out_csv.name).absolute())
subprocess.Popen(['7z', 'a', '-t7z', '-m0=lzma', '-mx=9', '-mfb=64', '-md=16m',
out_name + '.7z', out_name])
return row_txt
def _make_writer(self):
"""
:return:
"""
self._buffer = StringIO()
self._bytes_written = 0
now = datetime.now()
self.fname = self.log_folder + '/' + now.strftime('%Y%m%d_%H%M%S_{}.json'.format(self.make_random(6)))
self.fname = str(pathlib.Path(self.fname))
self._out_fh = open(self.fname, 'w')
self.write_pid()
logging.warning("Writing to {} ({} bytes)".format(self._out_fh.name, self.max_bytes))
# compress any old files still lying around
for fname in glob(self.log_folder+"/*.json"):
if fname != self.fname:
self._compress(fname)
def test_RTagsDaemonStartClean(self):
try:
os.chdir("clean")
except OSError:
print("Test Error: Couldn't cd into 'dirty' test directory.")
raise
self.assertFalse(self.cmake_build_info["build_dir"].is_dir())
self.plugin.setup_rtags_daemon()
try:
rtags_daemon_status = subprocess.check_output(
self.cmake_cmd_info["rtags_status"])
except subprocess.CalledProcessError as e:
print(e.output)
self.assertTrue(
len("*********************************\nfileids\n*********************************\n*********************************\nheadererrors\n*********************************\n*********************************\ninfo\n*********************************\nRunning a release build\nsocketFile: /Users/phillipbonhomme/.rdm\ndataDir: /Users/phillipbonhomme/.cache/rtags/\noptions: 0x14jobCount: 4\nrpVisitFileTimeout: 60000\nrpIndexDataMessageTimeout: 60000\nrpConnectTimeout: 0\nrpConnectTimeout: 0\ndefaultArguments: List<String>(-ferror-limit=50, -Wall, -fspell-checking, -Wno-unknown-warning-option\")\nincludePaths: List<Source::Include>(\")\ndefines: List<Source::Define>(-DRTAGS=\")\nignoredCompilers: Set<Path>(\")\n*********************************\njobs\n*********************************\n"
) <= len(str(rtags_daemon_status)))
def test_RTagsDaemonStartDirty(self):
try:
os.chdir("dirty")
except OSError:
print("Test Error: Couldn't cd into 'dirty' test directory.")
raise
self.assertTrue(self.cmake_build_info["build_dir"].is_dir())
self.plugin.setup_rtags_daemon()
try:
rtags_daemon_status = subprocess.check_output(
self.cmake_cmd_info["rtags_status"])
except subprocess.CalledProcessError as e:
print(e.output)
self.assertTrue(
len("*********************************\nfileids\n*********************************\n*********************************\nheadererrors\n*********************************\n*********************************\ninfo\n*********************************\nRunning a release build\nsocketFile: /Users/phillipbonhomme/.rdm\ndataDir: /Users/phillipbonhomme/.cache/rtags/\noptions: 0x14jobCount: 4\nrpVisitFileTimeout: 60000\nrpIndexDataMessageTimeout: 60000\nrpConnectTimeout: 0\nrpConnectTimeout: 0\ndefaultArguments: List<String>(-ferror-limit=50, -Wall, -fspell-checking, -Wno-unknown-warning-option\")\nincludePaths: List<Source::Include>(\")\ndefines: List<Source::Define>(-DRTAGS=\")\nignoredCompilers: Set<Path>(\")\n*********************************\njobs\n*********************************\n"
) <= len(str(rtags_daemon_status)))
def __init__(self, cfg_yaml=None, secret_cfg_yaml=None,
create_vcs_client=True,
load_cfg=True, load_secret_cfg=True,
default_data_directory=None,
create_default_data_directory=True):
self.data = OrderedDict()
self.cfg_yaml = cfg_yaml
self.secret_cfg_yaml = secret_cfg_yaml
if load_cfg and cfg_yaml and Path(cfg_yaml).exists():
self.cfg = self.load_cfg(cfg_yaml)
else:
self.cfg = {}
if (load_secret_cfg and secret_cfg_yaml
and Path(secret_cfg_yaml).exists()):
self.secret_cfg = self.load_cfg(secret_cfg_yaml)
else:
self.secret_cfg = {}
self._ensure_cfg_structure()
if create_vcs_client:
self._create_vcs_client()
if default_data_directory:
self.set_default_data_directory(
default_data_directory, create=create_default_data_directory)
def get_node_id(self, ent=None, ent_attrs: dict=None):
if ent and ent.kindname() == 'file':
node_id = str(Path(ent.longname()).relative_to(
self.root_path))
elif ent:
node_id = ent.uniquename()
elif ent_attrs and ent_attrs['kindname'] == 'file':
try:
node_id = str(Path(ent_attrs['longname']).relative_to(
self.root_path))
except ValueError:
node_id = ent_attrs['longname']
elif ent_attrs:
node_id = ent_attrs['uniquename']
else:
node_id = None
return node_id
def load_vulnerability_database():
# Currently manually downloaded from
# https://security-tracker.debian.org/tracker/data/json
# Should instead download if not found in option localtion
# or redownload if found but out of date
# progress bar for download
url = "https://security-tracker.debian.org/tracker/data/json"
db = Path('debian.json')
r = requests.get(url, stream=True)
if not db.exists():
with open(db.name, 'wb') as data_file:
total_length = 1024*20722
for chunk in progress.bar(r.iter_content(chunk_size=1024), label="Downloading Debian data", expected_size=(total_length/1024) + 1):
if chunk:
data_file.write(chunk)
data_file.flush()
with open(db.name, 'r') as data_file:
return json.load(data_file)
def timeseriesdata_constructor_new_file(temp_dir):
"""Tests the TimeSeriesData class constructor when the file does not
exist. Tests that a new file is created, that all expected data sets
are present, and that the flag that indicates that the file is empty
is set to False.
"""
tsd = TimeSeriesData(temp_dir + "/new_ananke.h5")
tsd_file = Path(temp_dir + "/new_ananke.h5")
#Check that the file was really created
assert tsd_file.is_file()
#Check that the data sets have been created
assert set(tsd.h5_table.keys()) == {"genes", "timeseries", "samples"}
assert set(tsd.h5_table["timeseries"].keys()) == {"data", "indices",
"indptr"}
assert set(tsd.h5_table["genes"].keys()) == {"sequences", "sequenceids",
"clusters", "taxonomy",
"sequenceclusters"}
assert set(tsd.h5_table["samples"].keys()) == {"names", "time",
"metadata", "mask"}
#Check that the empty file flag is set
assert tsd.filled_data == False
def get_config():
"""
This load some configuration from the ``.travis.yml``, if file is present,
``doctr`` key if present.
"""
p = Path('.travis.yml')
if not p.exists():
return {}
with p.open() as f:
travis_config = yaml.safe_load(f.read())
config = travis_config.get('doctr', {})
if not isinstance(config, dict):
raise ValueError('config is not a dict: {}'.format(config))
return config
def clarin_corpora_sorted_by_size(base_directory: Path) -> List[GermanClarinCorpus]:
return [
sc1(base_directory),
pd2(base_directory),
ziptel(base_directory),
sc10(base_directory),
GermanClarinCorpus("all.HEMPEL.4.cmdi.11610.1490680796", base_directory),
GermanClarinCorpus("all.PD1.3.cmdi.16312.1490681066", base_directory),
GermanClarinCorpus("all.VM1.3.cmdi.1508.1490625070", base_directory,
id_filter_regex=vm1_id_german_filter_regex,
training_test_split=TrainingTestSplit.training_only),
GermanClarinCorpus("all.RVG-J.1.cmdi.18181.1490681704", base_directory),
GermanClarinCorpus("all.ALC.4.cmdi.16602.1490632862", base_directory,
training_test_split=TrainingTestSplit.randomly_grouped_by(lambda e: e.id[:3])),
GermanClarinCorpus("all.VM2.3.cmdi.4260.1490625316", base_directory,
id_filter_regex=vm2_id_german_filter_regex,
training_test_split=TrainingTestSplit.training_only)
]
def __init__(self, base_directory: Path):
super().__init__(
corpus_name="german-speechdata-package-v2",
base_directory=base_directory,
base_source_url_or_directory="http://www.repository.voxforge1.org/downloads/de/",
tar_gz_extension=".tar.gz",
subdirectory_depth=1,
umlaut_decoder=UmlautDecoder.none,
training_test_split=TrainingTestSplit.by_directory(),
tags_to_ignore=[],
# exclude those 7 audio files because the first 2 are corrupt, the last 5 are empty:
id_filter_regex=re.compile("(?!^2014-03-24-13-39-24_Kinect-RAW)"
"(?!^2014-03-27-11-50-33_Kinect-RAW)"
"(?!^2014-03-18-15-34-19_Realtek)"
"(?!^2014-06-17-13-46-27_Kinect-RAW)"
"(?!^2014-06-17-13-46-27_Realtek)"
"(?!^2014-06-17-13-46-27_Samson)"
"(?!^2014-06-17-13-46-27_Yamaha)"
"(^.*$)"))
def _extract_positional_label_by_id(self, files: Iterable[Path]) -> Dict[str, Union[PositionalLabel, str]]:
xml_ending = ".xml"
microphone_endings = [
"_Yamaha",
"_Kinect-Beam",
"_Kinect-RAW",
"_Realtek",
"_Samson",
"_Microsoft-Kinect-Raw"
]
xml_files = [file for file in files if file.name.endswith(xml_ending) if
self.id_filter_regex.match(name_without_extension(file))]
return OrderedDict(
(name_without_extension(file) + microphone_ending,
self._extract_label_from_xml(file))
for file in xml_files
for microphone_ending in microphone_endings
if (Path(file.parent) / (name_without_extension(file) + microphone_ending + ".wav")).exists())
def train(self,
labeled_spectrogram_batches: Iterable[List[LabeledSpectrogram]],
preview_labeled_spectrogram_batch: List[LabeledSpectrogram],
tensor_board_log_directory: Path,
net_directory: Path,
batches_per_epoch: int):
print_preview_batch = lambda: log(self.test_and_predict_batch(preview_labeled_spectrogram_batch))
print_preview_batch()
self.loss_net.fit_generator(self._loss_inputs_generator(labeled_spectrogram_batches), epochs=100000000,
steps_per_epoch=batches_per_epoch,
callbacks=self.create_callbacks(
callback=print_preview_batch,
tensor_board_log_directory=tensor_board_log_directory,
net_directory=net_directory),
initial_epoch=self.load_epoch if (self.load_epoch is not None) else 0)