Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected high system memory usage on single stream task #1586

Open
sbengo opened this issue Sep 26, 2017 · 13 comments
Open

Unexpected high system memory usage on single stream task #1586

sbengo opened this issue Sep 26, 2017 · 13 comments

Comments

@sbengo
Copy link

sbengo commented Sep 26, 2017

Hi,

I'm working with Kapacitor 1.3.1 (git: master 3b5512f) to create some alerts with an InfluxDB subscription and after working on databases and several TickScripts we noticed an unusual high memory usage.

We have only enabled one task predefined with a template and filled with JSON which is retrieving data from stream mode to:

  • Calculate the mean every 1m of 30m windowed of InputUtilization in every interface of every switch.
  • Total interfaces of switches reaches to 4024.
  • Some of the interfaces may not have the field InputUtilization, so, sometimes error on mean nodes could appear.

Example case

After enabling the task and see that it works OK, we have seen that the used system memory increased to unexpected level since we started Kapacitor and it reaches a constant level after 1 hour of working.

Memory before: 1.907 GB
Memory after 1 hour: 2.985 GB
Total increase 1.078 GB

memory increase

  • Assuming it keeps the data for 30min (window .period()) 30 min * 4024 series * 1 point/1min = 120720 points over 30 min of data, it seems each point is using 1,078GB / 120720 points = 9588 bytes. We think this is too much memory and we can not understand why Kapacitor requires this amount of memory for each point.

  • We don't understand why the used system memory reaches the max level after 1 hour if only windowing points with interval set up with 30m duration.

Questions

With the behaviour explained above:

  1. Does kapacitor need 10Kb of data to store each point of data on stream mode ?

  2. Assuming linear scaling and the same cardinality, for only N task, we would need N * 1GB of memory?

  3. Could you help us to understand Kapacitor memory behaviour, please ?

We attach you the following TickScripts and show statistics to let you check if there is something wrong on our TickScript/Nodes definition:

TickScript + node info

// TICKSCRIPT:
// ================
var data = stream
    |from()
        .database('switch_metrics')
        .retentionPolicy('1y')
        .measurement('ifMIB_metrics')
        .groupBy([*])
        .where(lambda: TRUE)
    |window()
        .period(30m)
        .every(1m)
        .align()
    |mean('InputUtilization')
        .as('value')

// check_alert generates a 'mon_check' field that brief if a device has to be monitored or not
var check_alert = data
    |default()
        .field('mon_active', TRUE)
        .field('mon_line', 'LBLE')
        .field('mon_exc', 0)
        .tag('poc_entorno', 'PRO')
    // Generate hour and day
    |eval(lambda: hour(time), lambda: weekday(time))
        .as('time_hour', 'time_day')
        .keep()
    |eval(lambda: if(mon_activo == TRUE AND mon_exc >= 0 AND strContains(mon_line, ID_LINIA), 1, 0))
        .as('mon_check')
        .keep()
    |httpOut('mon_check')

var trigger = check_alert
    |alert()
        .crit(lambda: if(mon_check == 1 AND time_hour >= TH_CRIT_MIN_HOUR AND time_hour <= TH_CRIT_MAX_HOUR AND strContains(WEEK_DAY_CRIT, string(time_day)), float(value) > if(mon_exc == 0, TH_CRIT_DEF, if(mon_exc == 1, TH_CRIT_EX1, if(mon_exc == 2, TH_CRIT_EX2, 0.0))), FALSE))
        .warn(lambda: if(mon_check == 1 AND time_hour >= TH_WARN_MIN_HOUR AND time_hour <= TH_WARN_MAX_HOUR AND strContains(WEEK_DAY_WARN, string(time_day)), float(value) > if(mon_exc == 0, TH_WARN_DEF, if(mon_exc == 1, TH_WARN_EX1, if(mon_exc == 2, TH_WARN_EX2, 0.0))), FALSE))
        .info(lambda: if(mon_check == 1 AND time_hour >= TH_INFO_MIN_HOUR AND time_hour <= TH_INFO_MAX_HOUR AND strContains(WEEK_DAY_INFO, string(time_day)), float(value) > if(mon_exc == 0, TH_INFO_DEF, if(mon_exc == 1, TH_INFO_EX1, if(mon_exc == 2, TH_INFO_EX2, 0.0))), FALSE))
        .stateChangesOnly()
        .id(ID)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .message(message)
        .details(details)
        .email()
        .to(['[email protected]','[email protected]'])
trigger
    |httpOut('output')

The following table is the brief of nodes cardinality and the diagram:
Note: Sometimes there is an error on the mean node because the field on the point does not exist.

Node exc_time errs cardin. processed
stream0 0s 0 0 stream0 -> from1
from1 35.568µs 0 0 from1 -> window2
window2 16.249µs 0 4024 window2 -> mean3
mean3 81.9µs 91 0 mean3 -> default4
default4 43.656µs 0 0 default4->eval5
eval5 343.215µs 0 4024 eval5 -> eval6
eval6 134.317µs 0 4024 eval6 -> http_out7
http_out7 24.021µs 0 4024 http_out7 -> alert8
alert8 158.438µs 0 4024 alert8 -> http_out9
http_out9 0s 0 0
@sbengo
Copy link
Author

sbengo commented Sep 27, 2017

Hi again,

@nathanielc, @desa, we did the same test with Kapacitor 1.3.3 (git: master eacb373) and it seems to have the same behaviour explained on the comment above.

memory_increase_131

Thanks,
Regards

@desa
Copy link
Contributor

desa commented Sep 27, 2017

@sbengo Thanks for the report. What does the data that you're writing look like? How many tags, fields, etc.? Also the length of each of the tag keys and values would be useful.

One thing that we can do to lower the amount of things buffered is only keep the fields we need in the window

var data = stream
    |from()
        .database('switch_metrics')
        .retentionPolicy('1y')
        .measurement('ifMIB_metrics')
        .groupBy([*])
        .where(lambda: TRUE)
    |eval()
        .keep('InputUtilization')
    |window()
        .period(30m)
        .every(1m)
        .align()
    |mean('InputUtilization')
        .as('value')

Assuming it keeps the data for 30min (window .period()) 30 min * 4024 series * 1 point/1min = 120720 points over 30 min of data, it seems each point is using 1,078GB / 120720 points = 9588 bytes. We think this is too much memory and we can not understand why Kapacitor requires this amount of memory for each point.

I noticed that in our window node, we emit the data to the pipeline every minute with the last 30m

    |window()
        .period(30m)
        .every(1m)
        .align()

This means that every minute we send a 30m buffer of data down the pipeline. Internally this creates two ~30m windows of the data every minute so the math here is a bit off. This would also explain the leveling off at 1h. Nonetheless, with the adjustments to the equation, things are still a bit excessive.

To summarize, I've got a couple asks

  1. What does your data look like. Preferably an example data would be useful.
  2. What happens if we write one point to each series? (e.g. Enable the task, write one point to each series and let it sit).

@sbengo
Copy link
Author

sbengo commented Sep 27, 2017

Thanks for the quick answer.

In order to track which device/interface is giving errors, it is completely necessary to preserve the tags and show them on the alert.
We are using a generic template so we have no plans to hardcode the tag set on keep() function.

  • How can we preserve those tags?
  • There is any way to keep in memory as a variable map and after apply to the "message/details" variable template to pass to the alert node?

We will do the other proposed test on the following days.

In order to provide more info, our data looks like:

Data overview from measurement

- Precision = 1m
- Numer of tagKeys = 8 
- Number of fieldKeys = 14
- Cardinality = 4024

As an example of one entry:

TagSet (length in number of characters)

tagKey length tagKey ~ length tagValue
tag1 5 10
tag2 6 3
tag3 11 4
tag4 9 3
tag5 7 15
tag6 8 21
tag7 6 16
tag8 6 4

FieldSet (length in number of characters)

fieldKey length tagKey type
InputUtilization 16 "float"
OutputUtilization 17 "float"
ifAdminStatus 13 "float"
ifHCInBroadcastPkts 19 "float"
ifHCInMulticastPkts 19 "float"
ifHCInOctets 12 "float"
ifHCInUcastPkts 15 "float"
ifHCOutBroadcastPkts 20 "float"
ifHCOutMulticastPkts 20 "float"
ifHCOutOctets 13 "float"
ifHCOutUcastPkts 16 "float"
ifInDiscards 12 "float"
ifOperStatus 12 "float"
inErrorRate 11 "float"

@desa
Copy link
Contributor

desa commented Sep 27, 2017

We are using a generic template so we have no plans to hardcode the tag set on keep() function.

Keep only applies to fields. All tags will be preserved. I'd try to use the keep with the window. This way you'll buffer less data.

There is any way to keep in memory as a variable map and after apply to the "message/details" variable template to pass to the alert node?

Not sure I follow what you're trying to do.

@sbengo
Copy link
Author

sbengo commented Oct 2, 2017

Hi @desa , first of all thanks for your reply.

We did some test applying your suggestion of keeping only the 'InputUtilization' before the |window() node field and those are the results and questions:

// TICKSCRIPT:
// ================
var data = stream
    |from()
        .database('switch_metrics')
        .retentionPolicy('1y')
        .measurement('ifMIB_metrics')
        .groupBy([*])
        .where(lambda: TRUE)
    |eval()
        .keep('InputUtilization')
    |window()
        .period(period)
        .every(1m)
        .align()
    |mean('InputUtilization')
        .as('value')
... 

Test 1 - 2017/09/29 :

Var Value
period 30m
Mem before Time to standby Mem standby Total increase
1.984GB ~ 1 hour 2.776 GB 0.792 GB

850403524,608 / (30min * 4025 points) = 7042 bytes/point

Observations:

  • The new eval node seems to reduce the system memory by 26,55% in comparison with the first issue comment

test1_ok

Test 2 - 2017/09/29:

We activated another task to eval on the same measurement another field: OutputUtilization after the Test 1 - 2017/09/29 without restarting Kapacitor, with the following results:

Var Value
period 30m
Mem before Time to standby Mem standby Total increase
2.800 GB ~ 1 hour 3.331 GB 0.531 GB

850403524,608 / (30min * 4025 points) = 4721 bytes/point

Observations:

  • Enabling another task which is processing exactly the same amount of data and from the same measurement seems to consume 33% less than the Test 1

test2_ok

Test 3 - 2017/10/02

Var Value
period 15m
Mem before Time to standby Mem standby Total increase
2.063 ~ 30 min 2.440 GB 0.377 GB

404800667,648 bytes / (15 min * 4025 points) = 6704 bytes/point

Observations:

  • Setting a period 15m seems to be related with the total elapsed time to reach the max memory consumption. Its the half elapsed time than Test 1 and Test 2
  • The used memory/point seems to be more or less similar the Test 1 with period = 30m.

test3_ok

Questions

  1. As the data is coming every 1m and we keep it on the same node windowing 30 min, we don't see why it needs 2*period time to reach its max memory:

"This means that every minute we send a 30m buffer of data down the pipeline. Internally this creates two ~30m windows of the data every minute so the math here is a bit off. This would also explain the leveling off at 1h. Nonetheless, with the adjustments to the equation, things are still a bit excessive."

Is not Kapacitor keeping data like the following example? If its right, the memory would increase for period+1 minutes

For example with a period = 30m

time Datapoins on window node Datapoints on next node
0:00 1 --
0:01 2 1
0:02 3 2
... ... ...
0:29 30 29
0:30 30 30
0:31 30 30
0:32 30 30
... ... ...

Do you agree?

  1. If there is one buffer for each node (in our case, 2 nodes: window, mean), can we assume that the memory used on each node is exactly the half of each memory?

  2. As it can be shown, we are defaulting some fields to check if the alert must be fired or not. What's better, keep tags or keep fields? What uses less memory?

@sbengo
Copy link
Author

sbengo commented Oct 3, 2017

Hello again!

I update the issue with another test:

Test 4 - 2017/10/03

In this Test, we set up 5 task with the same cardinality: 4025 point into the same measurement and selecting different fields from the same time series (all fields are sent in the same write action)

We modified the TickScript: instead of using 3 different vars we defined the tickscript calling the stream node and chaining them

// TICKSCRIPT:
// ================
stream
    |from()
        .database('switch_metrics')
        .retentionPolicy('1y')
        .measurement('ifMIB_metrics')
        .groupBy([*])
        .where(lambda: TRUE)
    |eval()
        .keep('InputUtilization')
    |window()
        .period(period)
        .every(1m)
        .align()
    |mean('InputUtilization')
        .as('value')
    ...
    |default()
    ...
    |alert()...  
Var Value
period 15m
Mem before Time to standby Mem standby Total increase
1.565 ~ 15 min 3.278 GB 1.713 GB

1839319744,512 bytes / (15 min * 4025 points * 5 Task) = 6093 bytes/point

Observations:

  • Creating only a var seems to decrease the time elapsed until standby according to the period duration
  • The used memory/point seems to be less than other test: Test1 - 13% | Test2 - 9% .

test4_ok

Questions

  1. The memory consumption is proportional to number of declared vars (obviously, vars with data embedded) ?

@toni-moreno
Copy link

Hi @desa, @nathanielc I'm working with @sbengo in these analysis and we have decided repeat the test but now we have disabled all other running tasks and measured the Process "Heap In use" Instead of System Memory, as a more reliable way to measure memory consumption.

As in the previous tests , we need compute the mean of 15 minutes of "InputUtilization" on aprox 4029 switch port interfaces and after we will compare them with some thresholds, we have also used an UDF (monInjector) to inject some fields (mon_activo,mon_linia,mon_exc) taken from an external(not influxdb) database, to control whether to continue with threshold evaluation and to select the among 3 different possible thresholds, also have hour/weekday filtering.

Test

Test 1: Original Template

//TICKSCRIPT:
//================
//var data = stream
stream
    |from()
        .database(INFLUX_BD)
        .retentionPolicy(INFLUX_RP)
        .measurement(INFLUX_MEAS)
        .groupBy(influx_agrup)
        .where(INFLUX_FILTER)
    |default()
        .field(FIELD,FIELD_DEFAULT)
    |eval()
        .keep(FIELD)
    |window()
        .period(INTERVALO_CHECK)
        .every(every)
        .align()
    |mean(FIELD)
        .as('value')
    @monInjector()
        .alertId(ID)
        .searchByTag(DEVICEID_TAG)
    |default()
            .field('mon_activo', TRUE)
            .field('mon_linea', 'LBLE')
            .field('mon_exc', 0)
    |eval(lambda:hour("time"), lambda:weekday("time"))
        .as('hora','dia')
        .keep()
    |eval(lambda: if("mon_activo" == TRUE AND "mon_exc" >= 0 AND strContains("mon_linea",ID_LINIA), 1, 0))
          .as('mon_check')
          .keep()
    |httpOut('mon_check')
    |alert()
        .crit(lambda: if("mon_check" == 1 AND "hora" >= TH_CRIT_MIN_HOUR AND "hora" <= TH_CRIT_MAX_HOUR AND strContains(DIA_SEMANA_CRIT,string("dia")), float("value") > if("mon_exc" == 0, TH_CRIT_DEF, if("mon_exc" == 1, TH_CRIT_EX1, if("mon_exc" == 2, TH_CRIT_EX2, 0.0))),FALSE))
        .warn(lambda: if("mon_check" == 1 AND "hora" >= TH_WARN_MIN_HOUR AND "hora" <= TH_WARN_MAX_HOUR AND strContains(DIA_SEMANA_WARN,string("dia")), float("value") > if("mon_exc" == 0, TH_WARN_DEF, if("mon_exc" == 1, TH_WARN_EX1, if("mon_exc" == 2, TH_WARN_EX2, 0.0))),FALSE))
        .info(lambda: if("mon_check" == 1 AND "hora" >= TH_INFO_MIN_HOUR AND "hora" <= TH_INFO_MAX_HOUR AND strContains(DIA_SEMANA_INFO,string("dia")), float("value") > if("mon_exc" == 0, TH_INFO_DEF, if("mon_exc" == 1, TH_INFO_EX1, if("mon_exc" == 2, TH_INFO_EX2, 0.0))),FALSE))
        .stateChangesOnly()
        .id(ID)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .message(message)
        .details(details)
        .post('http://output_webservice')
        .email()
        .to(envio_mail)
    |httpOut('output')

When enabling this tasks we can see this consumption.

image

test Init Heap End Heap Heap Usage Cardinality Points(15m) bytes /point
1 18Mb 554Mb 536 Mb 4029 60435 9299 b /point

Test 2 ( windows + mean -> movingAverage)

In this test we have removed the window node, and replaced the mean() node for the movingAverage() with 15 points (our resolution is 1point/minute). The result should be the same as the Test 1.

//TICKSCRIPT:
//================
//var data = stream
stream
    |from()
        .database(INFLUX_BD)
        .retentionPolicy(INFLUX_RP)
        .measurement(INFLUX_MEAS)
        .groupBy(influx_agrup)
        .where(INFLUX_FILTER)
    |default()
        .field(FIELD,FIELD_DEFAULT)
    |eval()
        .keep(FIELD)
    |movingAverage(FIELD,MOV_AVG_POINTS)
        .as('value')
    @monInjector()
        .alertId(ID)
        .searchByTag(DEVICEID_TAG)
    |default()
            .field('mon_activo', TRUE)
            .field('mon_linea', 'LBLE')
            .field('mon_exc', 0)
    |eval(lambda:hour("time"), lambda:weekday("time"))
        .as('hora','dia')
        .keep()
    |eval(lambda: if("mon_activo" == TRUE AND "mon_exc" >= 0 AND strContains("mon_linea",ID_LINIA), 1, 0))
          .as('mon_check')
          .keep()
    |httpOut('mon_check')
    |alert()
        .crit(lambda: if("mon_check" == 1 AND "hora" >= TH_CRIT_MIN_HOUR AND "hora" <= TH_CRIT_MAX_HOUR AND strContains(DIA_SEMANA_CRIT,string("dia")), float("value") > if("mon_exc" == 0, TH_CRIT_DEF, if("mon_exc" == 1, TH_CRIT_EX1, if("mon_exc" == 2, TH_CRIT_EX2, 0.0))),FALSE))
        .warn(lambda: if("mon_check" == 1 AND "hora" >= TH_WARN_MIN_HOUR AND "hora" <= TH_WARN_MAX_HOUR AND strContains(DIA_SEMANA_WARN,string("dia")), float("value") > if("mon_exc" == 0, TH_WARN_DEF, if("mon_exc" == 1, TH_WARN_EX1, if("mon_exc" == 2, TH_WARN_EX2, 0.0))),FALSE))
        .info(lambda: if("mon_check" == 1 AND "hora" >= TH_INFO_MIN_HOUR AND "hora" <= TH_INFO_MAX_HOUR AND strContains(DIA_SEMANA_INFO,string("dia")), float("value") > if("mon_exc" == 0, TH_INFO_DEF, if("mon_exc" == 1, TH_INFO_EX1, if("mon_exc" == 2, TH_INFO_EX2, 0.0))),FALSE))
        .stateChangesOnly()
        .id(ID)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .message(message)
        .details(details)
        .post('http://output_webservice')
        .email()
        .to(envio_mail)
    |httpOut('output')

When enabled the task with this new TickScript this is the result

image

test Init Heap End Heap Heap Usage Cardinality Points(15m) bytes /point
2 15Mb 199Mb 184 Mb 4029 60435 3192 b /point

This test show us how memory increases a lot after 15 minutes when movingAverage node begins to send data to the following nodes.

Test 3 . (no httpOut)

In this test we have removed the, not needed httpOut() nodes

//TICKSCRIPT:
//================
//var data = stream
stream
    |from()
        .database(INFLUX_BD)
        .retentionPolicy(INFLUX_RP)
        .measurement(INFLUX_MEAS)
        .groupBy(influx_agrup)
        .where(INFLUX_FILTER)
    |default()
        .field(FIELD,FIELD_DEFAULT)
    |eval()
        .keep(FIELD)
    |movingAverage(FIELD,MOV_AVG_POINTS)
        .as('value')
    @monInjector()
        .alertId(ID)
        .searchByTag(DEVICEID_TAG)
    |default()
            .field('mon_activo', TRUE)
            .field('mon_linea', 'LBLE')
            .field('mon_exc', 0)
    |eval(lambda:hour("time"), lambda:weekday("time"))
        .as('hora','dia')
        .keep()
    |eval(lambda: if("mon_activo" == TRUE AND "mon_exc" >= 0 AND strContains("mon_linea",ID_LINIA), 1, 0))
          .as('mon_check')
          .keep()
    |alert()
        .crit(lambda: if("mon_check" == 1 AND "hora" >= TH_CRIT_MIN_HOUR AND "hora" <= TH_CRIT_MAX_HOUR AND strContains(DIA_SEMANA_CRIT,string("dia")), float("value") > if("mon_exc" == 0, TH_CRIT_DEF, if("mon_exc" == 1, TH_CRIT_EX1, if("mon_exc" == 2, TH_CRIT_EX2, 0.0))),FALSE))
        .warn(lambda: if("mon_check" == 1 AND "hora" >= TH_WARN_MIN_HOUR AND "hora" <= TH_WARN_MAX_HOUR AND strContains(DIA_SEMANA_WARN,string("dia")), float("value") > if("mon_exc" == 0, TH_WARN_DEF, if("mon_exc" == 1, TH_WARN_EX1, if("mon_exc" == 2, TH_WARN_EX2, 0.0))),FALSE))
        .info(lambda: if("mon_check" == 1 AND "hora" >= TH_INFO_MIN_HOUR AND "hora" <= TH_INFO_MAX_HOUR AND strContains(DIA_SEMANA_INFO,string("dia")), float("value") > if("mon_exc" == 0, TH_INFO_DEF, if("mon_exc" == 1, TH_INFO_EX1, if("mon_exc" == 2, TH_INFO_EX2, 0.0))),FALSE))
        .stateChangesOnly()
        .id(ID)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .message(message)
        .details(details)
        .post('http://output_webservice')
        .email()
        .to(envio_mail)

When enabled these are the results

image

test Init Heap End Heap Heap Usage Cardinality Points(15m) bytes /point
3 17Mb 160Mb 160 Mb 4029 60435 2481 b /point

When removed httpOut() the Heap peaks have disappeared.

Test 4 ( 9 to 5 nodes)

