python类Repo()的实例源码

core.py 文件源码 项目:ndeploy 作者: sglebs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def git_clone_all(config_as_dict):
    progress = GitProgress()
    for repo_url, branch, app_name, app_props in repo_and_branch_and_app_name_and_app_props_iterator(config_as_dict):
        repo_full_path = get_repo_full_path_for_repo_url(repo_url, config_as_dict)
        if os.path.exists(repo_full_path):
            print("Already cloned %s from %s" % (app_name, repo_url))
            repo = git.Repo(repo_full_path)
            if repo.active_branch.name != branch:
                print("Your local checkout is in a different branch (%s) from the branch you want to deploy (%s). URL: %s" %
                      (repo.active_branch.name, branch, repo_url))
                repo.git.checkout(branch)
            origin = repo.remotes.origin
            #origin.fetch(branch)
            if config_as_dict["gitpull"]:
                origin.pull(branch)
        else:
            os.makedirs(repo_full_path)
            try:
                repo = git.Repo.clone_from(repo_url, repo_full_path, branch=branch, progress=progress)
                print("Cloned: %s" % repo)
            except git.GitCommandError as e:
                print(e)
                return False
pull_from_github.py 文件源码 项目:interact 作者: data-8 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _initialize_repo(repo_name, repo_dir, config, progress=None):
    """
    Clones repository and configures it to use sparse checkout.
    Extraneous folders will get removed later using git read-tree
    """
    util.logger.info('Repo {} doesn\'t exist. Cloning...'.format(repo_name))
    # Clone repo
    repo = git.Repo.clone_from(
        config['GITHUB_ORG'] + repo_name,
        repo_dir,
        progress,
        branch=config['REPO_BRANCH'],
    )

    # Use sparse checkout
    config = repo.config_writer()
    config.set_value('core', 'sparsecheckout', True)
    config.release()

    util.logger.info('Repo {} initialized'.format(repo_name))
docker_build.py 文件源码 项目:biweeklybudget 作者: jantman 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _find_git_info(self):
        """
        Return information about the state of the Git repository tox is being
        run from.

        :return: dict with keys 'dirty' (bool), 'sha' (str), 'tag' (str or None)
        :rtype: dict
        """
        res = {}
        logger.debug('Checking git status...')
        repo = Repo(path=self._gitdir, search_parent_directories=False)
        res['sha'] = repo.head.commit.hexsha
        res['dirty'] = repo.is_dirty(untracked_files=True)
        res['tag'] = None
        for tag in repo.tags:
            # each is a git.Tag object
            if tag.commit.hexsha == res['sha']:
                res['tag'] = tag.name
        logger.debug('Git info: %s', res)
        return res
fetch.py 文件源码 项目:AerisCloud 作者: AerisCloud 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _clone_project(project_name):
    gh = Github()

    (gh_repo, forked) = gh.get_repo(project_name, fork=True)
    if not gh_repo:
        click.echo('error: no repository named %s was found on '
                   'your user and configured organizations' %
                   click.style(project_name, bold=True))
        sys.exit(1)

    if forked:
        click.echo('We found a repository named %s and forked it to your '
                   'user, it now accessible at the following url:\n'
                   '%s' % (forked.full_name, gh_repo.html_url))

    dest_path = os.path.join(projects_path(), project_name)
    click.echo('We will clone %s in %s\n' % (gh_repo.ssh_url, dest_path))

    vgit('clone', gh_repo.ssh_url, dest_path)

    # add our upstream remote
    if gh_repo.parent:
        repo = Repo(dest_path)
        repo.create_remote('upstream', gh_repo.parent.ssh_url)
utils.py 文件源码 项目:vulnerability-rating-taxonomy 作者: bugcrowd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def all_versions(filename):
    """
    Find, open and parse all tagged versions of a json file, including the current version

    :param filename: The filename to find
    :return: a dictionary of all the versions, in the form
        {
            'current': {...},
            '1.0': {...},
            '1.1': {...}
        }
    """
    repo = git.Repo()
    versions = {
        'current': get_json(filename)
    }
    for tag in repo.tags:
        version_dict = repo.git.show('%s:%s' % (tag.name, filename))
        versions[tag.name.strip('v')] = json.loads(version_dict)
    return versions
experiment.py 文件源码 项目:lang2program 作者: kelvinguu 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def match_commit(self, src_dir):
        """Check that the current commit matches the recorded commit for this experiment.

        Raises an error if commits don't match, or if there is dirty state.

        Args:
            src_dir (str): path to the Git repository
        """
        if self.metadata['dirty_repo']:
            raise EnvironmentError('Working directory was dirty when commit was recorded.')

        repo = Repo(src_dir)
        if repo.is_dirty():
            raise EnvironmentError('Current working directory is dirty.')

        current_commit = repo.head.object.hexsha.encode('utf-8')
        exp_commit = self.metadata['commit']
        if current_commit != exp_commit:
            raise EnvironmentError("Commits don't match.\nCurrent: {}\nRecorded: {}".format(current_commit, exp_commit))
experiment.py 文件源码 项目:lang2program 作者: kelvinguu 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def match_commit(self, src_dir):
        """Check that the current commit matches the recorded commit for this experiment.

        Raises an error if commits don't match, or if there is dirty state.

        Args:
            src_dir (str): path to the Git repository
        """
        if self.metadata['dirty_repo']:
            raise EnvironmentError('Working directory was dirty when commit was recorded.')

        repo = Repo(src_dir)
        if repo.is_dirty():
            raise EnvironmentError('Current working directory is dirty.')

        current_commit = repo.head.object.hexsha.encode('utf-8')
        exp_commit = self.metadata['commit']
        if current_commit != exp_commit:
            raise EnvironmentError("Commits don't match.\nCurrent: {}\nRecorded: {}".format(current_commit, exp_commit))
default.py 文件源码 项目:web3py 作者: web2py 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def git_push():
    """ Git Push handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    form = Form([Field('changelog', requires=IS_NOT_EMPTY())])
    #form.element('input[type=submit]')['_value'] = T('Push')
    #form.add_button(T('Cancel'), URL('site'))

    if form.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            index = repo.index
            index.add([apath(r=request) + app + '/*'])
            new_commit = index.commit(form.vars.changelog)
            origin = repo.remotes.origin
            origin.push()
            session.flash = T(
                "Git repo updated with latest application changes.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
            redirect(URL('site'))
    return dict(app=app, form=form)
library.py 文件源码 项目:loady 作者: timedata-org 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def load(self):
        """Load the library."""
        if not git:
            raise EnvironmentError(MISSING_GIT_ERROR)

        if os.path.exists(self.path):
            if not config.CACHE_DISABLE:
                return
            shutil.rmtree(self.path, ignore_errors=True)

        with files.remove_on_exception(self.path):
            url = self.GIT_URL.format(**vars(self))
            repo = git.Repo.clone_from(
                url=url, to_path=self.path, b=self.branch)
            if self.commit:
                repo.head.reset(self.commit, index=True, working_tree=True)
bro-pkg.py 文件源码 项目:package-manager 作者: bro 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def active_git_branch(path):
    try:
        repo = git.Repo(path)
    except git.exc.NoSuchPathError as error:
        return None

    if not repo.working_tree_dir:
        return None

    rval = repo.active_branch

    if not rval:
        return None

    rval = str(rval)
    return rval
manager.py 文件源码 项目:package-manager 作者: bro 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def refresh_installed_packages(self):
        """Fetch latest git information for installed packages.

        This retrieves information about outdated packages, but does
        not actually upgrade their installations.

        Raises:
            IOError: if the package manifest file can't be written
        """
        for ipkg in self.installed_packages():
            clonepath = os.path.join(self.package_clonedir, ipkg.package.name)
            clone = git.Repo(clonepath)
            LOG.debug('fetch package %s', ipkg.package.qualified_name())

            try:
                clone.remote().fetch()
            except git.exc.GitCommandError as error:
                LOG.warn('failed to fetch package %s: %s',
                         ipkg.package.qualified_name(), error)

            ipkg.status.is_outdated = _is_clone_outdated(
                clone, ipkg.status.current_version, ipkg.status.tracking_method)

        self._write_manifest()
test_git.py 文件源码 项目:maintain 作者: kylef 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_errors_when_remote_has_changes(self):
        with git_bare_repo() as bare_repo:
            with git_repo() as repo:
                touch('README.md')
                repo.index.add(['README.md'])
                repo.index.commit('Initial commit')
                repo.create_remote('origin', url=bare_repo)
                repo.remotes.origin.push(repo.refs.master)

                with temp_directory() as path:
                    clone = Repo(bare_repo).clone(path)
                    touch('CHANGELOG.md')
                    clone.index.add(['README.md'])
                    clone.index.commit('Second commit')
                    clone.remotes.origin.push(clone.refs.master)

                with self.assertRaises(Exception):
                    GitReleaser()
git_releaser.py 文件源码 项目:maintain 作者: kylef 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, config=None):
        self.repo = Repo()

        self.commit_format = (config or {}).get('commit_format', 'Release {version}')
        self.tag_format = (config or {}).get('tag_format', '{version}')

        if self.repo.head.ref != self.repo.heads.master:
            # TODO: Support releasing from stable/hotfix branches
            raise Exception('You need to be on the `master` branch in order to do a release.')

        if self.repo.is_dirty():
            raise Exception('Git repository has unstaged changes.')

        if len(self.repo.untracked_files) > 0:
            raise Exception('Git repository has untracked files.')

        if self.has_origin():
            self.repo.remotes.origin.fetch()

            if self.repo.remotes.origin.refs.master.commit != self.repo.head.ref.commit:
                raise Exception('Master has unsynced changes.')
default.py 文件源码 项目:slugiot-client 作者: slugiot 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def git_push():
    """ Git Push handler """
    app = get_app()
    if not have_git:
        session.flash = GIT_MISSING
        redirect(URL('site'))
    form = SQLFORM.factory(Field('changelog', requires=IS_NOT_EMPTY()))
    form.element('input[type=submit]')['_value'] = T('Push')
    form.add_button(T('Cancel'), URL('site'))
    form.process()
    if form.accepted:
        try:
            repo = git.Repo(os.path.join(apath(r=request), app))
            index = repo.index
            index.add([apath(r=request) + app + '/*'])
            new_commit = index.commit(form.vars.changelog)
            origin = repo.remotes.origin
            origin.push()
            session.flash = T(
                "Git repo updated with latest application changes.")
            redirect(URL('site'))
        except git.UnmergedEntriesError:
            session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
            redirect(URL('site'))
    return dict(app=app, form=form)
git_create_tag.py 文件源码 项目:chef_community_cookbooks 作者: DennyZhang 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def git_list_create_tag(working_dir, git_list_file, tag_name, delete_tag_already_exists):
    git_list = []
    with open(git_list_file,'r') as f:
        for row in f:
            row = row.strip()
            if row == "" or row.startswith('#'):
                continue
            git_list.append(row)

    for repo_url in git_list:
        code_dir = "%s/%s" % (working_dir, get_repo_name(repo_url))
        git_repo = None
        if os.path.exists(code_dir):
            # re-use current code folder
            git_repo = git.Repo(code_dir)
        else:
            git_repo = git.Repo.clone_from(repo_url, code_dir)

        git_create_tag(git_repo, tag_name, delete_tag_already_exists)
    return True
parsing.py 文件源码 项目:focuson 作者: uber 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_route_info(root_path):
    routing_path = os.path.join(root_path, "Uber", "uber", "routing.py")
    with open(routing_path) as f:
        file_contents = f.read()
    tree = ast.parse(file_contents)
    visitor = RouteVisitor()
    visitor.visit(tree)
    all_routes = visitor.routable_funcs

    # Get the commit that added each route and tack it onto the details
    route_linenos = sorted(set(x[0] for x in all_routes.values()))
    repo = git.Repo(root_path)
    blame_by_line = get_line_blames(repo, routing_path, route_linenos)
    for name, details in all_routes.iteritems():
        all_routes[name] = details + (blame_by_line[details[0]],)
    return all_routes
pluginManager.py 文件源码 项目:PluginManager 作者: mandeeps708 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def install(self, plugin):
        "Installs a GitHub plugin"

        print("Installing...", plugin.name)
        import git

        # Clone the GitHub repository via the URL.
        # git.Git().clone(str(plugin.baseurl), install_dir)

        # Checks if the plugin installation path already exists.
        if not self.isInstalled(plugin):
            """Clone the GitHub repository via Plugin URL to install_dir and
            with depth=1 (shallow clone).
            """
            git.Repo.clone_from(plugin.baseurl, self.install_dir, depth=1)
            print("Done!")
            return True

        else:
            print("Plugin already installed!")
            return False
test_plugins.py 文件源码 项目:infrared 作者: redhat-openstack 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_add_plugin_from_git_exception(plugin_manager_fixture, mocker):

    plugin_manager = plugin_manager_fixture()

    mock_git = mocker.patch("infrared.core.services.plugins.git")
    mock_git.Repo.clone_from.side_effect = git.exc.GitCommandError(
        "some_git_cmd", 1)
    mock_git.exc.GitCommandError = git.exc.GitCommandError
    mock_tempfile = mocker.patch("infrared.core.services.plugins.tempfile")
    mock_shutil = mocker.patch("infrared.core.services.plugins.shutil")
    mock_os = mocker.patch("infrared.core.services.plugins.os")
    mock_os.path.exists.return_value = False

    # add_plugin call
    with pytest.raises(IRFailedToAddPlugin):
        plugin_manager.add_plugin(
            "https://sample_github.null/plugin_repo.git")

    mock_shutil.rmtree.assert_called_with(mock_tempfile.mkdtemp.return_value)
papi_create_datastore_new.py 文件源码 项目:ccu_and_eccu_publish 作者: gaofubin 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def createGitRepository():
    # Create the git repository, or change into it if it exists
    datastore = 'datastore'
    if not os.path.isdir(datastore): 
        os.makedirs(datastore) 
        git.Repo.init(datastore) 
    repo = git.Repo(datastore)  
    os.chdir(datastore)
    return repo


问题


面经


文章

微信
公众号

扫码关注公众号