Skip to content

Commit

Permalink
fix(filters): fix DelimitedRowFilter must convert fields based on sch…
Browse files Browse the repository at this point in the history
…ema (#87)

Resolves: #87
  • Loading branch information
fhussonnois committed Nov 11, 2020
1 parent 473cdda commit 668ce19
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.streamthoughts.kafka.connect.filepulse.config.DelimitedRowFilterConfig;
import io.streamthoughts.kafka.connect.filepulse.data.Schema;
import io.streamthoughts.kafka.connect.filepulse.data.StructSchema;
import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedField;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
Expand All @@ -39,7 +40,7 @@ public class DelimitedRowFilter extends AbstractRecordFilter<DelimitedRowFilter>

private static final String DEFAULT_SOURCE_FIELD = "message";

private static Schema DEFAULT_COLUMN_TYPE = Schema.string();
private static final Schema DEFAULT_COLUMN_TYPE = Schema.string();

private static final String AUTO_GENERATED_COLUMN_NAME_PREFIX = "column";

Expand Down Expand Up @@ -141,7 +142,8 @@ private TypedStruct buildStructForFields(final String[] fieldValues, final Struc
fieldValue = fieldValue.trim();
}
TypedField field = fields.get(i);
struct = struct.put(field.name(), fieldValue);
final Type type = field.type();
struct = struct.put(field.name(), type, type.convert(fieldValue));
}
return struct;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package io.streamthoughts.kafka.connect.filepulse.filter;

import io.streamthoughts.kafka.connect.filepulse.data.Type;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import org.junit.Assert;
Expand All @@ -40,7 +41,7 @@ public class DelimitedRowFileInputFilterTest {


private static final TypedStruct DEFAULT_STRUCT = TypedStruct.create()
.put("message", "value1;value2;value3")
.put("message", "value1;2;true")
.put("headers", Collections.singletonList("col1;col2;col3"));


Expand All @@ -59,8 +60,8 @@ public void shouldAutoGeneratedSchemaGivenNoSchemaField() {

final TypedStruct record = output.iterator().next();
Assert.assertEquals("value1", record.getString("column1"));
Assert.assertEquals("value2", record.getString("column2"));
Assert.assertEquals("value3", record.getString("column3"));
Assert.assertEquals("2", record.getString("column2"));
Assert.assertEquals("true", record.getString("column3"));
}

@Test
Expand All @@ -73,21 +74,24 @@ public void shouldExtractColumnNamesFromGivenField() {

final TypedStruct record = output.iterator().next();
Assert.assertEquals("value1", record.getString("col1"));
Assert.assertEquals("value2", record.getString("col2"));
Assert.assertEquals("value3", record.getString("col3"));
Assert.assertEquals("2", record.getString("col2"));
Assert.assertEquals("true", record.getString("col3"));
}

@Test
public void shouldUseConfiguredSchemaWithNoType() {
configs.put(READER_FIELD_COLUMNS_CONFIG, "c1:STRING;c2:STRING;c3:STRING");
public void shouldUseConfiguredSchema() {
configs.put(READER_FIELD_COLUMNS_CONFIG, "c1:STRING;c2:INTEGER;c3:BOOLEAN");
filter.configure(configs);
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
Assert.assertNotNull(output);
Assert.assertEquals(1, output.size());

final TypedStruct record = output.iterator().next();
Assert.assertEquals(Type.STRING, record.get("c1").type());
Assert.assertEquals(Type.INTEGER, record.get("c2").type());
Assert.assertEquals(Type.BOOLEAN, record.get("c3").type());
Assert.assertEquals("value1", record.getString("c1"));
Assert.assertEquals("value2", record.getString("c2"));
Assert.assertEquals("value3", record.getString("c3"));
Assert.assertEquals(2, record.getInt("c2").intValue());
Assert.assertTrue(record.getBoolean("c3"));
}
}

0 comments on commit 668ce19

Please sign in to comment.