In this test we have removed all unneeded nodes and moved the related logic ( time checking, and some other checking) to our UDF (monInjector).

With this logic change , we have reduced from 9 nodes to 5

//TICKSCRIPT:
//================
//var data = stream
stream
    |from()
        .database(INFLUX_BD)
        .retentionPolicy(INFLUX_RP)
        .measurement(INFLUX_MEAS)
        .groupBy(influx_agrup)
        .where(INFLUX_FILTER)
    |default()
        .field(FIELD,FIELD_DEFAULT)
    |movingAverage(FIELD,MOV_AVG_POINTS)
        .as('value')
    @monInjector()
        .alertId(ID)
        .searchByTag(DEVICEID_TAG)
        .setLine(LIDE_ID)
        .critTime(DIA_SEMANA_CRIT,TH_CRIT_MIN_HOUR,TH_CRIT_MIN_HOUR)
        .warnTime(DIA_SEMANA_WARN,TH_WARN_MIN_HOUR,TH_WARN_MIN_HOUR)
        .infoTime(DIA_SEMANA_INFO,TH_INFO_MIN_HOUR,TH_INFO_MIN_HOUR)
    |alert()
        .crit(lambda: if("check_crit" , float("value") > if("mon_exc" == 0, TH_CRIT_DEF, if("mon_exc" == 1, TH_CRIT_EX1, if("mon_exc" == 2, TH_CRIT_EX2, 0.0))),FALSE))
        .warn(lambda: if("check_warn" , float("value") > if("mon_exc" == 0, TH_WARN_DEF, if("mon_exc" == 1, TH_WARN_EX1, if("mon_exc" == 2, TH_WARN_EX2, 0.0))),FALSE))
        .info(lambda: if("check_info" , float("value") > if("mon_exc" == 0, TH_INFO_DEF, if("mon_exc" == 1, TH_INFO_EX1, if("mon_exc" == 2, TH_INFO_EX2, 0.0))),FALSE))
        .stateChangesOnly()
        .id(ID)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .message(message)
        .details(details)
        .post('http://output_webservice')
        .email()
        .to(envio_mail)

When enabled the node we can see this behaviour.

image

test Init Heap End Heap Heap Usage Cardinality Points(15m) bytes /point
4 11Mb 53Mb 42 Mb 4029 60435 728 b /point

Conclusions

test Init Heap End Heap Heap Usage Cardinality Points(15m) bytes /point Test type
1 18Mb 554Mb 536 Mb 4029 60435 9299 b /point window+mean() 15min
2 15Mb 199Mb 184 Mb 4029 60435 3192 b /point movingAverage(15)
3 17Mb 160Mb 160 Mb 4029 60435 2481 b /point no httpOut()
4 11Mb 53Mb 42 Mb 4029 60435 728 b /point 9 to 5 nodes in pipeline

We have reduced from initial 9299bytes/point to 720b/point , by reducing number of nodes ( and also moving logic to our external UDF)

It seems like nodes after movingAverage (monInjector and Alert) are consuming such as movingAverage itself , but any of them stores data in memory )

Anyway we think 720 bytes/point is to much to a point where the sum of all tags/fields is about 200 bytes ( supposing each point are completely stored on memory although Tags have always the same values )

Remember we have working with this set of data.

TagSet (length in number of characters)

tagKey length tagKey ~ length tagValue
tag1 5 10
tag2 6 3
tag3 11 4
tag4 9 3
tag5 7 15
tag6 8 21
tag7 6 16
tag8 6 4

Original FieldSet (length in number of characters)

fieldKey length tagKey type
InputUtilization 16 "float"

Injected FieldSet from monInjector UDF Node (length in number of characters)

fieldKey length tagKey type
check_crit 10 "boolean"
check_warn 10 "boolean"
check_info 10 "boolean"
mon_exc 9 "integer"

Questions

I would like to understand how memory is handled in kapacitor in order to optimise the resource consumption for all our dataset ( lots of products with aprox 400 measurements sometimes with cardinality near 50000) . We need also sometimes compute online , averaged data for several hours ( 1 , 2 , 3) .

  • Does Kapacitor store repeated tags on memory although the tags should do not change?
  • Why memory consumption for node monInjector/Alert is greater or equal than the movingAverage if they are not storing old points in memory ?
  • There is plans to implement recursive formulas on some nodes (like movingAverage) avoiding excessive data cache?
  • How can reduce memory consumption on high cardinality measurements ?
  • do you need some other different test with our dataset ?

Thank you for your great work and We hope we can help you to improve it with our tests.

@toni-moreno
Copy link

Test 5 ( change monInjectorUDF to default())

As a last test trying to release memory , we have removed completely the UDF functionality by a default node witch will inject 4 default fields ( as UDF did before). With this tickScript.

//TICKSCRIPT:
//================
//var data = stream
stream
    |from()
        .database(INFLUX_BD)
        .retentionPolicy(INFLUX_RP)
        .measurement(INFLUX_MEAS)
        .groupBy(influx_agrup)
        .where(INFLUX_FILTER)
    |default()
        .field(FIELD,FIELD_DEFAULT)
    |movingAverage(FIELD,MOV_AVG_POINTS)
        .as('value')
    |default()
        .field('check_crit',TRUE)
        .field('check_warn',TRUE)
        .field('check_info',TRUE)
        .field('mon_exc',0)
    |alert()
        .crit(lambda: if("check_crit" , float("value") > if("mon_exc" == 0, TH_CRIT_DEF, if("mon_exc" == 1, TH_CRIT_EX1, if("mon_exc" == 2, TH_CRIT_EX2, 0.0))),FALSE))
        .warn(lambda: if("check_warn" , float("value") > if("mon_exc" == 0, TH_WARN_DEF, if("mon_exc" == 1, TH_WARN_EX1, if("mon_exc" == 2, TH_WARN_EX2, 0.0))),FALSE))
        .info(lambda: if("check_info" , float("value") > if("mon_exc" == 0, TH_INFO_DEF, if("mon_exc" == 1, TH_INFO_EX1, if("mon_exc" == 2, TH_INFO_EX2, 0.0))),FALSE))
        .stateChangesOnly()
        .id(ID)
        .idTag(idTag)
        .levelTag(levelTag)
        .messageField(messageField)
        .durationField(durationField)
        .message(message)
        .details(details)
        .post('http://output_webservice')
        .email()
        .to(envio_mail)

When enabled the node we can see this behaviour.

image

test Init Heap End Heap Heap Usage Cardinality Points(15m) bytes /point
5 17Mb 57Mb 40 Mb 4029 60435 694 b /point

As we can see this change does not affect to much to the memory consumption.

Still the same questions than before.

Questions

I would like to understand how memory is handled in kapacitor in order to optimise the resource consumption for all our dataset ( lots of products with aprox 400 measurements sometimes with cardinality near 50000) . We need also sometimes compute online , averaged data for several hours ( 1 , 2 , 3) .

  • Does Kapacitor store repeated tags on memory although the tags should do not change?
  • Why memory consumption for node monInjector/Alert is greater or equal than the movingAverage if they are not storing old points in memory ?
  • There is plans to implement recursive formulas on some nodes (like movingAverage) avoiding excessive data cache?
  • How can reduce memory consumption on high cardinality measurements ?
  • do you need some other different test with our dataset ?

@nathanielc
Copy link
Contributor

Hello @sbengo @toni-moreno , this is a great analysis. Thanks!

I'll answer your questions below.

Does Kapacitor store repeated tags on memory although the tags should do not change?

Yes, in most cases Kapacitor stores the data as its written via the API, so if each point has repeated tags it is duplicated. There are cases where we know that the tags will not be used and we can drop them. Your use of movingAverage si one of these cases. The moving average implementation only stores the float value for the number of points in the average. Only the common tags are stored, and they are stoed only once for the node. This is why movingAverage is more efficient than a window()|mean() operation.

Why memory consumption for node monInjector/Alert is greater or equal than the movingAverage if they are not storing old points in memory ?

My answer above address this as well.

There is plans to implement recursive formulas on some nodes (like movingAverage) avoiding excessive data cache?

Currently movingAverage is storing as little data as possible(which equates to a slice of floats the size of moving average). Do you have another algorithm in mind?

How can reduce memory consumption on high cardinality measurements ?

In general each node will have to store the metadata about each unique group. So as you saw reducing the number of nodes can reduce the overall memory usage. For that reason the eval node accepts multiple lambda expressions, not just one. This way you can do all your eval work in a single node thus reducing the overhead. Or like you have done push the logic down into a UDF, although we would like Kapacitor to be efficient enough that this is not necessary.

do you need some other different test with our dataset ?

Yes, it would be helpful to get a heap profile after the task is in its steady state. This can be retrieved by making this request.

curl 'http://localhost:9092/kapacitor/v1/debug/pprof/heap?debug=1' > heap.pprof

In order for us to be able to read the profile data we need to exact commit that you are running. If you are still on 1.3.3 great, if not we will need to know.

