def get_by_cluster_id(self, cluster_id):
instance = db().query(self.model).\
filter(self.model.env_id == cluster_id).first()
if instance is not None:
try:
instance.repo = Repo(os.path.join(const.REPOS_DIR,
instance.repo_name))
except exc.NoSuchPathError:
logger.debug("Repo folder does not exist. Cloning repo")
self._create_key_file(instance.repo_name, instance.user_key)
if instance.user_key:
os.environ['GIT_SSH'] = \
self._get_ssh_cmd(instance.repo_name)
repo_path = os.path.join(const.REPOS_DIR, instance.repo_name)
repo = Repo.clone_from(instance.git_url, repo_path)
instance.repo = repo
return instance
python类clone_from()的实例源码
def create(self, data):
if not os.path.exists(const.REPOS_DIR):
os.mkdir(const.REPOS_DIR)
repo_path = os.path.join(const.REPOS_DIR, data['repo_name'])
if os.path.exists(repo_path):
logger.debug('Repo directory exists. Removing...')
shutil.rmtree(repo_path)
user_key = data.get('user_key', '')
if user_key:
self._create_key_file(data['repo_name'], user_key)
os.environ['GIT_SSH'] = self._get_ssh_cmd(data['repo_name'])
repo = Repo.clone_from(data['git_url'], repo_path)
instance = super(GitRepo, self).create(data)
instance.repo = repo
return instance
def git_clone(repo_url, ref='master'):
'''
:params repo_url - URL of git repo to clone
:params ref - branch, commit or reference in the repo to clone
Returns a path to the cloned repo
'''
if repo_url == '':
raise source_exceptions.GitLocationException(repo_url)
os.environ['GIT_TERMINAL_PROMPT'] = '0'
_tmp_dir = tempfile.mkdtemp(prefix='armada')
try:
repo = Repo.clone_from(repo_url, _tmp_dir)
repo.remotes.origin.fetch(ref)
g = Git(repo.working_dir)
g.checkout('FETCH_HEAD')
except Exception:
raise source_exceptions.GitLocationException(repo_url)
return _tmp_dir
def init_project(self):
project_dir_git = os.path.join(self._project_dir, 'repo')
if not self._config['repo']:
raise Exception('Empty repo configuration')
create_dir(project_dir_git)
git_path = os.path.join(project_dir_git, '.git')
if os.path.exists(git_path):
repo = Repo(project_dir_git)
print colored('Pulling', 'green') + ' repo %s' % self._config['repo']
repo.remotes.origin.pull()
else:
try:
print 'cloning... %s' % self._config['repo']
Repo.clone_from(self._config['repo'], project_dir_git)
except GitCommandError as e:
print 'Repo cannot be found'
def do_clone(self):
print "[+] Cloning '{}'".format(self['name'])
try:
if self['private']:
Repo.clone_from(self['address'], self.path(), env=dict(GIT_SSH_COMMAND=self['ssh_cmd']))
else:
Repo.clone_from(self['address'], self.path())
dispatcher.update_modules(self)
self.update_value('status', 'active')
except Exception, e:
self['status'] = 'error'
self['error_msg'] = 'Could not clone repository, probably due to authentication issues.\n{}'.format(e)
self.save()
internals = Internals.get(name="updates")
internals.update_value("last_update", time())
def clone_student_repo(student, dest=None):
"""Clones a student repo into a temporary directory
Arguments:
student (Student)
Returns:
git.Repo: repo at the cloned path
"""
logger.info('Cloning repo: %s', student.github)
if dest is None:
dest = tempfile.mkdtemp()
repo_url = student.repo_url
logger.debug('%s -> %s', repo_url, dest)
GitRepo.clone_from(repo_url, dest)
return GitRepo(dest)
def run(self, args):
github = self.prompt(args, 'github')
email = self.prompt(args, 'email')
github_password = self.prompt(args, 'password')
org = self.prompt(args, 'org')
meta_name = self.prompt(args, 'meta')
if not email.endswith('@usc.edu'):
print('You must use your USC email address')
sys.exit(1)
# save the user information to the config file
# save into ~/.csci/cscirc by default
try:
os.mkdir(path.join(path.expanduser('~'), '.csci'))
except OSError as e:
if e.errno != errno.EEXIST:
raise
config_path = path.join(path.expanduser('~'), '.csci', 'cscirc')
config = Config(config_path)
config.github_login = github
config.email = email
config.github_org = org
config.meta_name = meta_name
config.github = github_password
logger.debug('Writing config to %s', config_path)
config.save()
logger.info('Cloning meta repo into %s', config.meta_path)
GitRepo.clone_from(config.meta_remote, config.meta_path)
logger.info('Cloned meta repo')
def main():
parser = argparse.ArgumentParser(description='Find secrets hidden in the depths of git.')
parser.add_argument('-o', '--output-file', dest="output_file", help="Filename to write output to (defaults to stdout)")
parser.add_argument('git_url', type=str, help='URL for secret searching')
args = parser.parse_args()
project_path = tempfile.mkdtemp()
Repo.clone_from(args.git_url, project_path)
repo = Repo(project_path)
if args.output_file is not None:
with open(args.output_file, "w") as f:
utf8_output = codecs.getwriter('utf8')
utf8_file = utf8_output(f)
find_strings(repo, output_file=utf8_file)
else:
find_strings(repo)
shutil.rmtree(project_path, onerror=del_rw)
def clone_repo(self, local_directory_path=None):
"""
Clone the Git repository to the given file path.
:param local_directory_path: The file path to clone the repository to. If None, then use the default
directory path.
:return: The Git repository.
"""
if local_directory_path is None:
local_directory_path = self.local_directory_path
return Repo.clone_from(self.git_url, local_directory_path)
# Protected Methods
# Private Methods
# Properties
def download_plugin(plugin_id, destination):
"""
Will download the specified plugin to the specified destination if it is
found in the plugin repository.
:param plugin_id: The plugin ID to download.
:type plugin_id: string
:param destination: The destination to download the plugin on disk.
:type destination: string
"""
downloadable_plugins = get_downloadable_plugins()
if plugin_id not in downloadable_plugins:
log.error('Could not find plugin in repository: %s' %plugin_id)
return
if os.path.exists(destination): shutil.rmtree(destination)
Repo.clone_from(downloadable_plugins[plugin_id]['url'], destination)
log.info('%s plugin downloaded' %plugin_id)
def clone_repo(self, repo_dir):
Repo.clone_from(self._github_wiki_url, repo_dir)
def install_git(url=None,directory=None,name_modul=None):
directory = 'modules/{}'.format(name_modul)
if url == None:
return "your url is None"
if name_modul == None:
return "please input your modul"
try:
Repo.clone_from(url,directory)
message = {'message':'your modul had installed'}
except Exception:
message = {"message":"install failed"}
return message
def update(self):
if self.need_update():
print("[\033[92mINFO\033[0m] Found an updated version! cloning...")
Repo.clone_from(self.url, self.tmp_dir)
os.chdir(self.tmp_dir)
print("[\033[92mINFO\033[0m] Installing...")
if self.install(self.new_version):
os.chdir("..")
shutil.rmtree(self.tmp_dir)
return True
else:
print("[\033[92mINFO\033[0m] You've got already the last version :)")
shutil.rmtree(self.tmp_dir)
return False
def clone_repository(user, repo, path):
try:
if user:
Repo.clone_from('https://github.com/%s/%s.git' % (user, repo), path)
else:
# repo is the gist hash then
Repo.clone_from('https://gist.github.com/%s.git' % repo, path)
except:
print 'Error cloning \'%s\'' % repo
def main():
"""lets start our task"""
# clone the repo
cleanup(LOCAL_WORK_COPY)
try:
r = Repo.clone_from(git_url, LOCAL_WORK_COPY)
except GitCommandError as git_error:
print(git_error)
exit(-1)
d = feedparser.parse(
'https://github.com/mattermost/mattermost-server/releases.atom')
release_version = d.entries[0].title[1:]
# lets read the dockerfile of the current master
dfp = DockerfileParser()
with open('./mattermost-openshift-workdir/Dockerfile') as f:
dfp.content = f.read()
if 'MATTERMOST_VERSION' in dfp.envs:
dockerfile_version = dfp.envs['MATTERMOST_VERSION']
# Lets check if we got a new release
if semver.compare(release_version, dockerfile_version) == 1:
print("Updating from %s to %s" % (dockerfile_version, release_version))
target_branch = 'bots-life/update-to-' + release_version
if not pr_in_progress(target_branch):
patch_and_push(dfp, r, target_branch, release_version)
cleanup(LOCAL_WORK_COPY)
create_pr_to_master(target_branch)
else:
print("There is an open PR for %s, aborting..." %
(target_branch))
else:
print("we are even with Mattermost %s, no need to update" %
release_version)
def install(repo):
sheet_dir = get_sheet_path(repo)
if os.path.exists(sheet_dir):
raise CheatExtException(
"%s had been installed at %s" % (repo, sheet_dir))
github_url = get_github_url(repo)
Repo.clone_from(github_url, sheet_dir, branch="master")
print("%s is installed successfully" % repo)
def prepare(self):
self._workspace = tempfile.mkdtemp()
self.log("cloning {0}...".format(self.workspace))
self._repo = Repo.clone_from(self._target, self.workspace)
def git_clone(self, name, source):
"""
Clone the plugin from git repository.
:param name: Plugin name.
:param source: Source url.
:returns: ``True`` if successful.
:rtype: bool
"""
if git_available:
target_dir = join(self.path, "plugins", name)
temp_dir = join(self.path, "plugins", "temp_" + name)
try:
# Clone repository to temporary folder
repo = Repo.clone_from(source, temp_dir)
self.print("cloned '{}' from '{}'".format(name, source))
except:
return False
# Check if valid uj project.
try:
UjProject(temp_dir, self)
except InvalidUjProjectError:
return False
# Delete old version of project if exiting
rm(target_dir)
# Move temp dir to target location and clean.
move(temp_dir, target_dir)
rm(temp_dir)
return True
else:
return False
def setUpClass(cls):
# Deploy path ? clone ??
if osp.exists(cls.__repository_path):
shutil.rmtree(cls.__repository_path, True)
cls.__repo = Repo.clone_from(cls.__repo_url, cls.__repository_path, branch='master')
def download_addon(name):
plugin = cbpi.cache["plugins"].get(name)
plugin["loading"] = True
if plugin is None:
return ('', 404)
try:
Repo.clone_from(plugin.get("repo_url"), "./modules/plugins/%s/" % (name))
cbpi.notify("Download successful", "Plugin %s downloaded successfully" % name)
finally:
plugin["loading"] = False
return ('', 204)
def init(self, path=None):
'''Inicialize repo object or init/clone new Git repo'''
if os.path.isdir(self.path):
self._repo = Repo.init(path or self.path)
else:
if self._url:
# clone remote is there url
mkdir_p(self.path)
self._repo = Repo.clone_from(self._url,
self.path,
branch='master')
return self._repo
def _clone_repo(path, git_url):
"""
Use git to clone locally the neuron in a temp folder
:return:
"""
# clone the repo
logger.debug("[ResourcesManager] GIT clone into folder: %s" % path)
Utils.print_info("Cloning repository...")
# if the folder already exist we remove it
if os.path.exists(path):
shutil.rmtree(path)
else:
os.makedirs(path)
Repo.clone_from(git_url, path)
def _checkout_if_not_present(repo_url, repos_root):
"""Returns the existing repository if it exists,
otherwises puts a fresh clone in the right place
"""
repo_name = os.path.basename(urlparse(repo_url).path)
local_repo_path = os.path.join(repos_root, repo_name)
if os.path.isdir(local_repo_path):
return Repo(local_repo_path)
else:
printerr("Cloning new repo {repo} into {location}".format(repo=repo_name, location=local_repo_path))
return Repo.clone_from(repo_url, local_repo_path)
def clone_repo(self):
"""Clone a repo and ensures it isn't bare."""
Repo.clone_from(self.repo_url, self.repo_path)
self.repo = Repo(self.repo_path)
assert not self.repo.bare
def create_local_repo(remote_git,dir,branch):
""" Uses the git.clone_from method to clone a remote git repository locally
Args:
remote_git: Url of the remote git repository
dir: Local directory where the contents of the
remote git will be downloaded
branch: Name of the branch on the remote git which will be used for cloning
Returns:
git.repo: Reference to the local git repository
Raises:
git.exc.InvalidGitRepositoryError: If remote git repository is bare
git.exc.GitCommandError: If remote git repository does not exist
"""
if(os.path.exists(dir)):
shutil.rmtree(dir)
try:
repo = Repo.clone_from(
url=remote_git,
to_path=dir,
branch=branch
)
if repo.bare:
raise git.exc.InvalidGitRepositoryError
else:
return repo
except git.exc.GitCommandError:
print "Please make sure you have the correct access rights and the repository exists"
def create_local_repo(remote_git,dir,branch):
""" Uses the git.clone_from method to clone a remote git repository locally
Args:
remote_git: Url of the remote git repository
dir: Local directory where the contents of the
remote git will be downloaded
branch: Name of the branch on the remote git which will be used for cloning
Returns:
git.repo: Reference to the local git repository
Raises:
git.exc.InvalidGitRepositoryError: If remote git repository is bare
git.exc.GitCommandError: If remote git repository does not exist
"""
if(os.path.exists(dir)):
shutil.rmtree(dir)
try:
repo = Repo.clone_from(
url=remote_git,
to_path=dir,
branch=branch
)
if repo.bare:
raise git.exc.InvalidGitRepositoryError
else:
return repo
except git.exc.GitCommandError:
print "Please make sure you have the correct access rights and the repository exists"
def create_local_repo(remote_git,dir,branch):
""" Uses the git.clone_from method to clone a remote git repository locally
Args:
remote_git: Url of the remote git repository
dir: Local directory where the contents of the
remote git will be downloaded
branch: Name of the branch on the remote git which will be used for cloning
Returns:
git.repo: Reference to the local git repository
Raises:
git.exc.InvalidGitRepositoryError: If remote git repository is bare
git.exc.GitCommandError: If remote git repository does not exist
"""
if(os.path.exists(dir)):
shutil.rmtree(dir)
try:
repo = Repo.clone_from(
url=remote_git,
to_path=dir,
branch=branch
)
if repo.bare:
raise git.exc.InvalidGitRepositoryError
else:
return repo
except git.exc.GitCommandError:
print "Please make sure you have the correct access rights and the repository exists"
def prepareCaseSourceList(self, gitRepoUrl, envFile=None):
"""
"""
if envFile:
ssh_cmd ='ssh -i '+ envFile
else:
ssh_cmd = 'ssh -i conf/id_rsa'
logger.debug("try to clone %s", gitRepoUrl)
date = datetime.datetime.now().strftime("%Y%m%d-"+ time.tzname[1] + "-%H%M%S.%f")
caseDir = "TestCase-" + date
Repo.clone_from(gitRepoUrl, caseDir, branch='master', env={'GIT_SSH_COMMAND': ssh_cmd})
caseDirFullPath = os.path.join(os.getcwd(), caseDir)
logger.info("case dir is %s", caseDirFullPath)
return caseDirFullPath
def _check_repository(self):
"""
Check if repository exist and clone it if not.
Then fill _repo instance attribute et make a pull.
:return:
"""
if not os.path.exists("%s/.git" % self._repository_path):
Repo.clone_from(self.REPOSITORY_ADDRESS, self._repository_path)
self._repo = Repo(self._repository_path)
self._pull()
def download_dataset(dataset_name, dataset_url):
local_git = local_dataset_path(dataset_name)
if not os.path.exists(local_git):
repo = Repo.clone_from(dataset_url, local_git)
print "Downloaded"
else:
print "Repo exists, pulling latest"
repo = Repo(local_git)
git = repo.git
git.checkout("master")
repo.remotes[0].pull()
return repo
# returns a new session