Skip to content

Commit

Permalink
RCCA-16236: AVRO buffer loss fix on IOException (#678)
Browse files Browse the repository at this point in the history
* Reset offsets after hitting IO exceptions in AvroRecordWriter + UT

* MINOR: initiate recovery after avro io exception

* update reset logic and remove tempfiles

* iterate over keyset instead of values

---------

Co-authored-by: Snehashis Pal <[email protected]>
  • Loading branch information
vbalani002 and snehashisp authored Dec 19, 2023
1 parent abc23ee commit 0ec1769
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 18 deletions.
62 changes: 49 additions & 13 deletions src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.connect.hdfs;

import io.confluent.connect.hdfs.avro.AvroIOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -246,6 +247,26 @@ public TopicPartitionWriter(
updateRotationTimers(null);
}

private void resetBuffers() {
buffer.clear();
writers.clear();
appended.clear();
startOffsets.clear();
endOffsets.clear();
recordCounter = 0;
currentSchema = null;
}

private void safeDeleteTempFiles() {
for (String encodedPartition : tempFiles.keySet()) {
try {
deleteTempFile(encodedPartition);
} catch (ConnectException e) {
log.error("Failed to delete tmp file {}", tempFiles.get(encodedPartition), e);
}
}
}

@SuppressWarnings("fallthrough")
public boolean recover() {
try {
Expand All @@ -260,6 +281,7 @@ public boolean recover() {
nextState();
case WAL_APPLIED:
log.debug("Start recovery state: Reset Offsets for topic partition {}", tp);
safeDeleteTempFiles();
resetOffsets();
nextState();
case OFFSET_RESET:
Expand All @@ -279,8 +301,9 @@ public boolean recover() {
tp
);
}
} catch (ConnectException e) {
} catch (AvroIOException | ConnectException e) {
log.error("Recovery failed at state {}", state, e);
failureTime = time.milliseconds();
setRetryTimeout(timeoutMs);
return false;
}
Expand Down Expand Up @@ -316,6 +339,13 @@ private void updateRotationTimers(SinkRecord currentRecord) {
}
}

private void resetAndSetRecovery() {
context.offset(tp, offset);
resetBuffers();
state = State.RECOVERY_STARTED;
recovered = false;
}

@SuppressWarnings("fallthrough")
public void write() {
long now = time.milliseconds();
Expand Down Expand Up @@ -405,10 +435,15 @@ public void write() {
}
} catch (SchemaProjectorException | IllegalWorkerStateException | HiveMetaStoreException e) {
throw new RuntimeException(e);
} catch (ConnectException e) {
} catch (AvroIOException | ConnectException e) {
log.error("Exception on topic partition {}: ", tp, e);
failureTime = time.milliseconds();
setRetryTimeout(timeoutMs);
if (e instanceof AvroIOException) {
log.error("Encountered AVRO IO exception, resetting this topic partition {} "
+ "to offset {}", tp, offset);
resetAndSetRecovery();
}
break;
}
}
Expand Down Expand Up @@ -445,10 +480,15 @@ public void write() {
default:
log.error("{} is not a valid state to empty batch for topic partition {}.", state, tp);
}
} catch (ConnectException e) {
} catch (AvroIOException | ConnectException e) {
log.error("Exception on topic partition {}: ", tp, e);
failureTime = time.milliseconds();
setRetryTimeout(timeoutMs);
if (e instanceof AvroIOException) {
log.error("Encountered AVRO IO exception, resetting this topic partition {} "
+ "to offset {}", tp, offset);
resetAndSetRecovery();
}
return;
}

Expand Down Expand Up @@ -764,14 +804,14 @@ private void closeTempFile(String encodedPartition) {
}

