def install(context, from_str, to_str=""):
to = Path(to_str)
if not is_subdir(context.pkgins / to, context.pkgins):
raise Exception("Package tried to copy to directory outside its own")
from_ = Path(from_str).normpath()
if from_.isabs():
raise Exception("From glob is not allowed to be absolute")
from_ = context.source_lookup.translate(from_)
for d in Path().glob(from_):
from_ = d
if not is_subdir(from_, context.pkgsrc):
raise Exception("Package tried to copy from directory outside its own")
name = from_.name
if from_.isdir():
rel_from = from_.relpath(context.pkgsrc)
context.add_package_file( (rel_from, to / name) )
for e in from_.walk():
rel_to = e.relpath(from_)
rel_from = e.relpath(context.pkgsrc)
context.add_package_file( (rel_from, to / name / rel_to) )
else:
rel_from = from_.relpath(context.pkgsrc)
context.add_package_file( (rel_from, to / name) )
python类Path()的实例源码
def test_normal_csv(self, tmpdir, file_path, format_name):
filename = pv.replace_symbol(file_path, "")
p_file_path = Path(
six.text_type(tmpdir.join(filename + Path(file_path).ext)))
p_file_path.parent.makedirs_p()
with open(p_file_path, "w") as f:
f.write('''"attr_a","attr_b","attr_c"
1,4,"a"
2,2.1,"bb"
3,120.9,"ccc"''')
expeced_list = [
TableData(
filename,
["attr_a", "attr_b", "attr_c"],
[
[1, 4, "a"],
[2, "2.1", "bb"],
[3, "120.9", "ccc"],
])
]
loader = ptr.TableFileLoader(p_file_path, format_name=format_name)
assert loader.format_name == "csv"
for tabledata, expected in zip(loader.load(), expeced_list):
print(ptw.dump_tabledata(expected))
print(ptw.dump_tabledata(tabledata))
assert tabledata == expected
def test_normal_json(self, tmpdir, file_path, format_name):
p_file_path = Path(str(tmpdir.join(file_path)))
p_file_path.parent.makedirs_p()
with open(p_file_path, "w") as f:
f.write('''[
{"attr_a": 1},
{"attr_b": 2.1, "attr_c": "bb"}
]''')
expeced_list = [
TableData(
"validdata",
["attr_a", "attr_b", "attr_c"],
[
{'attr_a': 1},
{'attr_b': 2.1, 'attr_c': 'bb'},
]),
]
loader = ptr.TableFileLoader(p_file_path, format_name=format_name)
assert loader.format_name == "json"
for tabledata, expected in zip(loader.load(), expeced_list):
assert tabledata == expected
def test_normal_excel(self, tmpdir):
file_path = '/tmp/valid/test/data/validdata.xlsx'
p_file_path = Path(str(tmpdir.join(file_path)))
p_file_path.parent.makedirs_p()
tabledata_list = [
TableData(
table_name='testsheet1',
header_list=['a1', 'b1', 'c1'],
record_list=[
['aa1', 'ab1', 'ac1'],
[1.0, 1.1, 'a'],
[2.0, 2.2, 'bb'],
[3.0, 3.3, 'cc"dd"'],
]),
TableData(
table_name='testsheet3',
header_list=['a3', 'b3', 'c3'],
record_list=[
['aa3', 'ab3', 'ac3'],
[4.0, 1.1, 'a'],
[5.0, '', 'bb'],
[6.0, 3.3, ''],
]),
]
writer = ptw.ExcelXlsxTableWriter()
writer.open(p_file_path)
for tabledata in tabledata_list:
writer.from_tabledata(tabledata)
writer.write_table()
writer.close()
loader = ptr.TableFileLoader(p_file_path)
assert loader.format_name == "excel"
for tabledata in loader.load():
print(ptw.dump_tabledata(tabledata))
assert tabledata in tabledata_list
def _get_filename_tablename_mapping(self):
filename = ""
if all([
self.source_type == SourceType.FILE,
typepy.is_not_null_string(self.source),
]):
filename = path.Path(self.source).namebase
return (tnt.FILENAME, filename)
def __init__(self, fname, reinitialize=False):
self.fname = Path(fname)
self.reinitialize = reinitialize
if self.fname.exists():
if self.reinitialize:
logging.warn('{} exists, deleting'.format(self.fname))
self.fname.remove()
rhythmbox_playlists_writer_tests.py 文件源码
项目:migrate-itunes-to-rhythmbox
作者: phauer
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def setUp(self):
self.target_folder = Path(settings.TESTOUTPUT_FOLDER).joinpath("PlaylistTest")
if not self.target_folder.exists():
self.target_folder.makedirs()
rhythmbox_playlists_writer_tests.py 文件源码
项目:migrate-itunes-to-rhythmbox
作者: phauer
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def write_playlist_and_compare(self, itunes_library_input: Path, expected_playlist_xml: Path):
target_path = self.target_folder.joinpath(expected_playlist_xml.name)
itunes_library_path = str(itunes_library_input)
playlists = itunes_library_reader.read_playlists(itunes_library_path)
rhythmbox_playlists_writer.write(playlists=playlists,
target_path=target_path,
source_library_root="D:/Music/",
target_library_root="/home/pha/Music/")
with target_path.open(mode="r", encoding="UTF-8") as target_path_opened, \
expected_playlist_xml.open("r") as expected_playlist_xml_opened:
actual_playlist_xml = target_path_opened.read()
expected_playlist_xml = expected_playlist_xml_opened.read()
self.assertEqual(actual_playlist_xml, expected_playlist_xml, "{} and {} are different!".format(target_path_opened, expected_playlist_xml))
rhythmbox_count_rating_integrator_tests.py 文件源码
项目:migrate-itunes-to-rhythmbox
作者: phauer
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def setUp(self):
self.target_folder = Path(settings.TESTOUTPUT_FOLDER).joinpath("CounterIntegrationTest")
if not self.target_folder.exists():
self.target_folder.makedirs()
rhythmbox_count_rating_integrator_tests.py 文件源码
项目:migrate-itunes-to-rhythmbox
作者: phauer
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def set_values_and_compare(self, rhythmdb_without_cout_rating: Path,
itunes_library_path: Path,
expected_rhythmboxdb:Path,
output_file_name: str,
assert_something_was_changed: bool,
itunes_library_root: str="D:/Music/",
rhythmbox_library_root: str="/home/pha/Music/") -> IntegrationLog:
target_rhythmdb = self.target_folder.joinpath(output_file_name)
rhythmdb_without_cout_rating.copy(target_rhythmdb)
itunes_library = str(itunes_library_path)
songs = itunes_library_reader.read_songs(itunes_library)
log = rhythmbox_count_rating_integrator.set_values(itunes_songs=songs,
target_rhythmdb=target_rhythmdb,
itunes_library_root=itunes_library_root,
rhythmbox_library_root=rhythmbox_library_root)
print("Expect something has changed: {}".format(assert_something_was_changed))
if assert_something_was_changed:
self.assertTrue(log.something_was_changed(), "No song entries was changed! But they should be!")
else:
self.assertFalse(log.something_was_changed(), "A song entries was changed! But they shouldn't be!")
print("Compare content of {} (actual) with {} (expected)".format(target_rhythmdb, expected_rhythmboxdb))
with expected_rhythmboxdb.open(mode="r", encoding="UTF-8") as expected_rhythmboxdb_opened, target_rhythmdb.open(
"r") as target_rhythmdb_opened:
actual_playlist_xml = target_rhythmdb_opened.read()
expected_playlist_xml = expected_rhythmboxdb_opened.read()
# comparing xml is a pain. simple string comparision doesn't work due to different tag order and formatting (newline after each tag or not).
# so let's sort each character in both xml strings. this leads to rubbish. but if the sorted rubbish is equal, the origin is xml is very likely to be equal.
actual_playlist_xml_normalized = sort_and_clean(actual_playlist_xml)
expected_playlist_xml_normalized = sort_and_clean(expected_playlist_xml)
self.assertEqual(actual_playlist_xml_normalized, expected_playlist_xml_normalized,
"Normalized content of {} and {} are different!".format(expected_rhythmboxdb, target_rhythmdb))
return log
rhythmbox_count_rating_integrator.py 文件源码
项目:migrate-itunes-to-rhythmbox
作者: phauer
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def set_values(itunes_songs: Dict[int, Song], target_rhythmdb: Path, itunes_library_root: str, rhythmbox_library_root: str) -> IntegrationLog:
itunes_statistics_dict = create_itunes_statistic_dict(itunes_songs, itunes_library_root)
rhythmdb = lxml.etree.parse(target_rhythmdb)
root = rhythmdb.getroot()
log = integrate_statistics_into_rhythmdb(root, itunes_statistics_dict, rhythmbox_library_root)
if log.something_was_changed():
common.write_to_file(root, target_rhythmdb, add_standalone_to_xml_declaration=True)
return log
def __init__(self, _path):
self.__path = str(Path(_path).abspath())
self.__props = None
self.__read_props()
def abspath(self):
return path.Path.abspath(self)
def exists(self):
return path.Path.exists(self)
def get_size(self):
return path.Path.getsize(self)
def remove(self):
return path.Path.remove(self)
def write_text(self,
text,
encoding=None,
errors='strict',
linesep=os.linesep,
append=False):
return path.Path.write_text(self, text, encoding, errors, linesep, append)
def joinpath(self, first, *others):
return Path(super(Path, self).joinpath(first, *others))
def create_temp_file(
*,
suffix: str = None,
prefix: str = None,
create_in_dir: str = None) -> Path:
os_handle, temp_file = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=create_in_dir or tempfile.gettempdir())
os.close(os_handle)
return Path(temp_file)
def create_temp_dir(
*,
suffix: str = None,
prefix: str = None,
create_in_dir: str = None) -> Path:
temp_dir = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=create_in_dir or tempfile.gettempdir())
return Path(temp_dir)
def add(self, section, name, value):
# Where are we going to store the database file.
path_obj = path.path()
path_obj.fromstr(self.database_dir)
path_obj.add(section)
path_obj.add(name)
# The database file itself.
entry = path.path()
entry.fromstr(path_obj.tostr())
entry.add("data")
# Make the directoies.
os.makedirs(path_obj.tostr(), exist_ok = True)
# And write the value.
with open(entry.tostr(), 'w') as file:
file.write(value)
def init(docker_host=constants.DOCKER_HOST,
ssh_key_path=constants.SSH_KEY,
clean_image_docker_tag=constants.CLEAN_IMAGE_DOCKER_TAG,
manager_image_docker_tag=constants.MANAGER_IMAGE_DOCKER_TAG,
source_root=constants.SOURCE_ROOT,
workdir=None,
reset=False,
debug_ip=None):
ssh_key_path = path(ssh_key_path).expanduser()
if not ssh_key_path.isfile():
raise argh.CommandError(
'You need to create a key (see man ssh-keygen) first'
)
configuration.save(
docker_host=docker_host,
ssh_key_path=ssh_key_path.abspath(),
clean_image_docker_tag=clean_image_docker_tag,
manager_image_docker_tag=manager_image_docker_tag,
source_root=source_root,
workdir=workdir,
reset=reset,
debug_ip=debug_ip)
logger.info('Configuration is saved to {}. Feel free to change it to your '
'liking.'.format(configuration.conf_path))
work.init()
def _build_volumes():
# resources should be able to override env packages which is why
# we use a dist based in the destination directory
volumes = {}
for env, packages in configuration.env_packages.items():
for package in packages:
src = '{}/{}/{}'.format(configuration.source_root,
configuration.package_dir[package],
package)
dst = '/opt/{}/env/lib/python2.7/site-packages/{}'.format(env,
package)
volumes[dst] = '{}:{}:ro'.format(src, dst)
for resource in configuration.resources:
dst = resource['dst']
if resource.get('write'):
permissions = 'rw'
else:
permissions = 'ro'
src = resource['src']
if not path(src).isabs():
src = '{}/{}'.format(configuration.source_root, src)
volumes[dst] = '{}:{}:{}'.format(src, dst, permissions)
return volumes.values()
def add_layout_pin_center_segment(self, text, layer, start, end):
""" Creates a path like pin with center-line convention """
debug.check(start.x==end.x or start.y==end.y,"Cannot have a non-manhatten layout pin.")
minwidth_layer = drc["minwidth_{}".format(layer)]
# one of these will be zero
width = max(start.x,end.x) - min(start.x,end.x)
height = max(start.y,end.y) - min(start.y,end.y)
ll_offset = vector(min(start.x,end.x),min(start.y,end.y))
# Shift it down 1/2 a width in the 0 dimension
if height==0:
ll_offset -= vector(0,0.5*minwidth_layer)
if width==0:
ll_offset -= vector(0.5*minwidth_layer,0)
# This makes sure it is long enough, but also it is not 0 width!
height = max(minwidth_layer,height)
width = max(minwidth_layer,width)
return self.add_layout_pin(text, layer, ll_offset, width, height)
def intersect(self, other):
"""intersect self with other path
Returns a tuple of lists consisting of the parameter values
of the intersection points of the corresponding normpath.
"""
other = other.normpath()
# here we build up the result
intersections = ([], [])
# Intersect all normsubpaths of self with the normsubpaths of
# other.
for ia, normsubpath_a in enumerate(self.normsubpaths):
for ib, normsubpath_b in enumerate(other.normsubpaths):
for intersection in zip(*normsubpath_a.intersect(normsubpath_b)):
intersections[0].append(normpathparam(self, ia, intersection[0]))
intersections[1].append(normpathparam(other, ib, intersection[1]))
return intersections
def _tangent(self, params, length_pt):
"""return tangent vector of path at params
If length_pt in pts is not None, the tangent vector will be scaled to
the desired length.
"""
result = [None] * len(params)
tangenttemplate = path.line_pt(0, 0, length_pt, 0).normpath()
for normsubpathindex, (indices, params) in self._distributeparams(params).items():
for index, atrafo in zip(indices, self.normsubpaths[normsubpathindex].trafo(params)):
if atrafo is invalid:
result[index] = invalid
else:
result[index] = tangenttemplate.transformed(atrafo)
return result
def create_vias(self):
""" Add a via and corner square at every corner of the path."""
self.c=contact(self.layer_stack, (1, 1))
c_width = self.c.width
c_height = self.c.height
from itertools import tee,islice
nwise = lambda g,n=2: zip(*(islice(g,i,None) for i,g in enumerate(tee(g,n))))
threewise=nwise(self.position_list,3)
for (a, offset, c) in list(threewise):
# add a exceptions to prevent a via when we don't change directions
if a[0] == c[0]:
continue
if a[1] == c[1]:
continue
via_offset = [offset[0] + 0.5*c_height,
offset[1] - 0.5*c_width]
self.obj.add_via(layers=self.layer_stack,
offset=via_offset,
rotate=90)
corner_offset = [offset[0] - 0.5*(c_height + self.vert_layer_width),
offset[1] + 0.5*(c_width - self.horiz_layer_width)]
def standard_split(cls, root):
"""
Use standard train/dev/test/other splits 2-21/22/23/24, respectively.
"""
train = []; dev = []; test = []; other = []
for d in path(root).listdir():
if d.isdir():
number = int(d.basename())
# for some reason we drop sections < 2.
if 2 <= number <= 21:
train.append(d)
elif number == 22:
dev.append(d)
elif number == 23:
test.append(d)
elif number == 24:
other.append(d)
train.sort()
assert len(train) == 20 and len(test) == 1 and len(dev) == 1
return cls(train, dev, test, other)
def removing(path):
try:
yield
finally:
rmtree(path)
def ensure_not_exists(path):
if not exists(path):
return
if isfile(path):
os.unlink(path)
else:
rmtree(path)