Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ public class CommonConfig {
private long pipeReceiverLoginPeriodicVerificationIntervalMs = 300000;
private double pipeReceiverActualToEstimatedMemoryRatio = 3;

private int pipeReceiverReqDecompressedMaxLengthInBytes = 1073741824; // 1GB

private int pipeMaxAllowedHistoricalTsFilePerDataRegion = Integer.MAX_VALUE; // Deprecated
private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = Integer.MAX_VALUE; // Deprecated
private int pipeMaxAllowedPinnedMemTableCount = Integer.MAX_VALUE; // per data region
Expand Down Expand Up @@ -1465,6 +1467,22 @@ public double getPipeReceiverActualToEstimatedMemoryRatio() {
return pipeReceiverActualToEstimatedMemoryRatio;
}

public void setPipeReceiverReqDecompressedMaxLengthInBytes(
int pipeReceiverReqDecompressedMaxLengthInBytes) {
if (this.pipeReceiverReqDecompressedMaxLengthInBytes
== pipeReceiverReqDecompressedMaxLengthInBytes) {
return;
}
this.pipeReceiverReqDecompressedMaxLengthInBytes = pipeReceiverReqDecompressedMaxLengthInBytes;
logger.info(
"pipeReceiverReqDecompressedMaxLengthInBytes is set to {}.",
pipeReceiverReqDecompressedMaxLengthInBytes);
}

public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
return pipeReceiverReqDecompressedMaxLengthInBytes;
}

public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
return pipeMaxAllowedHistoricalTsFilePerDataRegion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ public double getPipeReceiverActualToEstimatedMemoryRatio() {
return COMMON_CONFIG.getPipeReceiverActualToEstimatedMemoryRatio();
}

public int getPipeReceiverReqDecompressedMaxLengthInBytes() {
return COMMON_CONFIG.getPipeReceiverReqDecompressedMaxLengthInBytes();
}

/////////////////////////////// Hybrid Mode ///////////////////////////////

public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
Expand Down Expand Up @@ -613,6 +617,9 @@ public void printAllConfigs() {
LOGGER.info(
"PipeReceiverActualToEstimatedMemoryRatio: {}",
getPipeReceiverActualToEstimatedMemoryRatio());
LOGGER.info(
"PipeReceiverReqDecompressedMaxLengthInBytes: {}",
getPipeReceiverReqDecompressedMaxLengthInBytes());

LOGGER.info(
"PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
properties.getProperty(
"pipe_receiver_actual_to_estimated_memory_ratio",
Double.toString(config.getPipeReceiverActualToEstimatedMemoryRatio()))));
config.setPipeReceiverReqDecompressedMaxLengthInBytes(
Integer.parseInt(
properties.getProperty(
"pipe_receiver_req_decompressed_max_length_in_bytes",
String.valueOf(config.getPipeReceiverReqDecompressedMaxLengthInBytes()))));

config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.commons.pipe.connector.payload.thrift.request;

import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressor;
import org.apache.iotdb.commons.pipe.connector.compressor.PipeCompressorFactory;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
Expand Down Expand Up @@ -91,6 +92,7 @@ public static TPipeTransferReq fromTPipeTransferReq(final TPipeTransferReq trans
compressors.add(
PipeCompressorFactory.getCompressor(ReadWriteIOUtils.readByte(compressedBuffer)));
uncompressedLengths.add(ReadWriteIOUtils.readInt(compressedBuffer));
checkDecompressedLength(uncompressedLengths.get(i));
}

byte[] body = new byte[compressedBuffer.remaining()];
Expand All @@ -110,6 +112,19 @@ public static TPipeTransferReq fromTPipeTransferReq(final TPipeTransferReq trans
return decompressedReq;
}

/** This method is used to prevent decompression bomb attacks. */
private static void checkDecompressedLength(final int decompressedLength)
throws IllegalArgumentException {
final int maxDecompressedLength =
PipeConfig.getInstance().getPipeReceiverReqDecompressedMaxLengthInBytes();
if (decompressedLength < 0 || decompressedLength > maxDecompressedLength) {
throw new IllegalArgumentException(
String.format(
"Decompressed length should be between 0 and %d, but got %d.",
maxDecompressedLength, decompressedLength));
}
}

/**
* For air-gap connectors. Generate the bytes of a compressed req from the bytes of original req.
*/
Expand Down
Loading