Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Filter devices by pattern before reading device metadata from TsFile #12765

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
Expand Down Expand Up @@ -50,11 +51,13 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;

public class TsFileInsertionDataContainer implements AutoCloseable {

Expand All @@ -77,6 +80,7 @@ public class TsFileInsertionDataContainer implements AutoCloseable {

private boolean shouldParsePattern = false;

@TestOnly
public TsFileInsertionDataContainer(
final File tsFile, final PipePattern pattern, final long startTime, final long endTime)
throws IOException {
Expand Down Expand Up @@ -122,15 +126,20 @@ public TsFileInsertionDataContainer(
deviceIsAlignedMap = readDeviceIsAlignedMap();
memoryRequiredInBytes += PipeMemoryWeighUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);

measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
// Filter devices that may overlap with pattern first
// to avoid reading all time-series of all devices.
final Set<IDeviceID> devices = filterDevicesByPattern(deviceIsAlignedMap.keySet());

measurementDataTypeMap = readFilteredFullPathDataTypeMap(devices);
memoryRequiredInBytes += PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);

deviceMeasurementsMap = tsFileSequenceReader.getDeviceMeasurementsMap();
deviceMeasurementsMap = readFilteredDeviceMeasurementsMap(devices);
memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
}
allocatedMemoryBlock = PipeResourceManager.memory().forceAllocate(memoryRequiredInBytes);

// Filter again to get the final deviceMeasurementsMap that exactly matches the pattern.
deviceMeasurementsMapIterator =
filterDeviceMeasurementsMapByPattern(deviceMeasurementsMap).entrySet().iterator();

Expand Down Expand Up @@ -198,6 +207,67 @@ private Map<IDeviceID, Boolean> readDeviceIsAlignedMap() throws IOException {
return deviceIsAlignedResultMap;
}

private Set<IDeviceID> filterDevicesByPattern(final Set<IDeviceID> devices) {
if (Objects.isNull(pattern) || pattern.isRoot()) {
return devices;
}

final Set<IDeviceID> filteredDevices = new HashSet<>();
for (final IDeviceID device : devices) {
final String deviceId = ((PlainDeviceID) device).toStringID();
if (pattern.coversDevice(deviceId) || pattern.mayOverlapWithDevice(deviceId)) {
filteredDevices.add(device);
}
}
return filteredDevices;
}

/**
* This method is similar to {@link TsFileSequenceReader#getFullPathDataTypeMap()}, but only reads
* the given devices.
*/
private Map<String, TSDataType> readFilteredFullPathDataTypeMap(final Set<IDeviceID> devices)
throws IOException {
final Map<String, TSDataType> result = new HashMap<>();

for (IDeviceID device : devices) {
tsFileSequenceReader
.readDeviceMetadata(device)
.values()
.forEach(
timeseriesMetadata ->
result.put(
((PlainDeviceID) device).toStringID()
+ "."
+ timeseriesMetadata.getMeasurementId(),
timeseriesMetadata.getTsDataType()));
}

return result;
}

/**
* This method is similar to {@link TsFileSequenceReader#getDeviceMeasurementsMap()}, but only
* reads the given devices.
*/
private Map<IDeviceID, List<String>> readFilteredDeviceMeasurementsMap(
final Set<IDeviceID> devices) throws IOException {
final Map<IDeviceID, List<String>> result = new HashMap<>();

for (IDeviceID device : devices) {
tsFileSequenceReader
.readDeviceMetadata(device)
.values()
.forEach(
timeseriesMetadata ->
result
.computeIfAbsent(device, d -> new ArrayList<>())
.add(timeseriesMetadata.getMeasurementId()));
}

return result;
}

/**
* @return {@link TabletInsertionEvent} in a streaming way
*/
Expand Down
Loading