Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -218,6 +219,7 @@ protected void createRollupReplica() throws AlterCancelException {
try {
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
Map<Object, Object> objectPool = new HashMap<Object, Object>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use a concurrent map.

for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
Expand Down Expand Up @@ -261,7 +263,7 @@ protected void createRollupReplica() throws AlterCancelException {
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig);
binlogConfig, objectPool);
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@

import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -226,6 +227,7 @@ protected void createShadowIndexReplica() throws AlterCancelException {
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
Expand Down Expand Up @@ -275,7 +277,7 @@ protected void createShadowIndexReplica() throws AlterCancelException {
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig);
binlogConfig, objectPool);

createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId)
.get(shadowTabletId), originSchemaHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -1070,6 +1071,7 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc
} finally {
localTbl.readUnlock();
}
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (MaterializedIndex restoredIdx : restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId());
List<Index> indexes = restoredIdx.getId() == localTbl.getBaseIndexId()
Expand Down Expand Up @@ -1103,7 +1105,7 @@ private void createReplicas(Database db, AgentBatchTask batchTask, OlapTable loc
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.storeRowColumn(),
binlogConfig);
binlogConfig, objectPool);
task.setInvertedIndexStorageFormat(localTbl.getInvertedIndexStorageFormat());
task.setInRestoreMode(true);
batchTask.addTask(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1998,6 +1998,7 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa

short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
TStorageMedium realStorageMedium = null;
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
MaterializedIndex index = entry.getValue();
Expand Down Expand Up @@ -2046,7 +2047,7 @@ protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long pa
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(), binlogConfig);
tbl.storeRowColumn(), binlogConfig, objectPool);

task.setStorageFormat(tbl.getStorageFormat());
task.setInvertedIndexStorageFormat(tbl.getInvertedIndexStorageFormat());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
long backendReportVersion) {
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
Expand Down Expand Up @@ -881,7 +882,7 @@ private static void deleteFromMeta(ListMultimap<Long, Long> tabletDeleteFromMeta
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.getTimeSeriesCompactionLevelThreshold(),
olapTable.storeRowColumn(),
binlogConfig);
binlogConfig, objectPool);

createReplicaTask.setIsRecoverTask(true);
createReplicaTask.setInvertedIndexStorageFormat(olapTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -123,6 +124,8 @@ public class CreateReplicaTask extends AgentTask {
private BinlogConfig binlogConfig;
private List<Integer> clusterKeyIndexes;

private Map<Object, Object> objectPool;

public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
long replicaId, short shortKeyColumnCount, int schemaHash, long version,
KeysType keysType, TStorageType storageType,
Expand All @@ -144,7 +147,8 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition
long timeSeriesCompactionEmptyRowsetsThreshold,
long timeSeriesCompactionLevelThreshold,
boolean storeRowColumn,
BinlogConfig binlogConfig) {
BinlogConfig binlogConfig,
Map<Object, Object> objectPool) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);

this.replicaId = replicaId;
Expand Down Expand Up @@ -188,6 +192,7 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition
this.timeSeriesCompactionLevelThreshold = timeSeriesCompactionLevelThreshold;
this.storeRowColumn = storeRowColumn;
this.binlogConfig = binlogConfig;
this.objectPool = objectPool;
}

public void setIsRecoverTask(boolean isRecoverTask) {
Expand Down Expand Up @@ -260,21 +265,32 @@ public TCreateTabletReq toThrift() {
int deleteSign = -1;
int sequenceCol = -1;
int versionCol = -1;
List<TColumn> tColumns = new ArrayList<TColumn>();
List<TColumn> tColumns = null;
Object tCols = objectPool.get(columns);
if (tCols != null) {
tColumns = (List<TColumn>) tCols;
} else {
tColumns = new ArrayList<>();
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
TColumn tColumn = column.toThrift();
// is bloom filter column
if (bfColumns != null && bfColumns.contains(column.getName())) {
tColumn.setIsBloomFilterColumn(true);
}
// when doing schema change, some modified column has a prefix in name.
// this prefix is only used in FE, not visible to BE, so we should remove this prefix.
if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
tColumn.setColumnName(
column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
}
tColumn.setVisible(column.isVisible());
tColumns.add(tColumn);
}
objectPool.put(columns, tColumns);
}
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
TColumn tColumn = column.toThrift();
// is bloom filter column
if (bfColumns != null && bfColumns.contains(column.getName())) {
tColumn.setIsBloomFilterColumn(true);
}
// when doing schema change, some modified column has a prefix in name.
// this prefix is only used in FE, not visible to BE, so we should remove this prefix.
if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
tColumn.setColumnName(column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
}
tColumn.setVisible(column.isVisible());
tColumns.add(tColumn);
if (column.isDeleteSignColumn()) {
deleteSign = i;
}
Expand All @@ -296,9 +312,15 @@ public TCreateTabletReq toThrift() {
}
}
if (CollectionUtils.isNotEmpty(indexes)) {
List<TOlapTableIndex> tIndexes = new ArrayList<>();
for (Index index : indexes) {
tIndexes.add(index.toThrift());
List<TOlapTableIndex> tIndexes = null;
Object value = objectPool.get(indexes);
if (value != null) {
tIndexes = (List<TOlapTableIndex>) value;
} else {
tIndexes = new ArrayList<>();
for (Index index : indexes) {
tIndexes.add(index.toThrift());
}
}
tSchema.setIndexes(tIndexes);
storageFormat = TStorageFormat.V2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ public void setUp() throws AnalysisException {
range2 = Range.closedOpen(pk2, pk3);

// create tasks

Map<Object, Object> objectPool = new HashMap<Object, Object>();
// create
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId,
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType,
TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null,
TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, null);
TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, null, objectPool);

// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false);
Expand Down