Skip to content

Commit 6c4adac

Browse files
authored
[mirror] Feature: Push parallelism support (#40)
[mirror] Feature: Push parallelism support Signed-off-by: Maxim Vasilenko <[email protected]> Co-authored-by: Maxim Vasilenko <[email protected]>
1 parent c2685c5 commit 6c4adac

File tree

15 files changed

+399
-230
lines changed

15 files changed

+399
-230
lines changed

internal/mirror/cmd/modules/pull/pull.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,19 +167,17 @@ func pullExternalModulesToLocalFS(
167167
},
168168
}
169169

170-
logger.InfoLn("Beginning to pull module contents")
170+
logger.InfoLn("Pulling module contents")
171171
err = layouts.PullImageSet(pullCtx, moduleLayout, moduleImageSet, layouts.WithTagToDigestMapper(tagsResolver.GetTagDigest))
172172
if err != nil {
173173
return fmt.Errorf("Pull images: %w", err)
174174
}
175-
logger.InfoLn("✅ Module contents pulled successfully")
176175

177-
logger.InfoLn("Beginning to pull releases for module")
176+
logger.InfoLn("Pulling module release data")
178177
err = layouts.PullImageSet(pullCtx, moduleReleasesLayout, releasesImageSet, layouts.WithTagToDigestMapper(tagsResolver.GetTagDigest))
179178
if err != nil {
180179
return fmt.Errorf("Pull images: %w", err)
181180
}
182-
logger.InfoLn("✅ Releases for module pulled successfully")
183181
}
184182

185183
return nil

internal/mirror/cmd/modules/push/push.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,28 @@ func pushModulesToRegistry(
138138
return fmt.Errorf("Module %s: Read OCI layout: %w", moduleName, err)
139139
}
140140

141-
if err = layouts.PushLayoutToRepo(moduleLayout, moduleRegistryPath, authProvider, logger, insecure, skipVerifyTLS); err != nil {
141+
if err = layouts.PushLayoutToRepo(
142+
moduleLayout,
143+
moduleRegistryPath,
144+
authProvider,
145+
logger,
146+
contexts.DefaultParallelism,
147+
insecure,
148+
skipVerifyTLS,
149+
); err != nil {
142150
return fmt.Errorf("Push module to registry: %w", err)
143151
}
144152

145153
logger.InfoF("Pushing releases for module %s", moduleName)
146-
if err = layouts.PushLayoutToRepo(moduleReleasesLayout, moduleReleasesRegistryPath, authProvider, logger, insecure, skipVerifyTLS); err != nil {
154+
if err = layouts.PushLayoutToRepo(
155+
moduleReleasesLayout,
156+
moduleReleasesRegistryPath,
157+
authProvider,
158+
logger,
159+
contexts.DefaultParallelism,
160+
insecure,
161+
skipVerifyTLS,
162+
); err != nil {
147163
return fmt.Errorf("Push module to registry: %w", err)
148164
}
149165

@@ -163,7 +179,7 @@ func pushModulesToRegistry(
163179
return fmt.Errorf("Write module index tag: %w", err)
164180
}
165181

166-
logger.InfoF("Module %s pushed successfully", moduleName)
182+
logger.InfoF("Module %s pushed successfully", moduleName)
167183
}
168184

169185
return nil

internal/mirror/cmd/pull/pull.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,6 @@ func PullDeckhouseToLocalFS(
309309
if err != nil {
310310
return fmt.Errorf("get Deckhouse modules: %w", err)
311311
}
312-
logger.InfoLn("✅")
313312
}
314313

315314
logger.InfoF("Creating OCI Image Layouts")

internal/mirror/cmd/push/push.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,6 @@ func NewCommand() *cobra.Command {
6969
return pushCmd
7070
}
7171

72-
const (
73-
deckhouseRegistryHost = "registry.deckhouse.io"
74-
enterpriseEditionRepoPath = "/deckhouse/ee"
75-
76-
enterpriseEditionRepo = deckhouseRegistryHost + enterpriseEditionRepoPath
77-
)
78-
7972
var (
8073
TempDir = filepath.Join(os.TempDir(), "mirror")
8174

@@ -84,8 +77,6 @@ var (
8477
RegistryUsername string
8578
RegistryPassword string
8679

87-
SourceRegistryRepo = enterpriseEditionRepo
88-
8980
Insecure bool
9081
TLSSkipVerify bool
9182
ImagesBundlePath string
@@ -148,14 +139,18 @@ func buildPushContext() *contexts.PushContext {
148139

149140
mirrorCtx := &contexts.PushContext{
150141
BaseContext: contexts.BaseContext{
151-
Logger: logger,
152-
Insecure: Insecure,
153-
SkipTLSVerification: TLSSkipVerify,
154-
DeckhouseRegistryRepo: SourceRegistryRepo,
155-
RegistryHost: RegistryHost,
156-
RegistryPath: RegistryPath,
157-
BundlePath: ImagesBundlePath,
158-
UnpackedImagesPath: filepath.Join(TempDir, time.Now().Format("mirror_tmp_02-01-2006_15-04-05")),
142+
Logger: logger,
143+
Insecure: Insecure,
144+
SkipTLSVerification: TLSSkipVerify,
145+
RegistryHost: RegistryHost,
146+
RegistryPath: RegistryPath,
147+
BundlePath: ImagesBundlePath,
148+
UnpackedImagesPath: filepath.Join(TempDir, time.Now().Format("mirror_tmp_02-01-2006_15-04-05")),
149+
},
150+
151+
Parallelism: contexts.ParallelismConfig{
152+
Blobs: 4,
153+
Images: 1,
159154
},
160155
}
161156
return mirrorCtx

internal/mirror/cmd/vulndb/push/push.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ func push(_ *cobra.Command, _ []string) error {
110110
repo,
111111
pushContext.RegistryAuth,
112112
pushContext.Logger,
113+
contexts.DefaultParallelism,
113114
pushContext.Insecure,
114115
pushContext.SkipTLSVerification,
115116
)

pkg/libmirror/contexts/push.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,16 @@ package contexts
1919
// PushContext holds data related to pending mirroring-to-registry operation.
2020
type PushContext struct {
2121
BaseContext
22+
23+
Parallelism ParallelismConfig
24+
}
25+
26+
type ParallelismConfig struct {
27+
Blobs int
28+
Images int
29+
}
30+
31+
var DefaultParallelism = ParallelismConfig{
32+
Blobs: 4,
33+
Images: 1,
2234
}

pkg/libmirror/layouts/pull.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func PullInstallers(mirrorCtx *contexts.PullContext, layouts *ImageLayouts) erro
4444
); err != nil {
4545
return err
4646
}
47-
mirrorCtx.Logger.InfoLn("All required installers are pulled!")
47+
mirrorCtx.Logger.InfoLn("All required installers are pulled!")
4848
return nil
4949
}
5050

@@ -74,7 +74,7 @@ func PullDeckhouseReleaseChannels(mirrorCtx *contexts.PullContext, layouts *Imag
7474
); err != nil {
7575
return err
7676
}
77-
mirrorCtx.Logger.InfoLn("Deckhouse release channels are pulled!")
77+
mirrorCtx.Logger.InfoLn("Deckhouse release channels are pulled!")
7878
return nil
7979
}
8080

@@ -88,7 +88,7 @@ func PullDeckhouseImages(mirrorCtx *contexts.PullContext, layouts *ImageLayouts)
8888
); err != nil {
8989
return err
9090
}
91-
mirrorCtx.Logger.InfoLn("All required Deckhouse images are pulled!")
91+
mirrorCtx.Logger.InfoLn("All required Deckhouse images are pulled!")
9292
return nil
9393
}
9494

@@ -113,7 +113,7 @@ func PullModules(mirrorCtx *contexts.PullContext, layouts *ImageLayouts) error {
113113
return fmt.Errorf("pull %q module release information: %w", moduleName, err)
114114
}
115115
}
116-
mirrorCtx.Logger.InfoLn("Deckhouse modules pulled!")
116+
mirrorCtx.Logger.InfoLn("Deckhouse modules pulled!")
117117
return nil
118118
}
119119

pkg/libmirror/layouts/pull_test.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package layouts
1919
import (
2020
"log/slog"
2121
"net/http/httptest"
22-
"os"
2322
"strings"
2423
"testing"
2524

@@ -63,9 +62,9 @@ func TestPullTrivyVulnerabilityDatabaseImageSuccessSkipTLS(t *testing.T) {
6362
}
6463

6564
layouts := &ImageLayouts{
66-
TrivyDB: prepareEmptyOCILayout(t),
67-
TrivyBDU: prepareEmptyOCILayout(t),
68-
TrivyJavaDB: prepareEmptyOCILayout(t),
65+
TrivyDB: createEmptyOCILayout(t),
66+
TrivyBDU: createEmptyOCILayout(t),
67+
TrivyJavaDB: createEmptyOCILayout(t),
6968
}
7069

7170
err := PullTrivyVulnerabilityDatabasesImages(
@@ -116,9 +115,9 @@ func TestPullTrivyVulnerabilityDatabaseImageSuccessInsecure(t *testing.T) {
116115
}
117116

118117
layouts := &ImageLayouts{
119-
TrivyDB: prepareEmptyOCILayout(t),
120-
TrivyBDU: prepareEmptyOCILayout(t),
121-
TrivyJavaDB: prepareEmptyOCILayout(t),
118+
TrivyDB: createEmptyOCILayout(t),
119+
TrivyBDU: createEmptyOCILayout(t),
120+
TrivyJavaDB: createEmptyOCILayout(t),
122121
}
123122

124123
err := PullTrivyVulnerabilityDatabasesImages(
@@ -160,15 +159,10 @@ func layoutByIndex(t *testing.T, layouts *ImageLayouts, idx int) layout.Path {
160159
}
161160
}
162161

163-
func prepareEmptyOCILayout(t *testing.T) layout.Path {
162+
func createEmptyOCILayout(t *testing.T) layout.Path {
164163
t.Helper()
165-
p, err := os.MkdirTemp(os.TempDir(), "trivy_pull_test")
166-
require.NoError(t, err)
167-
t.Cleanup(func() {
168-
_ = os.RemoveAll(p)
169-
})
170164

171-
l, err := CreateEmptyImageLayoutAtPath(p)
165+
l, err := CreateEmptyImageLayoutAtPath(t.TempDir())
172166
require.NoError(t, err)
173167
return l
174168
}

pkg/libmirror/layouts/push.go

Lines changed: 94 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,40 @@ limitations under the License.
1717
package layouts
1818

1919
import (
20+
"errors"
2021
"fmt"
22+
"os"
23+
"time"
2124

2225
"github.com/google/go-containerregistry/pkg/authn"
2326
"github.com/google/go-containerregistry/pkg/name"
27+
v1 "github.com/google/go-containerregistry/pkg/v1"
2428
"github.com/google/go-containerregistry/pkg/v1/layout"
2529
"github.com/google/go-containerregistry/pkg/v1/remote"
30+
"github.com/samber/lo"
31+
"github.com/samber/lo/parallel"
2632

2733
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/contexts"
2834
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/auth"
35+
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/errorutil"
36+
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/retry"
37+
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/retry/task"
2938
)
3039

40+
var ErrEmptyLayout = errors.New("No images in layout")
41+
3142
func PushLayoutToRepo(
3243
imagesLayout layout.Path,
3344
registryRepo string,
3445
authProvider authn.Authenticator,
3546
logger contexts.Logger,
47+
parallelismConfig contexts.ParallelismConfig,
3648
insecure, skipVerifyTLS bool,
3749
) error {
3850
refOpts, remoteOpts := auth.MakeRemoteRegistryRequestOptions(authProvider, insecure, skipVerifyTLS)
51+
if parallelismConfig.Blobs != 0 {
52+
remoteOpts = append(remoteOpts, remote.WithJobs(parallelismConfig.Blobs))
53+
}
3954

4055
index, err := imagesLayout.ImageIndex()
4156
if err != nil {
@@ -46,27 +61,92 @@ func PushLayoutToRepo(
4661
return fmt.Errorf("Parse OCI Image Index Manifest: %w", err)
4762
}
4863

49-
pushCount := 1
50-
for _, imageDesc := range indexManifest.Manifests {
51-
tag := imageDesc.Annotations["io.deckhouse.image.short_tag"]
52-
imageRef := registryRepo + ":" + tag
64+
if len(indexManifest.Manifests) == 0 {
65+
return fmt.Errorf("%s: %w", registryRepo, ErrEmptyLayout)
66+
}
67+
68+
batches := lo.Chunk(indexManifest.Manifests, parallelismConfig.Images)
69+
batchesCount, imagesCount := 1, 1
70+
71+
for _, manifestSet := range batches {
72+
if parallelismConfig.Images == 1 {
73+
tag := manifestSet[0].Annotations["io.deckhouse.image.short_tag"]
74+
imageRef := registryRepo + ":" + tag
75+
logger.InfoF("[%d / %d] Pushing image %s", imagesCount, len(indexManifest.Manifests), imageRef)
76+
pushImage(logger, registryRepo, index, imagesCount, refOpts, remoteOpts)(manifestSet[0], 0)
77+
imagesCount += 1
78+
continue
79+
}
5380

54-
logger.InfoF("[%d / %d] Pushing image %s ", pushCount, len(indexManifest.Manifests), imageRef)
55-
img, err := index.Image(imageDesc.Digest)
81+
err = logger.Process(fmt.Sprintf("Pushing batch %d / %d", batchesCount, len(batches)), func() error {
82+
logger.InfoLn("Images in batch:")
83+
for _, manifest := range manifestSet {
84+
logger.InfoF("- %s", registryRepo+":"+manifest.Annotations["io.deckhouse.image.short_tag"])
85+
}
86+
87+
parallel.ForEach(manifestSet, pushImage(logger, registryRepo, index, imagesCount, refOpts, remoteOpts))
88+
89+
return nil
90+
})
5691
if err != nil {
57-
return fmt.Errorf("Read image: %w", err)
92+
return fmt.Errorf("Push batch of images: %w", err)
5893
}
94+
batchesCount += 1
95+
imagesCount += len(manifestSet)
96+
}
97+
98+
return nil
99+
}
59100

101+
func pushImage(
102+
logger contexts.Logger,
103+
registryRepo string,
104+
index v1.ImageIndex,
105+
imagesCount int,
106+
refOpts []name.Option,
107+
remoteOpts []remote.Option,
108+
) func(v1.Descriptor, int) {
109+
return func(manifest v1.Descriptor, _ int) {
110+
tag := manifest.Annotations["io.deckhouse.image.short_tag"]
111+
imageRef := registryRepo + ":" + tag
112+
img, err := index.Image(manifest.Digest)
113+
if err != nil {
114+
logger.WarnF("Read image: %v", err)
115+
os.Exit(1)
116+
}
60117
ref, err := name.ParseReference(imageRef, refOpts...)
61118
if err != nil {
62-
return fmt.Errorf("Parse image reference: %w", err)
119+
logger.WarnF("Parse image reference: %v", err)
120+
os.Exit(1)
63121
}
64-
if err = remote.Write(ref, img, remoteOpts...); err != nil {
65-
return fmt.Errorf("Write %s to registry: %w", ref.String(), err)
122+
123+
err = retry.RunTask(silentLogger{}, "", task.WithConstantRetries(19, 3*time.Second, func() error {
124+
if err = remote.Write(ref, img, remoteOpts...); err != nil {
125+
if errorutil.IsTrivyMediaTypeNotAllowedError(err) {
126+
logger.WarnLn(errorutil.CustomTrivyMediaTypesWarning)
127+
os.Exit(1)
128+
}
129+
return fmt.Errorf("Write %s to registry: %w", ref.String(), err)
130+
}
131+
return nil
132+
}))
133+
if err != nil {
134+
logger.WarnF("Push image: %v", err)
135+
os.Exit(1)
66136
}
67-
logger.InfoLn("✅")
68-
pushCount += 1
69-
}
70137

71-
return nil
138+
imagesCount += 1
139+
}
72140
}
141+
142+
type silentLogger struct{}
143+
144+
var _ contexts.Logger = silentLogger{}
145+
146+
func (silentLogger) DebugF(_ string, _ ...interface{}) {}
147+
func (silentLogger) DebugLn(_ ...interface{}) {}
148+
func (silentLogger) InfoF(_ string, _ ...interface{}) {}
149+
func (silentLogger) InfoLn(_ ...interface{}) {}
150+
func (silentLogger) WarnF(_ string, _ ...interface{}) {}
151+
func (silentLogger) WarnLn(_ ...interface{}) {}
152+
func (silentLogger) Process(_ string, _ func() error) error { return nil }

0 commit comments

Comments
 (0)