Skip to content

Commit e0050e1

Browse files
committed
[Feature][API] Add markdown parser #9714
1 parent ad5278c commit e0050e1

File tree

6 files changed

+552
-10
lines changed

6 files changed

+552
-10
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
<jaxen.version>2.0.0</jaxen.version>
4242
<easyexcel.version>4.0.3</easyexcel.version>
4343
<fastexcel-reader.version>0.18.4</fastexcel-reader.version>
44+
<flexmark-all.version>0.62.2</flexmark-all.version>
4445
</properties>
4546

4647
<dependencyManagement>
@@ -179,6 +180,11 @@
179180
<version>${easyexcel.version}</version>
180181
</dependency>
181182

183+
<dependency>
184+
<groupId>com.vladsch.flexmark</groupId>
185+
<artifactId>flexmark-all</artifactId>
186+
<version>${flexmark-all.version}</version>
187+
</dependency>
182188
</dependencies>
183189

184190
<build>

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,6 @@
3030
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.TextWriteStrategy;
3131
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
3232
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.XmlWriteStrategy;
33-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy;
34-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.CsvReadStrategy;
35-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy;
36-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
37-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
38-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
39-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
40-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.TextReadStrategy;
41-
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.XmlReadStrategy;
4233

4334
import java.io.Serializable;
4435
import java.util.Arrays;
@@ -167,7 +158,20 @@ public ReadStrategy getReadStrategy() {
167158
throw new UnsupportedOperationException(
168159
"File format 'maxwell_json' does not support reading.");
169160
}
170-
};
161+
},
162+
MARKDOWN("md", "markdown") {
163+
@Override
164+
public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
165+
throw new UnsupportedOperationException(
166+
"File format 'markdown' does not support writing.");
167+
}
168+
169+
@Override
170+
public ReadStrategy getReadStrategy() {
171+
return new MarkdownReadStrategy();
172+
}
173+
},
174+
;
171175

