Skip to content

Commit fda6bf6

Browse files
龙三gaoyunhaii
龙三
authored andcommitted
[FLINK-25072][streaming] Introduce description on Transformation and StreamGraph
This closes apache#17924.
1 parent ac957ae commit fda6bf6

File tree

9 files changed

+190
-1
lines changed

9 files changed

+190
-1
lines changed

flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java

+12
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ public static int getNewNodeId() {
119119

120120
protected String name;
121121

122+
protected String description;
123+
122124
protected TypeInformation<T> outputType;
123125
// This is used to handle MissingTypeInfo. As long as the outputType has not been queried
124126
// it can still be changed using setOutputType(). Afterwards an exception is thrown when
@@ -203,6 +205,16 @@ public String getName() {
203205
return name;
204206
}
205207

208+
/** Changes the description of this {@code Transformation}. */
209+
public void setDescription(String description) {
210+
this.description = Preconditions.checkNotNull(description);
211+
}
212+
213+
/** Returns the description of this {@code Transformation}. */
214+
public String getDescription() {
215+
return description;
216+
}
217+
206218
/** Returns the parallelism of this {@code Transformation}. */
207219
public int getParallelism() {
208220
return parallelism;

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java

+18
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,24 @@ public DataStreamSink<T> setParallelism(int parallelism) {
146146
return this;
147147
}
148148

149+
/**
150+
* Sets the description for this sink.
151+
*
152+
* <p>Description is used in json plan and web ui, but not in logging and metrics where only
153+
* name is available. Description is expected to provide detailed information about the sink,
154+
* while name is expected to be more simple, providing summary information only, so that we can
155+
* have more user-friendly logging messages and metric tags without losing useful messages for
156+
* debugging.
157+
*
158+
* @param description The description for this sink.
159+
* @return The sink with new description.
160+
*/
161+
@PublicEvolving
162+
public DataStreamSink<T> setDescription(String description) {
163+
transformation.setDescription(description);
164+
return this;
165+
}
166+
149167
// ---------------------------------------------------------------------------
150168
// Fine-grained resource profiles are an incomplete work-in-progress feature
151169
// The setters are hence private at this point.

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java

+18
Original file line numberDiff line numberDiff line change
@@ -419,4 +419,22 @@ public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
419419
new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
420420
return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
421421
}
422+
423+
/**
424+
* Sets the description for this operation.
425+
*
426+
* <p>Description is used in json plan and web ui, but not in logging and metrics where only
427+
* name is available. Description is expected to provide detailed information about the sink,
428+
* while name is expected to be more simple, providing summary information only, so that we can
429+
* have more user-friendly logging messages and metric tags without losing useful messages for
430+
* debugging.
431+
*
432+
* @param description The description for this operation.
433+
* @return The operation with new description.
434+
*/
435+
@PublicEvolving
436+
public SingleOutputStreamOperator<T> setDescription(String description) {
437+
transformation.setDescription(description);
438+
return this;
439+
}
422440
}

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ private void decorateNode(Integer vertexID, ObjectNode node) {
182182
node.put(PACT, "Operator");
183183
}
184184

185-
node.put(CONTENTS, vertex.getOperatorName());
185+
node.put(CONTENTS, vertex.getOperatorDescription());
186186

187187
node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
188188
}

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java

+3
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ private void configure(final T transformation, final Context context) {
124124
streamNode.setManagedMemoryUseCaseWeights(
125125
transformation.getManagedMemoryOperatorScopeUseCaseWeights(),
126126
transformation.getManagedMemorySlotScopeUseCases());
127+
if (null != transformation.getDescription()) {
128+
streamNode.setOperatorDescription(transformation.getDescription());
129+
}
127130
}
128131
}
129132
}

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java

