Registry v1 Pull Flow

| 分类 技术  | 标签 Docker-registry  Docker 

本文从源码角度描述Registry v1 Pull Flow……

如下是docker pull流程:

Pulling repository xxxx/library/centos
333333333333: Download complete 
111111111111: Download complete 
222222222222: Download complete 
Status: Downloaded newer image for xxxx/library/centos:latest

"GET /v1/_ping HTTP/1.1"
"GET /v1/repositories/library/centos/images HTTP/1.1"
"GET /v1/repositories/library/centos/tags HTTP/1.1"
"GET /v1/images/3333333333333333333333333333333333333333333333333333333333333333/ancestry HTTP/1.1"
"GET /v1/images/1111111111111111111111111111111111111111111111111111111111111111/json HTTP/1.1"
"GET /v1/images/1111111111111111111111111111111111111111111111111111111111111111/layer HTTP/1.1"
"GET /v1/images/2222222222222222222222222222222222222222222222222222222222222222/json HTTP/1.1"
"GET /v1/images/2222222222222222222222222222222222222222222222222222222222222222/layer HTTP/1.1"
"GET /v1/images/3333333333333333333333333333333333333333333333333333333333333333/json HTTP/1.1"
"GET /v1/images/3333333333333333333333333333333333333333333333333333333333333333/layer HTTP/1.1"

GET /v1/_ping

请求如下:

@app.route('/_ping')
@app.route('/v1/_ping')
def ping():
    headers = {
        'X-Docker-Registry-Standalone': 'mirror' if mirroring.is_mirror()
                                        else (cfg.standalone is True)
    }
    infos = {}
    if cfg.debug:
        # Versions
        versions = infos['versions'] = {}
        headers['X-Docker-Registry-Config'] = cfg.flavor

        for name, module in sys.modules.items():
            if name.startswith('_'):
                continue
            try:
                version = module.__version__
            except AttributeError:
                continue
            versions[name] = version
        versions['python'] = sys.version

        # Hosts infos
        infos['host'] = platform.uname()
        infos['launch'] = sys.argv

    return toolkit.response(infos, headers=headers)

GET /v1/repositories/library/centos/images

请求结果如下:

curl http://xxxx/v1/repositories/library/centos/images
[{"id": "3333333333333333333333333333333333333333333333333333333333333333"}, {"id": "2222222222222222222222222222222222222222222222222222222222222222"}, {"id": "1111111111111111111111111111111111111111111111111111111111111111"}]

对应代码:

@app.route('/v1/repositories/<path:repository>/images', methods=['GET'])
@toolkit.parse_repository_name
@toolkit.requires_auth
@mirroring.source_lookup(index_route=True)
def get_repository_images(namespace, repository):
    data = None
    try:
        path = store.index_images_path(namespace, repository)
        data = store.get_content(path)
    except exceptions.FileNotFoundError:
        return toolkit.api_error('images not found', 404)
    headers = generate_headers(namespace, repository, 'read')
    return toolkit.response(data, 200, headers, True)

@filter_args
    def index_images_path(self, namespace, repository):
        repository_path = self._repository_path(
            namespace=namespace, repository=repository)
        return '{0}/_index_images'.format(repository_path)

也即请求对应repositories/library/centos/_index_images文件,若为mirror,则执行如下lookup_source函数:

