def parse_requirement_line(line, filename, line_number, session, finder):
"""Parse a requirement line and return an InstallRequirement instance.
:param line: One line from a requirements.txt file.
:param filename: Path to a requirements.txt file.
:param line_number: The integer line number of the current line.
:param session: Instance of pip.download.PipSession.
:param finder: Instance of pip.download.PackageFinder.
"""
if not line:
return None
reqs = list(req_file.process_line(
line, filename, line_number, session=session, finder=finder))
return reqs[0] if len(reqs) > 0 else None
python类Path()的实例源码
def create_potree_page(work_dir, server_url, tablename, column):
'''Create an html demo page with potree viewer
'''
# get potree build
potree = os.path.join(work_dir, 'potree')
potreezip = os.path.join(work_dir, 'potree.zip')
if not os.path.exists(potree):
download('Getting potree code', 'http://3d.oslandia.com/potree.zip', potreezip)
# unzipping content
with ZipFile(potreezip) as myzip:
myzip.extractall(path=work_dir)
tablewschema = tablename.split('.')[-1]
sample_page = os.path.join(work_dir, 'potree-{}.html'.format(tablewschema))
abs_sample_page = str(Path(sample_page).absolute())
pending('Creating a potree demo page : file://{}'.format(abs_sample_page))
resource = '{}.{}'.format(tablename, column)
server_url = server_url.replace('http://', '')
with io.open(sample_page, 'wb') as html:
html.write(potree_page.format(resource=resource, server_url=server_url).encode())
ok()
def create_cesium_page(work_dir, tablename, column):
'''Create an html demo page with cesium viewer
'''
cesium = os.path.join(work_dir, 'cesium')
cesiumzip = os.path.join(work_dir, 'cesium.zip')
if not os.path.exists(cesium):
download('Getting cesium code', 'http://3d.oslandia.com/cesium.zip', cesiumzip)
# unzipping content
with ZipFile(cesiumzip) as myzip:
myzip.extractall(path=work_dir)
tablewschema = tablename.split('.')[-1]
sample_page = os.path.join(work_dir, 'cesium-{}.html'.format(tablewschema))
abs_sample_page = str(Path(sample_page).absolute())
pending('Creating a cesium demo page : file://{}'.format(abs_sample_page))
resource = '{}.{}'.format(tablename, column)
with io.open(sample_page, 'wb') as html:
html.write(cesium_page.format(resource=resource).encode())
ok()
def up(ctx, targets, run_id):
"""
Provisions nodes from the given target(s) in the given PinFile.
pinfile: Path to pinfile (Default: workspace path)
targets: Provision ONLY the listed target(s). If omitted, ALL targets in
the appropriate PinFile will be provisioned.
"""
pf_w_path = _get_pinfile_path()
try:
return_code, results = lpcli.lp_up(pf_w_path, targets, run_id=run_id)
_handle_results(ctx, results, return_code)
except LinchpinError as e:
ctx.log_state(e)
sys.exit(1)
def destroy(ctx, targets, run_id):
"""
Destroys nodes from the given target(s) in the given PinFile.
pinfile: Path to pinfile (Default: workspace path)
targets: Destroy ONLY the listed target(s). If omitted, ALL targets in
the appropriate PinFile will be destroyed.
"""
pf_w_path = _get_pinfile_path()
try:
return_code, results = lpcli.lp_destroy(pf_w_path,
targets,
run_id=run_id)
_handle_results(ctx, results, return_code)
except LinchpinError as e:
ctx.log_state(e)
sys.exit(1)
def get_click_options(self):
import click
import q2cli
import q2cli.core
name = '--' + self.cli_name
type = click.Path(exists=True, file_okay=True, dir_okay=False,
readable=True)
type = q2cli.core.MultipleType(type)
help = ('Metadata file or artifact viewable as metadata. This '
'option may be supplied multiple times to merge metadata.')
if self.default is None:
requirement = '[optional]'
else:
requirement = '[required]'
option = q2cli.Option([name], type=type, help=help, multiple=True)
yield self._add_description(option, requirement)
def main(ctx, # type: click.Context
directory, # type: click.Path
quiet, # type: bool
extension, # type: str
source_directory, # type: str
build_directory, # type: str
):
# type: (...) -> None
"""reqwire: micromanages your requirements."""
requirements_dir = pathlib.Path(str(directory))
console.verbose = not quiet
ctx.obj = {
'build_dir': build_directory,
'directory': requirements_dir,
'extension': extension,
'source_dir': source_directory,
}
def config_examples(dest, user_dir):
""" Copy the example workflows to a directory.
\b
DEST: Path to which the examples should be copied.
"""
examples_path = Path(lightflow.__file__).parents[1] / 'examples'
if examples_path.exists():
dest_path = Path(dest).resolve()
if not user_dir:
dest_path = dest_path / 'examples'
if dest_path.exists():
if not click.confirm('Directory already exists. Overwrite existing files?',
default=True, abort=True):
return
else:
dest_path.mkdir()
for example_file in examples_path.glob('*.py'):
shutil.copy(str(example_file), str(dest_path / example_file.name))
click.echo('Copied examples to {}'.format(str(dest_path)))
else:
click.echo('The examples source path does not exist')
def prepare(sets):
"""Prepare sticker sets to be uploaded by this tool.
Reads any given directories, process any file Telegram won't accept.
Generates and overwrites existing .ssd file.
"""
# TODO: do first-run configurations to setting paths to executables of pngquant and/or waifu2x
from pytgasu.prepare import SetDefGenerator, PrepareImageFiles
for set_dir in sets:
set_dir = Path(set_dir).resolve()
PrepareImageFiles(set_dir=set_dir)
SetDefGenerator(set_dir=set_dir)
print(NOTICE_GO_EDIT_DEFS)
def logout():
"""Logout from Telegram."""
if not Path(PATH_TGSESSION_FILE).exists():
# Return early because TelegramClient creates a session first,
# but only when you can log_out() the session file gets deleted.
# If it's not there, don't create it and waste time talking to Telegram.
print(ERROR_NOT_LOGGEDIN)
return
from telethon import TelegramClient
from pytgasu.upload import CustomisedSession
tc = TelegramClient(
session=CustomisedSession.try_load_or_create_new(),
api_id=TG_API_ID,
api_hash=TG_API_HASH)
tc.connect()
# I guess even if user is not authorised, invoking LogOutRequest does not cause problems
tc.log_out()
Path(PATH_TGSESSION_FILE).parent.unlink()
def copy_plugin_contents(path, plugin):
module_name = plugin.module_name.split(".", 1)[0]
plugin_path = path.joinpath("plugin")
plugin_path.mkdir()
for entry in plugin.dist.resource_listdir(module_name):
if entry == "__pycache__":
continue
filename = plugin.dist.get_resource_filename(
pkg_resources._manager,
os.path.join(module_name, entry)
)
filename = pathlib.Path(filename).absolute()
destpath = plugin_path.joinpath(filename.name)
if filename.is_dir():
shutil.copytree(filename.as_posix(), destpath.as_posix(),
symlinks=True)
else:
shutil.copy2(filename.as_posix(), destpath.as_posix(),
follow_symlinks=False)
def init():
"""Return top level command handler."""
@click.command()
@click.option('--api', required=False, help='API url to use.',
envvar='TREADMILL_RESTAPI')
@click.option('-m', '--manifest', help='App manifest file (stream)',
type=click.Path(exists=True, readable=True))
@click.option('--delete', help='Delete the app.',
is_flag=True, default=False)
@click.argument('appname', required=False)
@cli.handle_exceptions(restclient.CLI_REST_EXCEPTIONS)
def configure(api, manifest, delete, appname):
"""Configure a Treadmill app"""
restapi = context.GLOBAL.admin_api(api)
if appname:
if delete:
return _delete(restapi, appname)
return _configure(restapi, manifest, appname)
else:
return _list(restapi)
return configure
def init():
"""Return top level command handler."""
@click.command()
@click.argument('inputfile', type=click.Path(exists=True))
@click.argument('params', nargs=-1,
type=click.Path(exists=True, readable=True))
def interpolate(inputfile, params):
"""Interpolate input file template."""
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.dirname(inputfile)),
keep_trailing_newline=True
)
data = {}
for param in params:
with io.open(param, 'rb') as fd:
data.update(yaml.load(stream=fd))
cli.out(env.get_template(os.path.basename(inputfile)).render(data))
return interpolate
def init():
"""Return top level command handler"""
@click.command()
@click.option('--treadmill-root', type=click.Path(exists=True),
envvar='TREADMILL_APPROOT', required=True,
help='Treadmill root path.')
@click.option('--upload-user',
envvar='TREADMILL_ID', required=True,
help='Upload postmortem statistics with this user.')
@click.option('--upload-url',
help='Upload postmortem statistics to this file url.')
def collect(treadmill_root, upload_user, upload_url):
"""Collect Treadmill node data"""
os.environ['TREADMILL_ID'] = upload_user
postmortem.run(treadmill_root, upload_url)
return collect
def init():
"""Top level command handler."""
@click.command()
@click.option('--approot', type=click.Path(exists=True),
envvar='TREADMILL_APPROOT', required=True)
@click.option('--runtime', envvar='TREADMILL_RUNTIME', required=True)
@click.argument('container_dir', type=click.Path(exists=True))
def finish(approot, runtime, container_dir):
"""Finish treadmill application on the node."""
# Run with finish context as finish runs in cleanup.
with lc.LogContext(_LOGGER, os.path.basename(container_dir),
lc.ContainerAdapter) as log:
log.info('finish (approot %s)', approot)
tm_env = appenv.AppEnvironment(approot)
app_runtime.get_runtime(runtime, tm_env, container_dir).finish()
return finish
def init():
"""Top level command handler."""
@click.command()
@click.option('--approot', type=click.Path(exists=True),
envvar='TREADMILL_APPROOT', required=True)
@click.option('--runtime', envvar='TREADMILL_RUNTIME', required=True)
@click.argument('eventfile', type=click.Path(exists=True))
def configure(approot, runtime, eventfile):
"""Configure local manifest and schedule app to run."""
tm_env = appenv.AppEnvironment(root=approot)
container_dir = app_cfg.configure(tm_env, eventfile, runtime)
_LOGGER.info('Configured %r', container_dir)
return configure
def clone(force):
"""Clone a copy of the TigerHost project to
a private location and set the project path
to the cloned location.
"""
path = default_project_path()
if os.path.exists(path):
if not force:
click.confirm('Path {} already exists. Continuing will remove this path.'.format(
path), default=True, abort=True)
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
click.echo('Cloning to {}...'.format(path), nl=False)
clone_project()
click.secho('Done', fg='black', bg='green')
save_project_path(path)
def modpack_file(path: Path) -> Generator[ModPack, None, None]:
"""Context manager for manipulation of existing mod-pack.
Keyword arguments:
path: Path to the existing ModPack file, which should be provided.
Yields:
ModPack loaded from path. If no exception occurs, the provided modpack
is written (with changes) back to the file on context exit.
"""
with path.open(encoding='utf-8', mode='r') as istream:
mp = ModPack.load(istream)
yield mp
with path.open(encoding='utf-8', mode='w') as ostream:
mp.dump(ostream)
def new(ctx, pack, path, gamever):
"""Create and initialize a new mod-pack."""
# Check file system state
pack_path = Path(pack)
mods_path = Path(path)
if not pack_path.parent.exists():
msg = _('Mod-pack directory does not exists: {}').format(pack_path.parent)
raise UserReport(msg)
# Mods path existence is checked by click
# Setup game fro the mod-pack
game = ctx['default_game']
if gamever is not None:
game.version = gamever
with pack_path.open(mode='w', encoding='utf-8') as stream:
mp = ModPack(game, mods_path.relative_to(pack_path.parent))
mp.dump(stream)
def install(ctx, pack, release, mod):
"""Install new MOD into a mod-pack."""
with modpack_file(Path(pack)) as pack:
moddb = pack.game.database
mod = Mod.find(moddb.session(), mod)
proxy_session = requests.Session()
with ctx['token_path'].open(encoding='utf-8') as token:
proxy_session.auth = Authorization.load(token)
changes = pack.install_changes(
mod=mod,
min_release=Release[release.capitalize()],
session=proxy_session,
)
pack.apply(changes)
def upgrade(ctx, pack, release, mod):
"""Upgrade MOD and its dependencies."""
with modpack_file(Path(pack)) as pack:
moddb = pack.game.database
mod = Mod.find(moddb.session(), mod)
proxy_session = requests.Session()
with ctx['token_path'].open(encoding='utf-8') as token:
proxy_session.auth = Authorization.load(token)
changes = pack.upgrade_changes(
mod=mod,
min_release=Release[release.capitalize()],
session=proxy_session,
)
if not changes:
raise AlreadyUpToDate(mod.name)
pack.apply(changes)
def load_config(func):
"""Decorator for add load config file option"""
@functools.wraps(func)
def internal(*args, **kwargs):
filename = kwargs.pop('config')
if filename is None:
click.echo('--config option is required.', err=True)
raise SystemExit(1)
config = load(pathlib.Path(filename))
kwargs['config'] = config
return func(*args, **kwargs)
decorator = click.option(
'--config',
'-c',
type=click.Path(exists=True),
envvar='YUI_CONFIG_FILE_PATH',
)
return decorator(internal)
def cmd_status(self):
click.echo('On branch %s' % self.repo.active_branch)
click.echo()
click.echo(' Working')
click.echo(' Cache')
click.echo(' Depot')
click.echo(' SHA-256 Size Path')
for entry in self._entries():
if self.depot:
self.depot.get_status(entry)
if entry.is_linked:
w_bit = 'W'
elif entry.in_working:
w_bit = '*'
else:
w_bit = ' '
c_bit = entry.in_cache and 'C' or ' '
d_bit = entry.in_depot and 'D' or ' '
click.echo('[ {} {} {} ] {} {:>6} {}'.format(
w_bit, c_bit, d_bit, entry.digest[:8],
human_size(entry.size), entry.rel_path))
click.echo()
def sync_source_option(func):
"""
Add the sync source option to the decorated command func.
:param func: Click CLI command to decorate
:return: decorated CLI command
"""
return option(
'--src',
'-s',
'sync_source',
type=Path(
exists=True,
file_okay=False,
resolve_path=True,
),
help='Local directory from which to sync. Optional.',
)(func)
def sync_destination_option(func):
"""
Add the sync destination option to the decorated command func.
:param func: Click CLI command to decorate
:return: decorated CLI command
"""
return option(
'--dest',
'-d',
'sync_destination',
type=Path(
file_okay=False,
),
help='Remote directory to sync to. Optional.',
)(func)
def make_directory_override_option(func):
"""
Add the make destination option to the decorated command func.
:param func: Click CLI command to decorate
:return: decorated CLI command
"""
return option(
'--dest',
'-d',
'make_destination',
type=Path(
file_okay=False,
),
help='Remote directory to run make in. Optional.',
)(func)
def extract_start(source_id, sources_config):
"""
Start extraction for a pipeline specified by ``source_id`` defined in
``--sources-config``. ``--sources-config defaults to ``settings.SOURCES_CONFIG_FILE``.
:param sources_config: Path to file containing pipeline definitions. Defaults to the value of ``settings.SOURCES_CONFIG_FILE``
:param source_id: identifier used in ``--sources_config`` to describe pipeline
"""
sources = load_sources_config(sources_config)
# Find the requested source definition in the list of available sources
source = None
for candidate_source in sources:
if candidate_source['id'] == source_id:
source = candidate_source
continue
# Without a config we can't do anything, notify the user and exit
if not source:
click.echo('Error: unable to find source with id "%s" in sources '
'config' % source_id)
return
setup_pipeline(source)
def run(model, collect, filename, directory, ignore_git, pytest_args):
"""
Run the test suite and collect results.
MODEL: Path to model file. Can also be supplied via the environment variable
MEMOTE_MODEL or configured in 'setup.cfg' or 'memote.ini'.
"""
if ignore_git:
repo = None
else:
repo = callbacks.probe_git()
if not any(a.startswith("--tb") for a in pytest_args):
pytest_args = ["--tb", "short"] + pytest_args
if not any(a.startswith("-v") for a in pytest_args):
pytest_args.append("-vv")
if collect:
if repo is not None and directory is not None:
filename = join(directory,
"{}.json".format(repo.active_branch.commit.hexsha))
code = api.test_model(model, filename, pytest_args=pytest_args)
else:
code = api.test_model(model, pytest_args=pytest_args)
sys.exit(code)
def index_documents(ctx, hocr_files, autocomplete_min_count):
def show_fn(hocr_path):
if hocr_path is None:
return ''
else:
return hocr_path.name
global repository
if repository is None:
repository = DatabaseRepository(ctx.obj['DB_PATH'])
hocr_files = tuple(pathlib.Path(p) for p in hocr_files)
with click.progressbar(hocr_files, item_show_func=show_fn) as hocr_files:
for hocr_path in hocr_files:
try:
repository.ingest_document(hocr_path, autocomplete_min_count)
except Exception as e:
logger.error("Could not ingest {}".format(hocr_path))
logger.exception(e)
def get_requirements_and_latest(filename, force=False):
"""Parse a requirements file and get latest version for each requirement.
Yields a tuple of (original line, InstallRequirement instance,
spec_versions, latest_version).
:param filename: Path to a requirements.txt file.
:param force: Force getting latest version even for packages without
a version specified.
"""
session = PipSession()
finder = PackageFinder(
session=session, find_links=[], index_urls=[PyPI.simple_url])
_, content = get_file_content(filename, session=session)
for line_number, line, orig_line in yield_lines(content):
line = req_file.COMMENT_RE.sub('', line)
line = line.strip()
req = parse_requirement_line(line, filename, line_number, session, finder)
if req is None or req.name is None or req_file.SCHEME_RE.match(req.name):
yield (orig_line, None, None, None)
continue
spec_ver = current_version(req)
if spec_ver or force:
latest_ver = latest_version(req, session, finder)
yield (orig_line, req, spec_ver, latest_ver)