Skip to content

Conversation

yzeng1618
Copy link
Contributor

@yzeng1618 yzeng1618 commented Jul 15, 2025

#9513

Purpose of this pull request

This pull request fixes the Flink 1.20 compatibility issue in the SeaTunnel Flink translation layer. The current implementation uses reflection to access internal StreamingRuntimeContext from SourceReaderContext, which is fragile and may break with Flink version updates.

Does this PR introduce any user-facing change?

No. This is an internal implementation fix that maintains the same public API behavior. Users will continue to use the same SeaTunnel connector APIs without any changes. The metrics functionality remains identical from the user perspective.

How was this patch tested?

Manual Testing:
Built SeaTunnel with Flink 1.20 dependencies
Ran sample connector jobs to verify metrics collection
Confirmed no reflection-related errors in logs
Validated metrics appear correctly in Flink Web UI

Check list

@github-actions github-actions bot added core SeaTunnel core module flink api labels Jul 15, 2025
@Hisoka-X
Copy link
Member

cc @TyrantLucifer

@Hisoka-X Hisoka-X changed the title [improve][start] Seatunnel version support 1.20.1 [Feature][Flink] Add flink version 1.20.1 support Jul 15, 2025
@TyrantLucifer
Copy link
Member

Good pr

@nielifeng nielifeng requested a review from Copilot July 17, 2025 09:46
Copilot

This comment was marked as outdated.

@yzeng1618 yzeng1618 requested a review from TyrantLucifer July 19, 2025 10:57
@yzeng1618 yzeng1618 requested a review from Carl-Zhou-CN July 27, 2025 09:50
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions github-actions bot added the e2e label Aug 3, 2025
@yzeng1618
Copy link
Contributor Author

We need new flink 1.20.1 test container to verify it. https://github.com/apache/seatunnel/tree/dev/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink

The test container for Flink 1.20 has been added.

@yzeng1618 yzeng1618 requested a review from Copilot September 8, 2025 01:29
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for Flink 1.20.1 to SeaTunnel's translation layer by creating new Flink 1.20-specific implementations that use Flink's official sink2 API instead of the fragile reflection-based approach. The changes ensure compatibility while maintaining the same public API for users.

  • Creates Flink 1.20 translation layer with modern sink2 API integration
  • Adds comprehensive Flink 1.20 starter components and build configuration
  • Updates E2E testing infrastructure to include Flink 1.20 support

Reviewed Changes

Copilot reviewed 49 out of 51 changed files in this pull request and generated 3 comments.

File Description
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-20/ Complete Flink 1.20 translation layer implementation using sink2 API
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-20-starter/ Flink 1.20 starter implementation with refactored architecture
seatunnel-e2e/ E2E test infrastructure updates for Flink 1.20 compatibility
seatunnel-dist/ Distribution assembly configuration for Flink 1.20 components

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Copy link
Contributor Author

@yzeng1618 yzeng1618 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been replied to now.

import java.util.stream.Collectors;
import java.util.stream.Stream;

public class FlinkExecution implements TaskExecution {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there is additional logic for the createFlink20JobMetricsSummary method in FlinkExecution

@yzeng1618
Copy link
Contributor Author

Carl-Zhou-CN
Carl-Zhou-CN previously approved these changes Sep 24, 2025
Copy link
Member

@Carl-Zhou-CN Carl-Zhou-CN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@yzeng1618
Copy link
Contributor Author

LGTM

6243a6bccb385914b46c9ab4d9e9eabe Because this commit is merged into dev, we need to resolve conflicts and then re-run the CI process. Please approve it again.

}

if (metricGroup == null) {
Counter noOpCounter = new NoOpCounter();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need NoOpCounter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need NoOpCounter?

When metricGroup == null (e.g., in scenarios such as certain test cases, initialization phases, or when the context is not in the Flink Runtime environment), returning a NoOp (No Operation) instance prevents NullPointerException and ensures the business logic remains operational.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's werid. Why other flink version does not need this?

@yzeng1618 yzeng1618 requested a review from Hisoka-X September 28, 2025 06:08
}

@Override
protected FlinkJobMetricsSummary createJobMetricsSummary(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After remove logs, I found this method same as method in common module. We shoud do some refactor.

return accumulator;
}

public void sync() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's for?

Comment on lines +32 to +36
private final org.apache.flink.metrics.Counter flinkCounter;
private final LongCounter accumulator;
private final RuntimeContext runtimeContext;
private final java.util.concurrent.atomic.AtomicLong localCount =
new java.util.concurrent.atomic.AtomicLong(0L);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need two fields to store value?

}

private String getStandardAccumulatorName(String originalName) {
if (originalName.contains("SinkWriteCount")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as #9576 (comment)

}

if (metricGroup == null) {
Counter noOpCounter = new NoOpCounter();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's werid. Why other flink version does not need this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants