def do_clone(self, service=None, repo_path=None):
service = service or self.get_service(lookup_repository=False)
repo_path = repo_path or os.path.join(self.path, self.target_repo or self.repo_name)
if os.path.exists(repo_path) and os.listdir(repo_path) != []:
raise FileExistsError('Cannot clone repository, '
'a folder named {} already exists and '
'is not an empty directory!'.format(repo_path))
try:
repository = Repo.init(repo_path)
service = RepositoryService.get_service(repository, self.target)
service.clone(self.namespace, self.repo_name, self.branch)
log.info('Successfully cloned `{}` into `{}`!'.format(
service.format_path(self.repo_slug),
repo_path)
)
return 0
except Exception as err:
if os.path.exists(repo_path):
shutil.rmtree(repo_path)
raise ResourceNotFoundError(err.args[2].decode('utf-8')) from err
python类init()的实例源码
def test_git_release_without_remote(self):
with self.runner.isolated_filesystem():
with open('VERSION', 'w') as fp:
fp.write('1.0.0\n')
repo = Repo.init()
repo.index.add(['VERSION'])
repo.index.commit('Initial commit')
result = self.runner.invoke(release, ['2.0.0'])
self.assertIsNone(result.exception)
self.assertEqual(result.exit_code, 0)
with open('VERSION') as fp:
self.assertEqual(fp.read(), '2.0.0\n')
self.assertEqual(repo.refs.master.commit.message, 'Release 2.0.0')
self.assertEqual(repo.tags['2.0.0'].commit, repo.refs.master.commit)
self.assertFalse(repo.is_dirty())
def test_get_projects(self, tmpdir):
"""Verify that we can retrieve projects."""
p = tmpdir.mkdir('test')
path = str(p)
repo = Repo.init(path)
file = p / 'test1.yml'
file.write_text(u"""---
tempest_git_repo: https://git.openstack.org/openstack/tempest
tempest_git_install_branch: 1493c7f0ba49bfccb9ff8516b10a65d949d7462e
tempest_git_project_group: utility_all
""", encoding='utf-8')
file = p / 'test2.yml'
file.write_text(u"""---
novncproxy_git_repo: https://github.com/kanaka/novnc
novncproxy_git_install_branch: da82b3426c27bf1a79f671c5825d68ab8c0c5d9f
novncproxy_git_project_group: nova_console
""", encoding='utf-8')
repo.index.add(['test1.yml', 'test2.yml'])
repo.index.commit("Test")
projects = osa_differ.get_projects(path,
'HEAD')
assert isinstance(projects, list)
def test_get_roles(self, tmpdir):
"""Verify that we can get OSA role information."""
p = tmpdir.mkdir('test')
path = str(p)
repo = Repo.init(path)
file = p / 'ansible-role-requirements.yml'
file.write_text(u"""
- name: apt_package_pinning
scm: git
src: https://github.com/openstack/openstack-ansible-apt_package_pinning
version: master
""", encoding='utf-8')
repo.index.add(['ansible-role-requirements.yml'])
repo.index.commit("Test")
roles = osa_differ.get_roles(path,
'HEAD',
'ansible-role-requirements.yml')
assert isinstance(roles, list)
assert roles[0][0] == 'apt_package_pinning'
def test_commit_range_valid(self, tmpdir):
"""Verify that we can test a commit range for validity."""
p = tmpdir.mkdir('test')
path = str(p)
repo = Repo.init(path)
file = p / 'test.txt'
file.write_text(u'Testing1', encoding='utf-8')
repo.index.add(['test.txt'])
repo.index.commit('Testing 1')
file.write_text(u'Testing2', encoding='utf-8')
repo.index.add(['test.txt'])
repo.index.commit('Testing 2')
result = osa_differ.validate_commit_range(path, 'HEAD~1', 'HEAD')
assert result
def test_commit_range_flipped(self, tmpdir):
"""Verify that we can test a commit range for validity."""
p = tmpdir.mkdir('test')
path = str(p)
repo = Repo.init(path)
file = p / 'test.txt'
file.write_text(u'Testing1', encoding='utf-8')
repo.index.add(['test.txt'])
repo.index.commit('Testing 1')
file.write_text(u'Testing2', encoding='utf-8')
repo.index.add(['test.txt'])
repo.index.commit('Testing 2')
result = osa_differ.validate_commit_range(path, 'HEAD', 'HEAD~1')
assert result == 'flip'
def test_make_osa_report(self, tmpdir):
"""Verify that we can make the OSA header report."""
p = tmpdir.mkdir('test')
path = str(p)
repo = Repo.init(path)
file = p / 'test.txt'
file.write_text(u'Testing1', encoding='utf-8')
repo.index.add(['test.txt'])
repo.index.commit('Testing 1')
file.write_text(u'Testing2', encoding='utf-8')
repo.index.add(['test.txt'])
repo.index.commit('Testing 2')
parser = osa_differ.create_parser()
args = parser.parse_args(['HEAD~1', 'HEAD'])
report = osa_differ.make_osa_report(path,
'HEAD~1',
"HEAD",
args)
assert "HEAD~1" in report
assert "OpenStack-Ansible Diff Generator" in report
def test_make_report_old_pin_missing(self, tmpdir):
"""Verify that we can make a report when the old pin is missing."""
p = tmpdir.mkdir('test')
path = str(p)
repo = Repo.init(path)
file = p / 'test.txt'
file.write_text(u'Testing1', encoding='utf-8')
repo.index.add(['test.txt'])
repo.index.commit('Testing 1')
file.write_text(u'Testing2', encoding='utf-8')
repo.index.add(['test.txt'])
repo.index.commit('Testing 2')
new_pins = [("test", "http://example.com", "HEAD")]
old_pins = []
report = osa_differ.make_report(str(tmpdir), old_pins, new_pins)
assert report == ''
def test_repo_clone(self, tmpdir, monkeypatch):
"""Verify that we can clone a repo."""
p = tmpdir.mkdir('test')
path = str(p)
repo = Repo.init(path)
file = p / 'test.txt'
file.write_text(u'Testing', encoding='utf-8')
repo.index.add(['test.txt'])
repo.index.commit('Testing')
def mockclone(x, y):
return path
def mockwait(*args, **kwargs):
return True
monkeypatch.setattr("git.repo.base.Repo.clone", mockclone)
monkeypatch.setattr("git.cmd.Git.AutoInterrupt.wait", mockwait)
result = osa_differ.repo_clone(path,
"http://example.com")
assert result.active_branch.name == 'master'
assert not result.is_dirty()
def test_repo_clone_update(self, tmpdir):
"""Verify that we can clone a repo."""
p = tmpdir.mkdir('test')
path = str(p)
repo = Repo.init(path)
file = p / 'test.txt'
file.write_text(u'Testing', encoding='utf-8')
repo.index.add(['test.txt'])
repo.index.commit('Testing')
p = tmpdir.mkdir("test2")
path_clonefrom = "{0}/testrepodoesntexist".format(str(p))
result = osa_differ.update_repo(path_clonefrom, path)
assert result.active_branch.name == 'master'
assert not result.is_dirty()
def test_repo_update_with_fetch(self, tmpdir, monkeypatch):
"""Verify that we can get a repo ready and update it."""
p = tmpdir.mkdir('test')
path = str(p)
repo = Repo.init(path)
file = p / 'test.txt'
file.write_text(u'Testing', encoding='utf-8')
repo.index.add(['test.txt'])
repo.index.commit('Testing')
repo.create_remote('origin', url='http://example.com')
monkeypatch.setattr(
"git.cmd.Git._call_process", lambda *args, **kwargs: True)
result = osa_differ.repo_pull(path,
"http://example.com",
fetch=True)
monkeypatch.undo()
assert result.active_branch.name == 'master'
assert not result.is_dirty()
def init_plain_repo(create, repo_path):
"""
Initialize a plain repository
Parameters:
create - if true, create a new git repo,
else, reuse an existing one
repo_path - a path for the repository to reside in
"""
if not repo_path:
return Result.Err('repository path required')
if create:
if path.isdir(repo_path):
return Result.Err('this repository already exists')
try:
makedirs(repo_path)
except PermissionError:
return Result.Err('you do not have enough permissions to create the git repository')
Repo.init(repo_path)
try:
return Result.Ok(init_new(repo_path).unwrap())
except ResultException as e:
rmtree(repo_path)
return Result.Err(str(e))
else:
if not path.isdir(repo_path):
return Result.Err('directory not found')
return init_pomu(repo_path)
def init_portage_repo(create, repo, repo_dir):
"""
Initialize a portage repository
Parameters:
create - if true, create a new portage repo with git,
else, reuse an existing one
repo - name of the repository
repo_dir - location of the newly created repository, if applicable
"""
if not repo:
return Result.Err('repository name required')
rsets = portage.db[portage.root]['vartree'].settings.repositories
if create:
if repo in rsets.prepos_order:
return Result.Err('a repository with such name already exists!')
repo_path = path.join(repo_dir, repo)
try:
makedirs(repo_path)
except PermissionError:
return Result.Err('you do not have enough permissions to create the git repository')
try:
with open(path.join(portage.root, 'etc/portage/repos.conf', 'pomu.conf'), 'a') as f:
f.write('[' + repo + ']' + '\n')
f.write('location = ' + repo_path + '\n')
except PermissionError:
rmtree(repo_path)
return Result.Err('you do not have enough permissions to setup a portage repo')
Repo.init(repo_path)
try:
return Result.Ok(init_new(repo_path, repo).unwrap())
except ResultException as e:
rmtree(repo_path)
return Result.Err(str(e))
else:
if repo not in rsets.prepos_order:
return Result.Err('repository not found')
return init_pomu(rsets.prepos[repo], repo)
def __git_init(self):
""" Initialize git repository in the project infrastructure path """
if self._git_repo:
return Git().clone(self._git_repo, self._repository_directory)
else:
return Repo.init(self._repository_directory)
def test_ambiguous_arg_iteration(self, rw_dir):
rw_repo = Repo.init(osp.join(rw_dir, 'test_ambiguous_arg'))
path = osp.join(rw_repo.working_tree_dir, 'master')
touch(path)
rw_repo.index.add([path])
rw_repo.index.commit('initial commit')
list(rw_repo.iter_commits(rw_repo.head.ref)) # should fail unless bug is fixed
def test_add_file_and_commit(self, rw_dir):
import git
repo_dir = osp.join(rw_dir, 'my-new-repo')
file_name = osp.join(repo_dir, 'new-file')
r = git.Repo.init(repo_dir)
# This function just creates an empty file ...
open(file_name, 'wb').close()
r.index.add([file_name])
r.index.commit("initial commit")
# ![test_add_file_and_commit]
def init(self): # pragma: no cover
if 'GIT_WORK_TREE' in os.environ.keys() or 'GIT_DIR' in os.environ.keys():
del os.environ['GIT_WORK_TREE']
def do_gist_clone(self):
service = self.get_service(lookup_repository=False)
repo_path = os.path.join(self.path, self.gist_ref.split('/')[-1])
service.repository = Repo.init(repo_path)
service.gist_clone(self.gist_ref)
log.info('Successfully cloned `{}` into `{}`!'.format( self.gist_ref, repo_path))
return 0
def setup_git_popen(self):
# repository mockup (in a temporary place)
self.repository = Repo.init(self.tempdir.name)
# setup git command mockup
self.Popen = MockPopen()
def FixPopen(*a, **k):
if 'start_new_session' in k:
del k['start_new_session']
return self.Popen.Popen(*a, **k)
self.Popen.mock.Popen.side_effect = FixPopen
self.Popen.mock.Popen_instance.stdin = None
self.Popen.mock.Popen_instance.wait = lambda *a, **k: self.Popen.wait()
self.Popen.mock.Popen_instance.__enter__ = lambda self: self
self.Popen.mock.Popen_instance.__exit__ = lambda self, *a, **k: None
def main_add(self, repo, rc=0, args={}):
if repo:
create_repo = repo.split('/')[-1]
else:
create_repo = 'fubar'
os.mkdir(os.path.join(self.tempdir.name, create_repo))
Repo.init(os.path.join(self.tempdir.name, create_repo))
assert rc == main(self.setup_args({
'add': True,
'<user>/<repo>': repo,
'--path': self.tempdir.name
}, args)), "Non {} result for add".format(rc)
return RepositoryService._current._did_add
def main_delete(self, repo=None, rc=0, args={}):
if repo:
repo_path = os.path.join(self.tempdir.name, repo.split('/')[-1])
os.mkdir(repo_path)
Repo.init(repo_path)
assert rc == main(self.setup_args({
'delete': True,
'<user>/<repo>': repo,
'--path': self.tempdir.name,
}, args)), "Non {} result for delete".format(rc)
return RepositoryService._current._did_delete
def action_clone(self, namespace, repository):
# hijack subprocess call
with self.mockup_git(namespace, repository):
local_slug = self.service.format_path(namespace=namespace, repository=repository, rw=True)
self.set_mock_popen_commands([
('git init', b'Initialized empty Git repository in /tmp/bar/.git/', b'', 0),
('git remote add all {}'.format(local_slug), b'', b'', 0),
('git remote add {} {}'.format(self.service.name, local_slug), b'', b'', 0),
('git version', b'git version 2.8.0', b'', 0),
('git pull --progress -v {} master'.format(self.service.name), b'', '\n'.join([
'POST git-upload-pack (140 bytes)',
'remote: Counting objects: 8318, done.',
'remote: Compressing objects: 100% (3/3), done.',
'remote: Total 8318 (delta 0), reused 0 (delta 0), pack-reused 8315',
'Receiving objects: 100% (8318/8318), 3.59 MiB | 974.00 KiB/s, done.',
'Resolving deltas: 100% (5126/5126), done.',
'From {}:{}/{}'.format(self.service.fqdn, namespace, repository),
' * branch master -> FETCH_HEAD',
' * [new branch] master -> {}/master'.format(self.service.name)]).encode('utf-8'),
0)
])
with self.recorder.use_cassette(self._make_cassette_name()):
self.service.connect()
self.service.clone(namespace, repository)
self.service.repository.create_remote('all', url=local_slug)
self.service.repository.create_remote(self.service.name, url=local_slug)
def action_add(self, namespace, repository, alone=False, name=None,
tracking='master', auto_slug=False, remotes={}):
with self.recorder.use_cassette(self._make_cassette_name()):
# init git in the repository's destination
self.repository.init()
for remote, url in remotes.items():
self.repository.create_remote(remote, url)
self.service.connect()
self.service.add(user=namespace, repo=repository, alone=alone, name=name, tracking=tracking, auto_slug=auto_slug)
#
if not tracking:
if not alone and not name:
self.assert_added_remote_defaults()
elif not alone and name:
self.assert_added_remote(name)
self.assert_added_remote('all')
elif alone and not name:
self.assert_added_remote(self.service.name)
elif alone and name:
self.assert_added_remote(name)
else:
if not alone and not name:
self.assert_added_remote_defaults()
self.assert_tracking_remote()
elif not alone and name:
self.assert_added_remote(name)
self.assert_added_remote('all')
self.assert_tracking_remote(name)
elif alone and not name:
self.assert_added_remote(self.service.name)
self.assert_tracking_remote(branch_name=tracking)
elif alone and name:
self.assert_added_remote(name)
self.assert_tracking_remote(name, tracking)
def init(path=None):
"""Creates an empty uj project. If possible and not existing, it initializes a git repository."""
# Initialize current directory if no arguments where given.
target_directory = path or "./"
# Walk through empty source directory and copy any non existing files.
for (dir_path, dir_names, file_names) in os.walk(empty_project_dir):
# Get relative path to source root.
rel_path = os.path.relpath(dir_path, empty_project_dir)
# Get path to current target directory
target_path = os.path.normpath(os.path.join(target_directory, rel_path))
# Create target directory if necessary.
if not os.path.isdir(target_path):
os.mkdir(target_path)
# Create file id necessary.
for file_name in file_names:
if not os.path.exists(os.path.join(target_path, file_name)):
copyfile(os.path.join(dir_path, file_name), os.path.join(target_path, file_name))
# If it's copying a ujml file. Fill is the version number.
if file_name.endswith(".ujml"):
with open(os.path.join(target_path, file_name), "r") as f:
content = f.read()
with open(os.path.join(target_path, file_name), "w") as f:
f.write(content.format(version=uj_version))
# If possible make sure it's a git repository.
if git_available:
try:
Repo(target_directory)
except InvalidGitRepositoryError:
Repo.init(target_directory)
def init_package_in_develop(self):
# delete .git if exist
os.system("rm -rf " + osp.join(self.__package_path, '.git'))
os.system("rm -rf " + osp.join(self.__package_path, 'deploy'))
# init git for make a local repository
repo = Repo.init(str(self.__package_path))
repo.git.add('*.py')
repo.git.add('*.json')
repo.index.commit('add new files')
def init(self, path=None):
'''Inicialize repo object or init/clone new Git repo'''
if os.path.isdir(self.path):
self._repo = Repo.init(path or self.path)
else:
if self._url:
# clone remote is there url
mkdir_p(self.path)
self._repo = Repo.clone_from(self._url,
self.path,
branch='master')
return self._repo
def repo(self):
'''Try create instance of repo object or init new Git one'''
if not hasattr(self, '_repo'):
try:
self._repo = Repo(self.path)
except:
self.init()
return self._repo
def _initialise_history_repo(repo_path):
repo = Repo.init(repo_path)
with open(os.path.join(repo_path, "README.md"), "w") as readme:
print(_README(), file=readme)
repo.index.add(["README.md"])
repo.index.commit("Initial commit")
return
def __enter__(self):
path = super(git_bare_repo, self).__enter__()
Repo.init(bare=True)
return path
def __enter__(self):
super(git_repo, self).__enter__()
return Repo.init()