Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void afterAddBinlog(TBinlog binlog) {
}
}

private void addBinlog(TBinlog binlog) {
private void addBinlog(TBinlog binlog, Object raw) {
if (!Config.enable_feature_binlog) {
return;
}
Expand All @@ -116,11 +116,11 @@ private void addBinlog(TBinlog binlog) {
lock.writeLock().unlock();
}

dbBinlog.addBinlog(binlog);
dbBinlog.addBinlog(binlog, raw);
}

private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long timestamp, TBinlogType type,
String data, boolean removeEnableCache) {
String data, boolean removeEnableCache, Object raw) {
if (!Config.enable_feature_binlog) {
return;
}
Expand Down Expand Up @@ -152,7 +152,7 @@ private void addBinlog(long dbId, List<Long> tableIds, long commitSeq, long time
}

if (anyEnable) {
addBinlog(binlog);
addBinlog(binlog, raw);
}

afterAddBinlog(binlog);
Expand All @@ -166,7 +166,7 @@ public void addUpsertRecord(UpsertRecord upsertRecord) {
TBinlogType type = TBinlogType.UPSERT;
String data = upsertRecord.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, upsertRecord);
}

public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) {
Expand All @@ -178,7 +178,7 @@ public void addAddPartitionRecord(AddPartitionRecord addPartitionRecord) {
TBinlogType type = TBinlogType.ADD_PARTITION;
String data = addPartitionRecord.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, addPartitionRecord);
}

public void addCreateTableRecord(CreateTableRecord createTableRecord) {
Expand All @@ -190,7 +190,7 @@ public void addCreateTableRecord(CreateTableRecord createTableRecord) {
TBinlogType type = TBinlogType.CREATE_TABLE;
String data = createTableRecord.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, createTableRecord);
}

public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) {
Expand All @@ -201,7 +201,7 @@ public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long com
TBinlogType type = TBinlogType.DROP_PARTITION;
String data = dropPartitionInfo.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, dropPartitionInfo);
}

public void addDropTableRecord(DropTableRecord record) {
Expand All @@ -213,7 +213,7 @@ public void addDropTableRecord(DropTableRecord record) {
TBinlogType type = TBinlogType.DROP_TABLE;
String data = record.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, record);
}

public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) {
Expand All @@ -225,7 +225,7 @@ public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) {
AlterJobRecord alterJobRecord = new AlterJobRecord(alterJob);
String data = alterJobRecord.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, alterJob);
}

public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) {
Expand All @@ -236,7 +236,7 @@ public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long
TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}

public void addAlterDatabaseProperty(AlterDatabasePropertyInfo info, long commitSeq) {
Expand All @@ -247,7 +247,7 @@ public void addAlterDatabaseProperty(AlterDatabasePropertyInfo info, long commit
TBinlogType type = TBinlogType.ALTER_DATABASE_PROPERTY;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true, info);
}

public void addModifyTableProperty(ModifyTablePropertyOperationLog info, long commitSeq) {
Expand All @@ -258,7 +258,7 @@ public void addModifyTableProperty(ModifyTablePropertyOperationLog info, long co
TBinlogType type = TBinlogType.MODIFY_TABLE_PROPERTY;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true, info);
}

// add Barrier log
Expand All @@ -279,7 +279,7 @@ public void addBarrierLog(BarrierLog barrierLog, long commitSeq) {
TBinlogType type = TBinlogType.BARRIER;
String data = barrierLog.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, barrierLog);
}

// add Modify partitions
Expand All @@ -291,7 +291,7 @@ public void addModifyPartitions(BatchModifyPartitionsInfo info, long commitSeq)
TBinlogType type = TBinlogType.MODIFY_PARTITIONS;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}

// add Replace partition
Expand All @@ -303,7 +303,7 @@ public void addReplacePartitions(ReplacePartitionOperationLog info, long commitS
TBinlogType type = TBinlogType.REPLACE_PARTITIONS;
String data = info.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}

// add Truncate Table
Expand All @@ -316,7 +316,7 @@ public void addTruncateTable(TruncateTableInfo info, long commitSeq) {
TruncateTableRecord record = new TruncateTableRecord(info);
String data = record.toJson();

addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
}

// get binlog by dbId, return first binlog.version > version
Expand Down Expand Up @@ -355,6 +355,20 @@ public Pair<TStatus, Long> getBinlogLag(long dbId, long tableId, long prevCommit
}
}

// get the dropped partitions of the db.
public List<Long> getDroppedPartitions(long dbId) {
lock.readLock().lock();
try {
DBBinlog dbBinlog = dbBinlogMap.get(dbId);
if (dbBinlog == null) {
return Lists.newArrayList();
}
return dbBinlog.getDroppedPartitions();
} finally {
lock.readLock().unlock();
}
}

public List<BinlogTombstone> gc() {
LOG.info("begin gc binlog");

Expand Down
47 changes: 46 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
Expand All @@ -40,6 +41,7 @@
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

public class DBBinlog {
private static final Logger LOG = LogManager.getLogger(BinlogManager.class);
Expand All @@ -58,6 +60,9 @@ public class DBBinlog {
// need UpsertRecord to add timestamps for gc
private List<Pair<Long, Long>> timestamps;

// The commit seq of the dropped partitions
private List<Pair<Long, Long>> droppedPartitions;

private List<TBinlog> tableDummyBinlogs;

private BinlogConfigCache binlogConfigCache;
Expand All @@ -73,6 +78,7 @@ public DBBinlog(BinlogConfigCache binlogConfigCache, TBinlog binlog) {
tableDummyBinlogs = Lists.newArrayList();
tableBinlogMap = Maps.newHashMap();
timestamps = Lists.newArrayList();
droppedPartitions = Lists.newArrayList();

TBinlog dummy;
if (binlog.getType() == TBinlogType.DUMMY) {
Expand Down Expand Up @@ -110,6 +116,13 @@ public void recoverBinlog(TBinlog binlog, boolean dbBinlogEnable) {
allBinlogs.add(binlog);
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);

if (binlog.getType() == TBinlogType.DROP_PARTITION) {
DropPartitionInfo info = DropPartitionInfo.fromJson(binlog.data);
if (info != null && info.getPartitionId() > 0) {
droppedPartitions.add(Pair.of(info.getPartitionId(), binlog.getCommitSeq()));
}
}

if (tableIds == null) {
return;
}
Expand Down Expand Up @@ -139,7 +152,7 @@ private TableBinlog getTableBinlog(TBinlog binlog, long tableId, boolean dbBinlo

// guard by BinlogManager, if addBinlog called, more than one(db/tables) enable
// binlog
public void addBinlog(TBinlog binlog) {
public void addBinlog(TBinlog binlog, Object raw) {
boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
List<Long> tableIds = binlog.getTableIds();

Expand Down Expand Up @@ -170,6 +183,13 @@ public void addBinlog(TBinlog binlog) {
break;
}

if (binlog.getType() == TBinlogType.DROP_PARTITION && raw instanceof DropPartitionInfo) {
long partitionId = ((DropPartitionInfo) raw).getPartitionId();
if (partitionId > 0) {
droppedPartitions.add(Pair.of(partitionId, binlog.getCommitSeq()));
}
}

for (long tableId : tableIds) {
TableBinlog tableBinlog = getTableBinlog(binlog, tableId, dbBinlogEnable);
if (tableBinlog != null) {
Expand Down Expand Up @@ -205,6 +225,18 @@ public Pair<TStatus, TBinlog> getBinlog(long tableId, long prevCommitSeq) {
}
}

// Get the dropped partitions of the db.
public List<Long> getDroppedPartitions() {
lock.readLock().lock();
try {
return droppedPartitions.stream()
.map(v -> v.first)
.collect(Collectors.toList());
} finally {
lock.readLock().unlock();
}
}

public Pair<TStatus, Long> getBinlogLag(long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
lock.readLock().lock();
Expand Down Expand Up @@ -293,6 +325,7 @@ private BinlogTombstone dbBinlogDisableGc() {
return tombstone;
}

// remove expired binlogs and dropped partitions, used in disable db binlog gc.
private void removeExpiredMetaData(long largestExpiredCommitSeq) {
lock.writeLock().lock();
try {
Expand Down Expand Up @@ -321,6 +354,7 @@ private void removeExpiredMetaData(long largestExpiredCommitSeq) {
}
}

gcDroppedPartitions(largestExpiredCommitSeq);
if (lastCommitSeq != -1) {
dummy.setCommitSeq(lastCommitSeq);
}
Expand All @@ -331,6 +365,8 @@ private void removeExpiredMetaData(long largestExpiredCommitSeq) {
}
}

// Get last expired binlog, and gc expired binlogs/timestamps/dropped
// partitions, used in enable db binlog gc.
private TBinlog getLastExpiredBinlog(BinlogComparator checker) {
TBinlog lastExpiredBinlog = null;

Expand All @@ -355,6 +391,8 @@ private TBinlog getLastExpiredBinlog(BinlogComparator checker) {
while (timeIter.hasNext() && timeIter.next().first <= lastExpiredBinlog.getCommitSeq()) {
timeIter.remove();
}

gcDroppedPartitions(lastExpiredBinlog.getCommitSeq());
}

return lastExpiredBinlog;
Expand Down Expand Up @@ -464,6 +502,13 @@ public void dbBinlogDisableReplayGc(BinlogTombstone tombstone) {
}
}

private void gcDroppedPartitions(long commitSeq) {
Iterator<Pair<Long, Long>> iter = droppedPartitions.iterator();
while (iter.hasNext() && iter.next().second < commitSeq) {
iter.remove();
}
}

// not thread safety, do this without lock
public void getAllBinlogs(List<TBinlog> binlogs) {
binlogs.addAll(tableDummyBinlogs);
Expand Down
23 changes: 11 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,29 +84,28 @@ public long getTableId() {
public void recoverBinlog(TBinlog binlog) {
TBinlog dummy = getDummyBinlog();
if (binlog.getCommitSeq() > dummy.getCommitSeq()) {
binlogs.add(binlog);
++binlog.table_ref;
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getTimestamp() > 0) {
timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
}
addBinlogWithoutCheck(binlog);
}
}

public void addBinlog(TBinlog binlog) {
lock.writeLock().lock();
try {
binlogs.add(binlog);
++binlog.table_ref;
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getTimestamp() > 0) {
timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
}
addBinlogWithoutCheck(binlog);
} finally {
lock.writeLock().unlock();
}
}

private void addBinlogWithoutCheck(TBinlog binlog) {
binlogs.add(binlog);
++binlog.table_ref;
binlogSize += BinlogUtils.getApproximateMemoryUsage(binlog);
if (binlog.getTimestamp() > 0) {
timestamps.add(Pair.of(binlog.getCommitSeq(), binlog.getTimestamp()));
}
}

public Pair<TStatus, TBinlog> getBinlog(long prevCommitSeq) {
lock.readLock().lock();
try {
Expand Down
5 changes: 5 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -5606,6 +5606,11 @@ public static TGetMetaResult getMeta(Database db, List<Table> tables) throws Met
getTableMeta(olapTable, dbMeta);
}

if (Config.enable_feature_binlog) {
BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager();
dbMeta.setDroppedPartitions(binlogManager.getDroppedPartitions(db.getId()));
}

result.setDbMeta(dbMeta);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1760,11 +1760,11 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause
}

// drop
Partition partition = null;
long recycleTime = 0;
if (isTempPartition) {
olapTable.dropTempPartition(partitionName, true);
} else {
Partition partition = null;
if (!clause.isForceDrop()) {
partition = olapTable.getPartition(partitionName);
if (partition != null) {
Expand All @@ -1785,8 +1785,9 @@ public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause
}

// log
DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionName, isTempPartition,
clause.isForceDrop(), recycleTime);
long partitionId = partition == null ? -1L : partition.getId();
DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionId, partitionName,
isTempPartition, clause.isForceDrop(), recycleTime);
Env.getCurrentEnv().getEditLog().logDropPartition(info);

LOG.info("succeed in dropping partition[{}], table : [{}-{}], is temp : {}, is force : {}",
Expand Down
Loading