Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: correctly measure chained pipeline stats #33912

Merged
merged 9 commits into from
Sep 27, 2018

Conversation

jakelandis
Copy link
Contributor

@jakelandis jakelandis commented Sep 20, 2018

Prior to this change when a pipeline processor called another
pipeline, only the stats for the first processor were recorded.
The stats for the subsequent pipelines were ignored. This change
properly accounts for pipelines irregardless if they are the first
or subsequently called pipelines.

This change moves the state of the stats from the IngestService
to the pipeline itself. Cluster updates are safe since the pipelines
map is atomically swapped, and if a cluster update happens
while iterating over stats (now read directly from the pipeline)
a slightly stale view of stats may be shown.


With the following setup:

PUT _ingest/pipeline/mypipeline1
{
  "processors": [
    {
      "pipeline": {
        "pipeline": "mypipeline2"
      }
    }
  ]
}

PUT _ingest/pipeline/mypipeline2
{
  "processors": [
    {
      "pipeline": {
        "pipeline": "mypipeline3"
      }
    }
  ]
}

PUT _ingest/pipeline/mypipeline3
{
  "processors": [
    {
      "rename": {
        "field": "a",
        "target_field": "a2"
      }
    }
  ]
}


POST test/_doc?pipeline=mypipeline1
{
  "a" : "b"
}

DELETE _ingest/pipeline/xpack_monitoring_2
DELETE _ingest/pipeline/xpack_monitoring_6
GET _nodes/stats/ingest?filter_path=nodes.*.ingest

Before this change:

{
  "nodes": {
    "HvZVZ2o0SYqv4pMM8mX1cA": {
      "ingest": {
        "total": {
          "count": 1,
          "time_in_millis": 4,
          "current": 0,
          "failed": 0
        },
        "pipelines": {
          "mypipeline3": {
            "count": 0,
            "time_in_millis": 0,
            "current": 0,
            "failed": 0
          },
          "mypipeline2": {
            "count": 0,
            "time_in_millis": 0,
            "current": 0,
            "failed": 0
          },
          "mypipeline1": {
            "count": 1,
            "time_in_millis": 4,
            "current": 0,
            "failed": 0
          }
        }
      }
    }
  }
}

^^ Notice that pipeline2/3 were called but the stats do not reflect that they are.
After this change:

{
  "nodes": {
    "qdfWAkVhRQSKu8Au3s32fA": {
      "ingest": {
        "total": {
          "count": 1,
          "time_in_millis": 3,
          "current": 0,
          "failed": 0
        },
        "pipelines": {
          "mypipeline1": {
            "count": 1,
            "time_in_millis": 1,
            "current": 0,
            "failed": 0
          },
          "mypipeline3": {
            "count": 1,
            "time_in_millis": 1,
            "current": 0,
            "failed": 0
          },
          "mypipeline2": {
            "count": 1,
            "time_in_millis": 1,
            "current": 0,
            "failed": 0
          }
        }
      }
    }
  }
}

Note, before and after this change the time_in_millis includes the children pipelines. It is difficult to emulate outside of unit testing ... but if pipeline3 takes 3ms, and pipeline2 takes 2 milliseconds, and pipeline1 takes 1 ms, (assuming pipeline1->pipelin2->pipeline3) then pipeline1 = 6ms (1 + 2 + 3) , pipeline 2 = 5ms (2 + 3), and pipeline 3ms.


Also apologizes for the noise around the imports, but if understand correctly the updated versions is the preferred order. (and it is kinda painful to manage them manually or continually change my IDE settings)

Prior to this change when a pipeline processor called another
pipeline, only the stats for the first processor were recorded.
The stats for the subsequent pipelines were ignored. This change
properly accounts for pipelines irregardless if they are the first
or subsequently called pipelines.

This change moves the state of the stats from the IngestService
to the pipeline itself. Cluster updates are safe since the pipelines
map is now a ConcurrentHashMap, and if a cluster update happens
while iterating over stats (now read directly from the pipeline)
a slightly stale view of stats may shown.
@jakelandis jakelandis added :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP v7.0.0 v6.5.0 labels Sep 20, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-infra

@jakelandis
Copy link
Contributor Author

Jenkins test this please.

@@ -77,10 +76,9 @@
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
// processor factories rely on other node services. Custom metadata is statically registered when classes
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
private volatile Map<String, Pipeline> pipelines = new HashMap<>();
private volatile Map<String, Pipeline> pipelines = new ConcurrentHashMap<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the whole reason we have a volatile and just use a normal non-concurrent HashMap is that we're only updating this field's reference but never update a specific instance of the map => I don't think you need to move to CHM here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah.. you are right updates are rebuilt local to a method then atomically swapped to an unmodified map. => no need for CHM

will put back to normal HM and update commit message.

for (Map.Entry<String, StatsHolder> entry : statsHolderPerPipeline.entrySet()) {
statsPerPipeline.put(entry.getKey(), entry.getValue().createStats());
}
Map<String, IngestStats.Stats> statsPerPipeline =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I think even in 2018 JDKs the old version with the manual iteration is faster. I don't think it matters much here, but probably not worth the noise to change to the functional iteration here.

Copy link
Contributor Author

@jakelandis jakelandis Sep 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable name is the same, but the object and how it iterates is different, hence the update. (e.g. more then noise)

