Skip to content

Commit

Permalink
add flag to enable it, and fix ut
Browse files Browse the repository at this point in the history
-e
Signed-off-by: yanz <[email protected]>
  • Loading branch information
dirtysalt committed Oct 10, 2024
1 parent 181a4aa commit 9b57518
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,8 @@ public boolean hasMoreOutput() {
return remoteFileInfoSource.hasMoreOutput();
}
}

public int selectedPartitionCount() {
return partitionKeys.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.MetastoreType;
import com.starrocks.connector.PredicateSearchKey;
import com.starrocks.connector.RemoteFileDesc;
import com.starrocks.connector.RemoteFileInfo;
import com.starrocks.connector.RemoteFileInfoSource;
import com.starrocks.connector.TableVersionRange;
Expand Down Expand Up @@ -119,7 +118,6 @@ public List<String> listPartitionNames(String databaseName, String tableName, Ta
@Override
public List<RemoteFileInfo> getRemoteFiles(Table table, GetRemoteFilesParams params) {
DeltaLakeTable deltaLakeTable = (DeltaLakeTable) table;
RemoteFileInfo remoteFileInfo = new RemoteFileInfo();
String dbName = deltaLakeTable.getDbName();
String tableName = deltaLakeTable.getTableName();
PredicateSearchKey key =
Expand All @@ -133,10 +131,7 @@ public List<RemoteFileInfo> getRemoteFiles(Table table, GetRemoteFilesParams par
dbName, tableName, params.getPredicate());
}

List<RemoteFileDesc> remoteFileDescs = Lists.newArrayList(
DeltaLakeRemoteFileDesc.createDeltaLakeRemoteFileDesc(scanTasks));
remoteFileInfo.setFiles(remoteFileDescs);
return Lists.newArrayList(remoteFileInfo);
return scanTasks.stream().map(x -> new DeltaRemoteFileInfo(x)).collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.common.UserException;
import com.starrocks.connector.CatalogConnector;
import com.starrocks.connector.GetRemoteFilesParams;
import com.starrocks.connector.RemoteFileInfoDefaultSource;
import com.starrocks.connector.RemoteFileInfoSource;
import com.starrocks.connector.TableVersionRange;
import com.starrocks.connector.delta.DeltaConnectorScanRangeSource;
Expand All @@ -49,6 +50,7 @@ public class DeltaLakeScanNode extends ScanNode {
private final HDFSScanNodePredicates scanNodePredicates = new HDFSScanNodePredicates();
private CloudConfiguration cloudConfiguration = null;
private DeltaConnectorScanRangeSource scanRangeSource = null;
private int selectedPartitionCount = -1;

public DeltaLakeScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) {
super(id, desc, planNodeName);
Expand Down Expand Up @@ -98,7 +100,7 @@ public boolean hasMoreScanRanges() {
return scanRangeSource.hasMoreOutput();
}

public void setupScanRangeSource(ScalarOperator predicate, List<String> fieldNames)
public void setupScanRangeSource(ScalarOperator predicate, List<String> fieldNames, boolean enableIncrementalScanRanges)
throws UserException {
SnapshotImpl snapshot = (SnapshotImpl) deltaLakeTable.getDeltaSnapshot();
Engine engine = deltaLakeTable.getDeltaEngine();
Expand All @@ -107,8 +109,13 @@ public void setupScanRangeSource(ScalarOperator predicate, List<String> fieldNam
GetRemoteFilesParams params =
GetRemoteFilesParams.newBuilder().setTableVersionRange(TableVersionRange.withEnd(Optional.of(snapshotId)))
.setPredicate(predicate).setFieldNames(fieldNames).build();
RemoteFileInfoSource remoteFileInfoSource =
GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFilesAsync(deltaLakeTable, params);
RemoteFileInfoSource remoteFileInfoSource = null;
if (enableIncrementalScanRanges) {
remoteFileInfoSource = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFilesAsync(deltaLakeTable, params);
} else {
remoteFileInfoSource = new RemoteFileInfoDefaultSource(
GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFiles(deltaLakeTable, params));
}
scanRangeSource = new DeltaConnectorScanRangeSource(deltaLakeTable, remoteFileInfoSource);
}

Expand Down Expand Up @@ -158,8 +165,16 @@ protected String getNodeExplainString(String prefix, TExplainLevel detailLevel)
List<String> partitionNames = GlobalStateMgr.getCurrentState().getMetadataMgr().listPartitionNames(
deltaLakeTable.getCatalogName(), deltaLakeTable.getDbName(), deltaLakeTable.getTableName());

if (selectedPartitionCount == -1) {
// we have to consume all scan ranges to know how many partition been selected.
while (scanRangeSource.hasMoreOutput()) {
scanRangeSource.getOutputs(1000);
}
selectedPartitionCount = scanRangeSource.selectedPartitionCount();
}

output.append(prefix).append(
String.format("partitions=%s/%s", scanNodePredicates.getSelectedPartitionIds().size(),
String.format("partitions=%s/%s", selectedPartitionCount,
partitionNames.size() == 0 ? 1 : partitionNames.size()));
output.append("\n");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,8 @@ public PlanFragment visitPhysicalDeltaLakeScan(OptExpression optExpression, Exec
List<String> fieldNames = node.getColRefToColumnMetaMap().keySet().stream()
.map(ColumnRefOperator::getName)
.collect(Collectors.toList());
deltaLakeScanNode.setupScanRangeSource(node.getPredicate(), fieldNames);
deltaLakeScanNode.setupScanRangeSource(node.getPredicate(), fieldNames,
context.getConnectContext().getSessionVariable().isEnableConnectorIncrementalScanRanges());

HDFSScanNodePredicates scanNodePredicates = deltaLakeScanNode.getScanNodePredicates();
prepareCommonExpr(scanNodePredicates, node.getScanOperatorPredicates(), context);
Expand Down

0 comments on commit 9b57518

Please sign in to comment.