Skip to content

Commit 35b8571

Browse files
committed
[mirror] Feature: Push parallelism support
Signed-off-by: Maxim Vasilenko <[email protected]>
1 parent 0da2052 commit 35b8571

File tree

9 files changed

+264
-93
lines changed

9 files changed

+264
-93
lines changed

internal/mirror/cmd/modules/push/push.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,28 @@ func pushModulesToRegistry(
138138
return fmt.Errorf("Module %s: Read OCI layout: %w", moduleName, err)
139139
}
140140

141-
if err = layouts.PushLayoutToRepo(moduleLayout, moduleRegistryPath, authProvider, logger, insecure, skipVerifyTLS); err != nil {
141+
if err = layouts.PushLayoutToRepo(
142+
moduleLayout,
143+
moduleRegistryPath,
144+
authProvider,
145+
logger,
146+
contexts.DefaultParallelism,
147+
insecure,
148+
skipVerifyTLS,
149+
); err != nil {
142150
return fmt.Errorf("Push module to registry: %w", err)
143151
}
144152

145153
logger.InfoF("Pushing releases for module %s", moduleName)
146-
if err = layouts.PushLayoutToRepo(moduleReleasesLayout, moduleReleasesRegistryPath, authProvider, logger, insecure, skipVerifyTLS); err != nil {
154+
if err = layouts.PushLayoutToRepo(
155+
moduleReleasesLayout,
156+
moduleReleasesRegistryPath,
157+
authProvider,
158+
logger,
159+
contexts.DefaultParallelism,
160+
insecure,
161+
skipVerifyTLS,
162+
); err != nil {
147163
return fmt.Errorf("Push module to registry: %w", err)
148164
}
149165

internal/mirror/cmd/vulndb/push/push.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func push(_ *cobra.Command, _ []string) error {
110110
repo,
111111
pushContext.RegistryAuth,
112112
pushContext.Logger,
113+
contexts.DefaultParallelism,
113114
pushContext.Insecure,
114115
pushContext.SkipTLSVerification,
115116
)

pkg/libmirror/contexts/push.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,16 @@ package contexts
1919
// PushContext holds data related to pending mirroring-to-registry operation.
2020
type PushContext struct {
2121
BaseContext
22+
23+
Parallelism ParallelismConfig
24+
}
25+
26+
type ParallelismConfig struct {
27+
Blobs int
28+
Images int
29+
}
30+
31+
var DefaultParallelism = ParallelismConfig{
32+
Blobs: 4,
33+
Images: 5,
2234
}

pkg/libmirror/layouts/pull_test.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package layouts
1919
import (
2020
"log/slog"
2121
"net/http/httptest"
22-
"os"
2322
"strings"
2423
"testing"
2524

@@ -63,9 +62,9 @@ func TestPullTrivyVulnerabilityDatabaseImageSuccessSkipTLS(t *testing.T) {
6362
}
6463

6564
layouts := &ImageLayouts{
66-
TrivyDB: prepareEmptyOCILayout(t),
67-
TrivyBDU: prepareEmptyOCILayout(t),
68-
TrivyJavaDB: prepareEmptyOCILayout(t),
65+
TrivyDB: createEmptyOCILayout(t),
66+
TrivyBDU: createEmptyOCILayout(t),
67+
TrivyJavaDB: createEmptyOCILayout(t),
6968
}
7069

7170
err := PullTrivyVulnerabilityDatabasesImages(
@@ -116,9 +115,9 @@ func TestPullTrivyVulnerabilityDatabaseImageSuccessInsecure(t *testing.T) {
116115
}
117116

118117
layouts := &ImageLayouts{
119-
TrivyDB: prepareEmptyOCILayout(t),
120-
TrivyBDU: prepareEmptyOCILayout(t),
121-
TrivyJavaDB: prepareEmptyOCILayout(t),
118+
TrivyDB: createEmptyOCILayout(t),
119+
TrivyBDU: createEmptyOCILayout(t),
120+
TrivyJavaDB: createEmptyOCILayout(t),
122121
}
123122

124123
err := PullTrivyVulnerabilityDatabasesImages(
@@ -160,15 +159,10 @@ func layoutByIndex(t *testing.T, layouts *ImageLayouts, idx int) layout.Path {
160159
}
161160
}
162161

163-
func prepareEmptyOCILayout(t *testing.T) layout.Path {
162+
func createEmptyOCILayout(t *testing.T) layout.Path {
164163
t.Helper()
165-
p, err := os.MkdirTemp(os.TempDir(), "trivy_pull_test")
166-
require.NoError(t, err)
167-
t.Cleanup(func() {
168-
_ = os.RemoveAll(p)
169-
})
170164

171-
l, err := CreateEmptyImageLayoutAtPath(p)
165+
l, err := CreateEmptyImageLayoutAtPath(t.TempDir())
172166
require.NoError(t, err)
173167
return l
174168
}

pkg/libmirror/layouts/push.go

Lines changed: 74 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ import (
2424

2525
"github.com/google/go-containerregistry/pkg/authn"
2626
"github.com/google/go-containerregistry/pkg/name"
27+
v1 "github.com/google/go-containerregistry/pkg/v1"
2728
"github.com/google/go-containerregistry/pkg/v1/layout"
2829
"github.com/google/go-containerregistry/pkg/v1/remote"
30+
"github.com/samber/lo"
31+
"github.com/samber/lo/parallel"
2932

3033
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/contexts"
3134
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/auth"
@@ -41,9 +44,13 @@ func PushLayoutToRepo(
4144
registryRepo string,
4245
authProvider authn.Authenticator,
4346
logger contexts.Logger,
47+
parallelismConfig contexts.ParallelismConfig,
4448
insecure, skipVerifyTLS bool,
4549
) error {
4650
refOpts, remoteOpts := auth.MakeRemoteRegistryRequestOptions(authProvider, insecure, skipVerifyTLS)
51+
if parallelismConfig.Blobs != 0 {
52+
remoteOpts = append(remoteOpts, remote.WithJobs(parallelismConfig.Blobs))
53+
}
4754

4855
index, err := imagesLayout.ImageIndex()
4956
if err != nil {
@@ -58,40 +65,83 @@ func PushLayoutToRepo(
5865
return fmt.Errorf("%s: %w", registryRepo, ErrEmptyLayout)
5966
}
6067

61-
pushCount := 1
62-
for _, imageDesc := range indexManifest.Manifests {
63-
tag := imageDesc.Annotations["io.deckhouse.image.short_tag"]
64-
imageRef := registryRepo + ":" + tag
65-
66-
img, err := index.Image(imageDesc.Digest)
68+
batches := lo.Chunk(indexManifest.Manifests, parallelismConfig.Images)
69+
batchesCount, imagesCount := 1, 1
70+
71+
for _, manifestSet := range batches {
72+
err = logger.Process(fmt.Sprintf("Pushing batch %d / %d", batchesCount, len(batches)), func() error {
73+
logger.InfoLn("Images in batch:")
74+
for _, manifest := range manifestSet {
75+
tag := manifest.Annotations["io.deckhouse.image.short_tag"]
76+
imageRef := registryRepo + ":" + tag
77+
logger.InfoF("- %s", imageRef)
78+
}
79+
80+
parallel.ForEach(
81+
manifestSet,
82+
pushImage(logger, registryRepo, index, imagesCount, refOpts, remoteOpts),
83+
)
84+
85+
return nil
86+
})
6787
if err != nil {
68-
return fmt.Errorf("Read image: %w", err)
88+
return fmt.Errorf("Push batch of images: %w", err)
6989
}
90+
batchesCount += 1
91+
}
7092

93+
return nil
94+
}
95+
96+
func pushImage(
97+
logger contexts.Logger,
98+
registryRepo string,
99+
index v1.ImageIndex,
100+
imagesCount int,
101+
refOpts []name.Option,
102+
remoteOpts []remote.Option,
103+
) func(v1.Descriptor, int) {
104+
return func(manifest v1.Descriptor, _ int) {
105+
tag := manifest.Annotations["io.deckhouse.image.short_tag"]
106+
imageRef := registryRepo + ":" + tag
107+
img, err := index.Image(manifest.Digest)
108+
if err != nil {
109+
logger.WarnF("Read image: %v", err)
110+
os.Exit(1)
111+
}
71112
ref, err := name.ParseReference(imageRef, refOpts...)
72113
if err != nil {
73-
return fmt.Errorf("Parse image reference: %w", err)
114+
logger.WarnF("Parse image reference: %v", err)
115+
os.Exit(1)
74116
}
75117

76-
err = retry.RunTask(
77-
logger,
78-
fmt.Sprintf("[%d / %d] Pushing image %s ", pushCount, len(indexManifest.Manifests), imageRef),
79-
task.WithConstantRetries(19, 3*time.Second, func() error {
80-
if err = remote.Write(ref, img, remoteOpts...); err != nil {
81-
if errorutil.IsTrivyMediaTypeNotAllowedError(err) {
82-
logger.WarnLn(errorutil.CustomTrivyMediaTypesWarning)
83-
os.Exit(1)
84-
}
85-
return fmt.Errorf("Write %s to registry: %w", ref.String(), err)
118+
err = retry.RunTask(silentLogger{}, "", task.WithConstantRetries(19, 3*time.Second, func() error {
119+
if err = remote.Write(ref, img, remoteOpts...); err != nil {
120+
if errorutil.IsTrivyMediaTypeNotAllowedError(err) {
121+
logger.WarnLn(errorutil.CustomTrivyMediaTypesWarning)
122+
os.Exit(1)
86123
}
87-
return nil
88-
}))
124+
return fmt.Errorf("Write %s to registry: %w", ref.String(), err)
125+
}
126+
return nil
127+
}))
89128
if err != nil {
90-
return fmt.Errorf("Push image: %w", err)
129+
logger.WarnF("Push image: %v", err)
130+
os.Exit(1)
91131
}
92132

93-
pushCount += 1
133+
imagesCount += 1
94134
}
95-
96-
return nil
97135
}
136+
137+
type silentLogger struct{}
138+
139+
var _ contexts.Logger = silentLogger{}
140+
141+
func (silentLogger) DebugF(_ string, _ ...interface{}) {}
142+
func (silentLogger) DebugLn(_ ...interface{}) {}
143+
func (silentLogger) InfoF(_ string, _ ...interface{}) {}
144+
func (silentLogger) InfoLn(_ ...interface{}) {}
145+
func (silentLogger) WarnF(_ string, _ ...interface{}) {}
146+
func (silentLogger) WarnLn(_ ...interface{}) {}
147+
func (silentLogger) Process(_ string, _ func() error) error { return nil }

pkg/libmirror/layouts/push_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package layouts
2+
3+
import (
4+
"log/slog"
5+
"math/rand/v2"
6+
"testing"
7+
8+
"github.com/google/go-containerregistry/pkg/authn"
9+
"github.com/google/go-containerregistry/pkg/name"
10+
v1 "github.com/google/go-containerregistry/pkg/v1"
11+
"github.com/google/go-containerregistry/pkg/v1/layout"
12+
"github.com/google/go-containerregistry/pkg/v1/random"
13+
"github.com/google/go-containerregistry/pkg/v1/remote"
14+
"github.com/stretchr/testify/require"
15+
16+
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/contexts"
17+
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log"
18+
19+
mirrorTestUtils "github.com/deckhouse/deckhouse-cli/testing/util/mirror"
20+
)
21+
22+
func TestPushLayoutToRepo(t *testing.T) {
23+
s := require.New(t)
24+
25+
const totalImages, layersPerImage = 10, 3
26+
imagesLayout := createEmptyOCILayout(t)
27+
host, repoPath, blobHandler := mirrorTestUtils.SetupEmptyRegistryRepo(false)
28+
generatedDigests := make([]v1.Hash, 0)
29+
30+
platformOpt := layout.WithPlatform(v1.Platform{OS: "linux", Architecture: "amd64"})
31+
for range [totalImages]struct{}{} {
32+
img, err := random.Image(rand.Int64N(513), layersPerImage)
33+
s.NoError(err)
34+
digest, err := img.Digest()
35+
s.NoError(err)
36+
err = imagesLayout.AppendImage(img, platformOpt, layout.WithAnnotations(map[string]string{
37+
"org.opencontainers.image.ref.name": host + repoPath + "@" + digest.String(),
38+
"io.deckhouse.image.short_tag": digest.Hex,
39+
}))
40+
s.NoError(err)
41+
generatedDigests = append(generatedDigests, digest)
42+
}
43+
44+
err := PushLayoutToRepo(
45+
imagesLayout,
46+
host+repoPath, // Images repo
47+
authn.Anonymous,
48+
log.NewSLogger(slog.LevelDebug),
49+
contexts.ParallelismConfig{
50+
Blobs: 4,
51+
Images: 5,
52+
},
53+
true, // Use plain insecure HTTP
54+
false, // TLS verification irrelevant to HTTP requests
55+
)
56+
57+
s.NoError(err, "Push should not fail")
58+
59+
expectedPushedBlobsCount := totalImages * (layersPerImage + 1) // +1 blob is for manifest of each image
60+
s.Len(blobHandler.ListBlobs(), expectedPushedBlobsCount, "Number of pushed blobs should match the expected one")
61+
62+
for _, generatedDigest := range generatedDigests {
63+
ref, err := name.ParseReference(host + repoPath + ":" + generatedDigest.Hex)
64+
s.NoError(err, "Should be able to parse generated image reference")
65+
66+
desc, err := remote.Head(ref)
67+
s.NoError(err, "Should be able to fetch image descriptor")
68+
s.Equal(generatedDigest, desc.Digest, "Digest from registry should match with the generated one")
69+
}
70+
}
71+
72+
func TestPushEmptyLayoutToRepo(t *testing.T) {
73+
s := require.New(t)
74+
host, repoPath, blobHandler := mirrorTestUtils.SetupEmptyRegistryRepo(false)
75+
76+
emptyLayout := createEmptyOCILayout(t)
77+
err := PushLayoutToRepo(
78+
emptyLayout,
79+
host+repoPath,
80+
authn.Anonymous,
81+
log.NewSLogger(slog.LevelDebug),
82+
contexts.DefaultParallelism,
83+
true, // Use plain insecure HTTP
84+
false, // TLS verification irrelevant to HTTP requests
85+
)
86+
s.ErrorIs(err, ErrEmptyLayout, "Push should fail with error about layout with no images")
87+
s.Len(blobHandler.ListBlobs(), 0, "No blobs should be pushed to registry")
88+
}

pkg/libmirror/operations/push.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func PushDeckhouseToRegistry(mirrorCtx *contexts.PushContext) error {
3232
ociLayout, repo,
3333
mirrorCtx.RegistryAuth,
3434
mirrorCtx.Logger,
35+
mirrorCtx.Parallelism,
3536
mirrorCtx.Insecure,
3637
mirrorCtx.SkipTLSVerification,
3738
)
@@ -97,7 +98,6 @@ func findLayoutsToPush(mirrorCtx *contexts.PushContext) (map[string]layout.Path,
9798
bundlePaths := [][]string{
9899
{""}, // Root contains main deckhouse repo
99100
{"install"},
100-
{"install-standalone"},
101101
{"release-channel"},
102102
{"security", "trivy-db"},
103103
{"security", "trivy-bdu"},

0 commit comments

Comments
 (0)