def git_push():
""" Git Push handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
form = SQLFORM.factory(Field('changelog', requires=IS_NOT_EMPTY()))
form.element('input[type=submit]')['_value'] = T('Push')
form.add_button(T('Cancel'), URL('site'))
form.process()
if form.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
index = repo.index
index.add([apath(r=request) + app + '/*'])
new_commit = index.commit(form.vars.changelog)
origin = repo.remotes.origin
origin.push()
session.flash = T(
"Git repo updated with latest application changes.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
redirect(URL('site'))
return dict(app=app, form=form)
python类Repo()的实例源码
def make_testing_branch():
repo = Repo('.')
gitobj = repo.git
# create 'test' branch if it doesn't exist so that it can be used for tests in this module
git_branch_string = gitobj.branch()
git_branch_list = git_branch_string.split("\n")
clean_branch_list = []
for branch in git_branch_list:
branch = branch.replace('*', '')
branch = branch.replace(' ', '')
clean_branch_list.append(branch)
if 'testing_branch' in clean_branch_list:
pass
else:
gitobj.branch('testing_branch')
# deletes the temporary new git branch (testing_branch) for testing
def delete_testing_branch():
repo = Repo('.')
gitobj = repo.git
# create 'test' branch if it doesn't exist so that it can be used for tests in this module
git_branch_string = gitobj.branch()
git_branch_list = git_branch_string.split("\n")
clean_branch_list = []
for branch in git_branch_list:
branch = branch.replace('*', '')
branch = branch.replace(' ', '')
clean_branch_list.append(branch)
if 'testing_branch' in clean_branch_list:
gitobj.branch('-d', 'testing_branch')
# ///////////////////////////////////////////////////////
#
# pytest capsys capture tests
# confirms capture of std output and std error streams
#
# ///////////////////////////////////////////////////////
def make_testing_branch():
repo = Repo('.')
gitobj = repo.git
# create 'test' branch if it doesn't exist so that it can be used for tests in this module
git_branch_string = gitobj.branch()
git_branch_list = git_branch_string.split("\n")
clean_branch_list = []
for branch in git_branch_list:
branch = branch.replace('*', '')
branch = branch.replace(' ', '')
clean_branch_list.append(branch)
if 'testing_branch' in clean_branch_list:
pass
else:
gitobj.branch('testing_branch')
# deletes the temporary new git branch (testing_branch) for testing
def delete_testing_branch():
repo = Repo('.')
gitobj = repo.git
# create 'test' branch if it doesn't exist so that it can be used for tests in this module
git_branch_string = gitobj.branch()
git_branch_list = git_branch_string.split("\n")
clean_branch_list = []
for branch in git_branch_list:
branch = branch.replace('*', '')
branch = branch.replace(' ', '')
clean_branch_list.append(branch)
if 'testing_branch' in clean_branch_list:
gitobj.branch('-d', 'testing_branch')
# ///////////////////////////////////////////////////////
#
# Diff class tests
#
# ///////////////////////////////////////////////////////
def clone(self, name, user):
"""
create a clone of self with a given name, owned by user
"""
self.expiration = None
# create the branch on the database
new_branch = Branch(name, self.project, self, user)
db.session.add(new_branch)
db.session.commit()
# clone repository in file system
branch_path = os.path.abspath(join(os.getcwd(), 'repos',
self.project.name,
name, 'source'))
self.get_repo().clone(branch_path, branch=self.name)
os.symlink(os.path.abspath(join('repos', self.project.name,
'_resources/low_resolution')),
join(branch_path, '_resources'))
branch_repo = git.Repo(branch_path)
branch_repo.git.checkout('HEAD', b=name)
config_repo(branch_repo, user.username, user.email)
# build the source
new_branch.build(timeout=60)
return new_branch
def git_log(repo, prev_head):
r = git.Repo(repo)
if r.head.object.hexsha == prev_head:
return []
git_commit_fields = ['id', 'author_name', 'author_email', 'date', 'message']
git_log_format = ['%H', '%an', '%ae', '%ad', '%s']
git_log_format = '%x1f'.join(git_log_format) + '%x1e'
p = Popen('cd %s && git log %s..HEAD --format="%s"' % (repo, prev_head, git_log_format), shell=True, stdout=PIPE)
(log, _) = p.communicate()
log = log.strip('\n\x1e').split("\x1e")
log = [row.strip().split("\x1f") for row in log]
log = [dict(zip(git_commit_fields, row)) for row in log]
return log
def process(self):
"""Initialises Mimiron using the configuration found in `config_path`."""
for i, repo in enumerate(self.data['terraformRepositories']):
repo['path'] = os.path.expanduser(repo['path'])
try:
git_repo = Repo(repo['path'])
if git_repo.bare:
raise _InvalidGitRepositoryError
repo['defaultEnvironment'] = repo.get('defaultEnvironment', None)
repo['tagEnvironment'] = repo.get('tagEnvironment', None)
repo['git'] = git_repo
repo['tfvars'] = self._read_tfvars(repo)
except _InvalidGitRepositoryError:
raise InvalidGitRepository(repo['path'])
except _NoSuchPathError:
raise DeploymentRepositoriesNotSpecified
def render(self, context):
try:
r = Repo(BASE_DIR)
version = r.rev_parse("HEAD").hexsha
context['git'] = {
"shortHexSha": version[0:7],
"hexSha": version,
"shortRemote": "https://github.com",
"remote": "https://github.com/rubienr/network-monitoring"
}
except Exception:
context['git'] = {
"shortHexSha": None,
"hexSha": None,
"shortRemote": None,
"remote": None,
}
return ""
def git_push():
""" Git Push handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
form = SQLFORM.factory(Field('changelog', requires=IS_NOT_EMPTY()))
form.element('input[type=submit]')['_value'] = T('Push')
form.add_button(T('Cancel'), URL('site'))
form.process()
if form.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
index = repo.index
index.add([apath(r=request) + app + '/*'])
new_commit = index.commit(form.vars.changelog)
origin = repo.remotes.origin
origin.push()
session.flash = T(
"Git repo updated with latest application changes.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
redirect(URL('site'))
return dict(app=app, form=form)
def download_beard(beard_name, upgrade):
beard_details = find_record(beard_name)
git_ = git.Git("beard_cache")
print("Attempting to clone from {}...".format(beard_details['git_url']))
try:
repo_dir = join("beard_cache", beard_name)
os.makedirs(repo_dir)
repo = git.Repo()
repo.clone_from(beard_details['git_url'], repo_dir)
print("Done!")
except FileExistsError:
repo = git.Repo("beard_cache/{}".format(beard_name))
if upgrade:
print("Updating repo")
# Should be origin, since no-one should add another remote.
repo.remotes.origin.pull()
print("Done!")
else:
print("Repo already exists. Nothing to do.")
def test_rename(self, rwdir):
parent = git.Repo.init(osp.join(rwdir, 'parent'))
sm_name = 'mymodules/myname'
sm = parent.create_submodule(sm_name, sm_name, url=self._small_repo_url())
parent.index.commit("Added submodule")
assert sm.rename(sm_name) is sm and sm.name == sm_name
assert not sm.repo.is_dirty(index=True, working_tree=False, untracked_files=False)
new_path = 'renamed/myname'
assert sm.move(new_path).name == new_path
new_sm_name = "shortname"
assert sm.rename(new_sm_name) is sm
assert sm.repo.is_dirty(index=True, working_tree=False, untracked_files=False)
assert sm.exists()
sm_mod = sm.module()
if osp.isfile(osp.join(sm_mod.working_tree_dir, '.git')) == sm._need_gitfile_submodules(parent.git):
assert sm_mod.git_dir.endswith(join_path_native('.git', 'modules', new_sm_name))
# end
def _clone_repo(cls, repo, url, path, name, **kwargs):
""":return: Repo instance of newly cloned repository
:param repo: our parent repository
:param url: url to clone from
:param path: repository-relative path to the submodule checkout location
:param name: canonical of the submodule
:param kwrags: additinoal arguments given to git.clone"""
module_abspath = cls._module_abspath(repo, path, name)
module_checkout_path = module_abspath
if cls._need_gitfile_submodules(repo.git):
kwargs['separate_git_dir'] = module_abspath
module_abspath_dir = osp.dirname(module_abspath)
if not osp.isdir(module_abspath_dir):
os.makedirs(module_abspath_dir)
module_checkout_path = osp.join(repo.working_tree_dir, path)
# end
clone = git.Repo.clone_from(url, module_checkout_path, **kwargs)
if cls._need_gitfile_submodules(repo.git):
cls._write_git_file_and_module_config(module_checkout_path, module_abspath)
# end
return clone
def module(self):
""":return: Repo instance initialized from the repository at our submodule path
:raise InvalidGitRepositoryError: if a repository was not available. This could
also mean that it was not yet initialized"""
# late import to workaround circular dependencies
module_checkout_abspath = self.abspath
try:
repo = git.Repo(module_checkout_abspath)
if repo != self.repo:
return repo
# END handle repo uninitialized
except (InvalidGitRepositoryError, NoSuchPathError):
raise InvalidGitRepositoryError("No valid repository at %s" % module_checkout_abspath)
else:
raise InvalidGitRepositoryError("Repository at %r was not yet checked out" % module_checkout_abspath)
# END handle exceptions
def _get_org_and_name_from_remote(remote):
"""
Gets the org name and the repo name
for a github remote.
:param git.Remote remote:
:return: The owner and the repo name in that order
:rtype: unicode, unicode
"""
url = remote.config_reader.get('url')
if not url.endswith('.git'):
url = '{0}.git'.format(url)
git_info = parse(url)
_LOG.debug('Repo owner: "{0}"'.format(git_info.owner))
_LOG.debug('Repo name: "{0}"'.format(git_info.repo))
return git_info.owner, git_info.repo
def _create_pull_request_helper(repo, remote_repo, branch, base, auth=None):
"""
Creates a pull request to merge the branch into the base.
Ignores the error if there is already a pull request open
for the given branch->base
:param Repo repo:
:param Repository remote_repo:
:param unicode branch:
:param unicode base:
"""
title, message = construct_message(repo, base, branch)
try:
_LOG.info('Creating pull request')
return remote_repo.create_pull(title=title, head=branch, base=base,
body='Autogenerated: \n\n{0}'.format(message))
except GithubException as exc:
if 'errors' in exc.data and len(exc.data['errors']) == 1 and \
exc.data['errors'][0].get('message', '').startswith('A pull request already exists for'):
_LOG.warning('A pull request already exists for "{0}". Continuing.'.format(branch))
return _get_pull_request(remote_repo, branch, base)
else:
raise exc
def _get_relevant_commits(repo, commit_shas, branch):
"""
Gets all commits on the repo with the given shas
:param Repo repo:
:param list[unicode] commit_shas:
:param unicode branch:
:return: list[Commit]
"""
remaining_shas = set(commit_shas)
commits = list()
for commit in repo.iter_commits(branch):
if commit.hexsha in remaining_shas:
commits.append(commit)
remaining_shas.remove(commit.hexsha)
return commits
def _get_remote(repo, name):
"""
Gets the remote object raising a MissingRemoteException
when it does not exist
:param Repo repo:
:param unicode name: The remote name
:return: The remote object
:rtype: git.remote.Remote
:raises: MissingRemoteException
"""
try:
return repo.remotes[name]
except IndexError: # I have no idea why they raise an IndexError instead of KeyError
raise MissingRemoteException('The remote "{0}" does not exist. '
'Please select a different remote or'
' add it using "git remote add" command')
def push_repo(repo, branch, remote='origin', remote_branch=None):
"""
Pushes the repo up to the remote. It will set
the upstream to the remote branch.
:param Repo repo: The repo you wish to push
:param unicode branch: The branch being pushed
:param unicode remote: The remote to use (e.g. ``'origin'``
:param unicode remote_branch: The remote branch. Defaults
to the `branch` parameter
:return: None
:raises: PushFailedException
"""
remote_branch = remote_branch or branch
_checkout_branch(repo, branch)
_LOG.info("Pushing all commits to remote '{0}'".format(remote))
remote = _get_remote(repo, remote)
try:
remote.push(remote_branch, set_upstream=True)
except GitCommandError as e:
_LOG.error(str(e))
raise PushFailedException('Uh oh, it seems like something went'
' wrong with the push. "{0}"'.format(str(e)))
def check_multiple_branches(repository):
"""Check whether a git repository has more than one branch.
Parameters
----------
repository : string
Path to a git repository
Results
-------
boolean
True if the repository has more than 1 branch, False otherwise
Raises
------
git.InvalidGitRepositoryError if repository is a path to a directory
but not a git repository
git.NoSuchPathError if repository is not a path to a directory
"""
repo = git.Repo(repository)
branches = repo.branches
if len(branches) > 1:
return True
else:
return False
def __init__(self, root, testMode=False):
super(GitManager, self).__init__()
self.root = root
self.commitTable = {} # ??????????(submodule, last push binsha)???
self.ownerRepo = None
try:
self.problemHub = git.Repo(root)
self.readLastCommitBinsha()
except git.InvalidGitRepositoryError:
if not testMode:
self.problemHub = self.setup()
else:
pass
except FileNotFoundError:
self.commitTable = {}
# self.acrot = git.Actor(author, authorEmaill)
# def __str__(self):
def git_status(self):
repo = Repo('./')
o = repo.remotes.origin
o.fetch()
# Tags
tags = []
for t in repo.tags:
tags.append({"name": t.name, "commit": str(t.commit), "date": t.commit.committed_date,
"committer": t.commit.committer.name, "message": t.commit.message})
try:
branch_name = repo.active_branch.name
# test1
except:
branch_name = None
changes = []
commits_behind = repo.iter_commits('master..origin/master')
for c in list(commits_behind):
changes.append({"committer": c.committer.name, "message": c.message})
return json.dumps({"tags": tags, "headcommit": str(repo.head.commit), "branchname": branch_name,
"master": {"changes": changes}})
def pull_list_changed_files(git_path):
"""Pull new updates from remote origin."""
git_repo = git.Repo(git_path)
logging.info(" HEAD: " + str(git_repo.head.commit))
logging.info(" is detached: " + str(git_repo.head.is_detached))
logging.info(" is dirty: " + str(git_repo.is_dirty()))
if git_repo.head.is_detached:
raise Exception("Detached head")
if git_repo.is_dirty():
raise Exception("Dirty repository")
files = []
cdnjs_origin = git_repo.remotes.origin
fetch_info = cdnjs_origin.pull()
for single_fetch_info in fetch_info:
for diff in single_fetch_info.commit.diff(
single_fetch_info.old_commit):
logging.debug("Found diff: " + str(diff))
if not diff.a_blob is None:
if not diff.a_blob.path in files:
files.append(diff.a_blob.path)
return files
def git_pull():
""" Git Pull handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
dialog = FORM.confirm(T('Pull'),
{T('Cancel'): URL('site')})
if dialog.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
origin = repo.remotes.origin
origin.fetch()
origin.pull()
session.flash = T("Application updated via git pull")
redirect(URL('site'))
except git.CheckoutError:
session.flash = T("Pull failed, certain files could not be checked out. Check logs for details.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
except git.GitCommandError:
session.flash = T(
"Pull failed, git exited abnormally. See logs for details.")
redirect(URL('site'))
except AssertionError:
session.flash = T("Pull is not possible because you have unmerged files. Fix them up in the work tree, and then try again.")
redirect(URL('site'))
elif 'cancel' in request.vars:
redirect(URL('site'))
return dict(app=app, dialog=dialog)
def git_push():
""" Git Push handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
form = SQLFORM.factory(Field('changelog', requires=IS_NOT_EMPTY()))
form.element('input[type=submit]')['_value'] = T('Push')
form.add_button(T('Cancel'), URL('site'))
form.process()
if form.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
index = repo.index
index.add([apath(r=request) + app + '/*'])
new_commit = index.commit(form.vars.changelog)
origin = repo.remotes.origin
origin.push()
session.flash = T(
"Git repo updated with latest application changes.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
redirect(URL('site'))
return dict(app=app, form=form)
def deps(repo: Optional[RepoSpec], format: str, file: Optional[str], on_uncloned: str) -> Iterable[str]:
if file is not None:
file = Path(file)
if not file.exists():
raise ValueError(f"Dependency file does not exist: {file}")
t = Template(format)
for clone in iterDeps(repo, on_uncloned, file):
try:
hexsha = git.Repo(str(clone.path)).head.commit.hexsha
except:
hexsha = '0' * 40
try:
yield t.substitute(
H = hexsha,
h = hexsha[:7],
RS = clone.repospec.str(),
rs = clone.repospec.str(False, False),
p = str(clone.path),
)
except KeyError as e:
raise ValueError("Invalid format string specifier: %s" % e)
def git_helper(self):
self.addHost('daemon', 'host', 'http://localhost', 'user', 'pw', force = True)
r1 = git.Repo.init('repo1')
Path('repo1/deps.got').write_text("repo2\nrepo3\n")
r1.index.add(['deps.got'])
r1.index.commit('Commit')
with GotRun(['--here', 'host:repo1', 'repo1', '--force']):
pass
r2 = git.Repo.init('repo2')
r2.index.commit('Commit')
with GotRun(['--here', 'host:repo2', 'repo2', '--force']):
pass
r3 = git.Repo.init('repo3')
r3.index.commit('Commit')
with GotRun(['--here', 'host:repo3', 'repo3', '--force']):
pass
return r1, r2, r3
def get_remote_branch_info():
local_repo = Repo(path=settings.BASE_DIR)
# Fetch remote branches to ensure we are up to date
for remote in local_repo.remotes:
remote.fetch()
remote_repo = local_repo.remote()
local_branch = local_repo.active_branch.name
remote_branches = []
for this_branch in remote_repo.refs:
remote_branches.append(this_branch.remote_head)
return {'local_branch': local_branch, 'remote_branches': remote_branches}
def _initialize_repo(repo_url, repo_dir, branch_name, config, progress=None):
"""
Clones repository and configures it to use sparse checkout.
Extraneous folders will get removed later using git read-tree
"""
util.logger.info('Repo {} doesn\'t exist. Cloning...'.format(repo_url))
# Clone repo
repo = git.Repo.clone_from(
repo_url,
repo_dir,
progress,
branch=branch_name,
)
# Use sparse checkout
config = repo.config_writer()
config.set_value('core', 'sparsecheckout', True)
config.release()
util.logger.info('Repo {} initialized'.format(repo_url))
def remote_git_add(config_as_dict, url_template, host):
for repo_url, branch, app_name in repo_and_branch_and_app_name_iterator(config_as_dict):
repo_dir_name = dir_name_for_repo(repo_url)
repo_full_path = get_repo_full_path_for_repo_dir_name(repo_dir_name, config_as_dict)
repo = git.Repo(repo_full_path)
git_remote = url_template.format(host=host, app_name=app_name)
gitremote_repo_name = get_remote_repo_name(config_as_dict)
remote_names = [remote.name for remote in repo.remotes]
remote_urls = [remote.url for remote in repo.remotes]
if git_remote in remote_urls:
print("Already in remote: %s" % git_remote)
continue
if gitremote_repo_name in remote_names:
repo.delete_remote(gitremote_repo_name)
print("Adding remote '%s' for %s" % (gitremote_repo_name, git_remote))
repo.create_remote(get_remote_repo_name(config_as_dict), git_remote)