Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV connector's task hanged in infinite loop after meeting record it can't process #189

Open
MaxSamsonov opened this issue Aug 24, 2021 · 1 comment

Comments

@MaxSamsonov
Copy link

Hello!

We're testing SpoolDirCsvSourceConnector and faced following condition:

  1. Part of file was processed (2000 rows)

  2. Error from log:
    [2021-08-24 18:55:46,547] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265)
    java.io.IOException: Unterminated quoted field at end of CSV line. Beginning of lost text: [;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
    ]
    at com.opencsv.CSVReader.readNext(CSVReader.java:353)
    at com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceTask.process(SpoolDirCsvSourceTask.java:105)
    at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.read(AbstractSourceTask.java:254)
    at com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask.poll(AbstractSourceTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
    [2021-08-24 18:55:46,549] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173)
    [2021-08-24 18:55:46,550] INFO Removing processing flag /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv.PROCESSING (com.github.jcustenborder.kafka.connect.spooldir.InputFile:177)
    [2021-08-24 18:55:46,551] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90)
    [2021-08-24 18:55:46,552] INFO Moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:204)

  3. File successfully moved to error folder.

  4. No new files can be processed, connector task seems to hang on processing line with error on every "empty.poll.wait.ms" interval

[2021-08-24 19:29:54,234] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265)
at com.opencsv.CSVReader.readNext(CSVReader.java:353)
[2021-08-24 19:29:54,234] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173)
[2021-08-24 19:29:54,236] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90)
[2021-08-24 19:30:00,236] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265)
at com.opencsv.CSVReader.readNext(CSVReader.java:353)
[2021-08-24 19:30:00,237] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173)
[2021-08-24 19:30:00,239] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90)
[2021-08-24 19:30:06,240] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265)
at com.opencsv.CSVReader.readNext(CSVReader.java:353)
[2021-08-24 19:30:06,240] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173)
[2021-08-24 19:30:06,242] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90)
[2021-08-24 19:30:12,243] ERROR Exception encountered processing line 5692 of /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv. (com.github.jcustenborder.kafka.connect.spooldir.AbstractSourceTask:265)
at com.opencsv.CSVReader.readNext(CSVReader.java:353)
[2021-08-24 19:30:12,243] INFO Closing /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv (com.github.jcustenborder.kafka.connect.spooldir.InputFile:173)
[2021-08-24 19:30:12,245] ERROR Error during processing, moving /connect_integrations/848_quality/DEV/SWAP/SWAP-2021_06_0001_0.csv to /connect_integrations/848_quality/DEV/SWAP-ERROR. (com.github.jcustenborder.kafka.connect.spooldir.AbstractCleanUpPolicy:90)

Could you please help to identify where is the problem?

Confluent kafka 6.1.0 (Kafka version 2.7.1)
Used jcustenborder-kafka-connect-spooldir-2.0.62 version

Connector configuration:

{
"name" : "csv-file-quality-test-orders-v0",
"config" : {
"connector.class" : "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"csv.first.row.as.header" : "true",
"csv.separator.char" : "59",
"empty.poll.wait.ms" : "6000",
"error.path" : "/connect_integrations/848_quality/DEV/SWAP-ERROR",
"finished.path" : "/connect_integrations/848_quality/DEV/SWAP-SUCCESS",
"halt.on.error" : "false",
"input.file.pattern" : ".*.csv",
"input.path" : "/connect_integrations/848_quality/DEV/SWAP",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : "false",
"name" : "csv-file-quality-test-orders-v0",
"output.json.formatter" : "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"topic" : "csv_test_topic",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "false",
"value.schema" : "{"name": "csv.Value", "type": "STRUCT", "isOptional": true, "fieldSchemas": {"test0": { "type": "STRING", "isOptional": true} , "test1" : { "type": "STRING", "isOptional": true} , "test2" : { "type": "STRING", "isOptional": true} , "test3" : { "type": "STRING", "isOptional": true} , "test4" : { "type": "STRING", "isOptional": true} , "test5" : { "type": "STRING", "isOptional": true} , "test6" : { "type": "STRING", "isOptional": true} , "test7" : { "type": "STRING", "isOptional": true} , "extended" : { "type": "STRING", "isOptional": true} , "extended1" : { "type": "STRING", "isOptional": true} , "extended2" : { "type": "STRING", "isOptional": true} , "extended3" : { "type": "STRING", "isOptional": true} , "extended4" : { "type": "STRING", "isOptional": true} , "extended5" : { "type": "STRING", "isOptional": true} , "extended6" : { "type": "STRING", "isOptional": true} , "extended7" : { "type": "STRING", "isOptional": true} , "extended8" : { "type": "STRING", "isOptional": true} , "extended9" : { "type": "STRING", "isOptional": true} , "extended10" : { "type": "STRING", "isOptional": true} , "extended11" : { "type": "STRING", "isOptional": true} , "extended12" : { "type": "STRING", "isOptional": true} , "extended13" : { "type": "STRING", "isOptional": true} , "extended14" : { "type": "STRING", "isOptional": true} , "extended15" : { "type": "STRING", "isOptional": true} , "extended16" : { "type": "STRING", "isOptional": true} , "extended17" : { "type": "STRING", "isOptional": true} , "extended18" : { "type": "STRING", "isOptional": true} , "extended19" : { "type": "STRING", "isOptional": true} , "extended20" : { "type": "STRING", "isOptional": true} , "extended21" : { "type": "STRING", "isOptional": true} , "extended22" : { "type": "STRING", "isOptional": true} , "extended23" : { "type": "STRING", "isOptional": true} , "extended24" : { "type": "STRING", "isOptional": true} , "extended25" : { "type": "STRING", "isOptional": true} , "extended26" : { "type": "STRING", "isOptional": true} , "extended27" : { "type": "STRING", "isOptional": true} , "extended28" : { "type": "STRING", "isOptional": true} , "extended29" : { "type": "STRING", "isOptional": true} , "extended30" : { "type": "STRING", "isOptional": true} , "extended31" : { "type": "STRING", "isOptional": true} } }"
}
}

Thank you.

@cande1gut
Copy link

@MaxSamsonov I had the same problem, what worked for me was to change the parser to RFC 4180 and change the file's name convention to camel case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants