Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/vec/exec/format/avro/avro_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ Status AvroJNIReader::init_fetch_table_reader(
if (type == TFileType::FILE_S3) {
required_param.insert(_params.properties.begin(), _params.properties.end());
}
required_param.insert(
std::make_pair("split_start_offset", std::to_string(_range.start_offset)));
required_param.insert(std::make_pair("split_size", std::to_string(_range.size)));
required_param.insert(std::make_pair("split_file_size", std::to_string(_range.file_size)));
required_param.insert(std::make_pair("uri", _range.path));
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner",
required_param, column_names);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,37 @@ public static void invalidateFileCache(AvroFileCacheKey key) {
public static class AvroFileMeta {
private final String schema;
private Set<String> requiredFields;
// TODO split file
private String splitInfo;
private Long splitStartOffset;
private Long splitSize;

AvroFileMeta(String schema) {
this.schema = schema;
}

AvroFileMeta(String schema, String splitInfo) {
this.schema = schema;
this.splitInfo = splitInfo;
}

public String getSchema() {
return schema;
}

public String getSplitInfo() {
return splitInfo;
}

public void setRequiredFields(Set<String> requiredFields) {
this.requiredFields = requiredFields;
}

public void setSplitStartOffset(Long splitStartOffset) {
this.splitStartOffset = splitStartOffset;
}

public void setSplitSize(Long splitSize) {
this.splitSize = splitSize;
}

public Long getSplitStartOffset() {
return this.splitStartOffset;
}

public Long getSplitSize() {
return this.splitSize;
}

public Set<String> getRequiredFields() {
return requiredFields;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class AvroJNIScanner extends JniScanner {
private AvroFileMeta avroFileMeta;
private AvroWrapper<Pair<Integer, Long>> inputPair;
private NullWritable ignore;
private Long splitStartOffset;
private Long splitSize;
private Long splitFileSize;

/**
* Call by JNI for get table data or get table schema
Expand All @@ -100,6 +103,9 @@ public AvroJNIScanner(int fetchSize, Map<String, String> requiredParams) {
this.fieldInspectors = new ObjectInspector[requiredFields.length];
this.inputPair = new AvroWrapper<>(null);
this.ignore = NullWritable.get();
this.splitStartOffset = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_START_OFFSET));
this.splitSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_SIZE));
this.splitFileSize = Long.parseLong(requiredParams.get(AvroProperties.SPLIT_FILE_SIZE));
}
}

Expand Down Expand Up @@ -171,6 +177,8 @@ private void initDataReader() {
avroFileCacheKey = new AvroFileCacheKey(fileType.name(), uri);
avroFileMeta = AvroFileCache.getAvroFileMeta(avroFileCacheKey);
avroFileMeta.setRequiredFields(requiredFieldSet);
avroFileMeta.setSplitStartOffset(splitStartOffset);
avroFileMeta.setSplitSize(splitSize);
initFieldInspector();
initTableInfo(requiredTypes, requiredFields, new ScanPredicate[0], fetchSize);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,8 @@ public class AvroProperties {
protected static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
protected static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
protected static final String FS_S3A_REGION = "fs.s3a.region";
protected static final String SPLIT_START_OFFSET = "split_start_offset";
protected static final String SPLIT_SIZE = "split_size";
protected static final String SPLIT_FILE_SIZE = "split_file_size";

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.avro.mapred.AvroRecordReader;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
Expand Down Expand Up @@ -72,9 +71,7 @@ protected void openSchemaReader() throws IOException {
protected void openDataReader(AvroFileMeta avroFileMeta) throws IOException {
JobConf job = new JobConf();
projectionSchema(job, avroFileMeta);
FileStatus fileStatus = fileSystem.getFileStatus(path);
// TODO split file
FileSplit fileSplit = new FileSplit(path, 0, fileStatus.getLen(), job);
FileSplit fileSplit = new FileSplit(path, avroFileMeta.getSplitStartOffset(), avroFileMeta.getSplitSize(), job);
dataReader = new AvroRecordReader<>(job, fileSplit);
LOG.debug("success open avro data reader.");
}
Expand Down