You can share the profile here if you want or feel free to send me the file directly. The file is a simple text file so you can inspect it for any sensitive data first. My contact info can be found on my github profile.

Thanks again for the detailed write up.

@toni-moreno
Copy link

toni-moreno commented Oct 10, 2017

Hi @nathanielc

About hpprof

I've uploaded the pprof file for version Kapacitor 1.3.3 (git: master eacb373)

kapacitor-heap.pprof-20171010.gz

About recursive formulas.

I've coded myself a new aggregator for telegraf (influxdata/telegraf#2167) we can use recursive formulas for some basic statistics like (count, sum,max,min,mean,variance,standard deviation), this node will need apply a "reset" on its internal counter once a group of points have been reached", it will lose output resolution , because of you will have one output point for each group of input points. But for much of our alert rules could be a good solution allowing saving memory resources.

What do you thing about creating a special recursiveStatsNode to avoid window+mean and save memory consumption?

perhaps something like:

recursiveStatsNode()
      .every(TIME)
      .compute("max","min","mean",...)

What do you think about?

@nathanielc
Copy link
Contributor

@toni-moreno Thanks for the pprof file. There seems to be a lot of the in use RAM relating to prometheus scrapping specifically AWS. Does that sound right? Have you configured Kapacitor to scrape AWS hosts?

On Recursive formulas

The function in InfluxQL are nearly all implemented using this idea of consuming a single point at a time, and reducing the result as it goes along. The problem is that the window node is currently a bit naive. The window node could be optimized such that if its configured to be fixed non overlapping windows that it would immediately pass the data along only keeping track of metadata about the time of the points only. This would then take advantage of the reduce type function already implemented.

In your example case though you have overlapping windows so the points that are part of the overlapping windows must be cached so they can be sent in each new window.

@toni-moreno
Copy link

Hi @nathanielc we have not any Prometheus scrapping configured. In fact we have updated version with old 1.1 config file ( there is not any [[scraper]] section in this file). After updated with all new sections like [[scraper]] , [[azure]], and others , and restarted I've repeated the last test.

Test 6 ( added [[scraper]] enabled = false to the conf file ).

image

test Init Heap End Heap Heap Usage Cardinality Points(15m) bytes /point
6 8Mb 48Mb 40 Mb 4029 60435 694 b /point

no changes on the memory consumption/point with this test, only initial memory has decreased some 10Mb.

On Recursive formulas

About recursive formulas I've not understand your explanation ( sorry perhaps my poor English has contributed on this).

On the recursive implementation we assume we won't have overlapped window. In this approach if we would like to compute the mean of 15 points we will need only a counter (15 to 0 ) once it reaches 0 it should emit to the next node the computed average and reset values), and the last computed average.

In the last example for 60 input points it will emit 4 output points , and assuming 8 bytes/point we have saved 87% of memory (from 120 bytes to 16 bytes)

node Input points Output Points consumed memory
movingAverage(15) 60 45 15*8=120 bytes
recursiveStats(15,mean) 60 4 (each 15 points) 8(last computed)+8(counter) = 16 bytes

We have lost resolution but saved a lot of memory , on most cases could be a great solution and will save memory when computed for >2 points . What do you think about this new approach?

@toni-moreno
Copy link

Hi @nathanielc suppose we would like to help you coding a new "like" influxQLNode recursiveStats as described before for basic stats which we can compute in a recursive way.

These could be the statistics got (with one counter for number of points or time controller)

  • count: ( only has sense if configured with duration time controler instead of # points)
  • max: max value ( only needs 1 extra register)
  • min: min value ( only needs 1 extra register)
  • mean : average ( only needs 1 extra register)
  • variance: ( only needs 2 extra register -- one for mean and another for variable--)
  • stdev : standard deviation ( only needs 2 extra register -- one for mean and another for variable--)
  • spread: (only needs 2 extra register -- one for max and another for min--)

Would you like to add as a new Node in Kapacitor in next releases ?
Hi @nathanielc suppose we would like to help you coding a new "like" influxQLNode as described in recursiveStats for some basic stats which we can compute in a recursive way.

These could be the statistics got (with one counter for number of points or time controller)

  • count: ( only has sense if configured with duration time controler instead of # points)
  • max: max value ( only needs 1 extra register)
  • min: min value ( only needs 1 extra register)
  • mean : average ( only needs 1 extra register)
  • variance: ( only needs 2 extra register -- one for mean and another for variable--)
  • stdev : standard deviation ( only needs 2 extra register -- one for mean and another for variable--)
  • spread: (only needs 2 extra register -- one for max and another for min--)

Would you like to add as a new Node in Kapacitor in next releases ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants