From 0ec1769a00cebc809c39a2f4dcceab8f8bd3983e Mon Sep 17 00:00:00 2001 From: Vikas Balani <123360241+vbalani002@users.noreply.github.com> Date: Tue, 19 Dec 2023 19:06:03 +0530 Subject: [PATCH] RCCA-16236: AVRO buffer loss fix on IOException (#678) * Reset offsets after hitting IO exceptions in AvroRecordWriter + UT * MINOR: initiate recovery after avro io exception * update reset logic and remove tempfiles * iterate over keyset instead of values --------- Co-authored-by: Snehashis Pal --- .../connect/hdfs/TopicPartitionWriter.java | 62 +++++++++++---- .../connect/hdfs/avro/AvroIOException.java | 26 +++++++ .../hdfs/avro/AvroRecordWriterProvider.java | 8 +- .../connect/hdfs/avro/DataWriterAvroTest.java | 78 +++++++++++++++++++ 4 files changed, 156 insertions(+), 18 deletions(-) create mode 100644 src/main/java/io/confluent/connect/hdfs/avro/AvroIOException.java diff --git a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java index 4ca500774..422db079e 100644 --- a/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java +++ b/src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java @@ -15,6 +15,7 @@ package io.confluent.connect.hdfs; +import io.confluent.connect.hdfs.avro.AvroIOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.kafka.common.TopicPartition; @@ -246,6 +247,26 @@ public TopicPartitionWriter( updateRotationTimers(null); } + private void resetBuffers() { + buffer.clear(); + writers.clear(); + appended.clear(); + startOffsets.clear(); + endOffsets.clear(); + recordCounter = 0; + currentSchema = null; + } + + private void safeDeleteTempFiles() { + for (String encodedPartition : tempFiles.keySet()) { + try { + deleteTempFile(encodedPartition); + } catch (ConnectException e) { + log.error("Failed to delete tmp file {}", tempFiles.get(encodedPartition), e); + } + } + } + @SuppressWarnings("fallthrough") public boolean recover() { try { @@ -260,6 +281,7 @@ public boolean recover() { nextState(); case WAL_APPLIED: log.debug("Start recovery state: Reset Offsets for topic partition {}", tp); + safeDeleteTempFiles(); resetOffsets(); nextState(); case OFFSET_RESET: @@ -279,8 +301,9 @@ public boolean recover() { tp ); } - } catch (ConnectException e) { + } catch (AvroIOException | ConnectException e) { log.error("Recovery failed at state {}", state, e); + failureTime = time.milliseconds(); setRetryTimeout(timeoutMs); return false; } @@ -316,6 +339,13 @@ private void updateRotationTimers(SinkRecord currentRecord) { } } + private void resetAndSetRecovery() { + context.offset(tp, offset); + resetBuffers(); + state = State.RECOVERY_STARTED; + recovered = false; + } + @SuppressWarnings("fallthrough") public void write() { long now = time.milliseconds(); @@ -405,10 +435,15 @@ public void write() { } } catch (SchemaProjectorException | IllegalWorkerStateException | HiveMetaStoreException e) { throw new RuntimeException(e); - } catch (ConnectException e) { + } catch (AvroIOException | ConnectException e) { log.error("Exception on topic partition {}: ", tp, e); failureTime = time.milliseconds(); setRetryTimeout(timeoutMs); + if (e instanceof AvroIOException) { + log.error("Encountered AVRO IO exception, resetting this topic partition {} " + + "to offset {}", tp, offset); + resetAndSetRecovery(); + } break; } } @@ -445,10 +480,15 @@ public void write() { default: log.error("{} is not a valid state to empty batch for topic partition {}.", state, tp); } - } catch (ConnectException e) { + } catch (AvroIOException | ConnectException e) { log.error("Exception on topic partition {}: ", tp, e); failureTime = time.milliseconds(); setRetryTimeout(timeoutMs); + if (e instanceof AvroIOException) { + log.error("Encountered AVRO IO exception, resetting this topic partition {} " + + "to offset {}", tp, offset); + resetAndSetRecovery(); + } return; } @@ -764,14 +804,14 @@ private void closeTempFile(String encodedPartition) { } private void closeTempFile() { - ConnectException connectException = null; + RuntimeException exception = null; for (String encodedPartition : tempFiles.keySet()) { // Close the file and propagate any errors try { closeTempFile(encodedPartition); - } catch (ConnectException e) { + } catch (RuntimeException e) { // still want to close all of the other data writers - connectException = e; + exception = e; log.error( "Failed to close temporary file for partition {}. The connector will attempt to" + " rewrite the temporary file.", @@ -780,16 +820,12 @@ private void closeTempFile() { } } - if (connectException != null) { + if (exception != null) { // at least one tmp file did not close properly therefore will try to recreate the tmp and // delete all buffered records + tmp files and start over because otherwise there will be // duplicates, since there is no way to reclaim the records in the tmp file. for (String encodedPartition : tempFiles.keySet()) { - try { - deleteTempFile(encodedPartition); - } catch (ConnectException e) { - log.error("Failed to delete tmp file {}", tempFiles.get(encodedPartition), e); - } + safeDeleteTempFiles(); startOffsets.remove(encodedPartition); endOffsets.remove(encodedPartition); buffer.clear(); @@ -799,7 +835,7 @@ private void closeTempFile() { context.offset(tp, offset); recordCounter = 0; - throw connectException; + throw exception; } } diff --git a/src/main/java/io/confluent/connect/hdfs/avro/AvroIOException.java b/src/main/java/io/confluent/connect/hdfs/avro/AvroIOException.java new file mode 100644 index 000000000..2a2956e6d --- /dev/null +++ b/src/main/java/io/confluent/connect/hdfs/avro/AvroIOException.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + + +package io.confluent.connect.hdfs.avro; + +import java.io.IOException; + +@SuppressWarnings("serial") +public class AvroIOException extends RuntimeException { + public AvroIOException(IOException e) { + super(e); + } +} \ No newline at end of file diff --git a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java index f64940d31..c3a22c620 100644 --- a/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java +++ b/src/main/java/io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.java @@ -22,8 +22,6 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +66,7 @@ public void write(SinkRecord record) { writer.setCodec(CodecFactory.fromString(conf.getAvroCodec())); writer.create(avroSchema, out); } catch (IOException e) { - throw new ConnectException(e); + throw new AvroIOException(e); } } @@ -82,7 +80,7 @@ public void write(SinkRecord record) { writer.append(value); } } catch (IOException e) { - throw new DataException(e); + throw new AvroIOException(e); } } @@ -91,7 +89,7 @@ public void close() { try { writer.close(); } catch (IOException e) { - throw new DataException(e); + throw new AvroIOException(e); } } diff --git a/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java b/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java index 52c1b38d1..32472a208 100644 --- a/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java +++ b/src/test/java/io/confluent/connect/hdfs/avro/DataWriterAvroTest.java @@ -15,12 +15,14 @@ package io.confluent.connect.hdfs.avro; +import io.confluent.connect.hdfs.partitioner.DefaultPartitioner; import io.confluent.connect.hdfs.wal.FSWAL; import io.confluent.connect.hdfs.wal.WALFile.Writer; import io.confluent.connect.hdfs.wal.WALFileTest; import io.confluent.connect.hdfs.wal.WALFileTest.CorruptWriter; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.sink.SinkRecord; @@ -449,6 +451,82 @@ public void testFlushPartialFile() throws Exception { hdfsWriter.stop(); } + @Test + public void testRecoveryAfterFailedFlush() throws Exception { + // Define constants + String FLUSH_SIZE_CONFIG = "60"; + int FLUSH_SIZE = Integer.valueOf(FLUSH_SIZE_CONFIG); + int NUMBER_OF_RECORDS = FLUSH_SIZE*2; + long SLEEP_TIME = 10000L; + + // Create connector configs + Map props = createProps(); + props.put(HdfsSinkConnectorConfig.FLUSH_SIZE_CONFIG, FLUSH_SIZE_CONFIG); + props.put( + PartitionerConfig.PARTITIONER_CLASS_CONFIG, + DefaultPartitioner.class.getName() + ); + HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props); + + // Initialize data writer + context.assignment().clear(); + context.assignment().add(TOPIC_PARTITION); + Time time = TopicPartitionWriterTest.MockedWallclockTimestampExtractor.TIME; + DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData, time); + hdfsWriter.open(context.assignment()); + partitioner = hdfsWriter.getPartitioner(); + + List sinkRecords = createSinkRecords(NUMBER_OF_RECORDS); + + // Write initial batch of records + hdfsWriter.write(sinkRecords.subList(0, FLUSH_SIZE-2)); + + // Stop all datanodes + int NUM_DATANODES = 3; + ArrayList dnProps = new ArrayList<>(); + for(int j=0; j()); + + // Restart the datanodes and wait for them to come up + for(int j=0; j()); + + // Assert that all records have been committed + Map committedOffsets = hdfsWriter.getCommittedOffsets(); + assertTrue(committedOffsets.containsKey(TOPIC_PARTITION)); + long nextOffset = committedOffsets.get(TOPIC_PARTITION); + assertEquals(NUMBER_OF_RECORDS, nextOffset); + + hdfsWriter.close(); + hdfsWriter.stop(); + + // Assert that there are no zero data files + long[] validOffsets = {0, FLUSH_SIZE, FLUSH_SIZE*2}; + verify(sinkRecords, validOffsets, Collections.singleton(TOPIC_PARTITION), false); + } + @Test public void testAvroCompression() throws Exception { //set compression codec to Snappy