def list(self, pattern):
res = self._EXTRACT_PATTERN.match(pattern)
if not res:
raise URIException(f"Unable to match {pattern},"
" please use 'organization[/repo_pattern]'")
org_name = res.group("org")
repo_matcher = res.group("repo") or "*"
try:
repos = self._gh.get_organization(org_name).get_repos()
except github.GithubException:
repos = self._gh.get_user(org_name).get_repos()
for repo in repos:
if not fnmatch.fnmatch(repo.name, repo_matcher):
continue
if self._clone_protocol == self.CloneProtocol.ssh:
yield Repo(name=repo.name, url=repo.ssh_url)
elif self._clone_protocol == self.CloneProtocol.https:
yield Repo(name=repo.name, url=repo.clone_url)
else:
raise RuntimeError(f"Invalid protocol selected: {self._clone_protocol}")
python类GithubException()的实例源码
def run_add_system(name, token, org, system, prompt):
"""
Adds a new system to the repo.
"""
repo = get_repo(token=token, org=org, name=name)
try:
repo.create_label(name=system.strip(), color=SYSTEM_LABEL_COLOR)
click.secho("Successfully added new system {}".format(system), fg="green")
if prompt and click.confirm("Run update to re-generate the page?"):
run_update(name=name, token=token, org=org)
except GithubException as e:
if e.status == 422:
click.secho(
"Unable to add new system {}, it already exists.".format(system), fg="yellow")
return
raise
def test_issue_12(self):
e = github.GithubException(422, {
u'documentation_url':
u'https://developer.github.com/v3/pulls/#create-a-pull-request',
u'message':
u'Validation Failed',
u'errors': [{
u'message': u'No commits between issues-221 and issues-221',
u'code': u'custom',
u'resource': u'PullRequest'}
]}
)
self.assertEqual(
"Unable to create pull request: Validation Failed (422)\n"
"No commits between issues-221 and issues-221\n"
"Check "
"https://developer.github.com/v3/pulls/#create-a-pull-request "
"for more information.",
gpr._format_github_exception("create pull request", e))
def get_repository():
"""Get the GitHub repo specified in settings or the default.
If the repo doesn't exist, try to create it.
"""
try:
g = Github(**get_github_credentials())
if app_settings.GITHUB_ORG:
user = g.get_organization(app_settings.GITHUB_ORG)
else:
user = g.get_user()
try:
return user.get_repo(app_settings.GITHUB_REPO)
except UnknownObjectException:
logging.info("Creating repository {}".format(
app_settings.GITHUB_REPO
))
return user.create_repo(app_settings.GITHUB_REPO)
except GithubException:
logging.exception("Unable to configure Github connection.")
def commit_script(path, script):
"""Commit script string to GitHub repo."""
repository = get_repository()
try:
sha = repository.get_contents(path).sha
repository.update_file(
path=path,
message='Template update',
content=script,
sha=sha
)
except GithubException:
repository.create_file(
path=path,
message='Template initial commit',
content=script
)
logging.info("Initial commit of new file {}".format(path))
def testInvalidInput(self):
raised = False
try:
self.g.get_user().create_key("Bad key", "xxx")
except github.GithubException as exception:
raised = True
self.assertEqual(exception.status, 422)
self.assertEqual(
exception.data,
{
"errors": [
{
"code": "custom",
"field": "key",
"message": "key is invalid. It must begin with 'ssh-rsa' or 'ssh-dss'. Check that you're copying the public half of the key",
"resource": "PublicKey"
}
],
"message": "Validation Failed"
}
)
self.assertTrue(raised)
def handle_exceptions():
def wrapper(func):
@functools.wraps(func)
async def wrapped(*args: Tuple[Any]) -> Response:
try:
return await func(*args)
except KeyError as missing_key:
logging.warning(f"Error in {func}: {missing_key} key is missing")
return aiohttp.web.Response(text=f'Error: {missing_key} is missing in request', status=400)
except TriggearTimeoutError as timeout_error:
logging.exception(f'Timeout error raised')
return aiohttp.web.Response(text=str(timeout_error), reason=f'Timeout when accessing resources: {str(timeout_error)}', status=504)
except GithubException as github_exception:
logging.exception(f'Github client raised exception')
return aiohttp.web.Response(reason=str(github_exception.data), status=github_exception.status)
return wrapped
return wrapper
def test__when_github_entity_is_not_found__status_should_return_404_response_with_github_explanation(self):
parameters = {'repository': 'repo', 'sha': 'null', 'state': 'State', 'description': 'Description', 'context': 'Context', 'url': 'pr_url'}
request = mock({'headers': {'Authorization': f'Token {self.API_TOKEN}'}}, spec=aiohttp.web_request.Request, strict=True)
github_client = mock(spec=github.Github, strict=True)
github_repository = mock(spec=github.Repository, strict=True)
pipeline_controller = PipelineController(github_client, mock(), self.API_TOKEN)
# given
when(request).json().thenReturn(async_value(parameters))
when(StatusRequestData).is_valid_status_data(parameters).thenReturn(True)
when(github_client).get_repo('repo').thenReturn(github_repository)
github_exception_data = {'message': 'Not Found', 'documentation_url': 'https://developer.github.com/v3/'}
when(github_repository).get_commit('null').thenRaise(github.GithubException(404, github_exception_data))
# when
response: aiohttp.web.Response = await pipeline_controller.handle_status(request)
# then
assert response.status == 404
assert response.reason == str(github_exception_data)
def test__when_github_entity_is_not_found__comment_should_return_404_response_with_github_explanation(self):
parameters = {'repository': 'repo', 'sha': 'null', 'body': 'Comment body', 'jobName': 'job'}
request = mock({'headers': {'Authorization': f'Token {self.API_TOKEN}'}}, spec=aiohttp.web_request.Request, strict=True)
github_client = mock(spec=github.Github, strict=True)
github_repository = mock(spec=github.Repository, strict=True)
pipeline_controller = PipelineController(github_client, mock(), self.API_TOKEN)
# given
when(request).json().thenReturn(async_value(parameters))
when(CommentRequestData).is_valid_comment_data(parameters).thenReturn(True)
when(github_client).get_repo('repo').thenReturn(github_repository)
github_exception_data = {'message': 'Not Found', 'documentation_url': 'https://developer.github.com/v3/'}
when(github_repository).get_commit('null').thenRaise(github.GithubException(404, github_exception_data))
# when
response: aiohttp.web.Response = await pipeline_controller.handle_comment(request)
# then
assert response.status == 404
assert response.reason == str(github_exception_data)
def testInvalidInput(self):
raised = False
try:
self.g.get_user().create_key("Bad key", "xxx")
except github.GithubException, exception:
raised = True
self.assertEqual(exception.status, 422)
self.assertEqual(
exception.data,
{
"errors": [
{
"code": "custom",
"field": "key",
"message": "key is invalid. It must begin with 'ssh-rsa' or 'ssh-dss'. Check that you're copying the public half of the key",
"resource": "PublicKey"
}
],
"message": "Validation Failed"
}
)
self.assertTrue(raised)
def testInvalidInput(self):
raised = False
try:
self.g.get_user().create_key("Bad key", "xxx")
except github.GithubException, exception:
raised = True
self.assertEqual(exception.status, 422)
self.assertEqual(
exception.data,
{
"errors": [
{
"code": "custom",
"field": "key",
"message": "key is invalid. It must begin with 'ssh-rsa' or 'ssh-dss'. Check that you're copying the public half of the key",
"resource": "PublicKey"
}
],
"message": "Validation Failed"
}
)
self.assertTrue(raised)
def _extract(self):
try:
git_tree = self.api_repo.get_git_tree(self.api_repo.default_branch, recursive=True)
except GithubException:
git_tree = None
items = git_tree.tree if git_tree else []
files = [item for item in items if item.type == 'blob']
extensions_to_count = defaultdict(int)
extensions_to_size = defaultdict(int)
for file in files:
_, extension = os.path.splitext(file.path)
extension = extension.lower()
extensions_to_count[extension] += 1
extensions_to_size[extension] += file.size
total_count = sum(extensions_to_count.values())
total_size = sum(extensions_to_size.values())
relevant_count = sum(extensions_to_count[ext] for ext in self.extensions_to_check)
relevant_size = sum(extensions_to_size[ext] for ext in self.extensions_to_check)
self._extension_to_count_feature.value = relevant_count / total_count if total_count else 0
self._extension_to_size_feature.value = relevant_size / total_size if total_size else 0
def fetch_readme(repo):
cache_key = str(repo.id)
cache_file = CACHE_PATH_READMES + os.sep + cache_key
# check if file is cached
if os.path.isfile(cache_file):
with open(cache_file, 'r') as file:
return file.read()
# create cache folder
if not os.path.isdir(CACHE_PATH_READMES):
os.mkdir(CACHE_PATH_READMES)
try:
readme = repo.get_readme()
except github.GithubException:
# Readme wasn't found
logging.warning('no readme found for: ' + repo.full_name)
return ''
return readme.content
def testInvalidInput(self):
raised = False
try:
self.g.get_user().create_key("Bad key", "xxx")
except github.GithubException, exception:
raised = True
self.assertEqual(exception.status, 422)
self.assertEqual(
exception.data,
{
"errors": [
{
"code": "custom",
"field": "key",
"message": "key is invalid. It must begin with 'ssh-rsa' or 'ssh-dss'. Check that you're copying the public half of the key",
"resource": "PublicKey"
}
],
"message": "Validation Failed"
}
)
self.assertTrue(raised)
def get_repos(cls, api, testing=False, level=logging.INFO):
"""
Get the list of repositories
"""
repos = []
nb_repos = 0
query = api.search_code(cls.GITHUB_QUERY)
success_query = False
while not success_query:
try:
for rep in query:
nb_repos += 1
repos.append(rep.html_url)
cls._log.info(
"repo nº{x} :\n{y}".format(x=nb_repos, y=rep.html_url))
success_query = True
except GithubException:
if testing:
success_query = True
else:
print("Hit rate limit, sleeping 60 seconds...")
sleep(60)
continue
repos = {x for x in repos if cls.HISTORY_FILE.search(x)}
return repos
def run(self, event, context):
gh_hook = json.loads(event['body'])
repo = gh_hook['repository']['full_name']
sha = gh_hook['pull_request']['head']['sha']
try:
hooks_yml = get_github().get_repo(repo, lazy=True).get_file_contents('.hooks.yml', ref=sha)
logger.info("Fetched .hooks.yml from repo {}".format(repo))
except github.GithubException:
logger.error("Missig .hooks.yml on repo {}".format(repo))
send_status(event, context, gh_hook, self.configname, 'success', ".hooks.yml not present in branch")
return
try:
hook_config = yaml.safe_load(hooks_yml.decoded_content)
logger.info("Basic yml validation passed")
except Exception as e:
logger.error("Failed to decode hook yaml: " + e.message)
send_status(event, context, gh_hook, self.configname, 'failure', "Could not decode branch .hooks.yml")
return
logger.info("Advanced schema validation")
c = Core(source_data=hook_config,
schema_files=[os.path.join(os.path.dirname(__file__), "..", "hooks.schema.yml")])
c.validate(raise_exception=False)
vc = len(c.validation_errors)
if vc > 0:
for err in c.validation_errors:
logger.error(" - {}".format(err))
send_status(event, context, gh_hook, self.configname, 'failure', ".hooks.yml has {} validation errors; see log".format(vc))
return
send_status(event, context, gh_hook, self.configname, 'success', ".hooks.yml present and valid")
def monitor_inventory_metrics(synonym_mappings):
global REPO_SCRAPE_TIMES
minus_three_months = (datetime.now() - timedelta(3 * 365 / 12)).isoformat().split('.')[0]
def git():
return Github(get_access_token())
for repo in git().search_repositories('org:soundcloud pushed:>' + minus_three_months):
owner = get_owner(synonym_mappings, repo)
REPO_SCRAPE_TIMES[(owner, repo.name)] = time.time()
pulls = list(repo.get_pulls())
observe_inventory(owner, repo.name, pulls)
manifests = [None]
try:
manifests = list(git().search_code(
'repo:soundcloud/%s language:json filename:*manifest*.json' % repo.name))
except GithubException:
logger.error('Could not search repo %s!' % repo.name)
observe_features(owner, repo.name, manifests)
# zero-out deleted repos
dead_repos = {tup: last_time for tup, last_time in REPO_SCRAPE_TIMES.iteritems() if
last_time < time.time() - 60 * 60}
for owner, repo_name in dead_repos.keys():
del REPO_SCRAPE_TIMES[(owner, repo_name)]
observe_inventory(owner, repo_name, [])
def testAuthorizationHeaderWithLogin(self):
# See special case in Framework.fixAuthorizationHeader
g = github.Github("fake_login", "fake_password")
try:
g.get_user().name
except github.GithubException:
pass
def testAuthorizationHeaderWithToken(self):
# See special case in Framework.fixAuthorizationHeader
g = github.Github("ZmFrZV9sb2dpbjpmYWtlX3Bhc3N3b3Jk")
try:
g.get_user().name
except github.GithubException:
pass
def testNonJsonDataReturnedByGithub(self):
# Replay data was forged according to https://github.com/jacquev6/PyGithub/pull/182
raised = False
try:
self.g.get_user("jacquev6")
except github.GithubException as exception:
raised = True
self.assertEqual(exception.status, 503)
self.assertEqual(
exception.data,
{
"data": "<html><body><h1>503 Service Unavailable</h1>No server is available to handle this request.</body></html>",
}
)
self.assertTrue(raised)
def testUnknownObject(self):
raised = False
try:
self.g.get_user().get_repo("Xxx")
except github.GithubException as exception:
raised = True
self.assertEqual(exception.status, 404)
self.assertEqual(exception.data, {"message": "Not Found"})
if atLeastPython26 and atMostPython2:
self.assertEqual(str(exception), "404 {u'message': u'Not Found'}")
else:
self.assertEqual(str(exception), "404 {'message': 'Not Found'}") # pragma no cover (Covered with Python 3)
self.assertTrue(raised)
def testBadAuthentication(self):
raised = False
try:
github.Github("BadUser", "BadPassword").get_user().login
except github.GithubException as exception:
raised = True
self.assertEqual(exception.status, 401)
self.assertEqual(exception.data, {"message": "Bad credentials"})
if atLeastPython26 and atMostPython2:
self.assertEqual(str(exception), "401 {u'message': u'Bad credentials'}")
else:
self.assertEqual(str(exception), "401 {'message': 'Bad credentials'}") # pragma no cover (Covered with Python 3)
self.assertTrue(raised)
def testExceptionPickling(self):
pickle.loads(pickle.dumps(github.GithubException('foo', 'bar')))
def testGetAuthorizationsFailsWhenAutenticatedThroughOAuth(self):
g = github.Github(self.oauth_token)
raised = False
try:
list(g.get_user().get_authorizations())
except github.GithubException as exception:
raised = True
self.assertEqual(exception.status, 404)
self.assertTrue(raised)
def testRaiseErrorWithOutBranch(self):
raised = False
try:
self.repo.protect_branch("", True, "everyone", ["test"])
except github.GithubException as exception:
raised = True
self.assertEqual(exception.status, 404)
self.assertEqual(
exception.data, {
'documentation_url': 'https://developer.github.com/v3/repos/#get-branch',
'message': 'Branch not found'
}
)
self.assertTrue(raised)
def testRaiseErrorWithBranchProtectionWithOutContext(self):
raised = False
try:
self.repo.protect_branch("master", True, "everyone")
except github.GithubException as exception:
raised = True
self.assertEqual(exception.status, 422)
self.assertEqual(
exception.data, {
'documentation_url': 'https://developer.github.com/v3',
'message': 'Invalid request.\n\n"contexts" wasn\'t supplied.'
}
)
self.assertTrue(raised)
def testMergeWithConflict(self):
raised = False
try:
commit = self.repo.merge("branchForBase", "branchForHead")
except github.GithubException as exception:
raised = True
self.assertEqual(exception.status, 409)
self.assertEqual(exception.data, {"message": "Merge conflict"})
self.assertTrue(raised)
def testBadSubscribePubSubHubbub(self):
raised = False
try:
self.repo.subscribe_to_hub("non-existing-event", "http://requestb.in/1bc1sc61")
except github.GithubException as exception:
raised = True
self.assertEqual(exception.status, 422)
self.assertEqual(exception.data, {"message": "Invalid event: \"non-existing-event\""})
self.assertTrue(raised)
def set_pr_sync_label_with_retry(self, repo, pr_number):
retries = 3
while retries:
try:
self.__gh_client.get_repo(repo).get_issue(pr_number).add_to_labels(Labels.pr_sync)
return
except github.GithubException as gh_exception:
logging.exception(f'Exception when trying to set label on PR. Exception: {gh_exception}')
retries -= 1
await asyncio.sleep(1)
raise TriggearTimeoutError(f'Failed to set label on PR #{pr_number} in repo {repo} after 3 retries')
def are_files_in_repo(self, files: List[str], hook: HookDetails) -> bool:
try:
for file in files:
self.__gh_client.get_repo(hook.repository).get_file_contents(path=file, ref=hook.sha if hook.sha else hook.branch)
except github.GithubException as gh_exc:
logging.exception(f"Exception when looking for file {file} in repo {hook.repository} at ref {hook.sha}/{hook.branch} (Exc: {gh_exc})")
return False
return True