Skip to content
Snippets Groups Projects
Select Git revision
  • 95eb6afe6dce2b9e1f6a81f114549d57f40c81a5
  • master default
  • host01
  • ns4
  • kodi
5 results

.env

Blame
  • update_repos 18.46 KiB
    #!/usr/bin/python
    # encoding: utf-8
    
    import sys
    import os
    import argparse
    from collections import namedtuple
    from subprocess import Popen, PIPE
    
    from urllib2 import urlopen
    import json
    from urlparse import urlparse
    
    WHITELIST=[]
    BLACKLIST=[]
    
    LISTINGS_PER_PAGE = 100
    ACCESS_TOKEN_PARAM = '?access_token=%s'
    LISTING_PAGE_PARAM = '&per_page=%d&page=%d'
    GITHUB_API_HOST = 'https://api.github.com'
    
    GIT_CLONE_CMD = 'git clone %s %s %s'
    GIT_CLONE_API_URL = 'https://%s@github.com/%s'
    GIT_SHA_CMD = 'git rev-parse --short %s'
    GIT_FETCH_CMD = 'git fetch'
    GIT_CHECK_REMOTE_CMD = 'git ls-remote'
    
    USER_DETAILS_PATH = '/users/%s'
    USER_ORG_DETAILS_PATH = '/user/orgs'
    
    DEFAULT_TOKEN_FILE = os.path.expanduser('~/.config/ghtoken')
    
    class Color:
        GREEN = "\033[1;32m"
        BLUE = "\033[1;34m"
        YELLOW = "\033[1;33m"
        RED = "\033[1;31m"
        END = "\033[0m"
    
    def get_color_str(text, color):
        return color + str(text) + Color.END
    
    def system_exec(command, directory=None, show_output=True, ignore_error=False):
        if not directory:
            directory = os.getcwd()
    
        try:
            process = Popen(command, stdout=PIPE, stderr=PIPE, shell=True, cwd=directory)
            (output, error) = process.communicate()
            output = output.strip()
            error = error.strip()
    
            if show_output and len(output) > 0:
                print output
                sys.stdout.flush()
    
            if process.returncode != 0 and not ignore_error:
                raise Exception(error)
    
            ReturnInfo = namedtuple('ReturnInfo', 'return_code output error')
            return ReturnInfo(process.returncode, output, error)
    
        except Exception as err:
            print >>sys.stderr, Color.RED + "Could not execute", command
            print >>sys.stderr, err, Color.END
            print >>sys.stderr, "Terminating early"
            exit(1)
    
    class AttributeDict(dict):
        def __getattr__(self, attr):
            return self[attr]
        def __setattr__(self, attr, value):
            self[attr] = value
    
    def read_api_uri(uri, config, page=1):
        uri += ACCESS_TOKEN_PARAM % config.token + LISTING_PAGE_PARAM % (LISTINGS_PER_PAGE, page)
    
        if config.debug:
            print "Trying:", uri
    
        return urlopen(uri).read()
    
    def get_json(uri, config, obj_type=AttributeDict, page=1):
        return json.loads(read_api_uri(uri, config, page), object_hook=obj_type)
    
    class GitHubRepo(AttributeDict):
        """Top-level class managing all content of a GitHub repository"""
        def __init__(self, *args, **kwargs):
            AttributeDict.__init__(self, *args, **kwargs)
            self.content = []
    
        def configure(self, config):
            self.config = config
    
            # Initialize the content repos
            self.content.append(CodeRepo(self, config))
            if self.config.full_backup:
                self.content.append(WikiRepo(self, config))
                if self.has_issues:
                    self.content.append(IssuesRepo(self, config))
                    self.content.append(MilestonesRepo(self, config))
                self.content.append(PullsRepo(self, config))
                # Teams are only available from user's repos
                if self.owner.login == self.config.username:
                    self.content.append(TeamsRepo(self, config))
                self.content.append(CommentsRepo(self, config))
                self.content.append(ForksRepo(self, config))
    
        def update(self):
            [repo.update() for repo in self.content]
    
    class CodeRepo(object):
        """GitHub code git repository"""
        def __init__(self, gh_repo, config):
            self.config = config
    
            # Copy the details we need from the JSON
            self.name = gh_repo.name
            self.full_name = gh_repo.full_name
            self.ssh_url = gh_repo.ssh_url
            self.default_branch = gh_repo.default_branch
            self.description = gh_repo.description or ""
    
            self.target_directory = os.path.join(self.config.cwd, self.name)
            if self.config.mirror:
                self.target_directory += '.git'
    
        def try_clone(self, ignore_error=False):
            print "Using", self.target_directory
            if os.path.isdir(self.target_directory):
                return False
    
            print "Need to clone", self.name
            if self.config.mirror:
                clone_opts = '--mirror'
            else:
                clone_opts = '--recurse-submodules'
            if self.config.ssh:
                clone_url = self.ssh_url
            else:
                clone_url = GIT_CLONE_API_URL % (self.config.token,
                                                 self.full_name)
            clone_cmd = GIT_CLONE_CMD % (clone_opts, clone_url, \
                                         self.target_directory)
    
            # Create leading directories if necessary
            clone_dir = os.path.dirname(self.target_directory)
            if not os.path.isdir(clone_dir):
                os.makedirs(clone_dir)
    
            # Let the caller decide if errors should be ignored.
            return_code = system_exec(clone_cmd, ignore_error=ignore_error).return_code
            if ignore_error and return_code != 0:
                print "Repo for %s not initialized, skipping" % self.name
                return True
    
            print "Finished cloning"
            return True
    
        def get_sha_str(self, branch, directory):
            if self.config.mirror:
                sha_cmd = GIT_SHA_CMD % branch
            else:
                sha_cmd = GIT_SHA_CMD % 'origin/' + branch
            sha = system_exec(sha_cmd, directory, False, True).output
            return get_color_str(sha, Color.GREEN)
    
        def print_start_sha(self, branch, directory):
            print("- " + get_color_str(branch, Color.GREEN) + " @ " \
                    + self.get_sha_str(branch, directory) + ' ..'),
    
        def update(self, fatal_remote_errors = True):
            if self.try_clone(False):
                return
    
            # Check if the remote is still there before fetching
            return_code = system_exec(GIT_CHECK_REMOTE_CMD, self.target_directory, show_output=False, ignore_error=True).return_code
            if return_code != 0:
                # Is it fatal?
                if fatal_remote_errors:
                    error_message = get_color_str("ERROR: Repo for %s seems to have been deleted. Skipping update." % self.full_name, Color.RED)
                    print error_message
                    raise Exception(error_message)
                else:
                    print get_color_str("WARNING: Repo for %s seems to have been deleted. Skipping update." % self.full_name, Color.GREEN)
                    return
    
            self.print_start_sha(self.default_branch, self.target_directory)
            system_exec(GIT_FETCH_CMD, self.target_directory, False)
            print self.get_sha_str(self.default_branch, self.target_directory)
    
    class WikiRepo(CodeRepo):
        """GitHub wiki git repository"""
        def __init__(self, gh_repo, config):
            CodeRepo.__init__(self, gh_repo, config)
    
            # Put the wiki under the project.extras/wiki directory
            self.target_directory = os.path.join(self.config.cwd,
                                                 self.name + '.extras',
                                                 'wiki')
            if self.config.mirror:
                self.target_directory += '.git'
    
            # Adjust the name and hardcode master as the default branch
            self.name += '.wiki'
            self.full_name += '.wiki'
            self.ssh_url = self.ssh_url.rpartition('.git')[0] \
                           + '.wiki.git'
            self.default_branch = 'master'
    
            if self.description == None:
                self.description = gh_repo.name
    
            self.description += ' - Wiki'
    
        def try_clone(self, ignore_error=True):
            # This is sad. Github will tell us a repo has a wiki, but if the
            # wiki hasn't actually been initialized, it won't exist. Reuse the
            # GitRepo clone code, but allow it to fail.
            return CodeRepo.try_clone(self, True)
    
        def update(self):
            super(WikiRepo, self).update(fatal_remote_errors = False)
    
    class JsonRepo(object):
        def __init__(self, gh_repo, api_url, config, content=None):
            self.config = config
    
            # Copy the details we need from the JSON
            self.name = gh_repo.name
            self.full_name = gh_repo.full_name
            self.repo_url = gh_repo.url
            self.description = gh_repo.description or ""
    
            # Sanitize the content url to strip the {/number} type markers
            self.content_url = api_url.split('{')[0]
    
            # Put downloaded content in project.extras directory
            self.content_directory = os.path.join(self.config.cwd,
                                                  self.name + '.extras')
    
            # If the content type was supplied, adjust the names further
            if content is not None:
                self.name += '.%s' % content
                self.full_name += '.%s' % content
                self.description += ' - %s' % content.title()
                self.content_directory = os.path.join(self.content_directory,
                                                      content)
    
        def download_file(self, url, suffix='.json'):
            if url == self.content_url:
                # Just use the content directory
                target = self.content_directory
            else:
                # Strip off base content URL path to get target
                base = os.path.relpath(urlparse(url).path,
                                       urlparse(self.content_url).path)
                target = os.path.join(self.content_directory, base)
    
            # Add suffix if specified
            if suffix:
                target += suffix
    
            target_directory = os.path.dirname(target)
            if not os.path.isdir(target_directory):
                os.makedirs(target_directory)
    
            if self.config.debug:
                print "Downloading %s" % target
            data = read_api_uri(url, self.config)
            with open(target, 'wb') as dl_file:
                dl_file.write(data)
    
    class IssuesRepo(JsonRepo):
        def __init__(self, gh_repo, config):
            JsonRepo.__init__(self, gh_repo, gh_repo.issues_url, config, 'issues')
    
        def update(self):
            print "Using", self.content_directory
            for issue in get_json(self.content_url, self.config):
                self.download_file(issue.url)
                self.download_file(issue.comments_url)
                self.download_file(issue.events_url)
    
    class PullsRepo(JsonRepo):
        def __init__(self, gh_repo, config):
            JsonRepo.__init__(self, gh_repo, gh_repo.pulls_url, config, 'pulls')
    
        def update(self):
            print "Using", self.content_directory
            for pull in get_json(self.content_url, self.config):
                self.download_file(pull.url)
                self.download_file(pull.commits_url)
                self.download_file(pull.comments_url)
    
    class MilestonesRepo(JsonRepo):
        def __init__(self, gh_repo, config):
            JsonRepo.__init__(self, gh_repo, gh_repo.milestones_url, config,
                              'milestones')
    
        def update(self):
            print "Using", self.content_directory
            for milestone in get_json(self.content_url, self.config):
                self.download_file(milestone.url)
    
    class TeamsRepo(JsonRepo):
        def __init__(self, gh_repo, config):
            JsonRepo.__init__(self, gh_repo, gh_repo.teams_url, config, 'teams')
    
        def update(self):
            print "Using", self.content_directory + '.json'
            self.download_file(self.content_url)
    
    class CommentsRepo(JsonRepo):
        def __init__(self, gh_repo, config):
            JsonRepo.__init__(self, gh_repo, gh_repo.comments_url, config,
                              'comments')
    
        def update(self):
            print "Using", self.content_directory + '.json'
            self.download_file(self.content_url)
    
    class ForksRepo(JsonRepo):
        def __init__(self, gh_repo, config):
            JsonRepo.__init__(self, gh_repo, gh_repo.forks_url, config, 'forks')
    
        def update(self):
            print "Using", self.content_directory + '.json'
            self.download_file(self.content_url)
    
    class RepoUpdater(object):
        def __init__(self, args):
            # Keep a copy of the arguments dictionary
            self.args = args
    
            # Use current directory if unspecified
            self.args.cwd = args.directory if args.directory else os.getcwd()
    
            # Read a token file if necessary
            if self.args.token is None:
                try:
                    with open(self.args.token_file, 'r') as tfile:
                        self.args.token = tfile.read().strip()
                except IOError as err:
                    print >>sys.stderr, Color.RED + \
                        "Could not open token file %s: %s" \
                        % (self.args.token_file, err), \
                        Color.END
                    print >>sys.stderr, "Terminating early"
                    exit(1)
    
            if self.args.debug:
                print "Current configuration: "
                for arg in self.args:
                    # We don't want this printed
                    if arg != 'token':
                        print arg, ":", get_color_str(args[arg], Color.GREEN)
    
        def update(self):
            if not os.path.isdir(self.args.cwd):
                os.makedirs(self.args.cwd)
    
            print "User:", get_color_str(self.args.username, Color.GREEN)
            user_data = self.get_user_data()
    
            repos, excluded_repos = self.get_repos(user_data.repos_url, GITHUB_API_HOST + USER_ORG_DETAILS_PATH)
            repos = self.filter_repo_names(repos, excluded_repos)
            for repo in repos:
                print get_color_str('{:-^60}'.format(repo.name), Color.YELLOW)
                repo.update()
    
        def get_user_data(self):
            return get_json(GITHUB_API_HOST + USER_DETAILS_PATH
                            % self.args.username, self.args)
    
        def get_own_repos(self, repos_url):
            repos = self._get_paged_repos(repos_url, self.args, GitHubRepo)
    
            [repo.configure(self.args) for repo in repos]
            repo_count = len(repos)
    
            if self.args.exclude_forks:
                repos = [repo for repo in repos if not repo.fork]
    
            return repos, repo_count
    
        def get_org_repos(self, orgs_url):
            all_orgs_repos = []
            repo_count = 0
    
            orgs = get_json(orgs_url, self.args)
            for org in orgs:
                org_repos = self._get_paged_repos(org.repos_url, self.args, GitHubRepo)
    
                [repo.configure(self.args) for repo in org_repos]
                repo_count += len(org_repos)
    
                if not self.args.include_public_org_repos:
                    org_repos = [repo for repo in org_repos if repo.private]
                if not self.args.include_org_forks:
                    org_repos = [repo for repo in org_repos if not repo.fork]
    
                all_orgs_repos += org_repos
    
            return all_orgs_repos, repo_count
    
        def _get_paged_repos(self, url, args, clazz):
            repos_page = get_json(url, args, clazz)
            repos = repos_page
    
            if self.args.debug:
                print "Retrieved %d entries from %s" % (len(repos), url )
    
            page = 2
            while len(repos_page) >= LISTINGS_PER_PAGE:
                repos_page = get_json(url, args, clazz, page)
    
                if self.args.debug:
                    print "Retrieved %d entries from %s" % (len(repos_page), url)
    
                repos += repos_page
                page += 1
    
            return repos
    
        def get_repos(self, repos_url, orgs_url):
            repos = []
            if self.args.debug:
                print "Getting repo data from", get_color_str(repos_url, Color.GREEN)
    
            own_repos, repo_count = self.get_own_repos(repos_url)
            if not self.args.exclude_own:
                repos += own_repos
    
            org_repos, org_repo_count = self.get_org_repos(orgs_url)
            if not self.args.exclude_orgs:
                repos += org_repos
    
            if self.args.debug:
                print "Available repos:", get_color_str(str(len(repos)), Color.GREEN)
                for repo in repos:
                    owner = repo.owner.login
                    print " -", get_color_str(repo.name, Color.YELLOW)
                    print " " * 5, repo.description
    
            excluded_repos = repo_count + org_repo_count - len(repos)
            return repos, excluded_repos
    
        def filter_repo_names(self, repos, excluded_repos):
            original_repos = len(repos)
    
            if BLACKLIST:
                repos = [repo for repo in repos if not repo.name in BLACKLIST]
    
            if WHITELIST:
                repos = [repo for repo in repos if repo.name in WHITELIST]
    
            filtered_repos = original_repos - len(repos)
    
            ignored_repos_str = " (" + str(filtered_repos) + " filtered, " + str(excluded_repos) + " excluded)"
            repo_count_str = str(original_repos - filtered_repos) + " / " + str(original_repos)
            print "Fetching repos:", get_color_str(repo_count_str + ignored_repos_str, Color.GREEN)
    
            for repo in repos:
                owner = repo.owner.login
                print " -", Color.YELLOW + repo.name, Color.END
                print " " * 5, repo.description
    
            return repos
    
    if __name__ == '__main__':
        parser = argparse.ArgumentParser(description='Retrieve and/or update local copies of GitHub-hosted repos')
    
        parser.add_argument('username', \
                help='GitHub username that will be used for cloning and fetching')
        parser.add_argument('token', \
                nargs='?', \
                help='GitHub auth token for that username. \
                You can create one at https://github.com/settings/applications')
        parser.add_argument('--version', \
                action='version', \
                version='%(prog)s v0.7')
    
        parser.add_argument('-d', '--directory', \
                help='Target directory for cloning and fetching')
    
        parser.add_argument('-t', '--token-file', \
                default=DEFAULT_TOKEN_FILE, \
                help='File containing the github token')
        parser.add_argument('-s', '--ssh', \
                help='Fetch repositories using ssh', \
                action='store_true')
        parser.add_argument('-m', '--mirror', \
                help='Mirror bare repositories instead of making full checkouts', \
                action='store_true')
        parser.add_argument('-x', '--exclude-own', \
                help='Exclude own repositories in the updates', \
                action='store_true')
        parser.add_argument('--exclude-forks', \
                help='Exclude forked repositories from the updates', \
                action='store_true')
        parser.add_argument('--exclude-orgs', \
                help='Exclude repos that are in user\'s orgs (this does not filter the ones you have)', \
                action='store_true')
        parser.add_argument('--include-org-forks', \
                help='Include forked repos that are in user\'s orgs', \
                action='store_true')
        parser.add_argument('--include-public-org-repos', \
                help='Include public repos that are in user\'s orgs', \
                action='store_true')
        parser.add_argument('--full-backup', \
                help='Include all repository content', \
                action='store_true')
    
        parser.add_argument('--debug', \
                help='Enable debugging output', \
                action='store_true')
    
        args = AttributeDict(vars(parser.parse_args()))
    
        RepoUpdater(args).update()