Skip to content

Commit

Permalink
[FLINK-35355][State] Internal async aggregating state and correspondi…
Browse files Browse the repository at this point in the history
…ng state descriptor

This closes #24810
  • Loading branch information
jectpro7 authored and Zakelly committed May 24, 2024
1 parent 3d40bd7 commit 467f94f
Show file tree
Hide file tree
Showing 10 changed files with 423 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.state.v2;

import org.apache.flink.annotation.Experimental;

/**
* {@link State} interface for aggregating state, based on an {@link
* org.apache.flink.api.common.functions.AggregateFunction}. Elements that are added to this type of
* state will be eagerly pre-aggregated using a given {@code AggregateFunction}.
*
* <p>The state holds internally always the accumulator type of the {@code AggregateFunction}. When
* accessing the result of the state, the function's {@link
* org.apache.flink.api.common.functions.AggregateFunction#getResult(Object)} method.
*
* <p>The state is accessed and modified by user functions, and checkpointed consistently by the
* system as part of the distributed snapshots.
*
* <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
* automatically supplied by the system, so the function always sees the value mapped to the key of
* the current element. That way, the system can handle stream and state partitioning consistently
* together.
*
* @param <IN> Type of the value added to the state.
* @param <OUT> Type of the value extracted from the state.
*/
@Experimental
public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.asyncprocessing;

