In [ ]:
import requests
from requests.auth import HTTPBasicAuth
AUTH = HTTPBasicAuth('USERNAME_HERE', 'PERSONAL_ACCESS_TOKEN_HERE')
from utils import PROJECT_NAMES
def hooks_url(project):
return 'https://api.github.com/repos/menpo/{}/hooks'.format(project)
def get_hooks(project):
return requests.get(hooks_url(project), auth=AUTH).json()
def is_jenkins_hook(hook):
try:
return 'jenkins.menpo.org' in hook['config']['url']
except KeyError:
return False
def get_jenkins_hooks(project):
return list(filter(is_jenkins_hook, get_hooks(project)))
def delete_hook(hook):
response = requests.delete(hook['url'], auth=AUTH)
if response.status_code != 204:
raise ValueError('Failed to delete hook', response)
else:
return response
def create_hook(project, hook_spec):
response = requests.post(hooks_url(project),
json=hook_spec, auth=AUTH)
if response.status_code != 201:
raise ValueError('Failed to create hook', response)
else:
return response
DEFAULT_NON_PR_WEBHOOK = {
"name": "web",
"active": True,
"events": [
"push",
"repository"
],
"config": {
"url": "https://jenkins.menpo.org/github-webhook/",
}
}
DEFAULT_PR_WEBHOOK = {
"name": "web",
"active": True,
"events": [
"issue_comment",
"pull_request"
],
"config": {
"url": "https://jenkins.menpo.org/ghprbhook/",
}
}
def create_jenkins_hooks(project):
create_hook(project, DEFAULT_NON_PR_WEBHOOK)
create_hook(project, DEFAULT_PR_WEBHOOK)
In [ ]:
def remove_all_jenkins_hooks(projects):
print('Pruning jenkins hooks...')
for project in projects:
print('Cleaning {}...'.format(project))
try:
j_hooks = get_jenkins_hooks(project)
print(' {} jenkins hooks found'.format(len(j_hooks)))
for j_hook in j_hooks:
print(' - Deleting {}'.format(j_hook['id']))
delete_hook(j_hook)
except Exception as e:
print("Error: couldn't clean Jenkins hooks for {}".format(project))
print(e)
def create_all_jenkins_hooks(projects):
print('Create all jenkins hooks...')
for project in projects:
print('Creating {}...'.format(project))
try:
create_jenkins_hooks(project)
except Exception as e:
print("Error: couldn't create Jenkins hooks for {}".format(project))
print(e)
def check_all_jenkins_hooks(projects):
print('Checking jenkins hooks...')
for project in projects:
try:
j_hooks = get_jenkins_hooks(project)
if len(j_hooks) != 2:
print('{} has {} jenkins hooks'.format(project, len(j_hooks)))
else:
print('{} is fine'.format(project))
except Exception as e:
print("Error: couldn't check Jenkins hooks for {}".format(project))
print(e)
In [ ]:
remove_all_jenkins_hooks(PROJECT_NAMES)
In [ ]:
create_all_jenkins_hooks(PROJECT_NAMES)
In [ ]:
check_all_jenkins_hooks(PROJECT_NAMES)