def git_clone_all(config_as_dict):
progress = GitProgress()
for repo_url, branch, app_name, app_props in repo_and_branch_and_app_name_and_app_props_iterator(config_as_dict):
repo_full_path = get_repo_full_path_for_repo_url(repo_url, config_as_dict)
if os.path.exists(repo_full_path):
print("Already cloned %s from %s" % (app_name, repo_url))
repo = git.Repo(repo_full_path)
if repo.active_branch.name != branch:
print("Your local checkout is in a different branch (%s) from the branch you want to deploy (%s). URL: %s" %
(repo.active_branch.name, branch, repo_url))
repo.git.checkout(branch)
origin = repo.remotes.origin
#origin.fetch(branch)
if config_as_dict["gitpull"]:
origin.pull(branch)
else:
os.makedirs(repo_full_path)
try:
repo = git.Repo.clone_from(repo_url, repo_full_path, branch=branch, progress=progress)
print("Cloned: %s" % repo)
except git.GitCommandError as e:
print(e)
return False
python类Repo()的实例源码
def _initialize_repo(repo_name, repo_dir, config, progress=None):
"""
Clones repository and configures it to use sparse checkout.
Extraneous folders will get removed later using git read-tree
"""
util.logger.info('Repo {} doesn\'t exist. Cloning...'.format(repo_name))
# Clone repo
repo = git.Repo.clone_from(
config['GITHUB_ORG'] + repo_name,
repo_dir,
progress,
branch=config['REPO_BRANCH'],
)
# Use sparse checkout
config = repo.config_writer()
config.set_value('core', 'sparsecheckout', True)
config.release()
util.logger.info('Repo {} initialized'.format(repo_name))
def _find_git_info(self):
"""
Return information about the state of the Git repository tox is being
run from.
:return: dict with keys 'dirty' (bool), 'sha' (str), 'tag' (str or None)
:rtype: dict
"""
res = {}
logger.debug('Checking git status...')
repo = Repo(path=self._gitdir, search_parent_directories=False)
res['sha'] = repo.head.commit.hexsha
res['dirty'] = repo.is_dirty(untracked_files=True)
res['tag'] = None
for tag in repo.tags:
# each is a git.Tag object
if tag.commit.hexsha == res['sha']:
res['tag'] = tag.name
logger.debug('Git info: %s', res)
return res
def _clone_project(project_name):
gh = Github()
(gh_repo, forked) = gh.get_repo(project_name, fork=True)
if not gh_repo:
click.echo('error: no repository named %s was found on '
'your user and configured organizations' %
click.style(project_name, bold=True))
sys.exit(1)
if forked:
click.echo('We found a repository named %s and forked it to your '
'user, it now accessible at the following url:\n'
'%s' % (forked.full_name, gh_repo.html_url))
dest_path = os.path.join(projects_path(), project_name)
click.echo('We will clone %s in %s\n' % (gh_repo.ssh_url, dest_path))
vgit('clone', gh_repo.ssh_url, dest_path)
# add our upstream remote
if gh_repo.parent:
repo = Repo(dest_path)
repo.create_remote('upstream', gh_repo.parent.ssh_url)
def all_versions(filename):
"""
Find, open and parse all tagged versions of a json file, including the current version
:param filename: The filename to find
:return: a dictionary of all the versions, in the form
{
'current': {...},
'1.0': {...},
'1.1': {...}
}
"""
repo = git.Repo()
versions = {
'current': get_json(filename)
}
for tag in repo.tags:
version_dict = repo.git.show('%s:%s' % (tag.name, filename))
versions[tag.name.strip('v')] = json.loads(version_dict)
return versions
def match_commit(self, src_dir):
"""Check that the current commit matches the recorded commit for this experiment.
Raises an error if commits don't match, or if there is dirty state.
Args:
src_dir (str): path to the Git repository
"""
if self.metadata['dirty_repo']:
raise EnvironmentError('Working directory was dirty when commit was recorded.')
repo = Repo(src_dir)
if repo.is_dirty():
raise EnvironmentError('Current working directory is dirty.')
current_commit = repo.head.object.hexsha.encode('utf-8')
exp_commit = self.metadata['commit']
if current_commit != exp_commit:
raise EnvironmentError("Commits don't match.\nCurrent: {}\nRecorded: {}".format(current_commit, exp_commit))
def match_commit(self, src_dir):
"""Check that the current commit matches the recorded commit for this experiment.
Raises an error if commits don't match, or if there is dirty state.
Args:
src_dir (str): path to the Git repository
"""
if self.metadata['dirty_repo']:
raise EnvironmentError('Working directory was dirty when commit was recorded.')
repo = Repo(src_dir)
if repo.is_dirty():
raise EnvironmentError('Current working directory is dirty.')
current_commit = repo.head.object.hexsha.encode('utf-8')
exp_commit = self.metadata['commit']
if current_commit != exp_commit:
raise EnvironmentError("Commits don't match.\nCurrent: {}\nRecorded: {}".format(current_commit, exp_commit))
def git_push():
""" Git Push handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
form = Form([Field('changelog', requires=IS_NOT_EMPTY())])
#form.element('input[type=submit]')['_value'] = T('Push')
#form.add_button(T('Cancel'), URL('site'))
if form.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
index = repo.index
index.add([apath(r=request) + app + '/*'])
new_commit = index.commit(form.vars.changelog)
origin = repo.remotes.origin
origin.push()
session.flash = T(
"Git repo updated with latest application changes.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
redirect(URL('site'))
return dict(app=app, form=form)
def load(self):
"""Load the library."""
if not git:
raise EnvironmentError(MISSING_GIT_ERROR)
if os.path.exists(self.path):
if not config.CACHE_DISABLE:
return
shutil.rmtree(self.path, ignore_errors=True)
with files.remove_on_exception(self.path):
url = self.GIT_URL.format(**vars(self))
repo = git.Repo.clone_from(
url=url, to_path=self.path, b=self.branch)
if self.commit:
repo.head.reset(self.commit, index=True, working_tree=True)
def active_git_branch(path):
try:
repo = git.Repo(path)
except git.exc.NoSuchPathError as error:
return None
if not repo.working_tree_dir:
return None
rval = repo.active_branch
if not rval:
return None
rval = str(rval)
return rval
def refresh_installed_packages(self):
"""Fetch latest git information for installed packages.
This retrieves information about outdated packages, but does
not actually upgrade their installations.
Raises:
IOError: if the package manifest file can't be written
"""
for ipkg in self.installed_packages():
clonepath = os.path.join(self.package_clonedir, ipkg.package.name)
clone = git.Repo(clonepath)
LOG.debug('fetch package %s', ipkg.package.qualified_name())
try:
clone.remote().fetch()
except git.exc.GitCommandError as error:
LOG.warn('failed to fetch package %s: %s',
ipkg.package.qualified_name(), error)
ipkg.status.is_outdated = _is_clone_outdated(
clone, ipkg.status.current_version, ipkg.status.tracking_method)
self._write_manifest()
def test_errors_when_remote_has_changes(self):
with git_bare_repo() as bare_repo:
with git_repo() as repo:
touch('README.md')
repo.index.add(['README.md'])
repo.index.commit('Initial commit')
repo.create_remote('origin', url=bare_repo)
repo.remotes.origin.push(repo.refs.master)
with temp_directory() as path:
clone = Repo(bare_repo).clone(path)
touch('CHANGELOG.md')
clone.index.add(['README.md'])
clone.index.commit('Second commit')
clone.remotes.origin.push(clone.refs.master)
with self.assertRaises(Exception):
GitReleaser()
def __init__(self, config=None):
self.repo = Repo()
self.commit_format = (config or {}).get('commit_format', 'Release {version}')
self.tag_format = (config or {}).get('tag_format', '{version}')
if self.repo.head.ref != self.repo.heads.master:
# TODO: Support releasing from stable/hotfix branches
raise Exception('You need to be on the `master` branch in order to do a release.')
if self.repo.is_dirty():
raise Exception('Git repository has unstaged changes.')
if len(self.repo.untracked_files) > 0:
raise Exception('Git repository has untracked files.')
if self.has_origin():
self.repo.remotes.origin.fetch()
if self.repo.remotes.origin.refs.master.commit != self.repo.head.ref.commit:
raise Exception('Master has unsynced changes.')
def git_push():
""" Git Push handler """
app = get_app()
if not have_git:
session.flash = GIT_MISSING
redirect(URL('site'))
form = SQLFORM.factory(Field('changelog', requires=IS_NOT_EMPTY()))
form.element('input[type=submit]')['_value'] = T('Push')
form.add_button(T('Cancel'), URL('site'))
form.process()
if form.accepted:
try:
repo = git.Repo(os.path.join(apath(r=request), app))
index = repo.index
index.add([apath(r=request) + app + '/*'])
new_commit = index.commit(form.vars.changelog)
origin = repo.remotes.origin
origin.push()
session.flash = T(
"Git repo updated with latest application changes.")
redirect(URL('site'))
except git.UnmergedEntriesError:
session.flash = T("Push failed, there are unmerged entries in the cache. Resolve merge issues manually and try again.")
redirect(URL('site'))
return dict(app=app, form=form)
def git_list_create_tag(working_dir, git_list_file, tag_name, delete_tag_already_exists):
git_list = []
with open(git_list_file,'r') as f:
for row in f:
row = row.strip()
if row == "" or row.startswith('#'):
continue
git_list.append(row)
for repo_url in git_list:
code_dir = "%s/%s" % (working_dir, get_repo_name(repo_url))
git_repo = None
if os.path.exists(code_dir):
# re-use current code folder
git_repo = git.Repo(code_dir)
else:
git_repo = git.Repo.clone_from(repo_url, code_dir)
git_create_tag(git_repo, tag_name, delete_tag_already_exists)
return True
def get_route_info(root_path):
routing_path = os.path.join(root_path, "Uber", "uber", "routing.py")
with open(routing_path) as f:
file_contents = f.read()
tree = ast.parse(file_contents)
visitor = RouteVisitor()
visitor.visit(tree)
all_routes = visitor.routable_funcs
# Get the commit that added each route and tack it onto the details
route_linenos = sorted(set(x[0] for x in all_routes.values()))
repo = git.Repo(root_path)
blame_by_line = get_line_blames(repo, routing_path, route_linenos)
for name, details in all_routes.iteritems():
all_routes[name] = details + (blame_by_line[details[0]],)
return all_routes
def install(self, plugin):
"Installs a GitHub plugin"
print("Installing...", plugin.name)
import git
# Clone the GitHub repository via the URL.
# git.Git().clone(str(plugin.baseurl), install_dir)
# Checks if the plugin installation path already exists.
if not self.isInstalled(plugin):
"""Clone the GitHub repository via Plugin URL to install_dir and
with depth=1 (shallow clone).
"""
git.Repo.clone_from(plugin.baseurl, self.install_dir, depth=1)
print("Done!")
return True
else:
print("Plugin already installed!")
return False
def test_add_plugin_from_git_exception(plugin_manager_fixture, mocker):
plugin_manager = plugin_manager_fixture()
mock_git = mocker.patch("infrared.core.services.plugins.git")
mock_git.Repo.clone_from.side_effect = git.exc.GitCommandError(
"some_git_cmd", 1)
mock_git.exc.GitCommandError = git.exc.GitCommandError
mock_tempfile = mocker.patch("infrared.core.services.plugins.tempfile")
mock_shutil = mocker.patch("infrared.core.services.plugins.shutil")
mock_os = mocker.patch("infrared.core.services.plugins.os")
mock_os.path.exists.return_value = False
# add_plugin call
with pytest.raises(IRFailedToAddPlugin):
plugin_manager.add_plugin(
"https://sample_github.null/plugin_repo.git")
mock_shutil.rmtree.assert_called_with(mock_tempfile.mkdtemp.return_value)
papi_create_datastore_new.py 文件源码
项目:ccu_and_eccu_publish
作者: gaofubin
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def createGitRepository():
# Create the git repository, or change into it if it exists
datastore = 'datastore'
if not os.path.isdir(datastore):
os.makedirs(datastore)
git.Repo.init(datastore)
repo = git.Repo(datastore)
os.chdir(datastore)
return repo