Skip to content

Commit

Permalink
[hotfix] Remove minor unused deprecated method/class
Browse files Browse the repository at this point in the history
  • Loading branch information
codenohup authored and reswqa committed Oct 9, 2024
1 parent c7b2b0b commit 3e34a7e
Show file tree
Hide file tree
Showing 12 changed files with 12 additions and 182 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,6 @@ public HiveSourceSplit(
checkNotNull(hiveTablePartition, "hiveTablePartition can not be null");
}

/**
* @deprecated You should use {@link #HiveSourceSplit(String, Path, long, long, long, long,
* String[], CheckpointedPosition, HiveTablePartition)}
*/
@Deprecated
public HiveSourceSplit(
String id,
Path filePath,
long offset,
long length,
String[] hostnames,
@Nullable CheckpointedPosition readerPosition,
HiveTablePartition hiveTablePartition) {
this(id, filePath, offset, length, 0, 0, hostnames, readerPosition, hiveTablePartition);
}

public HiveTablePartition getHiveTablePartition() {
return hiveTablePartition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,6 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut

private static final long serialVersionUID = 1L;

/**
* The constant to use for the parallelism, if the system should use the number of currently
* available slots.
*/
@Deprecated public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;

/**
* The flag value indicating use of the default parallelism. This value can be used to reset the
* parallelism back to the default state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -112,25 +110,6 @@ public static <T> CsvReaderFormat<T> forSchema(
return forSchema(JacksonMapperFactory::createCsvMapper, ignored -> schema, typeInformation);
}

/**
* @deprecated This method is limited to serializable {@link CsvMapper CsvMappers}, preventing
* the usage of certain Jackson modules (like the {@link
* org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
* Java 8 Date/Time Serializers}). Use {@link #forSchema(Supplier, Function,
* TypeInformation)} instead.
*/
@Deprecated
public static <T> CsvReaderFormat<T> forSchema(
CsvMapper mapper, CsvSchema schema, TypeInformation<T> typeInformation) {
return new CsvReaderFormat<>(
() -> mapper,
ignored -> schema,
typeInformation.getTypeClass(),
(value, context) -> value,
typeInformation,
false);
}

/**
* Builds a new {@code CsvReaderFormat} using a {@code CsvSchema} generator and {@code
* CsvMapper} factory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,6 @@ public class Constants {
public static final String LABEL_COMPONENT_JOB_MANAGER = "jobmanager";
public static final String LABEL_COMPONENT_TASK_MANAGER = "taskmanager";

/**
* This constant is deprecated since we do not use it for deletion currently. We still keep the
* constants here for backward compatibility.
*/
@Deprecated public static final String LABEL_CONFIGMAP_TYPE_KEY = "configmap-type";

/**
* This constant is deprecated since we do not use it for deletion currently. We still keep the
* constants here for backward compatibility.
*/
@Deprecated
public static final String LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY = "high-availability";

// Use fixed port in kubernetes, it needs to be exposed.
public static final int REST_PORT = 8081;
public static final int BLOB_SERVER_PORT = 6124;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class DefaultVertexParallelismInfo implements VertexParallelismInformatio
private int maxParallelism;
private final Function<Integer, Optional<String>> rescaleMaxValidator;

/**
* The constant to use for the parallelism, if the system should use the number of currently
* available slots.
*/
public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;

/**
* Create {@link VertexParallelismInformation} with max parallelism rescaling validation for a
* vertex.
Expand Down Expand Up @@ -61,7 +67,7 @@ public DefaultVertexParallelismInfo(
}

private static int normalizeAndCheckMaxParallelism(int maxParallelism) {
if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
if (maxParallelism == PARALLELISM_AUTO_MAX) {
maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
Expand Down Expand Up @@ -365,14 +364,11 @@ private static void checkAllExchangesAreSupported(final JobGraph jobGraph) {
String.format(
"At the moment, adaptive batch scheduler requires batch workloads "
+ "to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. "
+ "To do that, you need to configure '%s' to '%s' or '%s/%s'. "
+ "Note that for DataSet jobs which do not recognize the aforementioned shuffle mode, "
+ "the RuntimeExecutionMode needs to be %s to force BLOCKING shuffle",
+ "To do that, you need to configure '%s' to '%s' or '%s/%s'. ",
ExecutionOptions.BATCH_SHUFFLE_MODE.key(),
BatchShuffleMode.ALL_EXCHANGES_BLOCKING,
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL,
BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE,
RuntimeExecutionMode.BATCH));
BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.flink.annotation.Experimental;

import java.util.OptionalLong;

/** The message send from {@code SinkWriter} to {@code Committer}. */
@Experimental
public interface CommittableMessage<CommT> {
Expand All @@ -34,18 +32,6 @@ public interface CommittableMessage<CommT> {
/** The subtask that created this committable. */
int getSubtaskId();

/**
* Returns the checkpoint id or empty if the message does not belong to a checkpoint. In that
* case, the committable was created at the end of input (e.g., in batch mode).
*
* @see #getCheckpointIdOrEOI()
*/
@Deprecated
default OptionalLong getCheckpointId() {
long checkpointIdOrEOI = getCheckpointIdOrEOI();
return checkpointIdOrEOI == EOI ? OptionalLong.empty() : OptionalLong.of(checkpointIdOrEOI);
}

/**
* Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch
* commit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void testSetParallelism() {
void setAutoMax() {
DefaultVertexParallelismInfo info =
new DefaultVertexParallelismInfo(
1, ExecutionConfig.PARALLELISM_AUTO_MAX, ALWAYS_VALID);
1, DefaultVertexParallelismInfo.PARALLELISM_AUTO_MAX, ALWAYS_VALID);

assertThat(info.getMaxParallelism())
.isEqualTo(KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM);
Expand All @@ -105,7 +105,8 @@ void canRescaleMaxOutOfBounds() {
void canRescaleMaxAuto() {
DefaultVertexParallelismInfo info = new DefaultVertexParallelismInfo(1, 1, ALWAYS_VALID);

assertThat(info.canRescaleMaxParallelism(ExecutionConfig.PARALLELISM_AUTO_MAX)).isTrue();
assertThat(info.canRescaleMaxParallelism(DefaultVertexParallelismInfo.PARALLELISM_AUTO_MAX))
.isTrue();
}

@Test
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

package org.apache.flink.walkthrough.common.sink;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.walkthrough.common.entity.Alert;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A sink for outputting alerts. */
@PublicEvolving
@Deprecated
@SuppressWarnings("unused")
public class AlertSink implements SinkFunction<Alert> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.flink.yarn;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterRetrieveException;
Expand Down Expand Up @@ -914,23 +912,6 @@ private ApplicationReport startAppMaster(
userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
}

// only for per job mode
if (jobGraph != null) {
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
jobGraph.getUserArtifacts().entrySet()) {
// only upload local files
if (!Utils.isRemotePath(entry.getValue().filePath)) {
Path localPath = new Path(entry.getValue().filePath);
Tuple2<Path, Long> remoteFileInfo =
fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
jobGraph.setUserArtifactRemotePath(
entry.getKey(), remoteFileInfo.f0.toString());
}
}

jobGraph.writeUserArtifactEntriesToConfiguration();
}

if (providedLibDirs == null || providedLibDirs.isEmpty()) {
addLibFoldersToShipFiles(systemShipFiles);
}
Expand Down

0 comments on commit 3e34a7e

Please sign in to comment.