Select Git revision
docker-compose.yml
update_repos 18.46 KiB
#!/usr/bin/python
# encoding: utf-8
import sys
import os
import argparse
from collections import namedtuple
from subprocess import Popen, PIPE
from urllib2 import urlopen
import json
from urlparse import urlparse
WHITELIST=[]
BLACKLIST=[]
LISTINGS_PER_PAGE = 100
ACCESS_TOKEN_PARAM = '?access_token=%s'
LISTING_PAGE_PARAM = '&per_page=%d&page=%d'
GITHUB_API_HOST = 'https://api.github.com'
GIT_CLONE_CMD = 'git clone %s %s %s'
GIT_CLONE_API_URL = 'https://%s@github.com/%s'
GIT_SHA_CMD = 'git rev-parse --short %s'
GIT_FETCH_CMD = 'git fetch'
GIT_CHECK_REMOTE_CMD = 'git ls-remote'
USER_DETAILS_PATH = '/users/%s'
USER_ORG_DETAILS_PATH = '/user/orgs'
DEFAULT_TOKEN_FILE = os.path.expanduser('~/.config/ghtoken')
class Color:
GREEN = "\033[1;32m"
BLUE = "\033[1;34m"
YELLOW = "\033[1;33m"
RED = "\033[1;31m"
END = "\033[0m"
def get_color_str(text, color):
return color + str(text) + Color.END
def system_exec(command, directory=None, show_output=True, ignore_error=False):
if not directory:
directory = os.getcwd()
try:
process = Popen(command, stdout=PIPE, stderr=PIPE, shell=True, cwd=directory)
(output, error) = process.communicate()
output = output.strip()
error = error.strip()
if show_output and len(output) > 0:
print output
sys.stdout.flush()
if process.returncode != 0 and not ignore_error:
raise Exception(error)
ReturnInfo = namedtuple('ReturnInfo', 'return_code output error')
return ReturnInfo(process.returncode, output, error)
except Exception as err:
print >>sys.stderr, Color.RED + "Could not execute", command
print >>sys.stderr, err, Color.END
print >>sys.stderr, "Terminating early"
exit(1)
class AttributeDict(dict):
def __getattr__(self, attr):
return self[attr]
def __setattr__(self, attr, value):
self[attr] = value
def read_api_uri(uri, config, page=1):
uri += ACCESS_TOKEN_PARAM % config.token + LISTING_PAGE_PARAM % (LISTINGS_PER_PAGE, page)
if config.debug:
print "Trying:", uri
return urlopen(uri).read()
def get_json(uri, config, obj_type=AttributeDict, page=1):
return json.loads(read_api_uri(uri, config, page), object_hook=obj_type)
class GitHubRepo(AttributeDict):
"""Top-level class managing all content of a GitHub repository"""
def __init__(self, *args, **kwargs):
AttributeDict.__init__(self, *args, **kwargs)
self.content = []
def configure(self, config):
self.config = config
# Initialize the content repos
self.content.append(CodeRepo(self, config))
if self.config.full_backup:
self.content.append(WikiRepo(self, config))
if self.has_issues:
self.content.append(IssuesRepo(self, config))
self.content.append(MilestonesRepo(self, config))
self.content.append(PullsRepo(self, config))
# Teams are only available from user's repos
if self.owner.login == self.config.username:
self.content.append(TeamsRepo(self, config))
self.content.append(CommentsRepo(self, config))
self.content.append(ForksRepo(self, config))
def update(self):
[repo.update() for repo in self.content]
class CodeRepo(object):
"""GitHub code git repository"""
def __init__(self, gh_repo, config):
self.config = config
# Copy the details we need from the JSON
self.name = gh_repo.name
self.full_name = gh_repo.full_name
self.ssh_url = gh_repo.ssh_url
self.default_branch = gh_repo.default_branch
self.description = gh_repo.description or ""
self.target_directory = os.path.join(self.config.cwd, self.name)
if self.config.mirror:
self.target_directory += '.git'
def try_clone(self, ignore_error=False):
print "Using", self.target_directory
if os.path.isdir(self.target_directory):
return False
print "Need to clone", self.name
if self.config.mirror:
clone_opts = '--mirror'
else:
clone_opts = '--recurse-submodules'
if self.config.ssh:
clone_url = self.ssh_url
else:
clone_url = GIT_CLONE_API_URL % (self.config.token,
self.full_name)
clone_cmd = GIT_CLONE_CMD % (clone_opts, clone_url, \
self.target_directory)
# Create leading directories if necessary
clone_dir = os.path.dirname(self.target_directory)
if not os.path.isdir(clone_dir):
os.makedirs(clone_dir)
# Let the caller decide if errors should be ignored.
return_code = system_exec(clone_cmd, ignore_error=ignore_error).return_code
if ignore_error and return_code != 0:
print "Repo for %s not initialized, skipping" % self.name
return True
print "Finished cloning"
return True
def get_sha_str(self, branch, directory):
if self.config.mirror:
sha_cmd = GIT_SHA_CMD % branch
else:
sha_cmd = GIT_SHA_CMD % 'origin/' + branch
sha = system_exec(sha_cmd, directory, False, True).output
return get_color_str(sha, Color.GREEN)
def print_start_sha(self, branch, directory):
print("- " + get_color_str(branch, Color.GREEN) + " @ " \
+ self.get_sha_str(branch, directory) + ' ..'),
def update(self, fatal_remote_errors = True):
if self.try_clone(False):
return
# Check if the remote is still there before fetching
return_code = system_exec(GIT_CHECK_REMOTE_CMD, self.target_directory, show_output=False, ignore_error=True).return_code
if return_code != 0:
# Is it fatal?
if fatal_remote_errors:
error_message = get_color_str("ERROR: Repo for %s seems to have been deleted. Skipping update." % self.full_name, Color.RED)
print error_message
raise Exception(error_message)
else:
print get_color_str("WARNING: Repo for %s seems to have been deleted. Skipping update." % self.full_name, Color.GREEN)
return
self.print_start_sha(self.default_branch, self.target_directory)
system_exec(GIT_FETCH_CMD, self.target_directory, False)
print self.get_sha_str(self.default_branch, self.target_directory)
class WikiRepo(CodeRepo):
"""GitHub wiki git repository"""
def __init__(self, gh_repo, config):
CodeRepo.__init__(self, gh_repo, config)
# Put the wiki under the project.extras/wiki directory
self.target_directory = os.path.join(self.config.cwd,
self.name + '.extras',
'wiki')
if self.config.mirror:
self.target_directory += '.git'
# Adjust the name and hardcode master as the default branch
self.name += '.wiki'
self.full_name += '.wiki'
self.ssh_url = self.ssh_url.rpartition('.git')[0] \
+ '.wiki.git'
self.default_branch = 'master'
if self.description == None:
self.description = gh_repo.name
self.description += ' - Wiki'
def try_clone(self, ignore_error=True):
# This is sad. Github will tell us a repo has a wiki, but if the
# wiki hasn't actually been initialized, it won't exist. Reuse the
# GitRepo clone code, but allow it to fail.
return CodeRepo.try_clone(self, True)
def update(self):
super(WikiRepo, self).update(fatal_remote_errors = False)
class JsonRepo(object):
def __init__(self, gh_repo, api_url, config, content=None):
self.config = config
# Copy the details we need from the JSON
self.name = gh_repo.name
self.full_name = gh_repo.full_name
self.repo_url = gh_repo.url
self.description = gh_repo.description or ""
# Sanitize the content url to strip the {/number} type markers
self.content_url = api_url.split('{')[0]
# Put downloaded content in project.extras directory
self.content_directory = os.path.join(self.config.cwd,
self.name + '.extras')
# If the content type was supplied, adjust the names further
if content is not None:
self.name += '.%s' % content
self.full_name += '.%s' % content
self.description += ' - %s' % content.title()
self.content_directory = os.path.join(self.content_directory,
content)
def download_file(self, url, suffix='.json'):
if url == self.content_url:
# Just use the content directory
target = self.content_directory
else:
# Strip off base content URL path to get target
base = os.path.relpath(urlparse(url).path,
urlparse(self.content_url).path)
target = os.path.join(self.content_directory, base)
# Add suffix if specified
if suffix:
target += suffix
target_directory = os.path.dirname(target)
if not os.path.isdir(target_directory):
os.makedirs(target_directory)
if self.config.debug:
print "Downloading %s" % target
data = read_api_uri(url, self.config)
with open(target, 'wb') as dl_file:
dl_file.write(data)
class IssuesRepo(JsonRepo):
def __init__(self, gh_repo, config):
JsonRepo.__init__(self, gh_repo, gh_repo.issues_url, config, 'issues')
def update(self):
print "Using", self.content_directory
for issue in get_json(self.content_url, self.config):
self.download_file(issue.url)
self.download_file(issue.comments_url)
self.download_file(issue.events_url)
class PullsRepo(JsonRepo):
def __init__(self, gh_repo, config):
JsonRepo.__init__(self, gh_repo, gh_repo.pulls_url, config, 'pulls')
def update(self):
print "Using", self.content_directory
for pull in get_json(self.content_url, self.config):
self.download_file(pull.url)
self.download_file(pull.commits_url)
self.download_file(pull.comments_url)
class MilestonesRepo(JsonRepo):
def __init__(self, gh_repo, config):
JsonRepo.__init__(self, gh_repo, gh_repo.milestones_url, config,
'milestones')
def update(self):
print "Using", self.content_directory
for milestone in get_json(self.content_url, self.config):
self.download_file(milestone.url)
class TeamsRepo(JsonRepo):
def __init__(self, gh_repo, config):
JsonRepo.__init__(self, gh_repo, gh_repo.teams_url, config, 'teams')
def update(self):
print "Using", self.content_directory + '.json'
self.download_file(self.content_url)
class CommentsRepo(JsonRepo):
def __init__(self, gh_repo, config):
JsonRepo.__init__(self, gh_repo, gh_repo.comments_url, config,
'comments')
def update(self):
print "Using", self.content_directory + '.json'
self.download_file(self.content_url)
class ForksRepo(JsonRepo):
def __init__(self, gh_repo, config):
JsonRepo.__init__(self, gh_repo, gh_repo.forks_url, config, 'forks')
def update(self):
print "Using", self.content_directory + '.json'
self.download_file(self.content_url)
class RepoUpdater(object):
def __init__(self, args):
# Keep a copy of the arguments dictionary
self.args = args
# Use current directory if unspecified
self.args.cwd = args.directory if args.directory else os.getcwd()
# Read a token file if necessary
if self.args.token is None:
try:
with open(self.args.token_file, 'r') as tfile:
self.args.token = tfile.read().strip()
except IOError as err:
print >>sys.stderr, Color.RED + \
"Could not open token file %s: %s" \
% (self.args.token_file, err), \
Color.END
print >>sys.stderr, "Terminating early"
exit(1)
if self.args.debug:
print "Current configuration: "
for arg in self.args:
# We don't want this printed
if arg != 'token':
print arg, ":", get_color_str(args[arg], Color.GREEN)
def update(self):
if not os.path.isdir(self.args.cwd):
os.makedirs(self.args.cwd)
print "User:", get_color_str(self.args.username, Color.GREEN)
user_data = self.get_user_data()
repos, excluded_repos = self.get_repos(user_data.repos_url, GITHUB_API_HOST + USER_ORG_DETAILS_PATH)
repos = self.filter_repo_names(repos, excluded_repos)
for repo in repos:
print get_color_str('{:-^60}'.format(repo.name), Color.YELLOW)
repo.update()
def get_user_data(self):
return get_json(GITHUB_API_HOST + USER_DETAILS_PATH
% self.args.username, self.args)
def get_own_repos(self, repos_url):
repos = self._get_paged_repos(repos_url, self.args, GitHubRepo)
[repo.configure(self.args) for repo in repos]
repo_count = len(repos)
if self.args.exclude_forks:
repos = [repo for repo in repos if not repo.fork]
return repos, repo_count
def get_org_repos(self, orgs_url):
all_orgs_repos = []
repo_count = 0
orgs = get_json(orgs_url, self.args)
for org in orgs:
org_repos = self._get_paged_repos(org.repos_url, self.args, GitHubRepo)
[repo.configure(self.args) for repo in org_repos]
repo_count += len(org_repos)
if not self.args.include_public_org_repos:
org_repos = [repo for repo in org_repos if repo.private]
if not self.args.include_org_forks:
org_repos = [repo for repo in org_repos if not repo.fork]
all_orgs_repos += org_repos
return all_orgs_repos, repo_count
def _get_paged_repos(self, url, args, clazz):
repos_page = get_json(url, args, clazz)
repos = repos_page
if self.args.debug:
print "Retrieved %d entries from %s" % (len(repos), url )
page = 2
while len(repos_page) >= LISTINGS_PER_PAGE:
repos_page = get_json(url, args, clazz, page)
if self.args.debug:
print "Retrieved %d entries from %s" % (len(repos_page), url)
repos += repos_page
page += 1
return repos
def get_repos(self, repos_url, orgs_url):
repos = []
if self.args.debug:
print "Getting repo data from", get_color_str(repos_url, Color.GREEN)
own_repos, repo_count = self.get_own_repos(repos_url)
if not self.args.exclude_own:
repos += own_repos
org_repos, org_repo_count = self.get_org_repos(orgs_url)
if not self.args.exclude_orgs:
repos += org_repos
if self.args.debug:
print "Available repos:", get_color_str(str(len(repos)), Color.GREEN)
for repo in repos:
owner = repo.owner.login
print " -", get_color_str(repo.name, Color.YELLOW)
print " " * 5, repo.description
excluded_repos = repo_count + org_repo_count - len(repos)
return repos, excluded_repos
def filter_repo_names(self, repos, excluded_repos):
original_repos = len(repos)
if BLACKLIST:
repos = [repo for repo in repos if not repo.name in BLACKLIST]
if WHITELIST:
repos = [repo for repo in repos if repo.name in WHITELIST]
filtered_repos = original_repos - len(repos)
ignored_repos_str = " (" + str(filtered_repos) + " filtered, " + str(excluded_repos) + " excluded)"
repo_count_str = str(original_repos - filtered_repos) + " / " + str(original_repos)
print "Fetching repos:", get_color_str(repo_count_str + ignored_repos_str, Color.GREEN)
for repo in repos:
owner = repo.owner.login
print " -", Color.YELLOW + repo.name, Color.END
print " " * 5, repo.description
return repos
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Retrieve and/or update local copies of GitHub-hosted repos')
parser.add_argument('username', \
help='GitHub username that will be used for cloning and fetching')
parser.add_argument('token', \
nargs='?', \
help='GitHub auth token for that username. \
You can create one at https://github.com/settings/applications')
parser.add_argument('--version', \
action='version', \
version='%(prog)s v0.7')
parser.add_argument('-d', '--directory', \
help='Target directory for cloning and fetching')
parser.add_argument('-t', '--token-file', \
default=DEFAULT_TOKEN_FILE, \
help='File containing the github token')
parser.add_argument('-s', '--ssh', \
help='Fetch repositories using ssh', \
action='store_true')
parser.add_argument('-m', '--mirror', \
help='Mirror bare repositories instead of making full checkouts', \
action='store_true')
parser.add_argument('-x', '--exclude-own', \
help='Exclude own repositories in the updates', \
action='store_true')
parser.add_argument('--exclude-forks', \
help='Exclude forked repositories from the updates', \
action='store_true')
parser.add_argument('--exclude-orgs', \
help='Exclude repos that are in user\'s orgs (this does not filter the ones you have)', \
action='store_true')
parser.add_argument('--include-org-forks', \
help='Include forked repos that are in user\'s orgs', \
action='store_true')
parser.add_argument('--include-public-org-repos', \
help='Include public repos that are in user\'s orgs', \
action='store_true')
parser.add_argument('--full-backup', \
help='Include all repository content', \
action='store_true')
parser.add_argument('--debug', \
help='Enable debugging output', \
action='store_true')
args = AttributeDict(vars(parser.parse_args()))
RepoUpdater(args).update()