Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KSQL 5.4.0-beta1 - SUM - BigDecimal has mismatching scale value for given Decimal schema #3875

Closed
robinroos opened this issue Nov 16, 2019 · 13 comments
Labels
Milestone

Comments

@robinroos
Copy link

Using 5.4.0-beta1, KSQL fails when SUM is run over a DECIMAL type.

I happen to know that the Avro record which I am publishing to Kafka is defined, in Avro IDL, as decimal(9,4), so it has Scale=4.

#1 COUNT works - the row appears when I stream 1 record

ksql> select book, pair, settledate, COUNT(baseamount), COUNT(quotedamount) from pair_position_change group by book, pair, settledate;
+------------+------------+------------+------------+------------+
|BOOK        |PAIR        |SETTLEDATE  |KSQL_COL_3  |KSQL_COL_4  |
+------------+------------+------------+------------+------------+
|UKFXSALES   |EURUSD      |18213       |1           |1           |
^CQuery terminated

#2 SUM fails - the exception appears when I stream 1 record

ksql> select book, pair, settledate, SUM(baseamount), SUM(quotedamount) from pair_position_change group by book, pair, settledate;
+------------+------------+------------+------------+------------+
|BOOK        |PAIR        |SETTLEDATE  |KSQL_COL_3  |KSQL_COL_4  |
+------------+------------+------------+------------+------------+
Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-0000000009, topic=_confluent-ksql-default_transient_2784932728943465989_1573916925762-Aggregate-groupby-repartition, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: _confluent-ksql-default_transient_2784932728943465989_1573916925762-Aggregate-aggregate-changelog
Caused by: org.apache.kafka.connect.errors.DataException: BigDecimal has mismatching scale value for given Decimal schema
	at org.apache.kafka.connect.data.Decimal.fromLogical(Decimal.java:68)
	at io.confluent.connect.avro.AvroData$5.convert(AvroData.java:265)
	at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:420)
	at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:607)
	at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:366)
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
	at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:49)
	at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
	at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:248)
	at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:215)
	at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
	at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
	at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
	at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:166)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:486)
	at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:107)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:116)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:345)
	at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:886)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:792)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:761)

Caused by: Error serializing message to topic:
	_confluent-ksql-default_transient_2784932728943465989_1573916925762-Aggregate-ag
	gregate-changelog
Caused by: BigDecimal has mismatching scale value for given Decimal schema
^CQuery terminated

#3 Describe the underlying Stream

ksql> describe pair_position_change;

Name                 : PAIR_POSITION_CHANGE
 Field        | Type                                                                   
---------------------------------------------------------------------------------------
 ROWTIME      | BIGINT           (system)                                              
 ROWKEY       | VARCHAR(STRING)  (system)                                              
 LEG          | VARCHAR(STRING)                                                        
 TRADETYPE    | VARCHAR(STRING)                                                        
 TRADEREF     | VARCHAR(STRING)                                                        
 BOOK         | VARCHAR(STRING)                                                        
 PAIR         | VARCHAR(STRING)                                                        
 SETTLEDATE   | INTEGER                                                                
 BASEAMOUNT   | DECIMAL                                                                
 QUOTEDAMOUNT | DECIMAL                                                                
 POSITIONKEY  | STRUCT<BOOK VARCHAR(STRING), PAIR VARCHAR(STRING), SETTLEDATE INTEGER> 
---------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> 

#4 This query is actually quite far removed from the original Topic. Here is the full KSQL "topology". My final statement, CREATE TABLE, was not working, so I tested it with SELECT and saw the error reported here.

create stream trades with (kafka_topic='trade', value_format = 'AVRO');

create stream near_pair_position_change as select tradetype, traderef, book, pair, settledate, baseamount, quotedamount, positionkey from trades where TradeType = 'FXSPOT' or TradeType = 'FXFWD' or TradeType='FXSWAP' partition by PositionKey;

create stream far_pair_position_change as select tradetype, traderef, book, pair, farsettledate as settledate, farbaseamount as baseamount, farquotedamount as quotedamount, farpositionkey as positionkey from trades where TradeType='FXSWAP' partition by PositionKey;

create stream pair_position_change as select 'NEAR' as Leg, * from near_pair_position_change;

insert into pair_position_change select 'FAR' as Leg, * from far_pair_position_change;

create table fx_position_by_book_pair_settledate as select book, pair, settledate, SUM(baseamount) as baseamount, SUM(quotedamount) as quotedamount from pair_position_change group by book, pair, settledate;
 

#5 Describe the top-level stream trades, which streams the topic trade

ksql> describe trades;

Name                 : TRADES
 Field            | Type                                                                   
