How to create a flow for counter type metric continuous aggregation #4851
Unanswered
dozer47528
asked this question in
Q&A
Replies: 1 comment 2 replies
-
To remove a column from a table, you can use Flow for filtering, as it does not perform any aggregation here. For an ad-hoc query that identifies all increases in the last minute and sums them, you would utilize PromQL. This requires a two-stage solution: First, use Flow to filter out the service_ip column: CREATE TABLE requests (
service_name STRING,
service_ip STRING,
val INT,
ts TIMESTAMP TIME INDEX
);
CREATE TABLE requests_without_ip (
service_name STRING,
val INT,
ts TIMESTAMP TIME INDEX,
update_at TIMESTAMP, -- auto generated column by flow engine, will be optional in next release
-- by then you can use `ALTER TABLE requests_without_ip DROP COLUMN update_at` to remove it
);
CREATE FLOW requests_long_term
SINK TO requests_without_ip
AS
SELECT
service_name,
val,
ts
FROM requests;
INSERT INTO requests VALUES
("svc1", "10.0.0.1", 100, "2024-10-18 19:00:00"),
("svc1", "10.0.0.2", 100, "2024-10-18 19:00:00"),
("svc1", "10.0.0.1", 200, "2024-10-18 19:00:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:00:30"),
("svc1", "10.0.0.1", 300, "2024-10-18 19:01:00"),
("svc1", "10.0.0.2", 100, "2024-10-18 19:01:01"),
-- notice here since ts is time index, so having same pk tag and timeindex will make `100` overwrite `400`, so a tiny difference is needed here
("svc1", "10.0.0.1", 400, "2024-10-18 19:01:30"),
("svc1", "10.0.0.2", 200, "2024-10-18 19:01:31");
admin flush_flow('requests_long_term'); -- flush the flow to make sure the data is in the table
SELECT * FROM requests_without_ip; Then you can use PromQL to find the increase in the last minute:
That should give you the sum of the increase in the last minute for |
Beta Was this translation helpful? Give feedback.
2 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I have a metric like this:
Data Example:
And I will query this metric by this promQL
sum(increase(requests{service_name='svc1'}[1m]))
I would like to create a flow which can remove
service_ip
column. For long-term storage, I don't need theservice_ip
column.Which aggregation func should I use for creating a flow?
If I use
sum
for aggregation func, the result would be:For this data, How can I get the correct result? If I still use
increase
like this:sum(increase(requests_without_ip{service_name='svc1'}[1m]))
.The result will be wrong.
Beta Was this translation helpful? Give feedback.
All reactions