Skip to content
Snippets Groups Projects
Commit 7ac51848 authored by nimrod's avatar nimrod
Browse files

Registry backup script.

Meant to be run on a different machine. It can be used on the same
machine and then the resulting files can be copied over.
parent be50d792
No related branches found
No related tags found
No related merge requests found
Pipeline #2309 failed
#!/usr/bin/env python3
"""Backup a container image registry."""
import argparse
import pathlib
import sys
import docker
import requests
def get_images(registry):
"""Return a list of images from the registry."""
return requests.get(f"https://{registry}/v2/_catalog").json()[
"repositories"
]
def get_image_tags(registry, image):
"""Return a list of tags for an image in the registry."""
return requests.get(f"https://{registry}/v2/{image}/tags/list").json()[
"tags"
]
def backup(docker_client, registry, image, tag, dest):
"""Backup an image tag from the registry to a file in the destination
directory."""
full_name = f"{registry}/{image}:{tag}"
print(f"Backing up {full_name}.", file=sys.stderr)
try:
docker_client.images.get(full_name)
image_existed = True
except docker.errors.ImageNotFound:
image_existed = False
docker_image = docker_client.images.pull(full_name)
with open(dest / f"{tag}.tar", "wb") as tarball:
for chunk in docker_image.save():
tarball.write(chunk)
if not image_existed:
docker_client.images.remove(full_name)
def backup_registry(registry, dest):
"""Backup the images in the registry to the destination."""
docker_client = docker.from_env(timeout=90)
docker_client.ping()
for image in get_images(registry):
(dest / image).mkdir(exist_ok=True)
for tag in get_image_tags(registry, image):
backup(docker_client, registry, image, tag, dest / image)
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser(description=__doc__)
arg_parser.add_argument("registry", help="FQDN of the registry to backup.")
arg_parser.add_argument(
"destination", help="Location to store the images.", type=pathlib.Path
)
args = arg_parser.parse_args()
destination = args.destination.expanduser()
if not destination.exists() or not destination.is_dir():
arg_parser.error(
"Backup destination doesn't exists or isn't a directory."
)
try:
destination.touch()
except IsADirectoryError:
arg_parser.error("Can't write to the backup destination.")
try:
backup_registry(args.registry, destination)
except Exception as exception:
arg_parser.error(str(exception))
sys.exit()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment