Skip to content

Commit

Permalink
Wip: trying to get the request through stream_content_artifact
Browse files Browse the repository at this point in the history
  • Loading branch information
pedro-psb committed Oct 16, 2024
1 parent 8c1347a commit 77794a6
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
9 changes: 6 additions & 3 deletions pulpcore/app/models/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging

import django
from asgiref.sync import sync_to_async
from asyncio_throttle import Throttler
from django.conf import settings
from django.contrib.postgres.fields import HStoreField
Expand Down Expand Up @@ -1094,15 +1095,17 @@ def __enter__(self):
repository.initialize_new_version(self)
return self

def __exit__(self, exc_type, exc_value, traceback):
# TODO: revert this
# only here so I can get the test right with less friction
async def __exit__(self, exc_type, exc_value, traceback):
"""
Finalize and save the RepositoryVersion if no errors are raised, delete it if not
"""
if exc_value:
self.delete()
await sync_to_async(self.delete)()
else:
try:
repository = self.repository.cast()
repository = self.repository.acast()
repository.finalize_new_version(self)
no_change = not self.added() and not self.removed()
if no_change:
Expand Down
4 changes: 3 additions & 1 deletion pulpcore/content/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,9 @@ async def _match_and_stream(self, path, request):
request, StreamResponse(headers=headers), ca
)

assert repo_version and not publication and not distro.SERVE_FROM_PUBLICATION
if repo_version and not publication and not distro.SERVE_FROM_PUBLICATION:
# breakpoint()
if rel_path == "" or rel_path[-1] == "/":
index_path = "{}index.html".format(rel_path)

Expand Down Expand Up @@ -686,6 +688,7 @@ async def _match_and_stream(self, path, request):

# If we haven't found a match yet, try to use pull-through caching with remote
if distro.remote:
assert False
remote = await distro.remote.acast()
if url := remote.get_remote_artifact_url(rel_path, request=request):
if (
Expand All @@ -704,7 +707,6 @@ async def _match_and_stream(self, path, request):
request, StreamResponse(headers=headers), ca
)
else:
breakpoint()
# Try to stream the RemoteArtifact and potentially save it as a new Content unit
save_artifact = remote.get_remote_artifact_content_type(rel_path) is not None
ca = ContentArtifact(relative_path=rel_path)
Expand Down
26 changes: 19 additions & 7 deletions pulpcore/tests/unit/content/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
Distribution,
Remote,
RemoteArtifact,
Repository,
RepositoryContent,
)


Expand Down Expand Up @@ -359,24 +361,30 @@ def test_mock_server(server_a, server_b):
async def test_server_client_setup(monkeypatch, server_a, server_b):
async def delete_all_objects(model):
await sync_to_async(model.objects.all().delete)()
for m in (Remote, Distribution, ContentArtifact, Content, RemoteArtifact, Artifact):
for m in (Remote, Distribution, ContentArtifact, Content, RemoteArtifact, Artifact, Repository):
await delete_all_objects(m)

expected_aaa_digest = hashlib.sha256(b"aaa").hexdigest()
# broken remote server setup
monkeypatch.setattr(Remote, "get_remote_artifact_content_type", Mock(return_value=Content))
monkeypatch.setattr(Content, "init_from_artifact_and_relative_path", Mock(return_value=Content()))
# Add content to repository
content = await Content.objects.acreate()
ca = await ContentArtifact.objects.acreate(content=content)
repo_a = await Repository.objects.acreate(name="repo_a")
async def setup_repo():
with await sync_to_async(repo_a.new_version)() as new_version:
await sync_to_async(new_version.add_content)(content)
await setup_repo()

ca = await ContentArtifact.objects.acreate(content=content)
# Setup Remote/CA/RA relationships
# * pulp expects aaa for ra_b, but server_b updated and we didnt sync, so should fail
remote_a = await Remote.objects.acreate(name="server_a", url=server_a)
ra_a = await RemoteArtifact.objects.acreate(content_artifact=ca, remote=remote_a, sha256=expected_aaa_digest)

# pulp expects aaa for ra_b, but server_b updated and we didnt sync, so should fail
remote_b = await Remote.objects.acreate(name="server_b", url=server_b)
ra_b = await RemoteArtifact.objects.acreate(content_artifact=ca, remote=remote_b, sha256=expected_aaa_digest)

dist = await Distribution.objects.acreate(name="mydist", base_path="mydist", remote=remote_b)
dist = await Distribution.objects.acreate(name="mydist", base_path="mydist", remote=remote_b, repository=repo_a)

resources = [remote_a, remote_b, content, ca, ra_b, ra_a, dist]

Expand All @@ -399,13 +407,17 @@ async def assert_can_get_blob():
assert "aaa" in text

try:
await assert_content_in("/pulp/content/", content="Index of /pulp/content/")
await assert_content_in("/pulp/content/mydist/", content="blob")
# await assert_content_in("/pulp/content/", content="Index of /pulp/content/")
# await assert_content_in("/pulp/content/mydist/", content="blob")
# TODO: the content handler is going through the pull-through caching and calling
# _stream_remote_artifact(), but I need to make it call _stream_content_artifact(),
# so I can test the RA selection/retry.
# assert await ContentArtifact.objects.filter(
# content__in=repo_a.latest_version().content, relative_path="blob"
# ).aexists()
await assert_can_get_blob()
finally:
breakpoint()
await client.close()
for item in resources:
await item.adelete()
Expand Down

0 comments on commit 77794a6

Please sign in to comment.