import org.apache.flink.api.common.state.v2.AggregatingState;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.ReducingState;
Expand Down Expand Up @@ -105,5 +106,11 @@ public enum StateRequestType {
REDUCING_GET,

/** Add element into reducing state, {@link ReducingState#asyncAdd(Object)}. */
REDUCING_ADD
REDUCING_ADD,

/** Get value from aggregating state by {@link AggregatingState#asyncGet()}. */
AGGREGATING_GET,

/** Add element to aggregating state by {@link AggregatingState#asyncAdd(Object)}. */
AGGREGATING_ADD
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import javax.annotation.Nonnull;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A {@link StateDescriptor} for {@link org.apache.flink.api.common.state.v2.AggregatingState}.
*
* <p>The type internally stored in the state is the type of the {@code Accumulator} of the {@code
* AggregateFunction}.
*
* @param <IN> The type of the values that are added to the state.
* @param <ACC> The type of the accumulator (intermediate aggregation state).
* @param <OUT> The type of the values that are returned from the state.
*/
public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<ACC> {

private final AggregateFunction<IN, ACC, OUT> aggregateFunction;

/**
* Create a new state descriptor with the given name, function, and type.
*
* @param stateId The (unique) name for the state.
* @param aggregateFunction The {@code AggregateFunction} used to aggregate the state.
* @param typeInfo The type of the accumulator. The accumulator is stored in the state.
*/
public AggregatingStateDescriptor(
@Nonnull String stateId,
@Nonnull AggregateFunction<IN, ACC, OUT> aggregateFunction,
@Nonnull TypeInformation<ACC> typeInfo) {
super(stateId, typeInfo);
this.aggregateFunction = checkNotNull(aggregateFunction);
}

/**
* Create a new state descriptor with the given name, function, and type.
*
* @param stateId The (unique) name for the state.
* @param aggregateFunction The {@code AggregateFunction} used to aggregate the state.
* @param typeInfo The type of the accumulator. The accumulator is stored in the state.
* @param serializerConfig The serializer related config used to generate TypeSerializer.
*/
public AggregatingStateDescriptor(
@Nonnull String stateId,
@Nonnull AggregateFunction<IN, ACC, OUT> aggregateFunction,
@Nonnull TypeInformation<ACC> typeInfo,
SerializerConfig serializerConfig) {
super(stateId, typeInfo, serializerConfig);
this.aggregateFunction = checkNotNull(aggregateFunction);
}

/** Returns the Aggregate function for this state. */
public AggregateFunction<IN, ACC, OUT> getAggregateFunction() {
return aggregateFunction;
}

@Override
public Type getType() {
return Type.AGGREGATING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.state.v2.AggregatingState;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.ReducingState;
Expand Down Expand Up @@ -77,4 +78,15 @@ public <T> ReducingState<T> getReducingState(
throw new RuntimeException("Error while getting state", e);
}
}

@Override
public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
@Nonnull AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
Preconditions.checkNotNull(stateProperties, "The state properties must not be null");
try {
return asyncKeyedStateBackend.createState(stateProperties);
} catch (Exception e) {
throw new RuntimeException("Error while getting state", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.v2.AggregatingState;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;

/**
* The default implementation of {@link AggregatingState}, which delegates all async requests to
* {@link StateRequestHandler}.
*
* @param <K> The type of key the state is associated to.
* @param <IN> The type of the values that are added into the state.
* @param <ACC> TThe type of the accumulator (intermediate aggregation state).
* @param <OUT> The type of the values that are returned from the state.
*/
public class InternalAggregatingState<K, IN, ACC, OUT> extends InternalKeyedState<K, ACC>
implements AggregatingState<IN, OUT> {

protected final AggregateFunction<IN, ACC, OUT> aggregateFunction;

/**
* Creates a new InternalKeyedState with the given asyncExecutionController and stateDescriptor.
*
* @param stateRequestHandler The async request handler for handling all requests.
* @param stateDescriptor The properties of the state.
*/
public InternalAggregatingState(
StateRequestHandler stateRequestHandler,
AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor) {
super(stateRequestHandler, stateDescriptor);
this.aggregateFunction = stateDescriptor.getAggregateFunction();
}

@Override
public StateFuture<OUT> asyncGet() {
return handleRequest(StateRequestType.AGGREGATING_GET, null);
}

@Override
public StateFuture<Void> asyncAdd(IN value) {
return handleRequest(StateRequestType.AGGREGATING_ADD, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.v2;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.v2.AggregatingState;
import org.apache.flink.api.common.state.v2.ListState;
import org.apache.flink.api.common.state.v2.MapState;
import org.apache.flink.api.common.state.v2.ReducingState;
Expand Down Expand Up @@ -86,4 +87,19 @@ public interface KeyedStateStoreV2 {
* function (function is not part of a KeyedStream).
*/
<T> ReducingState<T> getReducingState(@Nonnull ReducingStateDescriptor<T> stateProperties);

/**
* Gets a handle to the system's key/value aggregating state. This state is only accessible if
* the function is executed on a KeyedStream.
*
* @param stateProperties The descriptor defining the properties of the stats.
* @param <IN> The type of the values that are added to the state.
* @param <ACC> The type of the accumulator (intermediate aggregation state).
* @param <OUT> The type of the values that are returned from the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
@Nonnull AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state.v2;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.CommonTestUtils;

import org.junit.jupiter.api.Test;

import java.io.Serializable;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link AggregatingStateDescriptor}. */
class AggregatingStateDescriptorTest implements Serializable {

@Test
void testHashCodeAndEquals() throws Exception {
final String name = "testName";
AggregateFunction<Integer, Integer, Integer> aggregator =
new AggregateFunction<Integer, Integer, Integer>() {
@Override
public Integer createAccumulator() {
return 0;
}

@Override
public Integer add(Integer value, Integer accumulator) {
return accumulator + value;
}

@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}

@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
};

AggregatingStateDescriptor<Integer, Integer, Integer> original =
new AggregatingStateDescriptor<>(name, aggregator, BasicTypeInfo.INT_TYPE_INFO);
AggregatingStateDescriptor<Integer, Integer, Integer> same =
new AggregatingStateDescriptor<>(name, aggregator, BasicTypeInfo.INT_TYPE_INFO);
AggregatingStateDescriptor<Integer, Integer, Integer> sameBySerializer =
new AggregatingStateDescriptor<>(name, aggregator, BasicTypeInfo.INT_TYPE_INFO);

// test that hashCode() works on state descriptors with initialized and uninitialized
// serializers
assertThat(same).hasSameHashCodeAs(original);
assertThat(sameBySerializer).hasSameHashCodeAs(original);

assertThat(same).isEqualTo(original);
assertThat(sameBySerializer).isEqualTo(original);

// equality with a clone
AggregatingStateDescriptor<Integer, Integer, Integer> clone =
CommonTestUtils.createCopySerializable(original);
assertThat(clone).isEqualTo(original);
}
}
Loading

0 comments on commit 467f94f

Please sign in to comment.