2626import org .apache .flink .api .connector .source .SourceReader ;
2727import org .apache .flink .api .connector .source .SourceReaderContext ;
2828import org .apache .flink .api .connector .source .SourceSplit ;
29+ import org .apache .flink .api .connector .source .util .ratelimit .RateLimiter ;
30+ import org .apache .flink .api .connector .source .util .ratelimit .RateLimiterStrategy ;
2931import org .apache .flink .configuration .Configuration ;
3032import org .apache .flink .connector .base .source .reader .fetcher .SplitFetcherManager ;
3133import org .apache .flink .connector .base .source .reader .splitreader .SplitReader ;
@@ -107,6 +109,15 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
107109
108110 @ Nullable protected final RecordEvaluator <T > eofRecordEvaluator ;
109111
112+ /** Indicating whether the SourceReader supports rate limiting or not. */
113+ private final boolean rateLimitingEnabled ;
114+
115+ /** The {@link RateLimiter} uses for rate limiting. */
116+ @ Nullable private final RateLimiter <SplitT > rateLimiter ;
117+
118+ /** Future that tracks the result of acquiring permission from {@link #rateLimiter}. */
119+ @ Nullable private CompletableFuture <Void > rateLimitPermissionFuture ;
120+
110121 /**
111122 * The primary constructor for the source reader.
112123 *
@@ -118,7 +129,7 @@ public SourceReaderBase(
118129 RecordEmitter <E , T , SplitStateT > recordEmitter ,
119130 Configuration config ,
120131 SourceReaderContext context ) {
121- this (splitFetcherManager , recordEmitter , null , config , context );
132+ this (splitFetcherManager , recordEmitter , null , config , context , null );
122133 }
123134
124135 public SourceReaderBase (
@@ -127,6 +138,16 @@ public SourceReaderBase(
127138 @ Nullable RecordEvaluator <T > eofRecordEvaluator ,
128139 Configuration config ,
129140 SourceReaderContext context ) {
141+ this (splitFetcherManager , recordEmitter , eofRecordEvaluator , config , context , null );
142+ }
143+
144+ public SourceReaderBase (
145+ SplitFetcherManager <E , SplitT > splitFetcherManager ,
146+ RecordEmitter <E , T , SplitStateT > recordEmitter ,
147+ @ Nullable RecordEvaluator <T > eofRecordEvaluator ,
148+ Configuration config ,
149+ SourceReaderContext context ,
150+ @ Nullable RateLimiterStrategy <SplitT > rateLimiterStrategy ) {
130151 this .elementsQueue = splitFetcherManager .getQueue ();
131152 this .splitFetcherManager = splitFetcherManager ;
132153 this .recordEmitter = recordEmitter ;
@@ -136,8 +157,17 @@ public SourceReaderBase(
136157 this .context = context ;
137158 this .noMoreSplitsAssignment = false ;
138159 this .eofRecordEvaluator = eofRecordEvaluator ;
139-
160+ this . rateLimitingEnabled = rateLimiterStrategy != null ;
140161 numRecordsInCounter = context .metricGroup ().getIOMetricGroup ().getNumRecordsInCounter ();
162+
163+ rateLimiter =
164+ rateLimitingEnabled
165+ ? rateLimiterStrategy .createRateLimiter (context .currentParallelism ())
166+ : null ;
167+ LOG .info (
168+ "Rate limiting of SourceReader is {}" ,
169+ rateLimitingEnabled ? "enabled" : "disabled" );
170+ rateLimitPermissionFuture = CompletableFuture .completedFuture (null );
141171 }
142172
143173 @ Override
@@ -153,16 +183,66 @@ public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
153183 return trace (finishedOrAvailableLater ());
154184 }
155185 }
186+ if (rateLimitingEnabled ) {
187+ return pollNextWithRateLimiting (recordsWithSplitId , output );
188+ } else {
189+ return pollNextWithoutRateLimiting (recordsWithSplitId , output );
190+ }
191+ }
192+
193+ private InputStatus pollNextWithoutRateLimiting (
194+ RecordsWithSplitIds <E > recordsWithSplitId , ReaderOutput <T > output ) throws Exception {
195+ // we need to loop here, in case we may have to go across splits
196+ while (true ) {
197+ // Process one record.
198+ final E record = recordsWithSplitId .nextRecordFromSplit ();
199+ if (record != null ) {
200+ // emit the record.
201+ numRecordsInCounter .inc (1 );
202+ recordEmitter .emitRecord (record , currentSplitOutput , currentSplitContext .state );
203+ LOG .trace ("Emitted record: {}" , record );
204+ // We always emit MORE_AVAILABLE here, even though we do not strictly know whether
205+ // more is available. If nothing more is available, the next invocation will find
206+ // this out and return the correct status.
207+ // That means we emit the occasional 'false positive' for availability, but this
208+ // saves us doing checks for every record. Ultimately, this is cheaper.
209+ return trace (InputStatus .MORE_AVAILABLE );
210+ } else if (!moveToNextSplit (recordsWithSplitId , output )) {
211+ // The fetch is done and we just discovered that and have not emitted anything, yet.
212+ // We need to move to the next fetch. As a shortcut, we call pollNext() here again,
213+ // rather than emitting nothing and waiting for the caller to call us again.
214+ return pollNext (output );
215+ }
216+ }
217+ }
156218
157- // we need to loop here, because we may have to go across splits
219+ private InputStatus pollNextWithRateLimiting (
220+ RecordsWithSplitIds <E > recordsWithSplitId , ReaderOutput <T > output ) throws Exception {
221+ // make sure we have a fetch we are working on, or move to the next
158222 while (true ) {
223+ // Check if the previous record count reached the limit of rateLimiter.
224+ if (!rateLimitPermissionFuture .isDone ()) {
225+ return trace (InputStatus .MORE_AVAILABLE );
226+ }
159227 // Process one record.
160228 final E record = recordsWithSplitId .nextRecordFromSplit ();
161229 if (record != null ) {
162230 // emit the record.
163231 numRecordsInCounter .inc (1 );
164232 recordEmitter .emitRecord (record , currentSplitOutput , currentSplitContext .state );
165233 LOG .trace ("Emitted record: {}" , record );
234+ RateLimitingSourceOutputWrapper <T > rateLimitingSourceOutputWrapper =
235+ (RateLimitingSourceOutputWrapper <T >) currentSplitOutput ;
236+ if (rateLimitingSourceOutputWrapper .getRecordsOfCurrentWindow () > 0 ) {
237+ // Acquire permit from rateLimiter.
238+ rateLimitPermissionFuture =
239+ rateLimiter
240+ .acquire (
241+ rateLimitingSourceOutputWrapper
242+ .getRecordsOfCurrentWindow ())
243+ .toCompletableFuture ();
244+ }
245+ rateLimitingSourceOutputWrapper .resetWindowRecordCount ();
166246
167247 // We always emit MORE_AVAILABLE here, even though we do not strictly know whether
168248 // more is available. If nothing more is available, the next invocation will find
@@ -245,7 +325,14 @@ record -> {
245325 return true ;
246326 };
247327 }
248- currentSplitOutput = currentSplitContext .getOrCreateSplitOutput (output , eofRecordHandler );
328+ if (rateLimitingEnabled ) {
329+ rateLimiter .notifyAddingSplit (
330+ this .toSplitType (
331+ this .currentSplitContext .splitId , this .currentSplitContext .state ));
332+ }
333+ currentSplitOutput =
334+ currentSplitContext .getOrCreateSplitOutput (
335+ output , eofRecordHandler , rateLimitingEnabled );
249336 LOG .trace ("Emitting records from fetch for split {}" , nextSplitId );
250337 return true ;
251338 }
@@ -264,6 +351,16 @@ public List<SplitT> snapshotState(long checkpointId) {
264351 return splits ;
265352 }
266353
354+ @ Override
355+ public void notifyCheckpointComplete (long checkpointId ) throws Exception {
356+ splitStates .forEach (
357+ (id , context ) -> {
358+ if (rateLimitingEnabled ) {
359+ rateLimiter .notifyCheckpointComplete (checkpointId );
360+ }
361+ });
362+ }
363+
267364 @ Override
268365 public void addSplits (List <SplitT > splits ) {
269366 LOG .info ("Adding split(s) to reader: {}" , splits );
@@ -364,7 +461,9 @@ private SplitContext(String splitId, SplitStateT state) {
364461 }
365462
366463 SourceOutput <T > getOrCreateSplitOutput (
367- ReaderOutput <T > mainOutput , @ Nullable Function <T , Boolean > eofRecordHandler ) {
464+ ReaderOutput <T > mainOutput ,
465+ @ Nullable Function <T , Boolean > eofRecordHandler ,
466+ boolean rateLimitingEnabled ) {
368467 if (sourceOutput == null ) {
369468 // The split output should have been created when AddSplitsEvent was processed in
370469 // SourceOperator. Here we just use this method to get the previously created
@@ -373,6 +472,9 @@ SourceOutput<T> getOrCreateSplitOutput(
373472 if (eofRecordHandler != null ) {
374473 sourceOutput = new SourceOutputWrapper <>(sourceOutput , eofRecordHandler );
375474 }
475+ if (rateLimitingEnabled ) {
476+ sourceOutput = new RateLimitingSourceOutputWrapper <>(sourceOutput );
477+ }
376478 }
377479 return sourceOutput ;
378480 }
@@ -435,4 +537,73 @@ private boolean isEndOfStreamReached(T record) {
435537 return isStreamEnd ;
436538 }
437539 }
540+
541+ /**
542+ * A wrapper around {@link SourceOutput} that counts the number of records during the current
543+ * rate-limiting window.
544+ *
545+ * <p>This wrapper is used when rate limiting is enabled to track how many records have been
546+ * emitted since the last rate limit check, allowing the reader to properly apply backpressure
547+ * when the rate limit is exceeded.
548+ *
549+ * @param <T> The type of records being emitted
550+ */
551+ private static final class RateLimitingSourceOutputWrapper <T > implements SourceOutput <T > {
552+ /** The underlying source output to delegate to. */
553+ final SourceOutput <T > sourceOutput ;
554+
555+ /** Number of records handled during the current rate-limiting window. */
556+ private int recordsOfCurrentWindow ;
557+
558+ /**
559+ * Creates a new RecordCountingSourceOutputWrapper.
560+ *
561+ * @param sourceOutput The underlying source output to wrap
562+ */
563+ public RateLimitingSourceOutputWrapper (SourceOutput <T > sourceOutput ) {
564+ this .sourceOutput = sourceOutput ;
565+ this .recordsOfCurrentWindow = 0 ;
566+ }
567+
568+ @ Override
569+ public void emitWatermark (Watermark watermark ) {
570+ sourceOutput .emitWatermark (watermark );
571+ }
572+
573+ @ Override
574+ public void markIdle () {
575+ sourceOutput .markIdle ();
576+ }
577+
578+ @ Override
579+ public void markActive () {
580+ sourceOutput .markActive ();
581+ }
582+
583+ @ Override
584+ public void collect (T record ) {
585+ sourceOutput .collect (record );
586+ recordsOfCurrentWindow ++;
587+ }
588+
589+ @ Override
590+ public void collect (T record , long timestamp ) {
591+ sourceOutput .collect (record , timestamp );
592+ recordsOfCurrentWindow ++;
593+ }
594+
595+ /**
596+ * Gets the recordsOfCurrentWindow.
597+ *
598+ * @return the number of current window.
599+ */
600+ public int getRecordsOfCurrentWindow () {
601+ return recordsOfCurrentWindow ;
602+ }
603+
604+ /** Resets the recordsOfCurrentWindow to 0. */
605+ public void resetWindowRecordCount () {
606+ recordsOfCurrentWindow = 0 ;
607+ }
608+ }
438609}
0 commit comments