This Kafka Connect connector provides the capability to watch a directory for files and read the data as new files are written to the input directory. Each of the records in the input file will be converted based on the user supplied schema.
The CSVRecordProcessor supports reading CSV or TSV files. It can convert a CSV on the fly to the strongly typed Kafka Connect data types. It currently has support for all of the schema types and logical types that are supported in Kafka 0.10.x. If you couple this with the Avro converter and Schema Registry by Confluent, you will be able to process csv files to strongly typed Avro data in real time.
This connector requires you to specify the schema of the files to be read. This is controlled by the
key.schema
and the value.schema
configuration settings. This setting is a schema in json form stored as a string. The
field schemas are used to parse the data in each row. This allows the user to strongly define the types for each field.
mvn clean package
export CLASSPATH="$(find target/kafka-connect-target/usr/share/java -type f -name '*.jar' | tr '\n' ':')"
kafka-run-class com.github.jcustenborder.kafka.connect.spooldir.SchemaGenerator -t csv -f src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/csv/FieldsMatch.data -c config/CSVExample.properties -i id
mvn clean package
export CLASSPATH="$(find target/kafka-connect-target/usr/share/java -type f -name '*.jar' | tr '\n' ':')"
kafka-run-class com.github.jcustenborder.kafka.connect.spooldir.SchemaGenerator -t json -f src/test/resources/com/github/jcustenborder/kafka/connect/spooldir/json/FieldsMatch.data -c config/JsonExample.properties -i id
{
"name" : "com.example.users.UserKey",
"type" : "STRUCT",
"isOptional" : false,
"fieldSchemas" : {
"id" : {
"type" : "INT64",
"isOptional" : false
}
}
}
{
"name" : "com.example.users.User",
"type" : "STRUCT",
"isOptional" : false,
"fieldSchemas" : {
"id" : {
"type" : "INT64",
"isOptional" : false
},
"first_name" : {
"type" : "STRING",
"isOptional" : true
},
"last_name" : {
"type" : "STRING",
"isOptional" : true
},
"email" : {
"type" : "STRING",
"isOptional" : true
},
"gender" : {
"type" : "STRING",
"isOptional" : true
},
"ip_address" : {
"type" : "STRING",
"isOptional" : true
},
"last_login" : {
"name" : "org.apache.kafka.connect.data.Timestamp",
"type" : "INT64",
"version" : 1,
"isOptional" : true
},
"account_balance" : {
"name" : "org.apache.kafka.connect.data.Decimal",
"type" : "BYTES",
"version" : 1,
"parameters" : {
"scale" : "2"
},
"isOptional" : true
},
"country" : {
"type" : "STRING",
"isOptional" : true
},
"favorite_color" : {
"type" : "STRING",
"isOptional" : true
}
}
}
{
"name" : "org.apache.kafka.connect.data.Timestamp",
"type" : "INT64",
"version" : 1,
"isOptional" : true
}
{
"name" : "org.apache.kafka.connect.data.Decimal",
"type" : "BYTES",
"version" : 1,
"parameters" : {
"scale" : "2"
},
"isOptional" : true
}
{
"name" : "org.apache.kafka.connect.data.Date",
"type" : "INT32",
"version" : 1,
"isOptional" : true
}
{
"name" : "org.apache.kafka.connect.data.Time",
"type" : "INT32",
"version" : 1,
"isOptional" : true
}
The SpoolDirCsvSourceConnector
will monitor the directory specified in input.path
for files and read them as a CSV converting each of the records to the strongly typed equavalent specified in key.schema
and value.schema
.
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector
# Set these required values
finished.path=
input.file.pattern=
error.path=
topic=
input.path=
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
error.path | The directory to place files in which have error(s). This directory must exist and be writable by the user running Kafka Connect. | string | com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@319b4fb3 | high | |
finished.path | The directory to place files that have been successfully processed. This directory must exist and be writable by the user running Kafka Connect. | string | com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@20851e5b | high | |
input.file.pattern | Regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches(). | string | high | ||
input.path | The directory to read files that will be processed. This directory must exist and be writable by the user running Kafka Connect. | string | com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@5de6d178 | high | |
topic | The Kafka topic to write the data to. | string | high | ||
halt.on.error | Should the task halt when it encounters an error or continue to the next file. | boolean | true | high | |
key.schema | The schema for the key written to Kafka. | string | "" | high | |
value.schema | The schema for the value written to Kafka. | string | "" | high | |
csv.first.row.as.header | Flag to indicate if the fist row of data contains the header of the file. If true the position of the columns will be determined by the first row to the CSV. The column position will be inferred from the position of the schema supplied in value.schema . If set to true the number of columns must be greater than or equal to the number of fields in the schema. |
boolean | false | medium | |
schema.generation.enabled | Flag to determine if schemas should be dynamically generated. If set to true, key.schema and value.schema can be omitted, but schema.generation.key.name and schema.generation.value.name must be set. |
boolean | false | medium | |
schema.generation.key.fields | The field(s) to use to build a key schema. This is only used during schema generation. | list | [] | medium | |
schema.generation.key.name | The name of the generated key schema. | string | com.github.jcustenborder.kafka.connect.model.Key | medium | |
schema.generation.value.name | The name of the generated value schema. | string | com.github.jcustenborder.kafka.connect.model.Value | medium | |
timestamp.field | The field in the value schema that will contain the parsed timestamp for the record. This field cannot be marked as optional and must be a Timestamp | string | "" | medium | |
timestamp.mode | Determines how the connector will set the timestamp for the ConnectRecord. If set to Field then the timestamp will be read from a field in the value. This field cannot be optional and must be a Timestamp. Specify the field in timestamp.field . If set to FILE_TIME then the last modified time of the file will be used. If set to PROCESS_TIME the time the record is read will be used. |
string | PROCESS_TIME | ValidEnum{enum=TimestampMode, allowed=[FIELD, FILE_TIME, PROCESS_TIME]} | medium |
batch.size | The number of records that should be returned with each batch. | int | 1000 | low | |
csv.case.sensitive.field.names | Flag to determine if the field names in the header row should be treated as case sensitive. | boolean | false | low | |
csv.escape.char | Escape character. | int | 92 | low | |
csv.file.charset | Character set to read wth file with. | string | UTF-8 | Big5,Big5-HKSCS,CESU-8,EUC-JP,EUC-KR,GB18030,GB2312,GBK,IBM-Thai,IBM00858,IBM01140,IBM01141,IBM01142,IBM01143,IBM01144,IBM01145,IBM01146,IBM01147,IBM01148,IBM01149,IBM037,IBM1026,IBM1047,IBM273,IBM277,IBM278,IBM280,IBM284,IBM285,IBM290,IBM297,IBM420,IBM424,IBM437,IBM500,IBM775,IBM850,IBM852,IBM855,IBM857,IBM860,IBM861,IBM862,IBM863,IBM864,IBM865,IBM866,IBM868,IBM869,IBM870,IBM871,IBM918,ISO-2022-CN,ISO-2022-JP,ISO-2022-JP-2,ISO-2022-KR,ISO-8859-1,ISO-8859-13,ISO-8859-15,ISO-8859-2,ISO-8859-3,ISO-8859-4,ISO-8859-5,ISO-8859-6,ISO-8859-7,ISO-8859-8,ISO-8859-9,JIS_X0201,JIS_X0212-1990,KOI8-R,KOI8-U,Shift_JIS,TIS-620,US-ASCII,UTF-16,UTF-16BE,UTF-16LE,UTF-32,UTF-32BE,UTF-32LE,UTF-8,windows-1250,windows-1251,windows-1252,windows-1253,windows-1254,windows-1255,windows-1256,windows-1257,windows-1258,windows-31j,x-Big5-HKSCS-2001,x-Big5-Solaris,x-COMPOUND_TEXT,x-euc-jp-linux,x-EUC-TW,x-eucJP-Open,x-IBM1006,x-IBM1025,x-IBM1046,x-IBM1097,x-IBM1098,x-IBM1112,x-IBM1122,x-IBM1123,x-IBM1124,x-IBM1364,x-IBM1381,x-IBM1383,x-IBM300,x-IBM33722,x-IBM737,x-IBM833,x-IBM834,x-IBM856,x-IBM874,x-IBM875,x-IBM921,x-IBM922,x-IBM930,x-IBM933,x-IBM935,x-IBM937,x-IBM939,x-IBM942,x-IBM942C,x-IBM943,x-IBM943C,x-IBM948,x-IBM949,x-IBM949C,x-IBM950,x-IBM964,x-IBM970,x-ISCII91,x-ISO-2022-CN-CNS,x-ISO-2022-CN-GB,x-iso-8859-11,x-JIS0208,x-JISAutoDetect,x-Johab,x-MacArabic,x-MacCentralEurope,x-MacCroatian,x-MacCyrillic,x-MacDingbat,x-MacGreek,x-MacHebrew,x-MacIceland,x-MacRoman,x-MacRomania,x-MacSymbol,x-MacThai,x-MacTurkish,x-MacUkraine,x-MS932_0213,x-MS950-HKSCS,x-MS950-HKSCS-XP,x-mswin-936,x-PCK,x-SJIS_0213,x-UTF-16LE-BOM,X-UTF-32BE-BOM,X-UTF-32LE-BOM,x-windows-50220,x-windows-50221,x-windows-874,x-windows-949,x-windows-950,x-windows-iso2022jp | low |
csv.ignore.leading.whitespace | Sets the ignore leading whitespace setting - if true, white space in front of a quote in a field is ignored. | boolean | true | low | |
csv.ignore.quotations | Sets the ignore quotations mode - if true, quotations are ignored. | boolean | false | low | |
csv.keep.carriage.return | Flag to determine if the carriage return at the end of the line should be maintained. | boolean | false | low | |
csv.null.field.indicator | Indicator to determine how the CSV Reader can determine if a field is null. Valid values are EMPTY_SEPARATORS, EMPTY_QUOTES, BOTH, NEITHER. For more information see http://opencsv.sourceforge.net/apidocs/com/opencsv/enums/CSVReaderNullFieldIndicator.html. | string | NEITHER | ValidEnum{enum=CSVReaderNullFieldIndicator, allowed=[EMPTY_SEPARATORS, EMPTY_QUOTES, BOTH, NEITHER]} | low |
csv.quote.char | The character that is used to quote a field. This typically happens when the csv.separator.char character is within the data. | int | 34 | low | |
csv.separator.char | The character that seperates each field. Typically in a CSV this is a , character. A TSV would use \t. | int | 44 | low | |
csv.skip.lines | Number of lines to skip in the beginning of the file. | int | 0 | low | |
csv.strict.quotes | Sets the strict quotes setting - if true, characters outside the quotes are ignored. | boolean | false | low | |
csv.verify.reader | Flag to determine if the reader should be verified. | boolean | true | low | |
empty.poll.wait.ms | The amount of time to wait if a poll returns an empty list of records. | long | 1000 | [1,...,9223372036854775807] | low |
file.minimum.age.ms | The amount of time in milliseconds after the file was last written to before the file can be processed. | long | 0 | [0,...,9223372036854775807] | low |
parser.timestamp.date.formats | The date formats that are expected in the file. This is a list of strings that will be used to parse the date fields in order. The most accurate date format should be the first in the list. Take a look at the Java documentation for more info. https://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html | list | [yyyy-MM-dd'T'HH:mm:ss, yyyy-MM-dd' 'HH:mm:ss] | low | |
parser.timestamp.timezone | The timezone that all of the dates will be parsed with. | string | UTC | low | |
processing.file.extension | Before a file is processed, it is renamed to indicate that it is currently being processed. This setting is appended to the end of the file. | string | .PROCESSING | ValidPattern{pattern=^.*..+$} | low |
This connector is used to stream JSON files from a directory while converting the data based on the schema supplied in the configuration.
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirJsonSourceConnector
# Set these required values
finished.path=
input.file.pattern=
topic=
error.path=
input.path=
Name | Description | Type | Default | Valid Values | Importance |
---|---|---|---|---|---|
error.path | The directory to place files in which have error(s). This directory must exist and be writable by the user running Kafka Connect. | string | com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@4a126898 | high | |
finished.path | The directory to place files that have been successfully processed. This directory must exist and be writable by the user running Kafka Connect. | string | com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@21940db3 | high | |
input.file.pattern | Regular expression to check input file names against. This expression must match the entire filename. The equivalent of Matcher.matches(). | string | high | ||
input.path | The directory to read files that will be processed. This directory must exist and be writable by the user running Kafka Connect. | string | com.github.jcustenborder.kafka.connect.spooldir.SpoolDirSourceConnectorConfig$WritableDirectoryValidator@f723474 | high | |
topic | The Kafka topic to write the data to. | string | high | ||
halt.on.error | Should the task halt when it encounters an error or continue to the next file. | boolean | true | high | |
key.schema | The schema for the key written to Kafka. | string | "" | high | |
value.schema | The schema for the value written to Kafka. | string | "" | high | |
schema.generation.enabled | Flag to determine if schemas should be dynamically generated. If set to true, key.schema and value.schema can be omitted, but schema.generation.key.name and schema.generation.value.name must be set. |
boolean | false | medium | |
schema.generation.key.fields | The field(s) to use to build a key schema. This is only used during schema generation. | list | [] | medium | |
schema.generation.key.name | The name of the generated key schema. | string | com.github.jcustenborder.kafka.connect.model.Key | medium | |
schema.generation.value.name | The name of the generated value schema. | string | com.github.jcustenborder.kafka.connect.model.Value | medium | |
timestamp.field | The field in the value schema that will contain the parsed timestamp for the record. This field cannot be marked as optional and must be a Timestamp | string | "" | medium | |
timestamp.mode | Determines how the connector will set the timestamp for the ConnectRecord. If set to Field then the timestamp will be read from a field in the value. This field cannot be optional and must be a Timestamp. Specify the field in timestamp.field . If set to FILE_TIME then the last modified time of the file will be used. If set to PROCESS_TIME the time the record is read will be used. |
string | PROCESS_TIME | ValidEnum{enum=TimestampMode, allowed=[FIELD, FILE_TIME, PROCESS_TIME]} | medium |
batch.size | The number of records that should be returned with each batch. | int | 1000 | low | |
empty.poll.wait.ms | The amount of time to wait if a poll returns an empty list of records. | long | 1000 | [1,...,9223372036854775807] | low |
file.minimum.age.ms | The amount of time in milliseconds after the file was last written to before the file can be processed. | long | 0 | [0,...,9223372036854775807] | low |
parser.timestamp.date.formats | The date formats that are expected in the file. This is a list of strings that will be used to parse the date fields in order. The most accurate date format should be the first in the list. Take a look at the Java documentation for more info. https://docs.oracle.com/javase/6/docs/api/java/text/SimpleDateFormat.html | list | [yyyy-MM-dd'T'HH:mm:ss, yyyy-MM-dd' 'HH:mm:ss] | low | |
parser.timestamp.timezone | The timezone that all of the dates will be parsed with. | string | UTC | low | |
processing.file.extension | Before a file is processed, it is renamed to indicate that it is currently being processed. This setting is appended to the end of the file. | string | .PROCESSING | ValidPattern{pattern=^.*..+$} | low |
git clone [email protected]:jcustenborder/kafka-connect-spooldir.git
cd kafka-connect-spooldir
mvn clean package
./bin/debug.sh
export SUSPEND='y'
./bin/debug.sh