172176
private final String[] suffix;
173177

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
19+
20+
import org.apache.seatunnel.api.source.Collector;
21+
import org.apache.seatunnel.api.table.type.BasicType;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
24+
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
25+
26+
import com.vladsch.flexmark.ext.tables.TableBlock;
27+
import com.vladsch.flexmark.ext.tables.TableCell;
28+
import com.vladsch.flexmark.ext.tables.TableRow;
29+
import com.vladsch.flexmark.parser.Parser;
30+
import com.vladsch.flexmark.util.ast.Node;
31+
import lombok.extern.slf4j.Slf4j;
32+
33+
import java.io.IOException;
34+
import java.nio.file.Files;
35+
import java.nio.file.Paths;
36+
37+
@Slf4j
38+
public class MarkdownReadStrategy extends AbstractReadStrategy {
39+
40+
private static final int DEFAULT_PAGE_NUMBER = 1;
41+
private static final int DEFAULT_POSITION = 1;
42+
43+
private static class NodeInfo {
44+
String elementId;
45+
String parentId;
46+
List<String> childIds = new ArrayList<>();
47+
int positionIndex;
48+
49+
NodeInfo(String elementId, String parentId, int positionIndex) {
50+
this.elementId = elementId;
51+
this.parentId = parentId;
52+
this.positionIndex = positionIndex;
53+
}
54+
}
55+
56+
@Override
57+
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
58+
throws IOException, FileConnectorException {
59+
String markdown = new String(Files.readAllBytes(Paths.get(path)));
60+
Parser parser = Parser.builder().build();
61+
Node document = parser.parse(markdown);
62+
63+
Map<Node, NodeInfo> nodeInfoMap = new IdentityHashMap<>();
64+
Map<String, Integer> typeCounters = new HashMap<>();
65+
List<SeaTunnelRow> rows = new ArrayList<>();
66+
67+
assignIdsAndCollectTree(document, null, nodeInfoMap, DEFAULT_POSITION, typeCounters);
68+
generateRows(document, rows, nodeInfoMap, DEFAULT_PAGE_NUMBER);
69+
70+
for (SeaTunnelRow row : rows) {
71+
output.collect(row);
72+
}
73+
}
74+
75+
private void assignIdsAndCollectTree(
76+
Node node,
77+
Node parent,
78+
Map<Node, NodeInfo> nodeInfoMap,
79+
int position,
80+
Map<String, Integer> typeCounters) {
81+
String elementType = node.getClass().getSimpleName();
82+
String elementId = null;
83+
84+
if (isEligibleForRow(node)) {
85+
int count = typeCounters.getOrDefault(elementType, 0) + 1;
86+
typeCounters.put(elementType, count);
87+
elementId = elementType + "_" + count;
88+
}
89+
90+
String parentId = parent == null ? null : nodeInfoMap.get(parent).elementId;
91+
NodeInfo nodeInfo = new NodeInfo(elementId, parentId, position);
92+
nodeInfoMap.put(node, nodeInfo);
93+
94+
int childPosition = 1;
95+
for (Node child = node.getFirstChild(); child != null; child = child.getNext()) {
96+
assignIdsAndCollectTree(child, node, nodeInfoMap, childPosition++, typeCounters);
97+
NodeInfo childInfo = nodeInfoMap.get(child);
98+
if (childInfo.elementId != null) {
99+
nodeInfo.childIds.add(childInfo.elementId);
100+
}
101+
}
102+
}
103+
104+
private void generateRows(
105+
Node node, List<SeaTunnelRow> rows, Map<Node, NodeInfo> nodeInfoMap, int pageNumber) {
106+
if (isEligibleForRow(node)) {
107+
NodeInfo nodeInfo = nodeInfoMap.get(node);
108+
String elementType = node.getClass().getSimpleName();
109+
Integer headingLevel = null;
110+
String text = extractValue(node);
111+
112+
if (node instanceof Heading) {
113+
headingLevel = ((Heading) node).getLevel();
114+
}
115+
116+
rows.add(
117+
new SeaTunnelRow(
118+
new Object[] {
119+
nodeInfo.elementId,
120+
elementType,
121+
headingLevel,
122+
text,
123+
pageNumber,
124+
nodeInfo.positionIndex,
125+
nodeInfo.parentId,
126+
nodeInfo.childIds.isEmpty()
127+
? null
128+
: String.join(",", nodeInfo.childIds)
129+
}));
130+
log.debug(
131+
"Added row: element_id={} type={} heading_level={} text={} parent_id={} child_ids={}",
132+
nodeInfo.elementId,
133+
elementType,
134+
headingLevel,
135+
text,
136+
nodeInfo.parentId,
137+
nodeInfo.childIds);
138+
}
139+
140+
for (Node child = node.getFirstChild(); child != null; child = child.getNext()) {
141+
generateRows(child, rows, nodeInfoMap, pageNumber);
142+
}
143+
}
144+
145+
private boolean isEligibleForRow(Node node) {
146+
if (node instanceof Paragraph) {
147+
Node parent = node.getParent();
148+
if (parent instanceof ListItem || parent instanceof BlockQuote) {
149+
return false;
150+
}
151+
}
152+
153+
return node instanceof Heading
154+
|| node instanceof Paragraph
155+
|| node instanceof ListItem
156+
|| node instanceof BulletList
157+
|| node instanceof OrderedList
158+
|| node instanceof BlockQuote
159+
|| node instanceof FencedCodeBlock
160+
|| node instanceof TableBlock;
161+
}
162+
163+
private String extractValue(Node node) {
164+
if (node instanceof ListItem) {
165+
return extractTextFromChildren(node);
166+
} else if (node instanceof Heading || node instanceof Paragraph) {
167+
return extractTextFromChildren(node);
168+
} else if (node instanceof BulletList) {
169+
return bulletListToString((BulletList) node);
170+
} else if (node instanceof OrderedList) {
171+
return orderedListToString((OrderedList) node);
172+
} else if (node instanceof Code) {
173+
return ((Code) node).getText().toString();
174+
} else if (node instanceof FencedCodeBlock) {
175+
return ((FencedCodeBlock) node).getContentChars().toString();
176+
} else if (node instanceof BlockQuote) {
177+
return extractTextFromChildren(node);
178+
} else if (node instanceof ThematicBreak) {
179+
return "---";
180+
} else if (node instanceof Link) {
181+
return ((Link) node).getUrl().toString();
182+
} else if (node instanceof Image) {
183+
return ((Image) node).getUrl().toString();
184+
} else if (node instanceof TableBlock) {
185+
return tableToString((TableBlock) node);
186+
}
187+
188+
return node.getChars().toString();
189+
}
190+
191+
private String extractTextFromChildren(Node node) {
192+
StringBuilder sb = new StringBuilder();
193+
for (Node child = node.getFirstChild(); child != null; child = child.getNext()) {
194+
sb.append(child.getChars());
195+
}
196+
197+
return sb.toString().trim();
198+
}
199+
200+
private String bulletListToString(BulletList list) {
201+
StringBuilder sb = new StringBuilder();
202+
for (Node item = list.getFirstChild(); item != null; item = item.getNext()) {
203+
if (item instanceof ListItem) {
204+
sb.append("- ").append(extractTextFromChildren(item)).append("\n");
205+
}
206+
}
207+
208+
return sb.toString();
209+
}
210+
211+
private String orderedListToString(OrderedList list) {
212+
StringBuilder sb = new StringBuilder();
213+
int num = 1;
214+
for (Node item = list.getFirstChild(); item != null; item = item.getNext()) {
215+
if (item instanceof ListItem) {
216+
sb.append(num++).append(". ").append(extractTextFromChildren(item)).append("\n");
217+
}
218+
}
219+
220+
return sb.toString();
221+
}
222+
223+
private String tableToString(TableBlock table) {
224+
StringBuilder sb = new StringBuilder();
225+
for (Node row = table.getFirstChild(); row != null; row = row.getNext()) {
226+
if (row instanceof TableRow) {
227+
for (Node cell = row.getFirstChild(); cell != null; cell = cell.getNext()) {
228+
if (cell instanceof TableCell) {
229+
sb.append(((TableCell) cell).getText().toString()).append(" | ");
230+
}
231+
}
232+
sb.append("\n");
233+
}
234+
}
235+
236+
return sb.toString();
237+
}
238+
239+
@Override
240+
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnectorException {
241+
return new SeaTunnelRowType(
242+
new String[] {
243+
"element_id",
244+
"element_type",
245+
"heading_level",
246+
"text",
247+
"page_number",
248+
"position_index",
249+
"parent_id",
250+
"child_ids"
251+
},
252+
new org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
253+
BasicType.STRING_TYPE,
254+
BasicType.STRING_TYPE,
255+
BasicType.INT_TYPE,
256+
BasicType.STRING_TYPE,
257+
BasicType.INT_TYPE,
258+
BasicType.INT_TYPE,
259+
BasicType.STRING_TYPE,
260+
BasicType.STRING_TYPE
261+
});
262+
}
263+
}

0 commit comments

Comments
 (0)