private void closeTempFile() {
ConnectException connectException = null;
RuntimeException exception = null;
for (String encodedPartition : tempFiles.keySet()) {
// Close the file and propagate any errors
try {
closeTempFile(encodedPartition);
} catch (ConnectException e) {
} catch (RuntimeException e) {
// still want to close all of the other data writers
connectException = e;
exception = e;
log.error(
"Failed to close temporary file for partition {}. The connector will attempt to"
+ " rewrite the temporary file.",
Expand All @@ -780,16 +820,12 @@ private void closeTempFile() {
}
}

if (connectException != null) {
if (exception != null) {
// at least one tmp file did not close properly therefore will try to recreate the tmp and
// delete all buffered records + tmp files and start over because otherwise there will be
// duplicates, since there is no way to reclaim the records in the tmp file.
for (String encodedPartition : tempFiles.keySet()) {
try {
deleteTempFile(encodedPartition);
} catch (ConnectException e) {
log.error("Failed to delete tmp file {}", tempFiles.get(encodedPartition), e);
}
safeDeleteTempFiles();
startOffsets.remove(encodedPartition);
endOffsets.remove(encodedPartition);
buffer.clear();
Expand All @@ -799,7 +835,7 @@ private void closeTempFile() {
context.offset(tp, offset);

recordCounter = 0;
throw connectException;
throw exception;
}
}

Expand Down
26 changes: 26 additions & 0 deletions src/main/java/io/confluent/connect/hdfs/avro/AvroIOException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/


package io.confluent.connect.hdfs.avro;

import java.io.IOException;

@SuppressWarnings("serial")
public class AvroIOException extends RuntimeException {
public AvroIOException(IOException e) {
super(e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -68,7 +66,7 @@ public void write(SinkRecord record) {
writer.setCodec(CodecFactory.fromString(conf.getAvroCodec()));
writer.create(avroSchema, out);
} catch (IOException e) {
throw new ConnectException(e);
throw new AvroIOException(e);
}
}

Expand All @@ -82,7 +80,7 @@ public void write(SinkRecord record) {
writer.append(value);
}
} catch (IOException e) {
throw new DataException(e);
throw new AvroIOException(e);
}
}

Expand All @@ -91,7 +89,7 @@ public void close() {
try {
writer.close();
} catch (IOException e) {
throw new DataException(e);
throw new AvroIOException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@

package io.confluent.connect.hdfs.avro;

import io.confluent.connect.hdfs.partitioner.DefaultPartitioner;
import io.confluent.connect.hdfs.wal.FSWAL;
import io.confluent.connect.hdfs.wal.WALFile.Writer;
import io.confluent.connect.hdfs.wal.WALFileTest;
import io.confluent.connect.hdfs.wal.WALFileTest.CorruptWriter;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
Expand Down Expand Up @@ -449,6 +451,82 @@ public void testFlushPartialFile() throws Exception {
hdfsWriter.stop();
}

@Test
public void testRecoveryAfterFailedFlush() throws Exception {
// Define constants
String FLUSH_SIZE_CONFIG = "60";
int FLUSH_SIZE = Integer.valueOf(FLUSH_SIZE_CONFIG);
int NUMBER_OF_RECORDS = FLUSH_SIZE*2;
long SLEEP_TIME = 10000L;

// Create connector configs
Map<String, String> props = createProps();
props.put(HdfsSinkConnectorConfig.FLUSH_SIZE_CONFIG, FLUSH_SIZE_CONFIG);
props.put(
PartitionerConfig.PARTITIONER_CLASS_CONFIG,
DefaultPartitioner.class.getName()
);
HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);

// Initialize data writer
context.assignment().clear();
context.assignment().add(TOPIC_PARTITION);
Time time = TopicPartitionWriterTest.MockedWallclockTimestampExtractor.TIME;
DataWriter hdfsWriter = new DataWriter(connectorConfig, context, avroData, time);
hdfsWriter.open(context.assignment());
partitioner = hdfsWriter.getPartitioner();

List<SinkRecord> sinkRecords = createSinkRecords(NUMBER_OF_RECORDS);

// Write initial batch of records
hdfsWriter.write(sinkRecords.subList(0, FLUSH_SIZE-2));

// Stop all datanodes
int NUM_DATANODES = 3;
ArrayList<MiniDFSCluster.DataNodeProperties> dnProps = new ArrayList<>();
for(int j=0; j<NUM_DATANODES; j++){
MiniDFSCluster.DataNodeProperties dnProp = cluster.stopDataNode(0);
dnProps.add(dnProp);
}

// Attempt to write a batch of records that will result in a flush
hdfsWriter.write(sinkRecords.subList(FLUSH_SIZE-2, FLUSH_SIZE+2));
time.sleep(SLEEP_TIME);
// Simulate connect framework's effort to re-supply the lost data after offset reset
hdfsWriter.write(sinkRecords.subList(0, FLUSH_SIZE+2));
time.sleep(SLEEP_TIME);
hdfsWriter.write(sinkRecords.subList(0, FLUSH_SIZE+4));
time.sleep(SLEEP_TIME);
hdfsWriter.write(new ArrayList<SinkRecord>());

// Restart the datanodes and wait for them to come up
for(int j=0; j<NUM_DATANODES; j++){
cluster.restartDataNode(dnProps.get(j));
}
cluster.waitActive();

// assert that the offset was reset back to zero
assertEquals(0, hdfsWriter.getBucketWriter(TOPIC_PARTITION).offset());

// Simulate write attempts during recovery
hdfsWriter.write(sinkRecords.subList(0, NUMBER_OF_RECORDS));
time.sleep(SLEEP_TIME);
hdfsWriter.write(new ArrayList<SinkRecord>());

// Assert that all records have been committed
Map<TopicPartition, Long> committedOffsets = hdfsWriter.getCommittedOffsets();
assertTrue(committedOffsets.containsKey(TOPIC_PARTITION));
long nextOffset = committedOffsets.get(TOPIC_PARTITION);
assertEquals(NUMBER_OF_RECORDS, nextOffset);

hdfsWriter.close();
hdfsWriter.stop();

// Assert that there are no zero data files
long[] validOffsets = {0, FLUSH_SIZE, FLUSH_SIZE*2};
verify(sinkRecords, validOffsets, Collections.singleton(TOPIC_PARTITION), false);
}

@Test
public void testAvroCompression() throws Exception {
//set compression codec to Snappy
Expand Down

0 comments on commit 0ec1769

Please sign in to comment.