Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

latest_by_offset does not support DECIMAL #8368

Closed
krisajenkins opened this issue Nov 16, 2021 · 4 comments
Closed

latest_by_offset does not support DECIMAL #8368

krisajenkins opened this issue Nov 16, 2021 · 4 comments
Assignees
Labels
bug help wanted streaming-engine Tickets owned by the ksqlDB Streaming Team

Comments

@krisajenkins
Copy link
Contributor

Describe the bug
There is no latest_by_offset implementation for the DECIMAL type. This is a problem if, for example, you want the latest price of something.

To Reproduce

CREATE OR REPLACE STREAM stocks (
  stock_id VARCHAR KEY,
  description VARCHAR,
  price DECIMAL(10,2)
) WITH (
  KAFKA_TOPIC = 'stocks',
  VALUE_FORMAT = 'avro',
  PARTITIONS = 3
);

INSERT INTO stocks ( stock_id, description, price ) VALUES ( 'CFLT', 'Confluent Inc.', 80.02 );
INSERT INTO stocks ( stock_id, description, price ) VALUES ( 'CFLT', 'Confluent Inc.', 81.02 );
INSERT INTO stocks ( stock_id, description, price ) VALUES ( 'NASDAQ', 'NASDAQ 100', 4003.24 );
INSERT INTO stocks ( stock_id, description, price ) VALUES ( 'CFLT', 'Confluent Inc.', 92.02 );
INSERT INTO stocks ( stock_id, description, price ) VALUES ( 'CFLT', 'Confluent Inc.', 83.02 );
INSERT INTO stocks ( stock_id, description, price ) VALUES ( 'NASDAQ', 'NASDAQ 100', 4028.19 );
INSERT INTO stocks ( stock_id, description, price ) VALUES ( 'CFLT', 'Confluent Inc.', 81.02 );

SET 'auto.offset.reset' = 'earliest';
SET 'ksql.query.pull.table.scan.enabled' = 'true';

CREATE OR REPLACE TABLE current_stock_prices AS
  SELECT
    stock_id,
    latest_by_offset( price ) AS price
  FROM stocks
  GROUP BY stock_id;

SELECT *
FROM current_stock_prices;

Expected behavior
I was expecting a table of two prices, CFLT=81.02 and NASDAQ=4028.19.

Actual behaviour


 Message        
----------------
 Stream created 
----------------
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
Successfully changed local property 'ksql.query.pull.table.scan.enabled' to 'true'. Use the UNSET command to revert your change.
Could not determine output schema for query due to error: Function 'LATEST_BY_OFFSET' does not accept parameters (DECIMAL(10, 2)).
Valid alternatives are:
LATEST_BY_OFFSET(INT val, INT latestN, BOOLEAN ignoreNulls)
LATEST_BY_OFFSET(BIGINT val, INT latestN)
LATEST_BY_OFFSET(BOOLEAN val)
LATEST_BY_OFFSET(INT val, INT latestN)
LATEST_BY_OFFSET(BIGINT val, INT latestN, BOOLEAN ignoreNulls)
LATEST_BY_OFFSET(BOOLEAN val, BOOLEAN ignoreNulls)
LATEST_BY_OFFSET(DOUBLE val)
LATEST_BY_OFFSET(DOUBLE val, INT latestN)
LATEST_BY_OFFSET(VARCHAR val, INT latestN, BOOLEAN ignoreNulls)
LATEST_BY_OFFSET(INT val)
LATEST_BY_OFFSET(BIGINT val)
LATEST_BY_OFFSET(BOOLEAN val, INT latestN)
LATEST_BY_OFFSET(VARCHAR val)
LATEST_BY_OFFSET(VARCHAR val, BOOLEAN ignoreNulls)
LATEST_BY_OFFSET(BIGINT val, BOOLEAN ignoreNulls)
LATEST_BY_OFFSET(BOOLEAN val, INT latestN, BOOLEAN ignoreNulls)
LATEST_BY_OFFSET(INT val, BOOLEAN ignoreNulls)
LATEST_BY_OFFSET(DOUBLE val, BOOLEAN ignoreNulls)
LATEST_BY_OFFSET(DOUBLE val, INT latestN, BOOLEAN ignoreNulls)
LATEST_BY_OFFSET(VARCHAR val, INT latestN)
For detailed information on a function run: DESCRIBE FUNCTION <Function-Name>;
Statement: CREATE OR REPLACE TABLE CURRENT_STOCK_PRICES WITH (KAFKA_TOPIC='CURRENT_STOCK_PRICES', PARTITIONS=3, REPLICAS=1) AS SELECT
  STOCKS.STOCK_ID STOCK_ID,
  LATEST_BY_OFFSET(STOCKS.PRICE) PRICE
FROM STOCKS STOCKS
GROUP BY STOCKS.STOCK_ID
EMIT CHANGES;
Exception while preparing statement: CURRENT_STOCK_PRICES does not exist.
Statement: SELECT *
FROM current_stock_prices;
Caused by: CURRENT_STOCK_PRICES does not exist.

Additional context
This is running against ksql 7.0.0.

@krisajenkins
Copy link
Contributor Author

krisajenkins commented Nov 16, 2021

As a workaround you can cast(price as double), but it loses precision so it's not perfect.

@agavra agavra added the streaming-engine Tickets owned by the ksqlDB Streaming Team label Nov 19, 2021
@dttouchdata
Copy link

dttouchdata commented Jan 29, 2022

Any update or workarounds ?

@krisajenkins
Copy link
Contributor Author

@dttouchdata - one workaround is to use a source table to pick up the latest rows. I did this:

CREATE SOURCE TABLE latest_price (
  stock_id VARCHAR PRIMARY KEY,
  description VARCHAR,
  price DECIMAL(10,2)
) WITH (
  KAFKA_TOPIC = 'stocks',
  VALUE_FORMAT = 'avro'
);

SELECT * FROM latest_price;

...and that gave me the results I was expecting. Whether that will work for you depends on your use-case, but it's another option to explore.

@jnh5y jnh5y self-assigned this Feb 9, 2022
jnh5y added a commit that referenced this issue Mar 15, 2022
…8878)

* feat: Generalize the UDAFs earliest_by_offset and latest_by_offset

Addresses: #5437 and #8368
@jnh5y
Copy link
Member

jnh5y commented Mar 15, 2022

#8878 should close this out!

@jnh5y jnh5y closed this as completed Mar 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug help wanted streaming-engine Tickets owned by the ksqlDB Streaming Team
Projects
None yet
Development

No branches or pull requests

5 participants