python类init()的实例源码

repo.py 文件源码 项目:git-repo 作者: guyzmo 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def do_clone(self, service=None, repo_path=None):
        service = service or self.get_service(lookup_repository=False)
        repo_path = repo_path or os.path.join(self.path, self.target_repo or self.repo_name)
        if os.path.exists(repo_path) and os.listdir(repo_path) != []:
            raise FileExistsError('Cannot clone repository, '
                                  'a folder named {} already exists and '
                                  'is not an empty directory!'.format(repo_path))
        try:
            repository = Repo.init(repo_path)
            service = RepositoryService.get_service(repository, self.target)
            service.clone(self.namespace, self.repo_name, self.branch)
            log.info('Successfully cloned `{}` into `{}`!'.format(
                service.format_path(self.repo_slug),
                repo_path)
            )
            return 0
        except Exception as err:
            if os.path.exists(repo_path):
                shutil.rmtree(repo_path)
            raise ResourceNotFoundError(err.args[2].decode('utf-8')) from err
test_release.py 文件源码 项目:maintain 作者: kylef 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_git_release_without_remote(self):
        with self.runner.isolated_filesystem():
            with open('VERSION', 'w') as fp:
                fp.write('1.0.0\n')

            repo = Repo.init()
            repo.index.add(['VERSION'])
            repo.index.commit('Initial commit')

            result = self.runner.invoke(release, ['2.0.0'])
            self.assertIsNone(result.exception)
            self.assertEqual(result.exit_code, 0)

            with open('VERSION') as fp:
                self.assertEqual(fp.read(), '2.0.0\n')

            self.assertEqual(repo.refs.master.commit.message, 'Release 2.0.0')
            self.assertEqual(repo.tags['2.0.0'].commit, repo.refs.master.commit)
            self.assertFalse(repo.is_dirty())
test_osa_differ.py 文件源码 项目:osa_differ 作者: major 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_get_projects(self, tmpdir):
        """Verify that we can retrieve projects."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test1.yml'
        file.write_text(u"""---
tempest_git_repo: https://git.openstack.org/openstack/tempest
tempest_git_install_branch: 1493c7f0ba49bfccb9ff8516b10a65d949d7462e
tempest_git_project_group: utility_all
""", encoding='utf-8')
        file = p / 'test2.yml'
        file.write_text(u"""---
novncproxy_git_repo: https://github.com/kanaka/novnc
novncproxy_git_install_branch: da82b3426c27bf1a79f671c5825d68ab8c0c5d9f
novncproxy_git_project_group: nova_console
""", encoding='utf-8')
        repo.index.add(['test1.yml', 'test2.yml'])
        repo.index.commit("Test")
        projects = osa_differ.get_projects(path,
                                           'HEAD')
        assert isinstance(projects, list)
test_osa_differ.py 文件源码 项目:osa_differ 作者: major 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_get_roles(self, tmpdir):
        """Verify that we can get OSA role information."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'ansible-role-requirements.yml'
        file.write_text(u"""
- name: apt_package_pinning
  scm: git
  src: https://github.com/openstack/openstack-ansible-apt_package_pinning
  version: master
""", encoding='utf-8')
        repo.index.add(['ansible-role-requirements.yml'])
        repo.index.commit("Test")

        roles = osa_differ.get_roles(path,
                                     'HEAD',
                                     'ansible-role-requirements.yml')
        assert isinstance(roles, list)
        assert roles[0][0] == 'apt_package_pinning'
test_osa_differ.py 文件源码 项目:osa_differ 作者: major 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_commit_range_valid(self, tmpdir):
        """Verify that we can test a commit range for validity."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing1', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 1')
        file.write_text(u'Testing2', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 2')

        result = osa_differ.validate_commit_range(path, 'HEAD~1', 'HEAD')

        assert result
test_osa_differ.py 文件源码 项目:osa_differ 作者: major 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_commit_range_flipped(self, tmpdir):
        """Verify that we can test a commit range for validity."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing1', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 1')
        file.write_text(u'Testing2', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 2')

        result = osa_differ.validate_commit_range(path, 'HEAD', 'HEAD~1')

        assert result == 'flip'
test_osa_differ.py 文件源码 项目:osa_differ 作者: major 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_make_osa_report(self, tmpdir):
        """Verify that we can make the OSA header report."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing1', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 1')
        file.write_text(u'Testing2', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 2')

        parser = osa_differ.create_parser()
        args = parser.parse_args(['HEAD~1', 'HEAD'])
        report = osa_differ.make_osa_report(path,
                                            'HEAD~1',
                                            "HEAD",
                                            args)
        assert "HEAD~1" in report
        assert "OpenStack-Ansible Diff Generator" in report
test_osa_differ.py 文件源码 项目:osa_differ 作者: major 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_make_report_old_pin_missing(self, tmpdir):
        """Verify that we can make a report when the old pin is missing."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing1', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 1')
        file.write_text(u'Testing2', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing 2')

        new_pins = [("test", "http://example.com", "HEAD")]
        old_pins = []

        report = osa_differ.make_report(str(tmpdir), old_pins, new_pins)

        assert report == ''
test_osa_differ.py 文件源码 项目:osa_differ 作者: major 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_repo_clone(self, tmpdir, monkeypatch):
        """Verify that we can clone a repo."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing')

        def mockclone(x, y):
            return path

        def mockwait(*args, **kwargs):
            return True

        monkeypatch.setattr("git.repo.base.Repo.clone", mockclone)
        monkeypatch.setattr("git.cmd.Git.AutoInterrupt.wait", mockwait)
        result = osa_differ.repo_clone(path,
                                       "http://example.com")

        assert result.active_branch.name == 'master'
        assert not result.is_dirty()
test_osa_differ.py 文件源码 项目:osa_differ 作者: major 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_repo_clone_update(self, tmpdir):
        """Verify that we can clone a repo."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing')

        p = tmpdir.mkdir("test2")
        path_clonefrom = "{0}/testrepodoesntexist".format(str(p))
        result = osa_differ.update_repo(path_clonefrom, path)

        assert result.active_branch.name == 'master'
        assert not result.is_dirty()
test_osa_differ.py 文件源码 项目:osa_differ 作者: major 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_repo_update_with_fetch(self, tmpdir, monkeypatch):
        """Verify that we can get a repo ready and update it."""
        p = tmpdir.mkdir('test')
        path = str(p)
        repo = Repo.init(path)
        file = p / 'test.txt'
        file.write_text(u'Testing', encoding='utf-8')
        repo.index.add(['test.txt'])
        repo.index.commit('Testing')
        repo.create_remote('origin', url='http://example.com')

        monkeypatch.setattr(
            "git.cmd.Git._call_process", lambda *args, **kwargs: True)
        result = osa_differ.repo_pull(path,
                                      "http://example.com",
                                      fetch=True)
        monkeypatch.undo()
        assert result.active_branch.name == 'master'
        assert not result.is_dirty()
init.py 文件源码 项目:pomu 作者: Hummer12007 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def init_plain_repo(create, repo_path):
    """
    Initialize a plain repository
    Parameters:
        create - if true, create a new git repo,
                 else, reuse an existing one
        repo_path - a path for the repository to reside in
    """
    if not repo_path:
        return Result.Err('repository path required')
    if create:
        if path.isdir(repo_path):
            return Result.Err('this repository already exists')
        try:
            makedirs(repo_path)
        except PermissionError:
            return Result.Err('you do not have enough permissions to create the git repository')
        Repo.init(repo_path)
        try:
            return Result.Ok(init_new(repo_path).unwrap())
        except ResultException as e:
            rmtree(repo_path)
            return Result.Err(str(e))
    else:
        if not path.isdir(repo_path):
            return Result.Err('directory not found')
        return init_pomu(repo_path)
init.py 文件源码 项目:pomu 作者: Hummer12007 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def init_portage_repo(create, repo, repo_dir):
    """
    Initialize a portage repository
    Parameters:
        create - if true, create a new portage repo with git,
                 else, reuse an existing one
        repo - name of the repository
        repo_dir - location of the newly created repository, if applicable
    """
    if not repo:
        return Result.Err('repository name required')
    rsets = portage.db[portage.root]['vartree'].settings.repositories
    if create:
        if repo in rsets.prepos_order:
            return Result.Err('a repository with such name already exists!')
        repo_path = path.join(repo_dir, repo)
        try:
            makedirs(repo_path)
        except PermissionError:
            return Result.Err('you do not have enough permissions to create the git repository')
        try:
            with open(path.join(portage.root, 'etc/portage/repos.conf', 'pomu.conf'), 'a') as f:
                f.write('[' + repo + ']' + '\n')
                f.write('location = ' + repo_path + '\n')
        except PermissionError:
            rmtree(repo_path)
            return Result.Err('you do not have enough permissions to setup a portage repo')
        Repo.init(repo_path)
        try:
            return Result.Ok(init_new(repo_path, repo).unwrap())
        except ResultException as e:
            rmtree(repo_path)
            return Result.Err(str(e))
    else:
        if repo not in rsets.prepos_order:
            return Result.Err('repository not found')
        return init_pomu(rsets.prepos[repo], repo)
__init__.py 文件源码 项目:pentagon 作者: reactiveops 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __git_init(self):
        """ Initialize git repository in the project infrastructure path """
        if self._git_repo:
            return Git().clone(self._git_repo, self._repository_directory)
        else:
            return Repo.init(self._repository_directory)
test_commit.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_ambiguous_arg_iteration(self, rw_dir):
        rw_repo = Repo.init(osp.join(rw_dir, 'test_ambiguous_arg'))
        path = osp.join(rw_repo.working_tree_dir, 'master')
        touch(path)
        rw_repo.index.add([path])
        rw_repo.index.commit('initial commit')
        list(rw_repo.iter_commits(rw_repo.head.ref))  # should fail unless bug is fixed
test_docs.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_add_file_and_commit(self, rw_dir):
        import git

        repo_dir = osp.join(rw_dir, 'my-new-repo')
        file_name = osp.join(repo_dir, 'new-file')

        r = git.Repo.init(repo_dir)
        # This function just creates an empty file ...
        open(file_name, 'wb').close()
        r.index.add([file_name])
        r.index.commit("initial commit")

        # ![test_add_file_and_commit]
repo.py 文件源码 项目:git-repo 作者: guyzmo 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def init(self):  # pragma: no cover
        if 'GIT_WORK_TREE' in os.environ.keys() or 'GIT_DIR' in os.environ.keys():
            del os.environ['GIT_WORK_TREE']
repo.py 文件源码 项目:git-repo 作者: guyzmo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def do_gist_clone(self):
        service = self.get_service(lookup_repository=False)
        repo_path = os.path.join(self.path, self.gist_ref.split('/')[-1])
        service.repository = Repo.init(repo_path)
        service.gist_clone(self.gist_ref)
        log.info('Successfully cloned `{}` into `{}`!'.format( self.gist_ref, repo_path))
        return 0
helpers.py 文件源码 项目:git-repo 作者: guyzmo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setup_git_popen(self):
        # repository mockup (in a temporary place)
        self.repository = Repo.init(self.tempdir.name)
        # setup git command mockup
        self.Popen = MockPopen()
        def FixPopen(*a, **k):
            if 'start_new_session' in k:
                del k['start_new_session']
            return self.Popen.Popen(*a, **k)
        self.Popen.mock.Popen.side_effect = FixPopen
        self.Popen.mock.Popen_instance.stdin = None
        self.Popen.mock.Popen_instance.wait = lambda *a, **k: self.Popen.wait()
        self.Popen.mock.Popen_instance.__enter__ = lambda self: self
        self.Popen.mock.Popen_instance.__exit__ = lambda self, *a, **k: None
helpers.py 文件源码 项目:git-repo 作者: guyzmo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main_add(self, repo, rc=0, args={}):
        if repo:
            create_repo = repo.split('/')[-1]
        else:
            create_repo = 'fubar'
        os.mkdir(os.path.join(self.tempdir.name, create_repo))
        Repo.init(os.path.join(self.tempdir.name, create_repo))
        assert rc == main(self.setup_args({
            'add': True,
            '<user>/<repo>': repo,
            '--path': self.tempdir.name
        }, args)), "Non {} result for add".format(rc)
        return RepositoryService._current._did_add
helpers.py 文件源码 项目:git-repo 作者: guyzmo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main_delete(self, repo=None, rc=0, args={}):
        if repo:
            repo_path = os.path.join(self.tempdir.name, repo.split('/')[-1])
            os.mkdir(repo_path)
            Repo.init(repo_path)
        assert rc == main(self.setup_args({
            'delete': True,
            '<user>/<repo>': repo,
            '--path': self.tempdir.name,
        }, args)), "Non {} result for delete".format(rc)
        return RepositoryService._current._did_delete
helpers.py 文件源码 项目:git-repo 作者: guyzmo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def action_clone(self, namespace, repository):
        # hijack subprocess call
        with self.mockup_git(namespace, repository):
            local_slug = self.service.format_path(namespace=namespace, repository=repository, rw=True)
            self.set_mock_popen_commands([
                ('git init', b'Initialized empty Git repository in /tmp/bar/.git/', b'', 0),
                ('git remote add all {}'.format(local_slug), b'', b'', 0),
                ('git remote add {} {}'.format(self.service.name, local_slug), b'', b'', 0),
                ('git version', b'git version 2.8.0', b'', 0),
                ('git pull --progress -v {} master'.format(self.service.name), b'', '\n'.join([
                    'POST git-upload-pack (140 bytes)',
                    'remote: Counting objects: 8318, done.',
                    'remote: Compressing objects: 100% (3/3), done.',
                    'remote: Total 8318 (delta 0), reused 0 (delta 0), pack-reused 8315',
                    'Receiving objects: 100% (8318/8318), 3.59 MiB | 974.00 KiB/s, done.',
                    'Resolving deltas: 100% (5126/5126), done.',
                    'From {}:{}/{}'.format(self.service.fqdn, namespace, repository),
                    ' * branch            master     -> FETCH_HEAD',
                    ' * [new branch]      master     -> {}/master'.format(self.service.name)]).encode('utf-8'),
                0)
            ])
            with self.recorder.use_cassette(self._make_cassette_name()):
                self.service.connect()
                self.service.clone(namespace, repository)
                self.service.repository.create_remote('all', url=local_slug)
                self.service.repository.create_remote(self.service.name, url=local_slug)
helpers.py 文件源码 项目:git-repo 作者: guyzmo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def action_add(self, namespace, repository, alone=False, name=None,
            tracking='master', auto_slug=False, remotes={}):
        with self.recorder.use_cassette(self._make_cassette_name()):
            # init git in the repository's destination
            self.repository.init()
            for remote, url in remotes.items():
                self.repository.create_remote(remote, url)
            self.service.connect()
            self.service.add(user=namespace, repo=repository, alone=alone, name=name, tracking=tracking, auto_slug=auto_slug)
            #
            if not tracking:
                if not alone and not name:
                    self.assert_added_remote_defaults()
                elif not alone and name:
                    self.assert_added_remote(name)
                    self.assert_added_remote('all')
                elif alone and not name:
                    self.assert_added_remote(self.service.name)
                elif alone and name:
                    self.assert_added_remote(name)
            else:
                if not alone and not name:
                    self.assert_added_remote_defaults()
                    self.assert_tracking_remote()
                elif not alone and name:
                    self.assert_added_remote(name)
                    self.assert_added_remote('all')
                    self.assert_tracking_remote(name)
                elif alone and not name:
                    self.assert_added_remote(self.service.name)
                    self.assert_tracking_remote(branch_name=tracking)
                elif alone and name:
                    self.assert_added_remote(name)
                    self.assert_tracking_remote(name, tracking)
__init__.py 文件源码 项目:urban-journey 作者: urbanjourney 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def init(path=None):
        """Creates an empty uj project. If possible and not existing, it initializes a git repository."""
        # Initialize current directory if no arguments where given.
        target_directory = path or "./"

        # Walk through empty source directory and copy any non existing files.
        for (dir_path, dir_names, file_names) in os.walk(empty_project_dir):
            # Get relative path to source root.
            rel_path = os.path.relpath(dir_path, empty_project_dir)
            # Get path to current target directory
            target_path = os.path.normpath(os.path.join(target_directory, rel_path))
            # Create target directory if necessary.
            if not os.path.isdir(target_path):
                os.mkdir(target_path)
            # Create file id necessary.
            for file_name in file_names:
                if not os.path.exists(os.path.join(target_path, file_name)):
                    copyfile(os.path.join(dir_path, file_name), os.path.join(target_path, file_name))
                # If it's copying a ujml file. Fill is the version number.
                if file_name.endswith(".ujml"):
                    with open(os.path.join(target_path, file_name), "r") as f:
                        content = f.read()
                    with open(os.path.join(target_path, file_name), "w") as f:
                        f.write(content.format(version=uj_version))

        # If possible make sure it's a git repository.
        if git_available:
            try:
                Repo(target_directory)
            except InvalidGitRepositoryError:
                Repo.init(target_directory)
peer_score.py 文件源码 项目:loopchain 作者: theloopkr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def init_package_in_develop(self):
        # delete .git if exist
        os.system("rm -rf " + osp.join(self.__package_path, '.git'))
        os.system("rm -rf " + osp.join(self.__package_path, 'deploy'))

        # init git for make a local repository
        repo = Repo.init(str(self.__package_path))
        repo.git.add('*.py')
        repo.git.add('*.json')
        repo.index.commit('add new files')
git.py 文件源码 项目:django-gitversions 作者: michaelkuty 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def init(self, path=None):
        '''Inicialize repo object or init/clone new Git repo'''

        if os.path.isdir(self.path):
            self._repo = Repo.init(path or self.path)
        else:
            if self._url:
                # clone remote is there url
                mkdir_p(self.path)
                self._repo = Repo.clone_from(self._url,
                                             self.path,
                                             branch='master')

        return self._repo
git.py 文件源码 项目:django-gitversions 作者: michaelkuty 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def repo(self):
        '''Try create instance of repo object or init new Git one'''
        if not hasattr(self, '_repo'):
            try:
                self._repo = Repo(self.path)
            except:
                self.init()
        return self._repo
setup.py 文件源码 项目:dupin 作者: guardian 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _initialise_history_repo(repo_path):
    repo = Repo.init(repo_path)
    with open(os.path.join(repo_path, "README.md"), "w") as readme:
        print(_README(), file=readme)
    repo.index.add(["README.md"])
    repo.index.commit("Initial commit")
    return
utils.py 文件源码 项目:maintain 作者: kylef 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __enter__(self):
        path = super(git_bare_repo, self).__enter__()
        Repo.init(bare=True)
        return path
utils.py 文件源码 项目:maintain 作者: kylef 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __enter__(self):
        super(git_repo, self).__enter__()
        return Repo.init()


问题


面经


文章

微信
公众号

扫码关注公众号