1919
2020import org .apache .fluss .config .ConfigOptions ;
2121import org .apache .fluss .config .Configuration ;
22+ import org .apache .fluss .exception .InvalidOffsetException ;
2223import org .apache .fluss .exception .LogSegmentOffsetOverflowException ;
2324import org .apache .fluss .exception .LogStorageException ;
2425import org .apache .fluss .metadata .LogFormat ;
3132import java .io .File ;
3233import java .io .IOException ;
3334import java .nio .file .Files ;
35+ import java .util .ArrayList ;
3436import java .util .Arrays ;
3537import java .util .Comparator ;
38+ import java .util .Iterator ;
39+ import java .util .List ;
40+ import java .util .stream .Collectors ;
3641
3742/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
3843 * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -76,7 +81,7 @@ public LogLoader(
7681 *
7782 * @return the offsets of the Log successfully loaded from disk
7883 */
79- public LoadedLogOffsets load () throws IOException {
84+ public LoadedLogOffsets load () throws Exception {
8085 // load all the log and index files.
8186 logSegments .close ();
8287 logSegments .clear ();
@@ -117,6 +122,37 @@ public LoadedLogOffsets load() throws IOException {
117122 nextOffset , activeSegment .getBaseOffset (), activeSegment .getSizeInBytes ()));
118123 }
119124
125+ /**
126+ * Just recovers the given segment, without adding it to the provided segments.
127+ *
128+ * @param segment Segment to recover
129+ * @return The number of bytes truncated from the segment
130+ * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index
131+ * offset overflow
132+ */
133+ private int recoverSegment (LogSegment segment ) throws Exception {
134+ WriterStateManager writerStateManager =
135+ new WriterStateManager (
136+ logSegments .getTableBucket (),
137+ logTabletDir ,
138+ this .writerStateManager .writerExpirationMs ());
139+ // TODO, Here, we use 0 as the logStartOffset passed into rebuildWriterState. The reason is
140+ // that the current implementation of logStartOffset in Fluss is not yet fully refined, and
141+ // there may be cases where logStartOffset is not updated. As a result, logStartOffset is
142+ // not yet reliable. Once the issue with correctly updating logStartOffset is resolved in
143+ // issue https://github.com/apache/fluss/issues/744, we can use logStartOffset here.
144+ // Additionally, using 0 versus using logStartOffset does not affect correctness—they both
145+ // can restore the complete WriterState. The only difference is that using logStartOffset
146+ // can potentially skip over more segments.
147+ LogTablet .rebuildWriterState (
148+ writerStateManager , logSegments , 0 , segment .getBaseOffset (), false );
149+ int bytesTruncated = segment .recover ();
150+ // once we have recovered the segment's data, take a snapshot to ensure that we won't
151+ // need to reload the same segment again while recovering another segment.
152+ writerStateManager .takeSnapshot ();
153+ return bytesTruncated ;
154+ }
155+
120156 /**
121157 * Recover the log segments (if there was an unclean shutdown). Ensures there is at least one
122158 * active segment, and returns the updated recovery point and next offset after recovery.
@@ -129,16 +165,103 @@ public LoadedLogOffsets load() throws IOException {
129165 * overflow
130166 */
131167 private Tuple2 <Long , Long > recoverLog () throws IOException {
132- // TODO truncate log to recover maybe unflush segments.
168+ if (!isCleanShutdown ) {
169+ List <LogSegment > unflushed =
170+ logSegments .values (recoveryPointCheckpoint , Long .MAX_VALUE );
171+ int numUnflushed = unflushed .size ();
172+ Iterator <LogSegment > unflushedIter = unflushed .iterator ();
173+ boolean truncated = false ;
174+ int numFlushed = 1 ;
175+
176+ while (unflushedIter .hasNext () && !truncated ) {
177+ LogSegment segment = unflushedIter .next ();
178+ LOG .info (
179+ "Recovering unflushed segment {}. {}/{} recovered for {}" ,
180+ segment .getBaseOffset (),
181+ numFlushed ,
182+ numUnflushed ,
183+ logSegments .getTableBucket ());
184+
185+ int truncatedBytes = -1 ;
186+ try {
187+ truncatedBytes = recoverSegment (segment );
188+ } catch (Exception e ) {
189+ if (e instanceof InvalidOffsetException ) {
190+ long startOffset = segment .getBaseOffset ();
191+ LOG .warn (
192+ "Found invalid offset during recovery. Deleting the corrupt segment "
193+ + "and creating an empty one with starting offset {}" ,
194+ startOffset );
195+ truncatedBytes = segment .truncateTo (startOffset );
196+ }
197+ }
198+
199+ if (truncatedBytes > 0 ) {
200+ // we had an invalid message, delete all remaining log
201+ LOG .warn (
202+ "Corruption found in segment {}, truncating to offset {}" ,
203+ segment .getBaseOffset (),
204+ segment .readNextOffset ());
205+ removeAndDeleteSegments (unflushedIter );
206+ truncated = true ;
207+ } else {
208+ numFlushed += 1 ;
209+ }
210+ }
211+ }
212+
133213 if (logSegments .isEmpty ()) {
214+ // TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready
134215 logSegments .add (LogSegment .open (logTabletDir , 0L , conf , logFormat ));
135216 }
136217 long logEndOffset = logSegments .lastSegment ().get ().readNextOffset ();
137218 return Tuple2 .of (recoveryPointCheckpoint , logEndOffset );
138219 }
139220
221+ /**
222+ * This method deletes the given log segments and the associated writer snapshots.
223+ *
224+ * <p>This method does not need to convert IOException to {@link LogStorageException} because it
225+ * is either called before all logs are loaded or the immediate caller will catch and handle
226+ * IOException
227+ *
228+ * @param segmentsToDelete The log segments to schedule for deletion
229+ */
230+ private void removeAndDeleteSegments (Iterator <LogSegment > segmentsToDelete ) {
231+ if (segmentsToDelete .hasNext ()) {
232+ List <LogSegment > toDelete = new ArrayList <>();
233+ segmentsToDelete .forEachRemaining (toDelete ::add );
234+
235+ LOG .info (
236+ "Deleting segments as part of log recovery: {}" ,
237+ toDelete .stream ().map (LogSegment ::toString ).collect (Collectors .joining ("," )));
238+ toDelete .forEach (segment -> logSegments .remove (segment .getBaseOffset ()));
239+
240+ try {
241+ LocalLog .deleteSegmentFiles (
242+ toDelete , LocalLog .SegmentDeletionReason .LOG_TRUNCATION );
243+ } catch (IOException e ) {
244+ LOG .error (
245+ "Failed to delete truncated segments {} for bucket {}" ,
246+ toDelete ,
247+ logSegments .getTableBucket (),
248+ e );
249+ }
250+
251+ try {
252+ LogTablet .deleteWriterSnapshots (toDelete , writerStateManager );
253+ } catch (IOException e ) {
254+ LOG .error (
255+ "Failed to delete truncated writer snapshots {} for bucket {}" ,
256+ toDelete ,
257+ logSegments .getTableBucket (),
258+ e );
259+ }
260+ }
261+ }
262+
140263 /** Loads segments from disk into the provided segments. */
141- private void loadSegmentFiles () throws IOException {
264+ private void loadSegmentFiles () throws Exception {
142265 File [] sortedFiles = logTabletDir .listFiles ();
143266 if (sortedFiles != null ) {
144267 Arrays .sort (sortedFiles , Comparator .comparing (File ::getName ));
@@ -155,8 +278,25 @@ private void loadSegmentFiles() throws IOException {
155278 }
156279 } else if (LocalLog .isLogFile (file )) {
157280 long baseOffset = FlussPaths .offsetFromFile (file );
281+ boolean timeIndexFileNewlyCreated =
282+ !FlussPaths .timeIndexFile (logTabletDir , baseOffset ).exists ();
158283 LogSegment segment =
159284 LogSegment .open (logTabletDir , baseOffset , conf , true , 0 , logFormat );
285+
286+ try {
287+ segment .sanityCheck (timeIndexFileNewlyCreated );
288+ } catch (Exception e ) {
289+ if (e instanceof NoSuchFieldException ) {
290+ if (isCleanShutdown
291+ || segment .getBaseOffset () < recoveryPointCheckpoint ) {
292+ LOG .error (
293+ "Could not find offset index file corresponding to log file {}, "
294+ + "recovering segment and rebuilding index files..." ,
295+ segment .getFileLogRecords ().file ().getAbsoluteFile ());
296+ }
297+ recoverSegment (segment );
298+ }
299+ }
160300 logSegments .add (segment );
161301 }
162302 }
0 commit comments