-------------------------------------------------------------------------------------------
 ROWTIME          | BIGINT           (system)                                              
 ROWKEY           | VARCHAR(STRING)  (system)                                              
 TRADEREF         | VARCHAR(STRING)                                                        
 BOOK             | VARCHAR(STRING)                                                        
 TRADETYPE        | VARCHAR(STRING)                                                        
 BUYSELL          | VARCHAR(STRING)                                                        
 COUNTERPARTY     | VARCHAR(STRING)                                                        
 PAIR             | VARCHAR(STRING)                                                        
 TRADEDATETIME    | BIGINT                                                                 
 TRADEDATE        | INTEGER                                                                
 SETTLEDATE       | INTEGER                                                                
 BASEAMOUNT       | DECIMAL                                                                
 QUOTEDAMOUNT     | DECIMAL                                                                
 SPOTPRICE        | DECIMAL                                                                
 TRADERPOINTS     | DECIMAL                                                                
 SALESPOINTS      | DECIMAL                                                                
 CUSTOMERPRICE    | DECIMAL                                                                
 POSITIONKEY      | STRUCT<BOOK VARCHAR(STRING), PAIR VARCHAR(STRING), SETTLEDATE INTEGER> 
 FARSETTLEDATE    | INTEGER                                                                
 FARBASEAMOUNT    | DECIMAL                                                                
 FARQUOTEDAMOUNT  | DECIMAL                                                                
 FARSPOTPRICE     | DECIMAL                                                                
 FARTRADERPOINTS  | DECIMAL                                                                
 FARSALESPOINTS   | DECIMAL                                                                
 FARCUSTOMERPRICE | DECIMAL                                                                
 FARPOSITIONKEY   | STRUCT<BOOK VARCHAR(STRING), PAIR VARCHAR(STRING), SETTLEDATE INTEGER> 
-------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> 

#6 Version info, to confirm I am running 5.4.0-beta1 CLI and Server

CLI v5.4.0-beta1, Server v5.4.0-beta1 located at http://0.0.0.0:8088

@robinroos robinroos added the bug label Nov 16, 2019
@robinroos
Copy link
Author

Attaching Key and Value Schemas for Topic trade from Control Center.

schema-trade-key-v1-_T5E1pJ0TsKYKxknN1yuFA_json.txt
schema-trade-value-v1-_T5E1pJ0TsKYKxknN1yuFA_json.txt

@robinroos
Copy link
Author

#7 This works if I CAST AS INTEGER within SUM. Of course the logic of the application is broken by that approach....

ksql> select book, pair, settledate, SUM(CAST(baseamount as integer)), SUM(CAST(quotedamount as integer)) from pair_position_change group by book, pair, settledate;
+------------+------------+------------+------------+------------+
|BOOK        |PAIR        |SETTLEDATE  |KSQL_COL_3  |KSQL_COL_4  |
+------------+------------+------------+------------+------------+
|UKFXSALES   |EURUSD      |18213       |1000000     |-1234500    |
|UKFXSALES   |EURUSD      |18213       |2000000     |-2469000    |
|UKFXSALES   |EURUSD      |18213       |3000000     |-3703500    |

@robinroos
Copy link
Author

#8 Explicit CAST AS DOUBLE also works, so I guess I have my work-around:

ksql> select book, pair, settledate, SUM(CAST(baseamount as double)), SUM(CAST(quotedamount as double)) from pair_position_change group by book, pair, settledate;
+------------+------------+------------+------------+------------+
|BOOK        |PAIR        |SETTLEDATE  |KSQL_COL_3  |KSQL_COL_4  |
+------------+------------+------------+------------+------------+
|UKFXSALES   |EURUSD      |18213       |1000000.0   |-1234500.0  |
|UKFXSALES   |EURUSD      |18213       |2000000.0   |-2469000.0  |
|UKFXSALES   |EURUSD      |18213       |3000000.0   |-3703500.0  |

The inability to aggregate decimal types without an explicit cast is likely to catch-out other users, specifically those new to KSQL.

@agavra
Copy link
Contributor

agavra commented Nov 18, 2019

Thanks for reporting this @robinroos! This is definitely a problem that we need to address, but I'm still trying to figure out what the "right" behavior for such a sum mechanism would be. As it stands, it returns the same type as the input (e.g. if you sum DECIMAL(2,1) your return type is DECIMAL(2,1)). This is obviously pretty limited because, for example, 9.9+1.0 cannot fit in DECIMAL(2,1) and you'd get the serialization error that you saw. We could automatically cast to double, but that would risk loosing precision (something that is likely unacceptable). We could choose some really large scale/precision and just hope that it fits. None of these are really satisfying solutions, but I'm open to ideas!

@robinroos
Copy link
Author

The work-around, CAST (... AS DOUBLE), has been shown to fail downstream when attempting to use Avro IDL-generated message classes to interact with topics.

See: #3999 .

@robinroos
Copy link
Author

@agavra, your 16-Nov comment implied the issue was numeric overflow-related.

I do not believe this is the case. Rather, I suspect that the scale of the input is not being maintained through both the value of the aggregate and the defined schema of the resulting Topic. I notice that there is a Narrowing of Decimal to (2,1) within io.confluent.ksql.function.AggregateFunctionFactory. Could that be contributing to the problem described here?

