def init_account(self):
"""Setup a new GitHub account."""
ghuser = self.api.me()
# Setup local access tokens to be used by the webhooks
hook_token = ProviderToken.create_personal(
'github-webhook',
self.user_id,
scopes=['webhooks:event'],
is_internal=True,
)
# Initial structure of extra data
self.account.extra_data = dict(
id=ghuser.id,
login=ghuser.login,
name=ghuser.name,
tokens=dict(
webhook=hook_token.id,
),
repos=dict(),
last_sync=iso_utcnow(),
)
db.session.add(self.account)
# Sync data from GitHub, but don't check repository hooks yet.
self.sync(hooks=False)
python类GitHub()的实例源码
def generate_token():
ui.info_1("Creating new GitHub token")
username = ui.ask_string("Please enter you GitHub username")
password = getpass.getpass("Password: ")
scopes = ['repo']
# Need a different note for each device, otherwise
# gh_api.authorize() will fail
note = "tsrc-" + str(uuid.uuid4())
note_url = "https://supertanker.github.io/tsrc"
def ask_2fa():
return ui.ask_string("2FA code: ")
authorization = github3.authorize(username, password, scopes,
note=note, note_url=note_url,
two_factor_callback=ask_2fa)
return authorization.token
def __init__(self, owner, repo, pr, token, url=None):
self.github = None
# TODO: support non-PR runs
try:
pr = int(pr)
except (ValueError, TypeError):
return
if not url or url == 'https://github.com':
self.github = github3.GitHub(token=token)
else:
self.github = github3.GitHubEnterprise(url, token=token)
self.owner = owner
self.repo = repo
self.pr = pr
self.pull_request = self.github.pull_request(owner, repo, pr)
self.commits = self.pr_commits(self.pull_request)
self.last_sha = self.commits[-1].sha
self.first_sha = self.commits[0].sha
self.parent_sha = git.parent_sha(self.first_sha)
self.diff = git.diff(self.parent_sha, self.last_sha)
def connect(self):
if self.fqdn != GITHUB_COM_FQDN:
# upgrade self.gh from a GitHub object to a GitHubEnterprise object
gh = github3.GitHubEnterprise(RepositoryService.build_url(self))
self.gh._session.base_url = gh._session.base_url
gh._session = self.gh._session
self.gh = gh
# propagate ssl certificate parameter
self.gh._session.verify = self.session_certificate or not self.session_insecure
try:
self.gh.login(token=self._privatekey)
self.username = self.gh.user().login
except github3.models.GitHubError as err:
if 401 == err.code:
if not self._privatekey:
raise ConnectionError('Could not connect to Github. '
'Please configure .gitconfig '
'with your github private key.') from err
else:
raise ConnectionError('Could not connect to Github. '
'Check your configuration and try again.') from err
def get_auth_token(cls, login, password, prompt=None):
import platform
if cls.fqdn != GITHUB_COM_FQDN:
gh = github3.GitHubEnterprise()
else:
gh = github3.GitHub()
gh.login(login, password, two_factor_callback=lambda: prompt('2FA code> '))
try:
auth = gh.authorize(login, password,
scopes=[ 'repo', 'delete_repo', 'gist' ],
note='git-repo2 token used on {}'.format(platform.node()),
note_url='https://github.com/guyzmo/git-repo')
return auth.token
except github3.models.GitHubError as err:
if len(err.args) > 0 and 422 == err.args[0].status_code:
raise ResourceExistsError("A token already exist for this machine on your github account.")
else:
raise err
def github_org(sources, env_vars):
cmd_gral_part = cmd_composer(env_vars, 'github')
git_repositories = {'repositories':[]}
gh = github3.GitHub()
for organization in sources['repositories']:
if gh.organization(organization) is not None:
gh_organization = github3.organization(organization)
for repo in gh_organization.iter_repos():
repo_url = 'https://github.com/' + organization + '/' + repo.name + '.git'
git_repositories['repositories'].append(repo_url)
git(git_repositories, env_vars)
for repo in gh_organization.iter_repos():
cmd_github_part = ' ' + organization + ' ' + repo.name + ' -t ' + sources['token']
cmd = cmd_gral_part + cmd_github_part
os.system(cmd)
else:
pass
def get_commits(self, sha=None, path=None, author=None, number=-1, etag=None, since=None, until=None):
"""
Return a list of all commits in a repository
:params:
Parameters:
sha (str) – (optional), sha or branch to start listing commits from
path (str) – (optional), commits containing this path will be listed
author (str) – (optional), GitHub login, real name, or email to filter commits by (using commit author)
number (int) – (optional), number of commits to return. Default: -1 returns all commits
etag (str) – (optional), ETag from a previous request to the same endpoint
since (datetime or string) – (optional), Only commits after this date will be returned. This can be a datetime or an ISO8601 formatted date string.
until (datetime or string) – (optional), Only commits before this date will be returned. This can be a datetime or an ISO8601 formatted date string.
:return: a list of Commit
"""
# TODO: Should investigate proper use of GitHubIterator to help ratelimiting: https://github3py.readthedocs.io/en/master/examples/iterators.html
list_commits = []
for c in self.repo.iter_commits(sha, path, author, number, etag, since, until):
list_commits.append(c)
logging.info('Retrieved ' + str(len(list_commits)) + ' commits from repository with Username: ' + self.user_name + " / Repo: " + self.repo_name + "...")
return list_commits
def feeds(self):
"""List GitHub's timeline resources in Atom format.
:returns: dictionary parsed to include URITemplates
"""
url = self._build_url('feeds')
json = self._json(self._get(url), 200)
del json['ETag']
del json['Last-Modified']
urls = [
'timeline_url', 'user_url', 'current_user_public_url',
'current_user_url', 'current_user_actor_url',
'current_user_organization_url',
]
for url in urls:
json[url] = URITemplate(json[url])
links = json.get('_links', {})
for d in links.values():
d['href'] = URITemplate(d['href'])
return json
def login(self, username=None, password=None, token=None,
two_factor_callback=None):
"""Logs the user into GitHub for protected API calls.
:param str username: login name
:param str password: password for the login
:param str token: OAuth token
:param func two_factor_callback: (optional), function you implement to
provide the Two Factor Authentication code to GitHub when necessary
"""
if username and password:
self._session.basic_auth(username, password)
elif token:
self._session.token_auth(token)
# The Session method handles None for free.
self._session.two_factor_auth_callback(two_factor_callback)
def rate_limit(self):
"""Returns a dictionary with information from /rate_limit.
The dictionary has two keys: ``resources`` and ``rate``. In
``resources`` you can access information about ``core`` or ``search``.
Note: the ``rate`` key will be deprecated before version 3 of the
GitHub API is finalized. Do not rely on that key. Instead, make your
code future-proof by using ``core`` in ``resources``, e.g.,
::
rates = g.rate_limit()
rates['resources']['core'] # => your normal ratelimit info
rates['resources']['search'] # => your search ratelimit info
.. versionadded:: 0.8
:returns: dict
"""
url = self._build_url('rate_limit')
return self._json(self._get(url), 200)
def github_repo_configure(ctx, with_maintainers_file=False):
"""Configure GitHub repositories."""
conf = ctx.obj['config']
gh = ctx.obj['client']
for repo in conf.repositories:
click.echo('Configuring {}'.format(repo.slug))
repoapi = RepositoryAPI(gh, conf=repo)
if repoapi.update_settings():
click.echo('Updated settings')
if repoapi.update_team():
click.echo('Updated maintainer team')
if repoapi.update_branch_protection():
click.echo('Updated branch protection')
if with_maintainers_file:
click.echo('Checking MAINTAINERS file')
if repoapi.update_maintainers_file():
click.echo('Updated MAINTAINERS file')
# TODO prevent merge commits
def protect(self, required_status_checks=None,
required_pull_request_reviews=None,
dismissal_restrictions=None,
restrictions=None, enforce_admins=None):
"""Enable branch protection (with all features)."""
data = {
'required_status_checks': required_status_checks,
'required_pull_request_reviews': required_pull_request_reviews,
'dismissal_restrictions': dismissal_restrictions,
'restrictions': restrictions,
'enforce_admins': enforce_admins,
}
url = self._build_url('protection', base_url=self._api)
return self._json(
self._put(url, data=dumps(data), headers=self.PREVIEW_HEADERS),
200)
#
# Wrapper classes for GitHub API.
#
def deconstruct_github_url(url):
"""
Helper function for deconstructing GitHub repository URL and returning its
owner and name
:param url: GitHub repository URL
:type url: str
:returns: repository owner and name
:rtype: tuple
"""
if not url.startswith(GITHUB_PREFIX):
raise ValueError(
"Passed URL is not a GitHub repository: {}".format(url)
)
owner, repo = remove_prefix(url, GITHUB_PREFIX).split('/')[:2]
return owner, repo
def _github_get_repository(self, conf: dict):
"""Create repository object according to configuration. """
try:
import github3
except ImportError:
raise common.InputError(self, "github3 module not found")
github = None
if conf.get("github_user") and conf.get("github_token"):
try:
github = github3.login(username=conf.get("github_user"),
token=conf.get("github_token"))
except Exception as err:
raise common.InputError(self, "Github auth error: " + str(err))
if not github:
github = github3.GitHub()
repository = github.repository(conf["owner"], conf["repository"])
return repository
def all_users(self, number=-1, etag=None, per_page=None, since=None):
"""Iterate over every user in the order they signed up for GitHub.
.. versionchanged:: 1.0.0
Inserted the ``since`` parameter after the ``number`` parameter.
:param int number: (optional), number of users to return. Default: -1,
returns all of them
:param int since: (optional), ID of the last user that you've seen.
:param str etag: (optional), ETag from a previous request to the same
endpoint
:param int per_page: (optional), number of users to list per request
:returns: generator of :class:`User <github3.users.User>`
"""
url = self._build_url('users')
return self._iter(int(number), url, users.User, etag=etag,
params={'per_page': per_page, 'since': since})
def login(self, username=None, password=None, token=None,
two_factor_callback=None):
"""Logs the user into GitHub for protected API calls.
:param str username: login name
:param str password: password for the login
:param str token: OAuth token
:param func two_factor_callback: (optional), function you implement to
provide the Two Factor Authentication code to GitHub when necessary
"""
if username and password:
self.session.basic_auth(username, password)
elif token:
self.session.token_auth(token)
# The Session method handles None for free.
self.session.two_factor_auth_callback(two_factor_callback)
def rate_limit(self):
"""Returns a dictionary with information from /rate_limit.
The dictionary has two keys: ``resources`` and ``rate``. In
``resources`` you can access information about ``core`` or ``search``.
Note: the ``rate`` key will be deprecated before version 3 of the
GitHub API is finalized. Do not rely on that key. Instead, make your
code future-proof by using ``core`` in ``resources``, e.g.,
::
rates = g.rate_limit()
rates['resources']['core'] # => your normal ratelimit info
rates['resources']['search'] # => your search ratelimit info
.. versionadded:: 0.8
:returns: dict
"""
url = self._build_url('rate_limit')
return self._json(self._get(url), 200)
def project_info(bot, msg):
"""Show GitHub project information."""
user, repo_name = msg.match.groups()
github = github3.GitHub()
repo = github.repository(user, repo_name)
if repo is not None:
msg.respond(
'\x0303{user}/{repo}\x03 | \x0308{stars} stars\x03 | \x0314{description}\x03'.format(
user=user,
repo=repo_name,
stars=repo.stargazers,
description=repo.description,
),
ping=False,
)
def issue_info(bot, msg):
"""Show GitHub project issue information."""
user, repo_name, issue_num = msg.match.groups()
github = github3.GitHub()
repo = github.repository(user, repo_name)
if repo is not None:
issue = repo.issue(int(issue_num))
if issue is not None:
msg.respond(
'\x0314Issue #{num}: {title}\x03 (\x0308{state}\x03, filed by \x0302{user}\x03)'.format(
num=issue_num,
title=issue.title,
state=issue.state,
user=issue.user.login,
),
ping=False,
)
def pr_info(bot, msg):
"""Show GitHub project pull request information."""
user, repo_name, pr_num = msg.match.groups()
github = github3.GitHub()
repo = github.repository(user, repo_name)
if repo is not None:
pr = repo.pull_request(int(pr_num))
if pr is not None:
msg.respond(
'\x0314PR #{num}: {title}\x03 (\x0308{state}\x03, submitted by \x0302{user}\x03)'.format(
num=pr_num,
title=pr.title,
state=pr.state,
user=pr.user.login,
),
ping=False,
)
def __init__(self, user_id=None):
"""Create a GitHub API object."""
self.user_id = user_id
def api(self):
"""Return an authenticated GitHub API."""
return github3.login(token=self.access_token)
def webhook_url(self):
"""Return the url to be used by a GitHub webhook."""
webhook_token = ProviderToken.query.filter_by(
id=self.account.extra_data['tokens']['webhook']
).first()
if webhook_token:
wh_url = current_app.config.get('GITHUB_WEBHOOK_RECEIVER_URL')
if wh_url:
return wh_url.format(token=webhook_token.access_token)
else:
raise RuntimeError('You must set GITHUB_WEBHOOK_RECEIVER_URL.')
def sync(self, hooks=True, async_hooks=True):
"""Synchronize user repositories.
:param bool hooks: True for syncing hooks.
:param bool async_hooks: True for sending of an asynchronous task to
sync hooks.
.. note::
Syncing happens from GitHub's direction only. This means that we
consider the information on GitHub as valid, and we overwrite our
own state based on this information.
"""
active_repos = {}
github_repos = {repo.id: repo for repo in self.api.repositories()
if repo.permissions['admin']}
for gh_repo_id, gh_repo in github_repos.items():
active_repos[gh_repo_id] = {
'id': gh_repo_id,
'full_name': gh_repo.full_name,
'description': gh_repo.description,
}
if hooks:
self._sync_hooks(list(active_repos.keys()),
asynchronous=async_hooks)
# Remove ownership from repositories that the user has no longer
# 'admin' permissions, or have been deleted.
Repository.query.filter(
Repository.user_id == self.user_id,
~Repository.github_id.in_(github_repos.keys())
).update(dict(user_id=None, hook=None), synchronize_session=False)
# Update repos and last sync
self.account.extra_data.update(dict(
repos=active_repos,
last_sync=iso_utcnow(),
))
self.account.extra_data.changed()
db.session.add(self.account)
def create_hook(self, repo_id, repo_name):
"""Create repository hook."""
config = dict(
url=self.webhook_url,
content_type='json',
secret=current_app.config['GITHUB_SHARED_SECRET'],
insecure_ssl='1' if current_app.config['GITHUB_INSECURE_SSL']
else '0',
)
ghrepo = self.api.repository_with_id(repo_id)
if ghrepo:
try:
hook = ghrepo.create_hook(
'web', # GitHub identifier for webhook service
config,
events=['release'],
)
except github3.GitHubError as e:
# Check if hook is already installed
hook_errors = (m for m in e.errors
if m['code'] == 'custom' and
m['resource'] == 'Hook')
if next(hook_errors, None):
hooks = (h for h in ghrepo.hooks()
if h.config.get('url', '') == config['url'])
hook = next(hooks, None)
if hook:
hook.edit(config=config, events=['release'])
finally:
if hook:
Repository.enable(user_id=self.user_id,
github_id=repo_id,
name=repo_name,
hook=hook.id)
return True
return False
def _dev_api(cls):
"""Get a developer instance for GitHub API access."""
gh = github3.GitHub()
gh.set_client_id(cls.remote.consumer_key, cls.remote.consumer_secret)
return gh
def author(self):
"""Extract the author's GitHub username from a release."""
return self.release.get('author', {}).get('login')
def files(self):
"""Extract files to download from GitHub payload."""
tag_name = self.release['tag_name']
repo_name = self.repository['full_name']
zipball_url = self.release['zipball_url']
filename = u'{name}-{tag}.zip'.format(name=repo_name, tag=tag_name)
response = self.gh.api.session.head(zipball_url)
assert response.status_code == 302, \
u'Could not retrieve archive from GitHub: {0}'.format(zipball_url)
yield filename, zipball_url
def publish(self):
"""Publish GitHub release as record."""
with db.session.begin_nested():
deposit = self.deposit_class.create(self.metadata)
deposit['_deposit']['created_by'] = self.event.user_id
deposit['_deposit']['owners'] = [self.event.user_id]
# Fetch the deposit files
for key, url in self.files:
deposit.files[key] = self.gh.api.session.get(
url, stream=True).raw
deposit.publish()
recid, record = deposit.fetch_published()
self.model.recordmetadata = record.model
def _get_files(owner, repo, sha, tokens):
"""Get repo file paths
"""
# TODO: use other tokens if first fails
github_api = GitHub(token=tokens[0])
repo_api = github_api.repository(owner, repo)
# First attempt - use GitHub Tree API
files = _get_files_tree_api(repo_api, sha)
if files is None:
# Tree is trancated - use GitHub Contents API
files = _get_files_contents_api(repo_api, sha)
log.debug(
'Remaining GitHub API calls: %s',
github_api.rate_limit()['rate']['remaining'])
return files