def source_lookup(cache=False, stream=False, index_route=False,
                  merge_results=False):
    def decorator(f):
        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            mirroring_cfg = cfg.mirroring
            resp = f(*args, **kwargs)
            if not is_mirror():
                return resp
            source = mirroring_cfg.source
            if index_route and mirroring_cfg.source_index:
                source = mirroring_cfg.source_index
            logger.debug('Source provided, registry acts as mirror')
            if resp.status_code != 404 and not merge_results:
                logger.debug('Status code is not 404, no source '
                             'lookup required')
                return resp
            source_resp = lookup_source(
                flask.request.path, stream=stream, source=source
            )
            if not source_resp:
                return resp

            store = storage.load()

            headers = _response_headers(source_resp.headers)
            if index_route and 'x-docker-endpoints' in headers:
                headers['x-docker-endpoints'] = toolkit.get_endpoints()

            if not stream:
                logger.debug('JSON data found on source, writing response')
                resp_data = source_resp.content
                if merge_results:
                    mjson = json.loads(resp_data)
                    pjson = json.loads(resp.data)
                    for mr in mjson["results"]:
                        replaced = False
                        for pi, pr in enumerate(pjson["results"]):
                            if pr["name"] == mr["name"]:
                                pjson["results"][pi] = mr
                                replaced = True
                        if not replaced:
                            pjson["results"].extend([mr])
                    pjson['num_results'] = len(pjson["results"])
                    resp_data = json.dumps(pjson)
                if cache:
                    store_mirrored_data(
                        resp_data, flask.request.url_rule.rule, kwargs,
                        store
                    )
                return toolkit.response(
                    data=resp_data,
                    headers=headers,
                    raw=True
                )
            logger.debug('Layer data found on source, preparing to '
                         'stream response...')
            layer_path = store.image_layer_path(kwargs['image_id'])
            return _handle_mirrored_layer(source_resp, layer_path, store,
                                          headers)

        return wrapper
    return decorator

执行流程(local filesystem):检查本地有无_index_images文件,若不存在则从source请求该文件(事实上就是从source请求该文件,因为cacheFalse

GET /v1/repositories/library/centos/tags

请求结果如下:

curl http://xxxx/v1/repositories/library/centos/tags

{"latest": "3333333333333333333333333333333333333333333333333333333333333333"}

对应代码:

@app.route('/v1/repositories/<path:repository>/tags', methods=['GET'])
@toolkit.parse_repository_name
@toolkit.requires_auth
@mirroring.source_lookup_tag
def _get_tags(namespace, repository):
    logger.debug("[get_tags] namespace={0}; repository={1}".format(namespace,
                 repository))
    try:
        data = get_tags(namespace=namespace, repository=repository)
    except exceptions.FileNotFoundError:
        return toolkit.api_error('Repository not found', 404)
    return toolkit.response(data)

def get_tags(namespace, repository):
    tag_path = store.tag_path(namespace, repository)
    greenlets = {}
    for fname in store.list_directory(tag_path):
        full_tag_name = fname.split('/').pop()
        if not full_tag_name.startswith('tag_'):
            continue
        tag_name = full_tag_name[4:]
        greenlets[tag_name] = gevent.spawn(
            store.get_content,
            store.tag_path(namespace, repository, tag_name),
        )
    gevent.joinall(greenlets.values())
    return dict((k, g.value) for (k, g) in greenlets.items())

@filter_args
def tag_path(self, namespace, repository, tagname=None):
    repository_path = self._repository_path(
        namespace=namespace, repository=repository)
    if not tagname:
        return repository_path
    return '{0}/tag_{1}'.format(repository_path, tagname)

# Useful if we want to change those locations later without rewriting
# the code which uses Storage
repositories = 'repositories'
images = 'images'

def _repository_path(self, namespace, repository):
    return '{0}/{1}/{2}'.format(
        self.repositories, namespace, repository)

执行流程:

1、列举出目录repositories/library/centos下的所有文件

2、请求每个以tag_开头的文件内容

3、将结果合并

若为mirror,则执行如下lookup_source_tag函数:

def source_lookup_tag(f):
    @functools.wraps(f)
    def wrapper(namespace, repository, *args, **kwargs):
        mirroring_cfg = cfg.mirroring
        resp = f(namespace, repository, *args, **kwargs)
        if not is_mirror():
            return resp
        source = mirroring_cfg.source
        tags_cache_ttl = mirroring_cfg.tags_cache_ttl

        if resp.status_code != 404:
            logger.debug('Status code is not 404, no source '
                         'lookup required')
            return resp

        if not cache.redis_conn:
            # No tags cache, just return
            logger.warning('mirroring: Tags cache is disabled, please set a '
                           'valid `cache\' directive in the config.')
            source_resp = lookup_source(
                flask.request.path, stream=False, source=source
            )
            if not source_resp:
                return resp

            headers = _response_headers(source_resp.headers)
            return toolkit.response(data=source_resp.content, headers=headers,
                                    raw=True)

        store = storage.load()
        request_path = flask.request.path

        if request_path.endswith('/tags'):
            # client GETs a list of tags
            tag_path = store.tag_path(namespace, repository)
        else:
            # client GETs a single tag
            tag_path = store.tag_path(namespace, repository, kwargs['tag'])

        try:
            data = cache.redis_conn.get('{0}:{1}'.format(
                cache.cache_prefix, tag_path
            ))
        except cache.redis.exceptions.ConnectionError as e:
            data = None
            logger.warning("Diff queue: Redis connection error: {0}".format(
                e
            ))

        if data is not None:
            return toolkit.response(data=data, raw=True)
        source_resp = lookup_source(
            flask.request.path, stream=False, source=source
        )
        if not source_resp:
            return resp
        data = source_resp.content
        headers = _response_headers(source_resp.headers)
        try:
            cache.redis_conn.setex('{0}:{1}'.format(
                cache.cache_prefix, tag_path
            ), tags_cache_ttl, data)
        except cache.redis.exceptions.ConnectionError as e:
            logger.warning("Diff queue: Redis connection error: {0}".format(
                e
            ))

        return toolkit.response(data=data, headers=headers,
                                raw=True)
    return wrapper

def list_directory(self, path=None):
    prefix = ''
    if path:
        prefix = '%s/' % path
    path = self._init_path(path)
    exists = False
    try:
        for d in os.listdir(path):
            exists = True
            yield prefix + d
    except Exception:
        pass
    if not exists:
        raise exceptions.FileNotFoundError('%s is not there' % path)

执行流程(local filesystem):检查本地有无repositories/library/centos目录,若不存在则向source发出该请求(事实上就是从source发出该请求,cache.redis_connNone

GET /v1/images/3333333333333333333333333333333333333333333333333333333333333333/ancestry

请求结果如下:

curl http://xxxx/v1/images/3333333333333333333333333333333333333333333333333333333333333333/ancestry
["3333333333333333333333333333333333333333333333333333333333333333", "2222222222222222222222222222222222222222222222222222222222222222", "1111111111111111111111111111111111111111111111111111111111111111"]

对应代码:

@app.route('/v1/images/<image_id>/ancestry', methods=['GET'])
@toolkit.requires_auth
@toolkit.valid_image_id
@require_completion
@set_cache_headers
@mirroring.source_lookup(cache=True, stream=False)
def get_image_ancestry(image_id, headers):
    ancestry_path = store.image_ancestry_path(image_id)
    try:
        # Note(dmp): unicode patch
        data = store.get_json(ancestry_path)
    except exceptions.FileNotFoundError:
        return toolkit.api_error('Image not found', 404)
    return toolkit.response(data, headers=headers)

@filter_args
def image_ancestry_path(self, image_id):
    return '{0}/{1}/ancestry'.format(self.images, image_id)

对应文件:images/3333333333333333333333333333333333333333333333333333333333333333/ancestry,若为mirror,则执行:

def source_lookup(cache=False, stream=False, index_route=False,
                  merge_results=False):
    def decorator(f):
        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            mirroring_cfg = cfg.mirroring
            resp = f(*args, **kwargs)
            if not is_mirror():
                return resp
            source = mirroring_cfg.source
            if index_route and mirroring_cfg.source_index:
                source = mirroring_cfg.source_index
            logger.debug('Source provided, registry acts as mirror')
            if resp.status_code != 404 and not merge_results:
                logger.debug('Status code is not 404, no source '
                             'lookup required')
                return resp
            source_resp = lookup_source(
                flask.request.path, stream=stream, source=source
            )
            if not source_resp:
                return resp

            store = storage.load()

            headers = _response_headers(source_resp.headers)
            if index_route and 'x-docker-endpoints' in headers:
                headers['x-docker-endpoints'] = toolkit.get_endpoints()

            if not stream:
                logger.debug('JSON data found on source, writing response')
                resp_data = source_resp.content
                if merge_results:
                    mjson = json.loads(resp_data)
                    pjson = json.loads(resp.data)
                    for mr in mjson["results"]:
                        replaced = False
                        for pi, pr in enumerate(pjson["results"]):
                            if pr["name"] == mr["name"]:
                                pjson["results"][pi] = mr
                                replaced = True
                        if not replaced:
                            pjson["results"].extend([mr])
                    pjson['num_results'] = len(pjson["results"])
                    resp_data = json.dumps(pjson)
                if cache:
                    store_mirrored_data(
                        resp_data, flask.request.url_rule.rule, kwargs,
                        store
                    )
                return toolkit.response(
                    data=resp_data,
                    headers=headers,
                    raw=True
                )
            logger.debug('Layer data found on source, preparing to '
                         'stream response...')
            layer_path = store.image_layer_path(kwargs['image_id'])
            return _handle_mirrored_layer(source_resp, layer_path, store,
                                          headers)

        return wrapper
    return decorator

执行流程(local filesystem):

1、检查本地是否存在images/3333333333333333333333333333333333333333333333333333333333333333/ancestry

2、若不存在,则向source发出请求获取文件内容

3、将获取的文件内容存储到本地文件系统

GET /v1/images/1111111111111111111111111111111111111111111111111111111111111111/json

请求结果如下:

@app.route('/v1/images/<image_id>/json', methods=['GET'])
@toolkit.requires_auth
@toolkit.valid_image_id
@require_completion
@set_cache_headers
@mirroring.source_lookup(cache=True, stream=False)
def get_image_json(image_id, headers):
    try:
        repository = toolkit.get_repository()
        if repository and store.is_private(*repository):
            if not toolkit.validate_parent_access(image_id):
                return toolkit.api_error('Image not found', 404)
        # If no auth token found, either standalone registry or privileged
        # access. In both cases, access is always "public".
        return _get_image_json(image_id, headers)
    except exceptions.FileNotFoundError:
        return toolkit.api_error('Image not found', 404)

def _get_image_json(image_id, headers=None):
    if headers is None:
        headers = {}
    data = store.get_content(store.image_json_path(image_id))
    try:
        size = store.get_size(store.image_layer_path(image_id))
        headers['X-Docker-Size'] = str(size)
    except exceptions.FileNotFoundError:
        pass
    try:
        csums = load_checksums(image_id)
        headers['X-Docker-Checksum-Payload'] = csums
    except exceptions.FileNotFoundError:
        pass
    return toolkit.response(data, headers=headers, raw=True)

@filter_args
def image_json_path(self, image_id):
    return '{0}/{1}/json'.format(self.images, image_id)

@filter_args
def image_layer_path(self, image_id):
    return '{0}/{1}/layer'.format(self.images, image_id)

def load_checksums(image_id):
    checksum_path = store.image_checksum_path(image_id)
    data = store.get_content(checksum_path)
    try:
        # Note(dmp): unicode patch NOT applied here
        return json.loads(data)
    except ValueError:
        # NOTE(sam): For backward compatibility only, existing data may not be
        # a valid json but a simple string.
        return [data]

@filter_args
def image_checksum_path(self, image_id):
    return '{0}/{1}/_checksum'.format(self.images, image_id)

执行流程:

1、获取images/1111111111111111111111111111111111111111111111111111111111111111/json文件内容

2、获取images/1111111111111111111111111111111111111111111111111111111111111111/layer文件大小,并添加到X-Docker-Size回应报头字段

3、获取images/1111111111111111111111111111111111111111111111111111111111111111/_checksum文件内容,并添加到X-Docker-Checksum-Payload回应报头字段

若为mirror,则执行:

def source_lookup(cache=False, stream=False, index_route=False,
                  merge_results=False):
    def decorator(f):
        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            mirroring_cfg = cfg.mirroring
            resp = f(*args, **kwargs)
            if not is_mirror():
                return resp
            source = mirroring_cfg.source
            if index_route and mirroring_cfg.source_index:
                source = mirroring_cfg.source_index
            logger.debug('Source provided, registry acts as mirror')
            if resp.status_code != 404 and not merge_results:
                logger.debug('Status code is not 404, no source '
                             'lookup required')
                return resp
            source_resp = lookup_source(
                flask.request.path, stream=stream, source=source
            )
            if not source_resp:
                return resp

            store = storage.load()

            headers = _response_headers(source_resp.headers)
            if index_route and 'x-docker-endpoints' in headers:
                headers['x-docker-endpoints'] = toolkit.get_endpoints()

            if not stream:
                logger.debug('JSON data found on source, writing response')
                resp_data = source_resp.content
                if merge_results:
                    mjson = json.loads(resp_data)
                    pjson = json.loads(resp.data)
                    for mr in mjson["results"]:
                        replaced = False
                        for pi, pr in enumerate(pjson["results"]):
                            if pr["name"] == mr["name"]:
                                pjson["results"][pi] = mr
                                replaced = True
                        if not replaced:
                            pjson["results"].extend([mr])
                    pjson['num_results'] = len(pjson["results"])
                    resp_data = json.dumps(pjson)
                if cache:
                    store_mirrored_data(
                        resp_data, flask.request.url_rule.rule, kwargs,
                        store
                    )
                return toolkit.response(
                    data=resp_data,
                    headers=headers,
                    raw=True
                )
            logger.debug('Layer data found on source, preparing to '
                         'stream response...')
            layer_path = store.image_layer_path(kwargs['image_id'])
            return _handle_mirrored_layer(source_resp, layer_path, store,
                                          headers)

        return wrapper
    return decorator

执行流程(local filesystem):

1、检查本地是否存在images/3333333333333333333333333333333333333333333333333333333333333333/json

2、若不存在,则向source发出请求获取文件内容

3、将获取的文件内容存储到本地文件系统

GET /v1/images/1111111111111111111111111111111111111111111111111111111111111111/layer

代码如下:

@app.route('/v1/images/<image_id>/layer', methods=['GET'])
@toolkit.requires_auth
@require_completion
@toolkit.valid_image_id
@set_cache_headers
@mirroring.source_lookup(cache=True, stream=True)
def get_image_layer(image_id, headers):
    try:
        bytes_range = None
        if store.supports_bytes_range:
            headers['Accept-Ranges'] = 'bytes'
            bytes_range = _parse_bytes_range()
        repository = toolkit.get_repository()
        if repository and store.is_private(*repository):
            if not toolkit.validate_parent_access(image_id):
                return toolkit.api_error('Image not found', 404)
        # If no auth token found, either standalone registry or privileged
        # access. In both cases, access is always "public".
        return _get_image_layer(image_id, headers, bytes_range)
    except exceptions.FileNotFoundError:
        return toolkit.api_error('Image not found', 404)

def _get_image_layer(image_id, headers=None, bytes_range=None):
    if headers is None:
        headers = {}

    headers['Content-Type'] = 'application/octet-stream'
    accel_uri_prefix = cfg.nginx_x_accel_redirect
    path = store.image_layer_path(image_id)
    if accel_uri_prefix:
        if store.scheme == 'file':
            accel_uri = '/'.join([accel_uri_prefix, path])
            headers['X-Accel-Redirect'] = accel_uri
            logger.debug('send accelerated {0} ({1})'.format(
                accel_uri, headers))
            return flask.Response('', headers=headers)
        else:
            logger.warn('nginx_x_accel_redirect config set,'
                        ' but storage is not LocalStorage')

    # If store allows us to just redirect the client let's do that, we'll
    # offload a lot of expensive I/O and get faster I/O
    if cfg.storage_redirect:
        try:
            content_redirect_url = store.content_redirect_url(path)
            if content_redirect_url:
                return flask.redirect(content_redirect_url, 302)
        except IOError as e:
            logger.debug(str(e))

    status = None
    layer_size = 0

    if not store.exists(path):
        raise exceptions.FileNotFoundError("Image layer absent from store")
    try:
        layer_size = store.get_size(path)
    except exceptions.FileNotFoundError:
        # XXX why would that fail given we know the layer exists?
        pass
    if bytes_range and bytes_range[1] == -1 and not layer_size == 0:
        bytes_range = (bytes_range[0], layer_size)

    if bytes_range:
        content_length = bytes_range[1] - bytes_range[0] + 1
        if not _valid_bytes_range(bytes_range):
            return flask.Response(status=416, headers=headers)
        status = 206
        content_range = (bytes_range[0], bytes_range[1], layer_size)
        headers['Content-Range'] = '{0}-{1}/{2}'.format(*content_range)
        headers['Content-Length'] = content_length
    elif layer_size > 0:
        headers['Content-Length'] = layer_size
    else:
        return flask.Response(status=416, headers=headers)
    return flask.Response(store.stream_read(path, bytes_range),
                          headers=headers, status=status)


@filter_args
def image_layer_path(self, image_id):
    return '{0}/{1}/layer'.format(self.images, image_id)

获取文件images/1111111111111111111111111111111111111111111111111111111111111111/layer内容

若为mirror,则执行:

def source_lookup(cache=False, stream=False, index_route=False,
                  merge_results=False):
    def decorator(f):
        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            mirroring_cfg = cfg.mirroring
            resp = f(*args, **kwargs)
            if not is_mirror():
                return resp
            source = mirroring_cfg.source
            if index_route and mirroring_cfg.source_index:
                source = mirroring_cfg.source_index
            logger.debug('Source provided, registry acts as mirror')
            if resp.status_code != 404 and not merge_results:
                logger.debug('Status code is not 404, no source '
                             'lookup required')
                return resp
            source_resp = lookup_source(
                flask.request.path, stream=stream, source=source
            )
            if not source_resp:
                return resp

            store = storage.load()

            headers = _response_headers(source_resp.headers)
            if index_route and 'x-docker-endpoints' in headers:
                headers['x-docker-endpoints'] = toolkit.get_endpoints()

            if not stream:
                logger.debug('JSON data found on source, writing response')
                resp_data = source_resp.content
                if merge_results:
                    mjson = json.loads(resp_data)
                    pjson = json.loads(resp.data)
                    for mr in mjson["results"]:
                        replaced = False
                        for pi, pr in enumerate(pjson["results"]):
                            if pr["name"] == mr["name"]:
                                pjson["results"][pi] = mr
                                replaced = True
                        if not replaced:
                            pjson["results"].extend([mr])
                    pjson['num_results'] = len(pjson["results"])
                    resp_data = json.dumps(pjson)
                if cache:
                    store_mirrored_data(
                        resp_data, flask.request.url_rule.rule, kwargs,
                        store
                    )
                return toolkit.response(
                    data=resp_data,
                    headers=headers,
                    raw=True
                )
            logger.debug('Layer data found on source, preparing to '
                         'stream response...')
            layer_path = store.image_layer_path(kwargs['image_id'])
            return _handle_mirrored_layer(source_resp, layer_path, store,
                                          headers)

        return wrapper
    return decorator

执行流程(local filesystem):

1、检查本地是否存在images/3333333333333333333333333333333333333333333333333333333333333333/layer

2、若不存在,则向source发出请求获取文件内容

3、将获取的文件内容存储到本地文件系统

补充

  • images/2222222222222222222222222222222222222222222222222222222222222222/json
  • images/2222222222222222222222222222222222222222222222222222222222222222/layer
  • images/3333333333333333333333333333333333333333333333333333333333333333/json
  • images/3333333333333333333333333333333333333333333333333333333333333333/layer

请求过程如:images/1111111111111111111111111111111111111111111111111111111111111111


上一篇     下一篇