Skip to content

Commit 7a52620

Browse files
committed
[Feature][File] Add markdown parser #9714
1 parent c43d57d commit 7a52620

File tree

6 files changed

+569
-1
lines changed

6 files changed

+569
-1
lines changed

seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
<jaxen.version>2.0.0</jaxen.version>
4242
<easyexcel.version>4.0.3</easyexcel.version>
4343
<fastexcel-reader.version>0.18.4</fastexcel-reader.version>
44+
<flexmark-all.version>0.62.2</flexmark-all.version>
4445
</properties>
4546

4647
<dependencyManagement>
@@ -179,6 +180,11 @@
179180
<version>${easyexcel.version}</version>
180181
</dependency>
181182

183+
<dependency>
184+
<groupId>com.vladsch.flexmark</groupId>
185+
<artifactId>flexmark-all</artifactId>
186+
<version>${flexmark-all.version}</version>
187+
</dependency>
182188
</dependencies>
183189

184190
<build>

seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.CsvReadStrategy;
3535
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy;
3636
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
37+
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.MarkdownReadStrategy;
3738
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
3839
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
3940
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
@@ -167,7 +168,20 @@ public ReadStrategy getReadStrategy() {
167168
throw new UnsupportedOperationException(
168169
"File format 'maxwell_json' does not support reading.");
169170
}
170-
};
171+
},
172+
MARKDOWN("md", "markdown") {
173+
@Override
174+
public WriteStrategy getWriteStrategy(FileSinkConfig fileSinkConfig) {
175+
throw new UnsupportedOperationException(
176+
"File format 'markdown' does not support writing.");
177+
}
178+
179+
@Override
180+
public ReadStrategy getReadStrategy() {
181+
return new MarkdownReadStrategy();
182+
}
183+
},
184+
;
171185

172186
private final String[] suffix;
173187

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
19+
20+
import org.apache.seatunnel.api.source.Collector;
21+
import org.apache.seatunnel.api.table.type.BasicType;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
24+
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
25+
26+
import com.vladsch.flexmark.ast.BlockQuote;
27+
import com.vladsch.flexmark.ast.BulletList;
28+
import com.vladsch.flexmark.ast.Code;
29+
import com.vladsch.flexmark.ast.FencedCodeBlock;
30+
import com.vladsch.flexmark.ast.Heading;
31+
import com.vladsch.flexmark.ast.Image;
32+
import com.vladsch.flexmark.ast.Link;
33+
import com.vladsch.flexmark.ast.ListItem;
34+
import com.vladsch.flexmark.ast.OrderedList;
35+
import com.vladsch.flexmark.ast.Paragraph;
36+
import com.vladsch.flexmark.ast.ThematicBreak;
37+
import com.vladsch.flexmark.ext.tables.TableBlock;
38+
import com.vladsch.flexmark.ext.tables.TableCell;
39+
import com.vladsch.flexmark.ext.tables.TableRow;
40+
import com.vladsch.flexmark.parser.Parser;
41+
import com.vladsch.flexmark.util.ast.Node;
42+
import lombok.extern.slf4j.Slf4j;
43+
44+
import java.io.IOException;
45+
import java.nio.file.Files;
46+
import java.nio.file.Paths;
47+
import java.util.ArrayList;
48+
import java.util.HashMap;
49+
import java.util.IdentityHashMap;
50+
import java.util.List;
51+
import java.util.Map;
52+
53+
@Slf4j
54+
public class MarkdownReadStrategy extends AbstractReadStrategy {
55+
56+
private static final int DEFAULT_PAGE_NUMBER = 1;
57+
private static final int DEFAULT_POSITION = 1;
58+
59+
private static class NodeInfo {
60+
String elementId;
61+
String parentId;
62+
List<String> childIds = new ArrayList<>();
63+
int positionIndex;
64+
65+
NodeInfo(String elementId, String parentId, int positionIndex) {
66+
this.elementId = elementId;
67+
this.parentId = parentId;
68+
this.positionIndex = positionIndex;
69+
}
70+
}
71+
72+
@Override
73+
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
74+
throws IOException, FileConnectorException {
75+
String markdown = new String(Files.readAllBytes(Paths.get(path)));
76+
Parser parser = Parser.builder().build();
77+
Node document = parser.parse(markdown);
78+
79+
Map<Node, NodeInfo> nodeInfoMap = new IdentityHashMap<>();
80+
Map<String, Integer> typeCounters = new HashMap<>();
81+
List<SeaTunnelRow> rows = new ArrayList<>();
82+
83+
assignIdsAndCollectTree(document, null, nodeInfoMap, DEFAULT_POSITION, typeCounters);
84+
generateRows(document, rows, nodeInfoMap, DEFAULT_PAGE_NUMBER);
85+
86+
for (SeaTunnelRow row : rows) {
87+
output.collect(row);
88+
}
89+
}
90+
91+
private void assignIdsAndCollectTree(
92+
Node node,
93+
Node parent,
94+
Map<Node, NodeInfo> nodeInfoMap,
95+
int position,
96+
Map<String, Integer> typeCounters) {
97+
String elementType = node.getClass().getSimpleName();
98+
String elementId = null;
99+
100+
if (isEligibleForRow(node)) {
101+
int count = typeCounters.getOrDefault(elementType, 0) + 1;
102+
typeCounters.put(elementType, count);
103+
elementId = elementType + "_" + count;
104+
}
105+
106+
String parentId = parent == null ? null : nodeInfoMap.get(parent).elementId;
107+
NodeInfo nodeInfo = new NodeInfo(elementId, parentId, position);
108+
nodeInfoMap.put(node, nodeInfo);
109+
110+
int childPosition = 1;
111+
for (Node child = node.getFirstChild(); child != null; child = child.getNext()) {
112+
assignIdsAndCollectTree(child, node, nodeInfoMap, childPosition++, typeCounters);
113+
NodeInfo childInfo = nodeInfoMap.get(child);
114+
if (childInfo.elementId != null) {
115+
nodeInfo.childIds.add(childInfo.elementId);
116+
}
117+
}
118+
}
119+
120+
private void generateRows(
121+
Node node, List<SeaTunnelRow> rows, Map<Node, NodeInfo> nodeInfoMap, int pageNumber) {
122+
if (isEligibleForRow(node)) {
123+
NodeInfo nodeInfo = nodeInfoMap.get(node);
124+
String elementType = node.getClass().getSimpleName();
125+
Integer headingLevel = null;
126+
String text = extractValue(node);
127+
128+
if (node instanceof Heading) {
129+
headingLevel = ((Heading) node).getLevel();
130+
}
131+
132+
rows.add(
133+
new SeaTunnelRow(
134+
new Object[] {
135+
nodeInfo.elementId,
136+
elementType,
137+
headingLevel,
138+
text,
139+
pageNumber,
140+
nodeInfo.positionIndex,
141+
nodeInfo.parentId,
142+
nodeInfo.childIds.isEmpty()
143+
? null
144+
: String.join(",", nodeInfo.childIds)
145+
}));
146+
log.debug(
147+
"Added row: element_id={} type={} heading_level={} text={} parent_id={} child_ids={}",
148+
nodeInfo.elementId,
149+
elementType,
150+
headingLevel,
151+
text,
152+
nodeInfo.parentId,
153+
nodeInfo.childIds);
154+
}
155+
156+
for (Node child = node.getFirstChild(); child != null; child = child.getNext()) {
157+
generateRows(child, rows, nodeInfoMap, pageNumber);
158+
}
159+
}
160+
161+
private boolean isEligibleForRow(Node node) {
162+
if (node instanceof Paragraph) {
163+
Node parent = node.getParent();
164+
if (parent instanceof ListItem || parent instanceof BlockQuote) {
165+
return false;
166+
}
167+
}
168+
169+
return node instanceof Heading
170+
|| node instanceof Paragraph
171+
|| node instanceof ListItem
172+
|| node instanceof BulletList
173+
|| node instanceof OrderedList
174+
|| node instanceof BlockQuote
175+
|| node instanceof FencedCodeBlock
176+
|| node instanceof TableBlock;
177+
}
178+
179+
private String extractValue(Node node) {
180+
if (node instanceof ListItem) {
181+
return extractTextFromChildren(node);
182+
} else if (node instanceof Heading || node instanceof Paragraph) {
183+
return extractTextFromChildren(node);
184+
} else if (node instanceof BulletList) {
185+
return bulletListToString((BulletList) node);
186+
} else if (node instanceof OrderedList) {
187+
return orderedListToString((OrderedList) node);
188+
} else if (node instanceof Code) {
189+
return ((Code) node).getText().toString();
190+
} else if (node instanceof FencedCodeBlock) {
191+
return ((FencedCodeBlock) node).getContentChars().toString();
192+
} else if (node instanceof BlockQuote) {
193+
return extractTextFromChildren(node);
194+
} else if (node instanceof ThematicBreak) {
195+
return "---";
196+
} else if (node instanceof Link) {
197+
return ((Link) node).getUrl().toString();
198+
} else if (node instanceof Image) {
199+
return ((Image) node).getUrl().toString();
200+
} else if (node instanceof TableBlock) {
201+
return tableToString((TableBlock) node);
202+
}
203+
204+
return node.getChars().toString();
205+
}
206+
207+
private String extractTextFromChildren(Node node) {
208+
StringBuilder sb = new StringBuilder();
209+
for (Node child = node.getFirstChild(); child != null; child = child.getNext()) {
210+
sb.append(child.getChars());
211+
}
212+
213+
return sb.toString().trim();
214+
}
215+
216+
private String bulletListToString(BulletList list) {
217+
StringBuilder sb = new StringBuilder();
218+
for (Node item = list.getFirstChild(); item != null; item = item.getNext()) {
219+
if (item instanceof ListItem) {
220+
sb.append("- ").append(extractTextFromChildren(item)).append("\n");
221+
}
222+
}
223+
224+
return sb.toString();
225+
}
226+
227+
private String orderedListToString(OrderedList list) {
228+
StringBuilder sb = new StringBuilder();
229+
int num = 1;
230+
for (Node item = list.getFirstChild(); item != null; item = item.getNext()) {
231+
if (item instanceof ListItem) {
232+
sb.append(num++).append(". ").append(extractTextFromChildren(item)).append("\n");
233+
}
234+
}
235+
236+
return sb.toString();
237+
}
238+
239+
private String tableToString(TableBlock table) {
240+
StringBuilder sb = new StringBuilder();
241+
for (Node row = table.getFirstChild(); row != null; row = row.getNext()) {
242+
if (row instanceof TableRow) {
243+
for (Node cell = row.getFirstChild(); cell != null; cell = cell.getNext()) {
244+
if (cell instanceof TableCell) {
245+
sb.append(((TableCell) cell).getText().toString()).append(" | ");
246+
}
247+
}
248+
sb.append("\n");
249+
}
250+
}
251+
252+
return sb.toString();
253+
}
254+
255+
@Override
256+
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnectorException {
257+
return new SeaTunnelRowType(
258+
new String[] {
259+
"element_id",
260+
"element_type",
261+
"heading_level",
262+
"text",
263+
"page_number",
264+
"position_index",
265+
"parent_id",
266+
"child_ids"
267+
},
268+
new org.apache.seatunnel.api.table.type.SeaTunnelDataType[] {
269+
BasicType.STRING_TYPE,
270+
BasicType.STRING_TYPE,
271+
BasicType.INT_TYPE,
272+
BasicType.STRING_TYPE,
273+
BasicType.INT_TYPE,
274+
BasicType.INT_TYPE,
275+
BasicType.STRING_TYPE,
276+
BasicType.STRING_TYPE
277+
});
278+
}
279+
}

0 commit comments

Comments
 (0)