Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions be/src/exec/decompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
// See:
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
while (remaining_input_size > 0) {
if (remaining_input_size < 4) {
*more_input_bytes = 4 - remaining_input_size;
break;
}
// Read uncompressed size
uint32_t uncompressed_block_len = Decompressor::_read_int32(src);
int64_t remaining_output_len = output_max_len - uncompressed_total_len;
Expand All @@ -566,12 +570,24 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
break;
}

if (uncompressed_block_len == 0) {
remaining_input_size -= sizeof(uint32_t);
break;
}

if (remaining_input_size <= 2 * sizeof(uint32_t)) {
// The remaining input size should be larger then <uncompressed size><compressed size><compressed data>
// +1 means we need at least 1 bytes of compressed data.
*more_input_bytes = 2 * sizeof(uint32_t) + 1 - remaining_input_size;
break;
}

// Read compressed size
size_t tmp_src_size = remaining_input_size - sizeof(uint32_t);
size_t tmp_remaining_size = remaining_input_size - 2 * sizeof(uint32_t);
size_t compressed_len = _read_int32(src + sizeof(uint32_t));
if (compressed_len == 0 || compressed_len > tmp_src_size) {
if (compressed_len > tmp_remaining_size) {
// Need more input data
*more_input_bytes = compressed_len - tmp_src_size;
*more_input_bytes = compressed_len - tmp_remaining_size;
break;
}

Expand All @@ -590,8 +606,9 @@ Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
// Decompress
if (!snappy::RawUncompress(reinterpret_cast<const char*>(src), compressed_len,
reinterpret_cast<char*>(output))) {
return Status::InternalError("snappy block decompress failed. uncompressed_len: {}",
uncompressed_block_len);
return Status::InternalError(
"snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
uncompressed_block_len, compressed_len);
}

output += uncompressed_block_len;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,8 @@ Status NewPlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool
std::stringstream ss;
ss << "decompress made no progress."
<< " input_read_bytes: " << input_read_bytes
<< " decompressed_len: " << decompressed_len;
<< " decompressed_len: " << decompressed_len
<< " input len: " << (_input_buf_limit - _input_buf_pos);
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ private PFetchTableSchemaRequest getFetchTableStructureRequest() throws TExcepti
// get first file, used to parse table schema
TBrokerFileStatus firstFile = null;
for (TBrokerFileStatus fileStatus : fileStatuses) {
if (fileStatus.isIsDir() || fileStatus.size == 0) {
if (isFileContentEmpty(fileStatus)) {
continue;
}
firstFile = fileStatus;
Expand Down Expand Up @@ -516,5 +516,43 @@ private PFetchTableSchemaRequest getFetchTableStructureRequest() throws TExcepti
return InternalService.PFetchTableSchemaRequest.newBuilder()
.setFileScanRange(ByteString.copyFrom(new TSerializer().serialize(fileScanRange))).build();
}

private boolean isFileContentEmpty(TBrokerFileStatus fileStatus) {
if (fileStatus.isIsDir() || fileStatus.size == 0) {
return true;
}
if (Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON) {
int magicNumberBytes = 0;
switch (compressionType) {
case GZ:
magicNumberBytes = 20;
break;
case LZO:
case LZOP:
magicNumberBytes = 42;
break;
case DEFLATE:
magicNumberBytes = 8;
break;
case SNAPPYBLOCK:
case LZ4BLOCK:
case LZ4FRAME:
magicNumberBytes = 4;
break;
case BZ2:
magicNumberBytes = 14;
break;
case UNKNOWN:
case PLAIN:
default:
break;
}
// fileStatus.size may be -1 in http_stream
if (fileStatus.size >= 0 && fileStatus.size <= magicNumberBytes) {
return true;
}
}
return false;
}
}

Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -148,3 +148,5 @@

-- !snappy_2 --

-- !snappy_empty --

Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import org.junit.Assert
// specific language governing permissions and limitations
// under the License.

suite("test_local_tvf_compression", "p2,external,tvf,external_remote,external_remote_tvf") {
suite("test_local_tvf_compression", "p0,tvf") {
List<List<Object>> backends = sql """ show backends """
assertTrue(backends.size() > 0)
def be_id = backends[0][0]
def dataFilePath = context.config.dataPath + "/external_table_p2/tvf/compress"
def dataFilePath = context.config.dataPath + "/external_table_p0/tvf/compress"

def outFilePath="/compress"

Expand Down Expand Up @@ -136,6 +136,16 @@ suite("test_local_tvf_compression", "p2,external,tvf,external_remote,external_re
"compress_type" ="${compress_type}block") where c2="abcd" order by c3 limit 22 ;
"""

// test empty snapppy file
qt_snappy_empty """
select * from local(
"file_path" = "${outFilePath}/test_empty_snappy.${compress_type}",
"backend_id" = "${be_id}",
"format" = "csv",
"column_separator" = ",",
"compress_type" ="${compress_type}block");
"""

// test error case
test {
sql """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_s3_tvf_compression", "p2,external,tvf,external_remote,external_remote_tvf") {
suite("test_s3_tvf_compression", "p0") {

String ak = getS3AK()
String sk = getS3SK()
Expand Down