@agavra
Copy link
Contributor

agavra commented Dec 2, 2019

@robinroos - the narrowing in AggregateFunctionFactory is only used to describe functions, it is never used in the path of function application so that should not be the cause. I'll take a look at the scale mismatch when I have some time

@apurvam apurvam added this to the 0.7.0 milestone Dec 10, 2019
@robinroos
Copy link
Author

As input to the triage of this issue:

A stream of Avro messages, containing a Decimal(9.4) field, aggregated through SELECT SUM(...), results in a message format which cannot be represented by any Avro schema and which cannot be decoded in Java.

@agavra
Copy link
Contributor

agavra commented Dec 17, 2019

I was able to reproduce this locally - I am pretty confident in my original assessment that it has to do with overflow, as I mentioned before. In the example below, the SUM behaves fine until I add a value that causes it to no longer fit within DECIMAL(9,4).

ksql> CREATE STREAM books (name VARCHAR, cost DECIMAL(9, 4)) WITH (kafka_topic='books', value_format='AVRO', partitions=1);

 Message
----------------
 Stream created
----------------

ksql> INSERT INTO books (name, cost) VALUES ('lotr', '10000.5555');
ksql> SELECT * FROM books EMIT CHANGES LIMIT 1;
+----------------------+----------------------+----------------------+----------------------+
|ROWTIME               |ROWKEY                |NAME                  |COST                  |
+----------------------+----------------------+----------------------+----------------------+
|1576601141304         |null                  |lotr                  |10000.5555            |

ksql> INSERT INTO books (name, cost) VALUES ('lotr', '20000.5555');
ksql> SELECT SUM(cost) FROM books GROUP BY name EMIT CHANGES LIMIT 1;
+----------------------------------------------------------------------------------------------+
|KSQL_COL_0                                                                                    |
+----------------------------------------------------------------------------------------------+
|30001.1110                                                                                    |
Limit Reached
Query terminated

ksql> INSERT INTO books (name, cost) VALUES ('lotr', '99999.5555');
ksql> SELECT SUM(cost) FROM books GROUP BY name EMIT CHANGES;
+----------------------------------------------------------------------------------------------+
|KSQL_COL_0                                                                                    |
+----------------------------------------------------------------------------------------------+
|30001.1110                                                                                    |
Exception caught in process. taskId=1_0, processor=Aggregate-GroupBy-repartition-source, topic=_confluent-ksql-default_transient_4060992337144310523_1576601273150-Aggregate-GroupBy-repartition, partition=0, offset=2, stacktrace=org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: _confluent-ksql-default_transient_4060992337144310523_1576601273150-Aggregate-Aggregate-Materialize-changelog
Caused by: org.apache.kafka.connect.errors.DataException: BigDecimal has mismatching scale value for given Decimal schema
	at org.apache.kafka.connect.data.Decimal.fromLogical(Decimal.java:68)
	at io.confluent.connect.avro.AvroData$5.convert(AvroData.java:264)
	at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:419)
	at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:606)
	at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:365)
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
	at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:49)
	at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
	at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:248)
	at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:215)
	at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
	at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
	at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
	at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:782)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
	at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:107)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:782)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
	at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:385)
	at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:782)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:385)
	at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:537)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:795)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:701)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:674)

Caused by: Error serializing message to topic:
	_confluent-ksql-default_transient_4060992337144310523_1576601273150-Aggregate-Ag
	gregate-Materialize-changelog
Caused by: BigDecimal has mismatching scale value for given Decimal schema
Query terminated

@robinroos
Copy link
Author

Then perhaps I have misunderstood Decimal(9,4). I presumed that was 9 places before the decimal place, and 4 after. But your test case implies a semantic of 9 places, of which 4 come after the decimal place.

Given that semantic I could easily have been overflowing the maximum bound for the type....

@agavra
Copy link
Contributor

agavra commented Dec 17, 2019

Ah yes! That's the meaning of the precision and scale - I found that pretty confusing as well. The best explanation I found is on MSFT SqlServer docs: https://docs.microsoft.com/en-us/sql/t-sql/data-types/decimal-and-numeric-transact-sql?view=sql-server-ver15

[scale] The number of decimal digits that are stored to the right of the decimal point. This number is subtracted from p[recision] to determine the maximum number of digits to the left of the decimal point. Scale must be a value from 0 through p[recision], and can only be specified if precision is specified. The default scale is 0 and so 0 <= s <= p. Maximum storage sizes vary, based on the precision.

@robinroos
Copy link
Author

Ok, thanks for bearing with me here. I will retest with Decimal(14,4).

@robinroos
Copy link
Author

This problem arose due to a misunderstanding, on my part, of the relationship between precision and scale, which resulted in numeric overflow.

Furthermore, the issue of maintaining decimal scale and precision through arithmetic can be solved by CAST(... AS DECIMAL(precision, scale)).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants