@@ -39,6 +39,7 @@ import (
39
39
"github.com/cloudevents/sdk-go/v2/types"
40
40
"github.com/kelseyhightower/envconfig"
41
41
"go.opencensus.io/trace"
42
+ "go.uber.org/atomic"
42
43
"go.uber.org/zap"
43
44
"k8s.io/apimachinery/pkg/util/wait"
44
45
"knative.dev/pkg/logging"
@@ -116,6 +117,10 @@ type generator struct {
116
117
eventQueue []conformanceevent.Event
117
118
}
118
119
120
+ var (
121
+ verifyConnectionCounter = atomic .NewUint64 (0 )
122
+ )
123
+
119
124
func Start (ctx context.Context , logs * eventshub.EventLogs , clientOpts ... eventshub.ClientOption ) error {
120
125
var env generator
121
126
if err := envconfig .Process ("" , & env ); err != nil {
@@ -143,27 +148,9 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh
143
148
logging .FromContext (ctx ).Info ("awake, continuing" )
144
149
}
145
150
146
- httpClient := nethttp .DefaultClient
147
-
148
- if env .EnforceTLS {
149
- caCertPool , err := x509 .SystemCertPool ()
150
- if err != nil {
151
- return fmt .Errorf ("failed to create cert pool %s: %w" , env .Sink , err )
152
- }
153
- caCertPool .AppendCertsFromPEM ([]byte (env .CACerts ))
154
-
155
- transport := nethttp .DefaultTransport .(* nethttp.Transport ).Clone ()
156
- transport .TLSClientConfig = & tls.Config {
157
- RootCAs : caCertPool ,
158
- MinVersion : tls .VersionTLS12 ,
159
- VerifyConnection : func (state tls.ConnectionState ) error {
160
- if err := logs .Vent (env .peerCertificatesReceived (state )); err != nil {
161
- return err
162
- }
163
- return nil
164
- },
165
- }
166
- httpClient = & nethttp.Client {Transport : transport }
151
+ httpClient , _ , err := createClient (ctx , env , logs )
152
+ if err != nil {
153
+ return err
167
154
}
168
155
169
156
if env .ProbeSink {
@@ -185,12 +172,6 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh
185
172
}
186
173
}
187
174
188
- for _ , opt := range clientOpts {
189
- if err := opt (httpClient ); err != nil {
190
- return fmt .Errorf ("unable to apply option: %w" , err )
191
- }
192
- }
193
-
194
175
switch env .EventEncoding {
195
176
case "binary" :
196
177
ctx = cloudevents .WithEncodingBinary (ctx )
@@ -203,6 +184,19 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh
203
184
ticker := time .NewTicker (period )
204
185
for {
205
186
187
+ // when enforcing TLS we want to create multiple transports to force multiple TLS handshakes
188
+ // on each request sent so that VerifyConnection is called multiple times.
189
+ httpClient , _ , err = createClient (ctx , env , logs )
190
+ if err != nil {
191
+ return err
192
+ }
193
+
194
+ for _ , opt := range clientOpts {
195
+ if err := opt (httpClient ); err != nil {
196
+ return fmt .Errorf ("unable to apply option: %w" , err )
197
+ }
198
+ }
199
+
206
200
ctx , span := trace .StartSpan (ctx , "eventshub-sender" )
207
201
208
202
req , event , err := env .next (ctx )
@@ -251,13 +245,46 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh
251
245
}
252
246
}
253
247
254
- func (g * generator ) peerCertificatesReceived (state tls.ConnectionState ) eventshub.EventInfo {
248
+ func createClient (ctx context.Context , env generator , logs * eventshub.EventLogs ) (* nethttp.Client , * nethttp.Transport , error ) {
249
+ if env .EnforceTLS {
250
+ caCertPool , err := x509 .SystemCertPool ()
251
+ if err != nil {
252
+ return nil , nil , fmt .Errorf ("failed to create cert pool %s: %w" , env .Sink , err )
253
+ }
254
+ caCertPool .AppendCertsFromPEM ([]byte (env .CACerts ))
255
+
256
+ transport := nethttp .DefaultTransport .(* nethttp.Transport ).Clone ()
257
+
258
+ // Force multiple TLS handshakes
259
+ transport .DisableKeepAlives = true
260
+ transport .IdleConnTimeout = 500 * time .Millisecond
261
+
262
+ transport .TLSClientConfig = & tls.Config {
263
+ RootCAs : caCertPool ,
264
+ MinVersion : tls .VersionTLS12 ,
265
+ VerifyConnection : func (state tls.ConnectionState ) error {
266
+ logging .FromContext (ctx ).Infow ("VerifyConnection" )
267
+
268
+ if err := logs .Vent (env .peerCertificatesReceived (verifyConnectionCounter .Inc (), state )); err != nil {
269
+ return err
270
+ }
271
+ return nil
272
+ },
273
+ }
274
+ return & nethttp.Client {Transport : transport }, transport , nil
275
+ }
276
+
277
+ return nethttp .DefaultClient , nethttp .DefaultTransport .(* nethttp.Transport ), nil
278
+ }
279
+
280
+ func (g * generator ) peerCertificatesReceived (counter uint64 , state tls.ConnectionState ) eventshub.EventInfo {
255
281
return eventshub.EventInfo {
256
282
Kind : eventshub .PeerCertificatesReceived ,
257
283
Connection : eventshub .TLSConnectionStateToConnection (& state ),
258
284
Origin : g .SenderName ,
259
285
Observer : g .SenderName ,
260
286
Time : time .Now (),
287
+ Sequence : counter ,
261
288
}
262
289
}
263
290
0 commit comments