def setupTableContextMenu( self, m, addMenu ):
super().setupTableContextMenu( m, addMenu )
act = self.ui_actions
m.addSection( T_('Status') )
addMenu( m, T_('Diff HEAD vs. Working'), act.tableActionGitDiffHeadVsWorking, act.enablerGitDiffHeadVsWorking, 'toolbar_images/diff.png' )
addMenu( m, T_('Diff HEAD vs. Staged'), act.tableActionGitDiffHeadVsStaged, act.enablerGitDiffHeadVsStaged, 'toolbar_images/diff.png' )
addMenu( m, T_('Diff Staged vs. Working'), act.tableActionGitDiffStagedVsWorking, act.enablerGitDiffStagedVsWorking, 'toolbar_images/diff.png' )
m.addSeparator()
addMenu( m, T_('Annotate'), act.tableActionGitAnnotate_Bg, act.enablerTableGitAnnotate )
m.addSection( T_('Git Actions') )
addMenu( m, T_('Stage'), act.tableActionGitStage_Bg, act.enablerGitFilesStage, 'toolbar_images/include.png' )
addMenu( m, T_('Unstage'), act.tableActionGitUnstage_Bg, act.enablerGitFilesUnstage, 'toolbar_images/exclude.png' )
addMenu( m, T_('Revert'), act.tableActionGitRevert_Bg, act.enablerGitFilesRevert, 'toolbar_images/revert.png' )
m.addSeparator()
addMenu( m, T_('Rename…'), act.tableActionGitRename_Bg, self.main_window.table_view.enablerTableFilesExists )
addMenu( m, T_('Delete…'), act.tableActionGitDelete_Bg, self.main_window.table_view.enablerTableFilesExists )
python类Git()的实例源码
def clone(self):
"""Clone repo and update path to match the current one"""
r = self.URL.split('/')[-1].split('.')
if len(r) > 1:
self.directory = '.'.join(r[0:-1])
else:
self.directory = r[0]
if self.directory not in os.listdir():
git.Git().clone(self.URL)
if self.path is None:
self._find_entry_path()
elif self.path[0] == '/':
self.path = self.path[1:]
self.path = os.path.join(self.directory, self.path)
else:
self.path = os.path.join(self.directory, self.path)
def download_beard(beard_name, upgrade):
beard_details = find_record(beard_name)
git_ = git.Git("beard_cache")
print("Attempting to clone from {}...".format(beard_details['git_url']))
try:
repo_dir = join("beard_cache", beard_name)
os.makedirs(repo_dir)
repo = git.Repo()
repo.clone_from(beard_details['git_url'], repo_dir)
print("Done!")
except FileExistsError:
repo = git.Repo("beard_cache/{}".format(beard_name))
if upgrade:
print("Updating repo")
# Should be origin, since no-one should add another remote.
repo.remotes.origin.pull()
print("Done!")
else:
print("Repo already exists. Nothing to do.")
def git_clone(repo_url, ref='master'):
'''
:params repo_url - URL of git repo to clone
:params ref - branch, commit or reference in the repo to clone
Returns a path to the cloned repo
'''
if repo_url == '':
raise source_exceptions.GitLocationException(repo_url)
os.environ['GIT_TERMINAL_PROMPT'] = '0'
_tmp_dir = tempfile.mkdtemp(prefix='armada')
try:
repo = Repo.clone_from(repo_url, _tmp_dir)
repo.remotes.origin.fetch(ref)
g = Git(repo.working_dir)
g.checkout('FETCH_HEAD')
except Exception:
raise source_exceptions.GitLocationException(repo_url)
return _tmp_dir
def getCloneURL(self, name):
if self.clone_url is not None:
return self.getCloneURLFromPattern(name)
# Nothing stops 'name' from escaping the path specified by self.url, like '../../../foo'. I can't see a problem with allowing it other than that it's weird, and allowing normal subdirectory traversal could be useful, so not currently putting any restrictions on 'name'
rtn = f"{self.url}/{name}"
try:
oldEnv = dict(os.environ)
os.environ.update(makeGitEnvironment(self))
try:
git.Git().ls_remote(rtn)
finally:
os.environ.clear()
os.environ.update(oldEnv)
except git.GitCommandError as e:
err = e.stderr
# Try to strip off the formatting GitCommandError puts on stderr
match = re.search("stderr: '(.*)'$", err)
if match:
err = match.group(1)
raise RuntimeError(err)
return rtn
# Patch stashy's AuthenticationException to print the server's message (mostly for issue #16, detecting a captcha check)
def __init__(self, git_uri, window):
self._window = window
self._git = Git(git_uri)
self._watchdog = WatchDog(self._git.dir)
self._watchdog.connect("refresh", self._refresh)
self._builder = Gtk.Builder()
self._builder.add_from_resource('/com/nautilus/git/ui/location.ui')
self._builder.connect_signals({
"open_remote_clicked": self._open_remote_browser,
"compare_commits_clicked": self._compare_commits,
"popover_clicked": self._trigger_popover,
"branch_clicked": self._update_branch
})
self._build_widgets()
def install(self, plugin):
"Installs a GitHub plugin"
print("Installing...", plugin.name)
import git
# Clone the GitHub repository via the URL.
# git.Git().clone(str(plugin.baseurl), install_dir)
# Checks if the plugin installation path already exists.
if not self.isInstalled(plugin):
"""Clone the GitHub repository via Plugin URL to install_dir and
with depth=1 (shallow clone).
"""
git.Repo.clone_from(plugin.baseurl, self.install_dir, depth=1)
print("Done!")
return True
else:
print("Plugin already installed!")
return False
def main(argv=None):
""" Execute the application.
Arguments are taken from sys.argv by default.
"""
args = _cmdline(argv)
logger.start(args.logging_level)
logger.info("executing githeat")
try:
g = Git(os.getcwd())
except (InvalidGitRepositoryError, GitCommandError, GitCommandNotFound):
print("Are you sure you're in an initialized git directory?")
return 0
githeat = Githeat(g, **vars(args))
githeat.run()
logger.info("successful completion")
return 0
# Make it executable.
def install_git_(url=None):
os.chdir('modules/')
if url == None:
return "your url is None"
try:
git.Git().clone(url)
message = {'message':'your modul had installed'}
os.chdir('..')
except Exception:
message = {"message":"install failed"}
os.chdir('..')
return message
def __init__(self, target_dir, url, branch, git_ssh_identity_file=None, force=False):
super(GitRepository, self).__init__(target_dir)
self.branch = branch
try:
if url is None:
ColorPrint.exit_after_print_messages(message="GIT URL is empty")
if git_ssh_identity_file is None:
git_ssh_identity_file = os.path.expanduser("~/.ssh/id_rsa")
with git.Git().custom_environment(GIT_SSH=git_ssh_identity_file):
if not os.path.exists(target_dir) or not os.listdir(target_dir):
self.repo = git.Repo.clone_from(url=url, to_path=target_dir)
else:
self.repo = git.Repo(target_dir)
old_url = self.repo.remotes.origin.url
if not GitRepository.is_same_host(old_url, url):
if self.is_developer_mode():
ColorPrint.exit_after_print_messages(
message="This directory exists with not matching git repository " + target_dir)
else:
shutil.rmtree(target_dir, onerror=FileUtils.remove_readonly)
self.repo = git.Repo.clone_from(url=url, to_path=target_dir)
self.set_branch(branch=branch, force=force)
except git.GitCommandError as exc:
ColorPrint.exit_after_print_messages(message=exc.stderr)
def createProject( self, project ):
if shutil.which( git.Git.GIT_PYTHON_GIT_EXECUTABLE ) is None:
self.app.log.error( '"git" command line tool not found' )
return None
try:
return wb_git_project.GitProject( self.app, project, self )
except git.exc.InvalidGitRepositoryError as e:
self.log.error( 'Failed to add Git repo %r' % (project.path,) )
self.log.error( 'Git error: %s' % (e,) )
return None
#------------------------------------------------------------
def addProjectPreInitWizardHandler( self, name, url, wc_path ):
self.log.infoheader( 'Initialise Git repository in %s' % (wc_path,) )
self.setStatusAction( T_('Clone %(project)s') %
{'project': name} )
# runs on the background thread
def addProjectPreCloneWizardHandler( self, name, url, wc_path ):
self.log.infoheader( T_('Cloning Git repository %(url)s into %(path)s') %
{'url': url, 'path': wc_path} )
self.setStatusAction( T_('Clone %(project)s') %
{'project': name} )
# runs on the background thread
def setTopWindow( self, top_window ):
super().setTopWindow( top_window )
tm = self.table_view.table_model
self.all_visible_table_columns = (tm.col_staged, tm.col_status, tm.col_name, tm.col_date)
prefs = self.app.prefs.git
if prefs.program is not None:
git.Git.GIT_PYTHON_GIT_EXECUTABLE = str(prefs.program)
self.log.info( 'Git using program %s' % (shutil.which( git.Git.GIT_PYTHON_GIT_EXECUTABLE ),) )
wb_git_project.initCallbackServer( self.app )
wb_git_project.setCallbackCredentialsHandler( self.getGitCredentials )
def about( self ):
if shutil.which( git.Git.GIT_PYTHON_GIT_EXECUTABLE ) is None:
git_ver = '"git" command line tool not found'
else:
git_ver = 'git %d.%d.%d' % git.Git().version_info
return ['GitPython %s' % (git.__version__,)
,git_ver]
def setupToolBarAtLeft( self, addToolBar, addTool ):
t = addToolBar( T_('git logo'), style='font-size: 20pt; width: 40px; color: #cc0000' )
self.all_toolbars.append( t )
addTool( t, 'Git', self.main_window.projectActionSettings )
def lambda_handler(event, context):
print "event.dump = " + json.dumps(event)
responseData = {}
# If not valid cloudformation custom resource call
try:
git.Git().clone(event["ResourceProperties"]["git_url"])
DIR_NAME = "tmp/git"
REMOTE_URL = event["ResourceProperties"]["git_url"]
if os.path.isdir(DIR_NAME):
shutil.rmtree(DIR_NAME)
os.mkdir(DIR_NAME)
repo = git.Repo.init(DIR_NAME)
origin = repo.create_remote('origin',REMOTE_URL)
origin.fetch()
origin.pull(origin.refs[0].remote_head)
print "---- DONE ----"
# Foreach Object in the cloned folder, upload to s3 cloned folder.
for filename in os.listdir('DIR_NAME'):
buffer+= open(filename, 'rU').read()
s3.Bucket(event["ResourceProperties"]["bucket_name"]).put_object(Key=filename, Body=buffer)
cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData, ".zip pulled to S3 Bucket!")
except Exception:
cfnresponse.send(event, context, cfnresponse.FAILED, responseData, "Bucket Name and Key are all required.")
print "ERROR"
def __git_init(self):
""" Initialize git repository in the project infrastructure path """
if self._git_repo:
return Git().clone(self._git_repo, self._repository_directory)
else:
return Repo.init(self._repository_directory)
def clone(self):
clone_path = self.context.clone_path
source_repo = self.context.source['repository']['full_name']
if self.context.clone_depth > 0:
# Use shallow clone to speed up
clone_kwargs = dict(
depth=self.context.clone_depth,
branch=self.context.source['branch']['name']
)
if self.context.type == 'commit':
# ci retry on commit
clone_kwargs['no_single_branch'] = True
bitbucket.clone(
source_repo,
clone_path,
**clone_kwargs
)
else:
# Full clone for ci retry in single commit
bitbucket.clone(source_repo, clone_path)
gitcmd = git.Git(clone_path)
if self.context.target:
self._merge_pull_request(gitcmd)
else:
# Push to branch or ci retry comment on some commit
if not self.is_commit_exists(gitcmd, self.commit_hash):
logger.info('Unshallowing a shallow cloned repository')
output = gitcmd.fetch('--unshallow')
logger.info('%s', output)
logger.info('Checkout commit %s', self.commit_hash)
gitcmd.checkout(self.commit_hash)
gitmodules = os.path.join(clone_path, '.gitmodules')
if os.path.exists(gitmodules):
output = gitcmd.submodule('update', '--init', '--recursive')
logger.info('%s', output)
def get_conflicted_files(repo_path):
gitcmd = git.Git(repo_path)
try:
return gitcmd.diff('--name-only', '--diff-filter=U')
except git.GitCommandError:
logger.exception('Error get conflicted files by git diff command')
return None
def clone_repository(self, full_name, path, **kwargs):
clone_url = self.get_git_url(full_name)
return git.Git().clone(clone_url, path, **kwargs)
def clone_repository(self, full_name, path, **kwargs):
clone_url = self.get_git_url(full_name)
return git.Git().clone(clone_url, path, **kwargs)
def load_package_by_file(self):
"""
???? ??? ??
:return:
"""
# ?? ??? ?? ??
# 1. repository_path ? ?? ???? zip??? ??? ??? ??? ??? ??, Git ????? ?? ??
# 2. repository_path Remote Repository ?? pull ? ?? ? ??? pull? ??
# 3. exception? ????, ???? ?? ???.
# ????? ??? ?? ???? ???? ?? ???? ????? ????
# ?????? ??? ?? ??
# ?????? ????? ???? ?? ???? (?/..?) ? _ ? ????.
score_package_file = get_valid_filename(self.__score_package)
logging.debug(self.__repository_path)
package_file = osp.join(self.__repository_path, score_package_file+".zip")
logging.debug('load_package_by_file '+str(package_file))
if osp.isfile(package_file):
package_zip = zipfile.ZipFile(package_file, 'r')
# file exists
logging.debug("load local package file and package file exist")
# ??? ??? ????? ??? ?? ???.
# ??? ???? ??
os.makedirs(self.__package_path, exist_ok=True)
# ?? ?? ??
package_zip.extractall(self.__package_path)
package_zip.close()
# ????? ??? ????? ?? ???.
return self.load_package_by_local_repository()
else:
return False
def load_package_by_remote(self):
"""
??? ??????? ???? ??
:return:
"""
# socore package clone from remote repository
repository_url = self.__score_base + ':' + self.__score_package + '.git'
# Repository Key ? ?? ?? ???.
logging.debug("git Path :"+self.__package_path)
# repo = Repo(str(self.__package_path))
git = Git(os.getcwd())
# check deploy key
if os.path.exists(conf.DEFAULT_SCORE_REPOSITORY_KEY):
st = os.stat(conf.DEFAULT_SCORE_REPOSITORY_KEY)
# owner read only
if bool(st.st_mode & stat.S_IRGRP or st.st_mode & stat.S_IROTH):
os.chmod(conf.DEFAULT_SCORE_REPOSITORY_KEY, 0o600)
ssh_cmd = 'ssh -o StrictHostKeyChecking=no -i '+conf.DEFAULT_SCORE_REPOSITORY_KEY
logging.debug("SSH KEY COMMAND : "+ssh_cmd)
git.custom_environment(GIT_SSH_COMMAND=ssh_cmd)
logging.debug(f"load_package_by_remote repository_url({repository_url}) package_path({self.__package_path})")
self.__package_repository = Repo._clone(git, repository_url, self.__package_path, GitCmdObjectDB, None)
logging.debug(f"load_package_by_remote result({self.__package_repository})")
if conf.DEFAULT_SCORE_BRANCH != conf.DEFAULT_SCORE_BRANCH_MASTER:
self.__package_repository.git.checkout(conf.DEFAULT_SCORE_BRANCH)
return True
def checkout_tag(self,name):
repo = Repo('./')
repo.git.reset('--hard')
o = repo.remotes.origin
o.fetch()
g = Git('./')
g.checkout(name)
cbpi.notify("Checkout successful", "Please restart the system")
return ('', 204)
def get_add_date(git_path, filename):
"""Method for getting the initial add/commit date of a file."""
try:
gitobj = git.Git(git_path)
add_date_string = gitobj.log("--follow", "--format=%aD", "--reverse",
filename).splitlines()[0]
del gitobj
gc.collect()
logging.info(filename + " was added on " + add_date_string)
return dateutil.parser.parse(add_date_string)
except Exception as e:
logging.debug("Exception during git log for " + filename + ":\n" +
(str(e)))
return None
def hackish_pull_list_changed_files(git_path):
"""Pull new updates from remote origin (hack, using git binary -
faster but not as safe as GitPython)."""
git_repo = git.Repo(git_path)
logging.info(" HEAD: " + str(git_repo.head.commit))
logging.info(" is detached: " + str(git_repo.head.is_detached))
logging.info(" is dirty: " + str(git_repo.is_dirty()))
if git_repo.head.is_detached:
raise Exception("Detached head")
if git_repo.is_dirty():
raise Exception("Dirty repository")
del git_repo
gc.collect()
files = set()
git_obj = git.Git(git_path)
pull_lines = git_obj.pull().splitlines()
del git_obj
gc.collect()
for line in pull_lines:
match = re.search(r'^ (.+) \| .*$', line)
if not match is None:
changed_files = match.group(1).split('=>')
for changed_file in changed_files:
files.add(changed_file.strip())
return list(files)
def download_yara_rules_git():
git.Git().clone("https://github.com/Yara-Rules/rules")
def git_clone_shallow(git_url, dst_path):
git.Git().clone(git_url, dst_path, '--no-single-branch', depth=1)
rval = git.Repo(dst_path)
rval.git.fetch(tags=True)
return rval
def __init__(self, git_uri):
self._git = Git(git_uri)
self._watchdog = WatchDog(self._git.dir)
self._watchdog.connect("refresh", self._refresh)
self._builder = Gtk.Builder()
self._builder.add_from_resource('/com/nautilus/git/ui/page.ui')
self._build_widgets()