#!/usr/bin/python # encoding: utf-8 import sys import os import argparse from collections import namedtuple from subprocess import Popen, PIPE from urllib2 import urlopen import json from urlparse import urlparse WHITELIST=[] BLACKLIST=[] ACCESS_TOKEN_PARAM = '?access_token=%s' LISTING_FIX_PARAM = '&per_page=150' GITHUB_API_HOST = 'https://api.github.com' GIT_CLONE_CMD = 'git clone %s %s %s' GIT_CLONE_API_URL = 'https://%s@github.com/%s' GIT_SHA_CMD = 'git rev-parse --short %s' GIT_FETCH_CMD = 'git fetch' GIT_CHECK_REMOTE_CMD = 'git ls-remote' USER_DETAILS_PATH = '/users/%s' DEFAULT_TOKEN_FILE = os.path.expanduser('~/.config/ghtoken') class Color: GREEN = "\033[1;32m" BLUE = "\033[1;34m" YELLOW = "\033[1;33m" RED = "\033[1;31m" END = "\033[0m" def get_color_str(text, color): return color + str(text) + Color.END def system_exec(command, directory=None, show_output=True, ignore_error=False): if not directory: directory = os.getcwd() try: process = Popen(command, stdout=PIPE, stderr=PIPE, shell=True, cwd=directory) (output, error) = process.communicate() output = output.strip() error = error.strip() if show_output and len(output) > 0: print output sys.stdout.flush() if process.returncode != 0 and not ignore_error: raise Exception(error) ReturnInfo = namedtuple('ReturnInfo', 'return_code output error') return ReturnInfo(process.returncode, output, error) except Exception as err: print >>sys.stderr, Color.RED + "Could not execute", command print >>sys.stderr, err, Color.END print >>sys.stderr, "Terminating early" exit(1) class AttributeDict(dict): def __getattr__(self, attr): return self[attr] def __setattr__(self, attr, value): self[attr] = value def read_api_uri(uri, config): uri += ACCESS_TOKEN_PARAM % config.token + LISTING_FIX_PARAM if config.debug: print "Trying:", uri return urlopen(uri).read() def get_json(uri, config, obj_type=AttributeDict): return json.loads(read_api_uri(uri, config), object_hook=obj_type) class GitHubRepo(AttributeDict): """Top-level class managing all content of a GitHub repository""" def __init__(self, *args, **kwargs): AttributeDict.__init__(self, *args, **kwargs) self.content = [] def configure(self, config): self.config = config # Initialize the content repos self.content.append(CodeRepo(self, config)) if self.config.full_backup: self.content.append(WikiRepo(self, config)) if self.has_issues: self.content.append(IssuesRepo(self, config)) self.content.append(MilestonesRepo(self, config)) self.content.append(PullsRepo(self, config)) # Teams are only available from user's repos if self.owner.login == self.config.username: self.content.append(TeamsRepo(self, config)) self.content.append(CommentsRepo(self, config)) self.content.append(ForksRepo(self, config)) def update(self): [repo.update() for repo in self.content] class CodeRepo(object): """GitHub code git repository""" def __init__(self, gh_repo, config): self.config = config # Copy the details we need from the JSON self.name = gh_repo.name self.full_name = gh_repo.full_name self.ssh_url = gh_repo.ssh_url self.default_branch = gh_repo.default_branch self.description = gh_repo.description self.target_directory = os.path.join(self.config.cwd, self.name) if self.config.mirror: self.target_directory += '.git' def try_clone(self, ignore_error=False): print "Using", self.target_directory if os.path.isdir(self.target_directory): return False print "Need to clone", self.name if self.config.mirror: clone_opts = '--mirror' else: clone_opts = '--recurse-submodules' if self.config.ssh: clone_url = self.ssh_url else: clone_url = GIT_CLONE_API_URL % (self.config.token, self.full_name) clone_cmd = GIT_CLONE_CMD % (clone_opts, clone_url, \ self.target_directory) # Create leading directories if necessary clone_dir = os.path.dirname(self.target_directory) if not os.path.isdir(clone_dir): os.makedirs(clone_dir) # Let the caller decide if errors should be ignored. return_code = system_exec(clone_cmd, ignore_error=ignore_error).return_code if ignore_error and return_code != 0: print "Repo for %s not initialized, skipping" % self.name return True print "Finished cloning" return True def get_sha_str(self, branch, directory): if self.config.mirror: sha_cmd = GIT_SHA_CMD % branch else: sha_cmd = GIT_SHA_CMD % 'origin/' + branch sha = system_exec(sha_cmd, directory, False, True).output return get_color_str(sha, Color.GREEN) def print_start_sha(self, branch, directory): print("- " + get_color_str(branch, Color.GREEN) + " @ " \ + self.get_sha_str(branch, directory) + ' ..'), def update(self, fatal_remote_errors = True): if self.try_clone(False): return # Check if the remote is still there before fetching return_code = system_exec(GIT_CHECK_REMOTE_CMD, self.target_directory, show_output=False, ignore_error=True).return_code if return_code != 0: # Is it fatal? if fatal_remote_errors: error_message = get_color_str("ERROR: Repo for %s seems to have been deleted. Skipping update." % self.full_name, Color.RED) print error_message raise Exception(error_message) else: print get_color_str("WARNING: Repo for %s seems to have been deleted. Skipping update." % self.full_name, Color.GREEN) return self.print_start_sha(self.default_branch, self.target_directory) system_exec(GIT_FETCH_CMD, self.target_directory, False) print self.get_sha_str(self.default_branch, self.target_directory) class WikiRepo(CodeRepo): """GitHub wiki git repository""" def __init__(self, gh_repo, config): CodeRepo.__init__(self, gh_repo, config) # Put the wiki under the project.extras/wiki directory self.target_directory = os.path.join(self.config.cwd, self.name + '.extras', 'wiki') if self.config.mirror: self.target_directory += '.git' # Adjust the name and hardcode master as the default branch self.name += '.wiki' self.full_name += '.wiki' self.ssh_url = self.ssh_url.rpartition('.git')[0] \ + '.wiki.git' self.default_branch = 'master' self.description += ' - Wiki' def try_clone(self, ignore_error=True): # This is sad. Github will tell us a repo has a wiki, but if the # wiki hasn't actually been initialized, it won't exist. Reuse the # GitRepo clone code, but allow it to fail. return CodeRepo.try_clone(self, True) def update(self): super(WikiRepo, self).update(fatal_remote_errors = False) class JsonRepo(object): def __init__(self, gh_repo, api_url, config, content=None): self.config = config # Copy the details we need from the JSON self.name = gh_repo.name self.full_name = gh_repo.full_name self.repo_url = gh_repo.url self.description = gh_repo.description # Sanitize the content url to strip the {/number} type markers self.content_url = api_url.split('{')[0] # Put downloaded content in project.extras directory self.content_directory = os.path.join(self.config.cwd, self.name + '.extras') # If the content type was supplied, adjust the names further if content is not None: self.name += '.%s' % content self.full_name += '.%s' % content self.description += ' - %s' % content.title() self.content_directory = os.path.join(self.content_directory, content) def download_file(self, url, suffix='.json'): if url == self.content_url: # Just use the content directory target = self.content_directory else: # Strip off base content URL path to get target base = os.path.relpath(urlparse(url).path, urlparse(self.content_url).path) target = os.path.join(self.content_directory, base) # Add suffix if specified if suffix: target += suffix target_directory = os.path.dirname(target) if not os.path.isdir(target_directory): os.makedirs(target_directory) if self.config.debug: print "Downloading %s" % target data = read_api_uri(url, self.config) with open(target, 'wb') as dl_file: dl_file.write(data) class IssuesRepo(JsonRepo): def __init__(self, gh_repo, config): JsonRepo.__init__(self, gh_repo, gh_repo.issues_url, config, 'issues') def update(self): print "Using", self.content_directory for issue in get_json(self.content_url, self.config): self.download_file(issue.url) self.download_file(issue.comments_url) self.download_file(issue.events_url) class PullsRepo(JsonRepo): def __init__(self, gh_repo, config): JsonRepo.__init__(self, gh_repo, gh_repo.pulls_url, config, 'pulls') def update(self): print "Using", self.content_directory for pull in get_json(self.content_url, self.config): self.download_file(pull.url) self.download_file(pull.commits_url) self.download_file(pull.comments_url) class MilestonesRepo(JsonRepo): def __init__(self, gh_repo, config): JsonRepo.__init__(self, gh_repo, gh_repo.milestones_url, config, 'milestones') def update(self): print "Using", self.content_directory for milestone in get_json(self.content_url, self.config): self.download_file(milestone.url) class TeamsRepo(JsonRepo): def __init__(self, gh_repo, config): JsonRepo.__init__(self, gh_repo, gh_repo.teams_url, config, 'teams') def update(self): print "Using", self.content_directory + '.json' self.download_file(self.content_url) class CommentsRepo(JsonRepo): def __init__(self, gh_repo, config): JsonRepo.__init__(self, gh_repo, gh_repo.comments_url, config, 'comments') def update(self): print "Using", self.content_directory + '.json' self.download_file(self.content_url) class ForksRepo(JsonRepo): def __init__(self, gh_repo, config): JsonRepo.__init__(self, gh_repo, gh_repo.forks_url, config, 'forks') def update(self): print "Using", self.content_directory + '.json' self.download_file(self.content_url) class RepoUpdater(object): def __init__(self, args): # Keep a copy of the arguments dictionary self.args = args # Use current directory if unspecified self.args.cwd = args.directory if args.directory else os.getcwd() # Read a token file if necessary if self.args.token is None: try: with open(self.args.token_file, 'r') as tfile: self.args.token = tfile.read().strip() except IOError as err: print >>sys.stderr, Color.RED + \ "Could not open token file %s: %s" \ % (self.args.token_file, err), \ Color.END print >>sys.stderr, "Terminating early" exit(1) if self.args.debug: print "Current configuration: " for arg in self.args: # We don't want this printed if arg != 'token': print arg, ":", get_color_str(args[arg], Color.GREEN) def update(self): if not os.path.isdir(self.args.cwd): os.makedirs(self.args.cwd) print "User:", get_color_str(self.args.username, Color.GREEN) user_data = self.get_user_data() repos, excluded_repos = self.get_repos(user_data.repos_url, user_data.organizations_url) repos = self.filter_repo_names(repos, excluded_repos) for repo in repos: print get_color_str('{:-^60}'.format(repo.name), Color.YELLOW) repo.update() def get_user_data(self): return get_json(GITHUB_API_HOST + USER_DETAILS_PATH % self.args.username, self.args) def get_own_repos(self, repos_url): repos = get_json(repos_url, self.args, GitHubRepo) [repo.configure(self.args) for repo in repos] repo_count = len(repos) if self.args.exclude_forks: repos = [repo for repo in repos if not repo.fork] return repos, repo_count def get_org_repos(self, orgs_url): all_orgs_repos = [] repo_count = 0 orgs = get_json(orgs_url, self.args) for org in orgs: org_repos = get_json(org.repos_url, self.args, GitHubRepo) [repo.configure(self.args) for repo in org_repos] repo_count += len(org_repos) if not self.args.include_public_org_repos: org_repos = [repo for repo in org_repos if repo.private] if not self.args.include_org_forks: org_repos = [repo for repo in org_repos if not repo.fork] all_orgs_repos += org_repos return all_orgs_repos, repo_count def get_repos(self, repos_url, orgs_url): repos = [] if self.args.debug: print "Getting repo data from", get_color_str(repos_url, Color.GREEN) own_repos, repo_count = self.get_own_repos(repos_url) if not self.args.exclude_own: repos += own_repos org_repos, org_repo_count = self.get_org_repos(orgs_url) if not self.args.exclude_orgs: repos += org_repos if self.args.debug: print "Available repos:", get_color_str(str(len(repos)), Color.GREEN) for repo in repos: owner = repo.owner.login print " -", get_color_str(repo.name, Color.YELLOW) print " " * 5, repo.description excluded_repos = repo_count + org_repo_count - len(repos) return repos, excluded_repos def filter_repo_names(self, repos, excluded_repos): original_repos = len(repos) if BLACKLIST: repos = [repo for repo in repos if not repo.name in BLACKLIST] if WHITELIST: repos = [repo for repo in repos if repo.name in WHITELIST] filtered_repos = original_repos - len(repos) ignored_repos_str = " (" + str(filtered_repos) + " filtered, " + str(excluded_repos) + " excluded)" repo_count_str = str(original_repos - filtered_repos) + " / " + str(original_repos) print "Fetching repos:", get_color_str(repo_count_str + ignored_repos_str, Color.GREEN) for repo in repos: owner = repo.owner.login print " -", Color.YELLOW + repo.name, Color.END print " " * 5, repo.description return repos if __name__ == '__main__': parser = argparse.ArgumentParser(description='Retrieve and/or update local copies of GitHub-hosted repos') parser.add_argument('username', \ help='GitHub username that will be used for cloning and fetching') parser.add_argument('token', \ nargs='?', \ help='GitHub auth token for that username. \ You can create one at https://github.com/settings/applications') parser.add_argument('--version', \ action='version', \ version='%(prog)s v0.7') parser.add_argument('-d', '--directory', \ help='Target directory for cloning and fetching') parser.add_argument('-t', '--token-file', \ default=DEFAULT_TOKEN_FILE, \ help='File containing the github token') parser.add_argument('-s', '--ssh', \ help='Fetch repositories using ssh', \ action='store_true') parser.add_argument('-m', '--mirror', \ help='Mirror bare repositories instead of making full checkouts', \ action='store_true') parser.add_argument('-x', '--exclude-own', \ help='Exclude own repositories in the updates', \ action='store_true') parser.add_argument('--exclude-forks', \ help='Exclude forked repositories from the updates', \ action='store_true') parser.add_argument('--exclude-orgs', \ help='Exclude repos that are in user\'s orgs (this does not filter the ones you have)', \ action='store_true') parser.add_argument('--include-org-forks', \ help='Include forked repos that are in user\'s orgs', \ action='store_true') parser.add_argument('--include-public-org-repos', \ help='Include public repos that are in user\'s orgs', \ action='store_true') parser.add_argument('--full-backup', \ help='Include all repository content', \ action='store_true') parser.add_argument('--debug', \ help='Enable debugging output', \ action='store_true') args = AttributeDict(vars(parser.parse_args())) RepoUpdater(args).update()