new TestProcessor(ingestDocument -> {
ingestDocument.setFieldValue(key1, randomInt());
try {
Thread.sleep(100); //force the stat time to be non-zero
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it technically suffice to sleep for 2 here (or even 1, not sure about the potential rounding/accuracy issues with that and nanotime(), but 2 should be fine for getting a visible change in the metric?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, 100 is bit overkill. I will drop this down.

@@ -257,10 +255,19 @@ public IngestInfo info() {
@Override
public void applyClusterState(final ClusterChangedEvent event) {
ClusterState state = event.state();
int beforeHashCode = pipelines.hashCode();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: I think you can just keep a reference here to the pipelines field and then use straight instance inequality instead of hash codes below to check if the pipelines changed. That would be a little clearer since we specifically use the volatile pipelines in a way so that we only ever replace the pipelines instance as a whole.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed. now that i see the update is is an atomic swap I can clean this up by holding onto the original reference.

Copy link
Member

@original-brownbear original-brownbear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a few nits :)
If @rjernst is happy I'm happy

@jakelandis
Copy link
Contributor Author

@original-brownbear thanks for the review. changes added on 526ae4b

@jakelandis
Copy link
Contributor Author

@rjernst - ping

@jakelandis
Copy link
Contributor Author

Jenkins test this please.

Copy link
Member

@rjernst rjernst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a couple questions

new TestProcessor(ingestDocument -> {
ingestDocument.setFieldValue(key1, randomInt());
try {
Thread.sleep(2); //force the stat time to be non-zero
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you mock the clock instead? time can go backwards due to correction, thus this is not guaranteed to be non-zero. Mocking would also allow better control to test the behavior of adding/joining the metrics objects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

class IngestMetric {

private final MeanMetric ingestMetric = new MeanMetric();
private final CounterMetric ingestCurrent = new CounterMetric();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "current"? I see this existed before, but the purpose is unclear. Some java docs here would be good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current is briefly incremented while the ingest thing that is getting measured (total, pipeline, (soon) processor) is working on a document. It makes the most sense for total to see how many documents are currently being processed.

I will add some Javadoc.

}

void add(IngestMetric metrics){
ingestCount.inc(metrics.ingestCount.count());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this missing "current"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, but I see why it looks that.

add is used to carry forward metrics between cluster state changes (when the pipeline changes). When the cluster state changes for the pipelines, the pipeline instances are rebuilt, and now (with this change) the metrics get wiped out too. Add is used to add the old metrics to the metrics on the newly created pipeline instances (pre-populate/carry forward the old). Since the old metrics are a snapshot at time of cluster state change, we don't want to carry forward current since it will never be decremented. current may incorrectly show zero value during pipelines cluster state change until the existing bulk request is finished processing. This should be fairly rare, fairly short time period where current drops to zero.

I can change the name to carryForward, prePopulate ? other suggestions to make this more obvious ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hopefully the added javadoc sufficiently addresses the missing addition of current.

@jakelandis
Copy link
Contributor Author

@rjernst - changes implemented, can you take another look ?

Copy link
Member

@rjernst rjernst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM

*/
private final MeanMetric ingestTime = new MeanMetric();
/**
* The current count of things being measure. Should mostly like ever be 0 or 1.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wording is confusing here: most likely?

}

/**
* Add two sets of metrics together. *Important* does NOT add the current count intentionally,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this important section down to its own paragraph, and please use proper javadoc formatting for emphasis and paragraph breaks.

@jakelandis
Copy link
Contributor Author

@rjernst thanks! I have addressed the minor javadoc comments and will merge on green.

@jakelandis jakelandis merged commit 73ee721 into elastic:master Sep 27, 2018
jakelandis added a commit to jakelandis/elasticsearch that referenced this pull request Oct 21, 2018
Prior to this change when a pipeline processor called another
pipeline, only the stats for the first processor were recorded.
The stats for the subsequent pipelines were ignored. This change
properly accounts for pipelines irregardless if they are the first
or subsequently called pipelines.

This change moves the state of the stats from the IngestService
to the pipeline itself. Cluster updates are safe since the pipelines
map is atomically swapped, and if a cluster update happens
while iterating over stats (now read directly from the pipeline)
a slightly stale view of stats may be shown.
jakelandis added a commit that referenced this pull request Oct 22, 2018
Prior to this change when a pipeline processor called another
pipeline, only the stats for the first processor were recorded.
The stats for the subsequent pipelines were ignored. This change
properly accounts for pipelines irregardless if they are the first
or subsequently called pipelines.

This change moves the state of the stats from the IngestService
to the pipeline itself. Cluster updates are safe since the pipelines
map is atomically swapped, and if a cluster update happens
while iterating over stats (now read directly from the pipeline)
a slightly stale view of stats may be shown.
@jakelandis
Copy link
Contributor Author

6.5 backport: 8e72a68

kcm pushed a commit that referenced this pull request Oct 30, 2018
Prior to this change when a pipeline processor called another
pipeline, only the stats for the first processor were recorded.
The stats for the subsequent pipelines were ignored. This change
properly accounts for pipelines irregardless if they are the first
or subsequently called pipelines.

This change moves the state of the stats from the IngestService
to the pipeline itself. Cluster updates are safe since the pipelines
map is atomically swapped, and if a cluster update happens
while iterating over stats (now read directly from the pipeline)
a slightly stale view of stats may be shown.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement v6.5.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants