def set_branch(self, branch, force=False):
try:
if branch not in self.repo.branches and branch == 'master':
'''Make init commit'''
self.repo.index.add(["*"])
self.repo.index.commit("init")
remote = self.repo.create_remote('master', self.repo.remotes.origin.url)
remote.push(refspec='{}:{}'.format(self.repo.active_branch, 'master'))
if self.is_developer_mode():
return
if str(self.repo.active_branch) != branch:
self.repo.git.checkout(branch, force=force)
else:
self.pull()
except git.GitCommandError as exc:
ColorPrint.exit_after_print_messages(message=exc.stderr)
python类GitCommandError()的实例源码
def init_project(self):
project_dir_git = os.path.join(self._project_dir, 'repo')
if not self._config['repo']:
raise Exception('Empty repo configuration')
create_dir(project_dir_git)
git_path = os.path.join(project_dir_git, '.git')
if os.path.exists(git_path):
repo = Repo(project_dir_git)
print colored('Pulling', 'green') + ' repo %s' % self._config['repo']
repo.remotes.origin.pull()
else:
try:
print 'cloning... %s' % self._config['repo']
Repo.clone_from(self._config['repo'], project_dir_git)
except GitCommandError as e:
print 'Repo cannot be found'
def git_pull():
""" Git Pull handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
dialog = FORM.confirm(T('Pull'),
{T('Cancel'): URL('site')})
if dialog.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
origin = repo.remotes.origin
origin.fetch()
origin.pull()
session.flash = T("Application updated via git pull")
redirect(URL('site'))
except git.CheckoutError:
session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
except git.GitCommandError:
session.flash = T(
"Pull failed, git exited abnormally. See logs for details.")
redirect(URL('site'))
except AssertionError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
elif 'cancel' in request.vars:
redirect(URL('site'))
return dict(app=app, dialog=dialog)
def getCloneURL(self, name):
if self.clone_url is not None:
return self.getCloneURLFromPattern(name)
# Nothing stops 'name' from escaping the path specified by self.url, like '../../../foo'. I can't see a problem with allowing it other than that it's weird, and allowing normal subdirectory traversal could be useful, so not currently putting any restrictions on 'name'
rtn = f"{self.url}/{name}"
try:
oldEnv = dict(os.environ)
os.environ.update(makeGitEnvironment(self))
try:
git.Git().ls_remote(rtn)
finally:
os.environ.clear()
os.environ.update(oldEnv)
except git.GitCommandError as e:
err = e.stderr
# Try to strip off the formatting GitCommandError puts on stderr
match = re.search("stderr: '(.*)'$", err)
if match:
err = match.group(1)
raise RuntimeError(err)
return rtn
# Patch stashy's AuthenticationException to print the server's message (mostly for issue #16, detecting a captcha check)
def git_clone_all(config_as_dict):
progress = GitProgress()
for repo_url, branch, app_name, app_props in repo_and_branch_and_app_name_and_app_props_iterator(config_as_dict):
repo_full_path = get_repo_full_path_for_repo_url(repo_url, config_as_dict)
if os.path.exists(repo_full_path):
print("Already cloned %s from %s" % (app_name, repo_url))
repo = git.Repo(repo_full_path)
if repo.active_branch.name != branch:
print("Your local checkout is in a different branch (%s) from the branch you want to deploy (%s). URL: %s" %
(repo.active_branch.name, branch, repo_url))
repo.git.checkout(branch)
origin = repo.remotes.origin
#origin.fetch(branch)
if config_as_dict["gitpull"]:
origin.pull(branch)
else:
os.makedirs(repo_full_path)
try:
repo = git.Repo.clone_from(repo_url, repo_full_path, branch=branch, progress=progress)
print("Cloned: %s" % repo)
except git.GitCommandError as e:
print(e)
return False
def __init__(self, target_dir, url, branch, git_ssh_identity_file=None, force=False):
super(GitRepository, self).__init__(target_dir)
self.branch = branch
try:
if url is None:
ColorPrint.exit_after_print_messages(message="GIT URL is empty")
if git_ssh_identity_file is None:
git_ssh_identity_file = os.path.expanduser("~/.ssh/id_rsa")
with git.Git().custom_environment(GIT_SSH=git_ssh_identity_file):
if not os.path.exists(target_dir) or not os.listdir(target_dir):
self.repo = git.Repo.clone_from(url=url, to_path=target_dir)
else:
self.repo = git.Repo(target_dir)
old_url = self.repo.remotes.origin.url
if not GitRepository.is_same_host(old_url, url):
if self.is_developer_mode():
ColorPrint.exit_after_print_messages(
message="This directory exists with not matching git repository " + target_dir)
else:
shutil.rmtree(target_dir, onerror=FileUtils.remove_readonly)
self.repo = git.Repo.clone_from(url=url, to_path=target_dir)
self.set_branch(branch=branch, force=force)
except git.GitCommandError as exc:
ColorPrint.exit_after_print_messages(message=exc.stderr)
def clone(self, url=None, path=None):
try:
git.Repo.clone_from(url, path)
except git.GitCommandError as exception:
print(exception)
if exception.stdout:
print('!! stdout was:')
print(exception.stdout)
if exception.stderr:
print('!! stderr was:')
print(exception.stderr)
def git_pull():
""" Git Pull handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
dialog = FORM.confirm(T('Pull'),
{T('Cancel'): URL('site')})
if dialog.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
origin = repo.remotes.origin
origin.fetch()
origin.pull()
session.flash = T("Application updated via git pull")
redirect(URL('site'))
except git.CheckoutError:
session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
except git.GitCommandError:
session.flash = T(
"Pull failed, git exited abnormally. See logs for details.")
redirect(URL('site'))
except AssertionError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
elif 'cancel' in request.vars:
redirect(URL('site'))
return dict(app=app, dialog=dialog)
def git_version(version):
"""
Return a version to identify the state of the underlying git repo. The version will
indicate whether the head of the current git-backed working directory is tied to a
release tag or not : it will indicate the former with a 'release:{version}' prefix
and the latter with a 'dev0' prefix. Following the prefix will be a sha of the current
branch head. Finally, a "dirty" suffix is appended to indicate that uncommitted changes
are present.
"""
repo = None
try:
import git
repo = git.Repo('.git')
except ImportError:
logger.warning('gitpython not found: Cannot compute the git version.')
return ''
except Exception as e:
logger.warning('Git repo not found: Cannot compute the git version.')
return ''
if repo:
sha = repo.head.commit.hexsha
if repo.is_dirty():
return '.dev0+{sha}.dirty'.format(sha=sha)
# commit is clean
# is it release of `version` ?
try:
tag = repo.git.describe(
match='[0-9]*', exact_match=True,
tags=True, dirty=True)
assert tag == version, (tag, version)
return '.release:{version}+{sha}'.format(version=version,
sha=sha)
except git.GitCommandError:
return '.dev0+{sha}'.format(sha=sha)
else:
return 'no_git_version'
def get():
try:
return git.Repo(path.dirname(path.dirname(vj4.__file__))).git.describe(always=True, tags=True, dirty=True)
except (git.InvalidGitRepositoryError, git.GitCommandError) as e:
_logger.error('Failed to get repository: %s', repr(e))
return 'unknown'
def test_clone_repo_failed(app, pipeline):
with mock.patch.object(pipeline, 'clone') as mock_clone, \
mock.patch.object(pipeline, '_report_git_error') as report_git_error, \
mock.patch.object(pipeline, 'parse_spec') as mock_spec, \
mock.patch.object(PullRequest, 'comment') as pr_comment, \
mock.patch.object(Changesets, 'comment') as cs_comment:
mock_clone.side_effect = git.GitCommandError('git clone', 1)
report_git_error.return_value = None
pr_comment.return_value = None
cs_comment.return_value = None
pipeline.start()
assert report_git_error.called
mock_spec.assert_not_called()
def start(self):
'''Start Pipeline'''
logger.info('Pipeline started for repository %s', self.context.repository)
try:
self.clone()
self.parse_spec()
exit_code = self.build()
build_success = exit_code == 0
self.save_artifacts(build_success)
if exit_code != 137:
# 137 means build cancelled
self.lint()
if build_success:
self.deploy()
except git.GitCommandError as git_err:
logger.exception('Git command error')
self._report_git_error(git_err)
except BitbucketAPIError:
logger.exception('Error calling BitBucket API')
sentry.captureException()
except InvalidSpecification as err:
self._report_error(':umbrella: Invalid badwolf configuration: ' + str(err))
except BadwolfException:
pass
finally:
self.clean()
def get_conflicted_files(repo_path):
gitcmd = git.Git(repo_path)
try:
return gitcmd.diff('--name-only', '--diff-filter=U')
except git.GitCommandError:
logger.exception('Error get conflicted files by git diff command')
return None
def git_pull():
""" Git Pull handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
dialog = FORM.confirm(T('Pull'),
{T('Cancel'): URL('site')})
if dialog.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
origin = repo.remotes.origin
origin.fetch()
origin.pull()
session.flash = T("Application updated via git pull")
redirect(URL('site'))
except git.CheckoutError:
session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
except git.GitCommandError:
session.flash = T(
"Pull failed, git exited abnormally. See logs for details.")
redirect(URL('site'))
except AssertionError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
elif 'cancel' in request.vars:
redirect(URL('site'))
return dict(app=app, dialog=dialog)
def git_version(version):
"""
Return a version to identify the state of the underlying git repo. The version will
indicate whether the head of the current git-backed working directory is tied to a
release tag or not : it will indicate the former with a 'release:{version}' prefix
and the latter with a 'dev0' prefix. Following the prefix will be a sha of the current
branch head. Finally, a "dirty" suffix is appended to indicate that uncommitted changes
are present.
"""
repo = None
try:
import git
repo = git.Repo('.git')
except ImportError:
logger.warning('gitpython not found: Cannot compute the git version.')
return ''
except Exception as e:
logger.warning('Git repo not found: Cannot compute the git version.')
return ''
if repo:
sha = repo.head.commit.hexsha
if repo.is_dirty():
return '.dev0+{sha}.dirty'.format(sha=sha)
# commit is clean
# is it release of `version` ?
try:
tag = repo.git.describe(
match='[0-9]*', exact_match=True,
tags=True, dirty=True)
assert tag == version, (tag, version)
return '.release:{version}+{sha}'.format(version=version,
sha=sha)
except git.GitCommandError:
return '.dev0+{sha}'.format(sha=sha)
else:
return 'no_git_version'
def commit_changes(self, push=False):
git_repo = Repo(self.repo)
git_repo.git.add('content/')
try:
git_repo.git.commit('-m', 'Commit added by PaperGit')
self.last_run = time.time()
self.save()
except GitCommandError:
print('Nothing to commit')
if push:
print("Pushing changes to remote")
git_repo.git.push('origin')
def git_repo(self):
with mock.patch("release.Repo", spec=Repo, spec_set=True) as mock_repo:
commits = [
Commit(mock_repo, _h2b("111111"), message="First commit", parents=tuple()),
Commit(mock_repo, _h2b("222222"), message="Second commit", parents=("111111",)),
Commit(mock_repo, _h2b("333333"), message="Third commit", parents=("222222",))
]
mock_repo.iter_commits.return_value = commits
rev_parse_returns = {
"heads/master": commits[-1],
PREVIOUS_TAG: TagObject(mock_repo, _h2b("aaaaaa"), object=commits[-2], tag=PREVIOUS_TAG),
CURRENT_TAG: TagObject(mock_repo, _h2b("bbbbbb"), object=commits[-1], tag=CURRENT_TAG)
}
mock_repo.rev_parse.side_effect = lambda x: rev_parse_returns[x]
mock_repo.git.rev_parse.side_effect = lambda x, **kwargs: x
def describe(rev=None, **kwargs):
print("call to describe(%r, %r)" % (rev, kwargs))
if rev is None:
return CURRENT_TAG
if rev.endswith("^"):
if rev.startswith(CURRENT_TAG):
return PREVIOUS_TAG
raise GitCommandError("describe", "failed")
raise AssertionError("Test wants to describe something unexpected: rev=%r, kwargs=%r" % (rev, kwargs))
mock_repo.git.describe.side_effect = describe
yield mock_repo
def current_tag(self):
if self._current_tag:
return self._current_tag
try:
current_name = self.repo.git.describe(all=True)
self._current_tag = self.repo.rev_parse(current_name)
return self._current_tag
except (BadName, GitCommandError):
return None
def generate_changelog(self):
"""Use the git log to create a changelog with all changes since the previous tag"""
try:
previous_name = self.repo.git.describe("{}^".format(self.current_tag), abbrev=0)
previous_tag = self.repo.rev_parse(previous_name)
except GitCommandError:
previous_tag = THE_NULL_COMMIT
current = self._resolve_tag(self.current_tag)
previous = self._resolve_tag(previous_tag)
commit_range = "{}..{}".format(previous, current)
return [(self._shorten(commit.hexsha), commit.summary) for commit in self.repo.iter_commits(commit_range)
if len(commit.parents) <= 1]
def git_pull():
""" Git Pull handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
dialog = FORM.confirm(T('Pull'),
{T('Cancel'): URL('site')})
if dialog.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
origin = repo.remotes.origin
origin.fetch()
origin.pull()
session.flash = T("Application updated via git pull")
redirect(URL('site'))
except git.CheckoutError:
session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
except git.GitCommandError:
session.flash = T(
"Pull failed, git exited abnormally. See logs for details.")
redirect(URL('site'))
except AssertionError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
elif 'cancel' in request.vars:
redirect(URL('site'))
return dict(app=app, dialog=dialog)
def git_clone_all_repos(cat):
"""Perform a 'git clone' for each data repository that doesnt exist.
"""
log = cat.log
log.debug("gitter.git_clone_all_repos()")
all_repos = cat.PATHS.get_all_repo_folders()
out_repos = cat.PATHS.get_repo_output_folders()
for repo in all_repos:
log.info("Repo in: '{}'".format(repo))
if os.path.isdir(repo):
log.info("Directory exists.")
else:
log.debug("Cloning directory...")
clone(repo, cat.log, depth=max(cat.args.clone_depth, 1))
if cat.args.purge_outputs and repo in out_repos:
for fil in glob(os.path.join(repo, '*.json')):
os.remove(fil)
grepo = git.cmd.Git(repo)
try:
grepo.status()
except git.GitCommandError:
log.error("Repository does not exist!")
raise
# Get the initial git SHA
sha_beg = get_sha(repo)
log.debug("Current SHA: '{}'".format(sha_beg))
return
def git_pull():
""" Git Pull handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
dialog = FORM.confirm(T('Pull'),
{T('Cancel'): URL('site')})
if dialog.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
origin = repo.remotes.origin
origin.fetch()
origin.pull()
session.flash = T("Application updated via git pull")
redirect(URL('site'))
except git.CheckoutError:
session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
except git.GitCommandError:
session.flash = T(
"Pull failed, git exited abnormally. See logs for details.")
redirect(URL('site'))
except AssertionError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
elif 'cancel' in request.vars:
redirect(URL('site'))
return dict(app=app, dialog=dialog)
def git_version(version):
"""
Return a version to identify the state of the underlying git repo. The version will
indicate whether the head of the current git-backed working directory is tied to a
release tag or not : it will indicate the former with a 'release:{version}' prefix
and the latter with a 'dev0' prefix. Following the prefix will be a sha of the current
branch head. Finally, a "dirty" suffix is appended to indicate that uncommitted changes
are present.
"""
repo = None
try:
import git
repo = git.Repo('.git')
except ImportError:
logger.warn('gitpython not found: Cannot compute the git version.')
return ''
except Exception as e:
logger.warn('Git repo not found: Cannot compute the git version.')
return ''
if repo:
sha = repo.head.commit.hexsha
if repo.is_dirty():
return '.dev0+{sha}.dirty'.format(sha=sha)
# commit is clean
# is it release of `version` ?
try:
tag = repo.git.describe(
match='[0-9]*', exact_match=True,
tags=True, dirty=True)
assert tag == version, (tag, version)
return '.release:{version}+{sha}'.format(version=version,
sha=sha)
except git.GitCommandError:
return '.dev0+{sha}'.format(sha=sha)
else:
return 'no_git_version'
def clone(self, raise_for_error=True):
try:
git.Repo.clone_from(self.remote_url, self.local_path)
return True
except git.GitCommandError as e:
if e.status == 128 and not raise_for_error:
return False
raise
# Safe operations only, can be used without acquiring a lock first.
# Specialized subclasses with more operations defined are built by using one of the lock functions.
def get():
try:
return git.Repo(path.dirname(path.dirname(anubis.__file__))).git.describe(always=True, dirty=True)
except (git.InvalidGitRepositoryError, git.GitCommandError) as e:
_logger.error('Failed to get respository: %s', repr(e))
return 'unknown'
def git_pull():
""" Git Pull handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
dialog = FORM.confirm(T('Pull'),
{T('Cancel'): URL('site')})
if dialog.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
origin = repo.remotes.origin
origin.fetch()
origin.pull()
session.flash = T("Application updated via git pull")
redirect(URL('site'))
except git.CheckoutError:
session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
except git.GitCommandError:
session.flash = T(
"Pull failed, git exited abnormally. See logs for details.")
redirect(URL('site'))
except AssertionError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
elif 'cancel' in request.vars:
redirect(URL('site'))
return dict(app=app, dialog=dialog)
def test_clone_error(mocker, git_repo):
mocker.patch('git.Repo.clone_from', side_effect=git.GitCommandError('clone', 'Foo'))
dir = git_repo.working_tree_dir
with zazu.util.cd(dir):
runner = click.testing.CliRunner()
result = runner.invoke(zazu.cli.cli, ['repo', 'clone', 'http://foo/bar/baz.git'])
assert result.exit_code != 0
assert result.exception
git.Repo.clone_from.assert_called_once()