+10
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class StreamNode {
6565
private final Set<ManagedMemoryUseCase> managedMemorySlotScopeUseCases = new HashSet<>();
6666
private long bufferTimeout;
6767
private final String operatorName;
68+
private String operatorDescription;
6869
private @Nullable String slotSharingGroup;
6970
private @Nullable String coLocationGroup;
7071
private KeySelector<?, ?>[] statePartitioners = new KeySelector[0];
@@ -113,6 +114,7 @@ public StreamNode(
113114
Class<? extends TaskInvokable> jobVertexClass) {
114115
this.id = id;
115116
this.operatorName = operatorName;
117+
this.operatorDescription = operatorName;
116118
this.operatorFactory = operatorFactory;
117119
this.jobVertexClass = jobVertexClass;
118120
this.slotSharingGroup = slotSharingGroup;
@@ -242,6 +244,14 @@ public String getOperatorName() {
242244
return operatorName;
243245
}
244246

247+
public String getOperatorDescription() {
248+
return operatorDescription;
249+
}
250+
251+
public void setOperatorDescription(String operatorDescription) {
252+
this.operatorDescription = operatorDescription;
253+
}
254+
245255
public void setSerializersIn(TypeSerializer<?>... typeSerializersIn) {
246256
checkArgument(typeSerializersIn.length > 0);
247257
this.typeSerializersIn = typeSerializersIn;

flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java

+78
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,84 @@ public void processElement(
11041104
});
11051105
}
11061106

1107+
/**
1108+
* Tests {@link SingleOutputStreamOperator#setDescription(String)} functionality.
1109+
*
1110+
* @throws Exception
1111+
*/
1112+
@Test
1113+
public void testUserDefinedDescription() {
1114+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1115+
1116+
DataStream<Long> dataStream1 =
1117+
env.generateSequence(0, 0)
1118+
.name("testSource1")
1119+
.setDescription("this is test source 1")
1120+
.map(
1121+
new MapFunction<Long, Long>() {
1122+
@Override
1123+
public Long map(Long value) throws Exception {
1124+
return null;
1125+
}
1126+
})
1127+
.name("testMap")
1128+
.setDescription("this is test map 1");
1129+
1130+
DataStream<Long> dataStream2 =
1131+
env.generateSequence(0, 0)
1132+
.name("testSource2")
1133+
.setDescription("this is test source 2")
1134+
.map(
1135+
new MapFunction<Long, Long>() {
1136+
@Override
1137+
public Long map(Long value) throws Exception {
1138+
return null;
1139+
}
1140+
})
1141+
.name("testMap")
1142+
.setDescription("this is test map 2");
1143+
1144+
dataStream1
1145+
.connect(dataStream2)
1146+
.flatMap(
1147+
new CoFlatMapFunction<Long, Long, Long>() {
1148+
1149+
@Override
1150+
public void flatMap1(Long value, Collector<Long> out)
1151+
throws Exception {}
1152+
1153+
@Override
1154+
public void flatMap2(Long value, Collector<Long> out)
1155+
throws Exception {}
1156+
})
1157+
.name("testCoFlatMap")
1158+
.setDescription("this is test co flat map")
1159+
.windowAll(GlobalWindows.create())
1160+
.trigger(PurgingTrigger.of(CountTrigger.of(10)))
1161+
.reduce(
1162+
new ReduceFunction<Long>() {
1163+
private static final long serialVersionUID = 1L;
1164+
1165+
@Override
1166+
public Long reduce(Long value1, Long value2) throws Exception {
1167+
return null;
1168+
}
1169+
})
1170+
.name("testWindowReduce")
1171+
.setDescription("this is test window reduce")
1172+
.print();
1173+
1174+
// test functionality through the operator names in the execution plan
1175+
String plan = env.getExecutionPlan();
1176+
1177+
assertTrue(plan.contains("this is test source 1"));
1178+
assertTrue(plan.contains("this is test source 2"));
1179+
assertTrue(plan.contains("this is test map 1"));
1180+
assertTrue(plan.contains("this is test map 2"));
1181+
assertTrue(plan.contains("this is test co flat map"));
1182+
assertTrue(plan.contains("this is test window reduce"));
1183+
}
1184+
11071185
private abstract static class CustomWmEmitter<T>
11081186
implements AssignerWithPunctuatedWatermarks<T> {
11091187

flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala

+18
Original file line numberDiff line numberDiff line change
@@ -1225,4 +1225,22 @@ class DataStream[T](stream: JavaStream[T]) {
12251225
operator: OneInputStreamOperator[T, R]): DataStream[R] = {
12261226
asScalaStream(stream.transform(operatorName, implicitly[TypeInformation[R]], operator))
12271227
}
1228+
1229+
/**
1230+
* Sets the description of this data stream.
1231+
*
1232+
* <p>Description is used in json plan and web ui, but not in logging and metrics where only
1233+
* name is available. Description is expected to provide detailed information about
1234+
* this operation, while name is expected to be more simple, providing summary information only,
1235+
* so that we can have more user-friendly logging messages and metric tags
1236+
* without losing useful messages for debugging.
1237+
*
1238+
* @return The operator with new description
1239+
*/
1240+
@PublicEvolving
1241+
def setDescription(description: String) : DataStream[T] = stream match {
1242+
case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.setDescription(description))
1243+
case _ => throw new UnsupportedOperationException("Only supported for operators.")
1244+
this
1245+
}
12281246
}

flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala

+32
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,38 @@ class DataStreamTest extends AbstractTestBase {
9797
assert(plan contains "testWindowReduce")
9898
}
9999

100+
@Test
101+
def testUserDefinedDescription(): Unit = {
102+
val env = StreamExecutionEnvironment.getExecutionEnvironment
103+
val dataStream1 = env.generateSequence(0, 0)
104+
.setDescription("this is test source 1")
105+
.map(x => x)
106+
.setDescription("this is test map 1")
107+
val dataStream2 = env.generateSequence(0, 0)
108+
.setDescription("this is test source 2")
109+
.map(x => x)
110+
.setDescription("this is test map 2")
111+
112+
val func: (((Long, Long), (Long, Long)) => (Long, Long)) =
113+
(x: (Long, Long), y: (Long, Long)) => (0L, 0L)
114+
dataStream1.connect(dataStream2)
115+
.flatMap({ (in, out: Collector[(Long, Long)]) => }, { (in, out: Collector[(Long, Long)]) => })
116+
.setDescription("this is test co flat map")
117+
.windowAll(GlobalWindows.create)
118+
.trigger(PurgingTrigger.of(CountTrigger.of[GlobalWindow](10)))
119+
.reduce(func)
120+
.setDescription("this is test window reduce")
121+
.print
122+
// test functionality through the operator names in the execution plan
123+
val plan = env.getExecutionPlan
124+
assertTrue(plan.contains("this is test source 1"))
125+
assertTrue(plan.contains("this is test source 2"))
126+
assertTrue(plan.contains("this is test map 1"))
127+
assertTrue(plan.contains("this is test map 2"))
128+
assertTrue(plan.contains("this is test co flat map"))
129+
assertTrue(plan.contains("this is test window reduce"))
130+
}
131+
100132
/**
101133
* Tests that [[DataStream.keyBy]] and [[DataStream.partitionCustom]] result in
102134
* different and correct topologies. Does the some for the [[ConnectedStreams]].

0 commit comments

Comments
 (0)