Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion python_tests/test_atomic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,7 +1403,19 @@ def test_atomic_deletion(db0_fixture):
with pytest.raises(Exception):
db0.fetch(dep_uuid)



def test_commit_skips_modified_wrapper_after_deferred_free(db0_no_autocommit):
child = db0.list([1])
parent = MemoTestClass(child)
db0.commit()

child.append(2)
parent.value = None

db0.commit()
db0.commit()


def test_atomic_deletion_issue_1(db0_fixture):
"""
This test was failing due to incorrect implementation of AtomicContext.exit() -
Expand Down
3 changes: 2 additions & 1 deletion src/dbzero/core/memory/SlabItem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ DB0_PACKED_END
// the capacity item as last retrieved from the backend (may need update)
CapacityItem m_cap_item;
bool m_is_dirty = false;
std::size_t m_dirty_depth = 0;

SlabItem(std::shared_ptr<SlabAllocator> slab, CapacityItem cap);
~SlabItem();
Expand Down Expand Up @@ -191,4 +192,4 @@ namespace std
ostream &operator<<(ostream &os, const db0::CapacityItem &item);
ostream &operator<<(ostream &os, const db0::SlabDef &item);

}
}
81 changes: 52 additions & 29 deletions src/dbzero/core/memory/SlabManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,24 @@ namespace db0
return slab;
}

void SlabManager::markDirty(std::shared_ptr<SlabItem> slab)
void SlabManager::markDirty(std::shared_ptr<SlabItem> slab, std::size_t depth)
{
// Atomic rollback tracking is independent from m_is_dirty: a slab can
// be saved while an atomic frame is active, but its depth marker must
// still keep it reachable from m_dirty_slabs for later rollback.
auto was_tracked_in_atomic = slab->m_dirty_depth > 0;
if (depth > 0 && slab->m_dirty_depth < depth) {
slab->m_dirty_depth = depth;
}
if (slab->m_is_dirty) {
return;
}
if (!m_atomic_stack.empty()) {
// The first dirty transition registers this slab with the active frame.
// Subsequent mutations are covered by the dirty flag and dirty-list entry.
m_atomic_stack.back().m_dirty_slabs.insert(m_slab_address_func(slab->m_cap_item.m_slab_id));
}
slab->m_is_dirty = true;
m_dirty_slabs.push_back(slab);
// A depth-tracked slab is already retained in m_dirty_slabs across
// atomic saveDirtySlabs() calls; avoid duplicate entries after re-dirty.
if (!was_tracked_in_atomic) {
m_dirty_slabs.push_back(slab);
}
}

void SlabManager::invalidateCachedSlab(std::uint64_t address)
Expand All @@ -264,6 +270,7 @@ namespace db0
auto slab_item = it->second.lock();
if (slab_item) {
slab_item->m_is_dirty = false;
slab_item->m_dirty_depth = 0;
resetActiveSlab(slab_item);
if (m_recycler_ptr) {
m_recycler_ptr->closeOne([&slab_item](const SlabItem &item) {
Expand Down Expand Up @@ -420,9 +427,12 @@ namespace db0
atomicFrame.m_volatile_slabs.end());
}

if (!atomicFrame.m_dirty_slabs.empty() && !m_atomic_stack.empty()) {
auto &parentDirtySlabs = m_atomic_stack.back().m_dirty_slabs;
parentDirtySlabs.insert(atomicFrame.m_dirty_slabs.begin(), atomicFrame.m_dirty_slabs.end());
auto child_depth = m_atomic_stack.size() + 1;
auto parent_depth = m_atomic_stack.size();
for (auto &slab_item : m_dirty_slabs) {
if (slab_item && slab_item->m_dirty_depth == child_depth) {
slab_item->m_dirty_depth = parent_depth;
}
}
}

Expand All @@ -435,7 +445,14 @@ namespace db0
// Rollback restores prefix data for the canceled frame. Only slab
// allocators mutated by that frame can now carry stale in-memory state,
// so evict just those slabs and let later reads reopen them lazily.
for (auto slab_addr : atomicFrame.m_dirty_slabs) {
auto depth = m_atomic_stack.size() + 1;
std::vector<std::uint64_t> rollback_slab_addrs;
for (auto &slab_item : m_dirty_slabs) {
if (slab_item && slab_item->m_dirty_depth >= depth) {
rollback_slab_addrs.push_back(m_slab_address_func(slab_item->m_cap_item.m_slab_id));
}
}
for (auto slab_addr : rollback_slab_addrs) {
invalidateCachedSlab(slab_addr);
}
for (auto slab_addr : atomicFrame.m_volatile_slabs) {
Expand All @@ -445,11 +462,11 @@ namespace db0
m_dirty_slabs.erase(
std::remove_if(m_dirty_slabs.begin(), m_dirty_slabs.end(),
[](const std::shared_ptr<SlabItem> &slab_item) {
return !slab_item || !slab_item->m_is_dirty;
return !slab_item || (!slab_item->m_is_dirty && slab_item->m_dirty_depth == 0);
}),
m_dirty_slabs.end());

if (!atomicFrame.m_dirty_slabs.empty() || !atomicFrame.m_volatile_slabs.empty()) {
if (!rollback_slab_addrs.empty() || !atomicFrame.m_volatile_slabs.empty()) {
m_next_slab_id = {};
}
}
Expand Down Expand Up @@ -488,7 +505,16 @@ namespace db0
for (auto &slab_item : m_dirty_slabs) {
saveItem(*slab_item);
}
m_dirty_slabs.clear();
if (m_atomic_stack.empty()) {
m_dirty_slabs.clear();
} else {
m_dirty_slabs.erase(
std::remove_if(m_dirty_slabs.begin(), m_dirty_slabs.end(),
[](const std::shared_ptr<SlabItem> &slab_item) {
return !slab_item || (!slab_item->m_is_dirty && slab_item->m_dirty_depth == 0);
}),
m_dirty_slabs.end());
}
}

std::shared_ptr<SlabItem> SlabManager::tryOpenSlab(Address address) const
Expand Down Expand Up @@ -552,6 +578,7 @@ namespace db0
auto addr = m_slab_address_func(slab_id);
// clear the dirty flag since it's being erased anyway
slab->m_is_dirty = false;
slab->m_dirty_depth = 0;
// unregister from cache
auto it = m_slabs.find(addr);
if (it != m_slabs.end()) {
Expand Down Expand Up @@ -624,16 +651,15 @@ namespace db0
}

if (!unique || ((*slab)->tryMakeAddressUnique(*addr, instance_id))) {
// Modified slabs are tracked per atomic frame so rollback can evict
// only stale allocator instances from that frame.
if (!slab->m_is_dirty) {
markDirty(slab);
}
// Atomic rollback evicts only slabs dirtied at or below
// the canceled depth.
markDirty(slab, m_atomic_stack.size());
return addr;
}

// unable to make the address unique, schedule for deferred free and try again
// NOTE: the allocation is lost
markDirty(slab, m_atomic_stack.size());
deferredFree(*addr);
}
if (size > ((*slab)->getMaxAllocSize())) {
Expand Down Expand Up @@ -691,9 +717,7 @@ namespace db0
auto slab = find(slab_id);
assert(slab);
(*slab)->free(address);
if (!slab->m_is_dirty) {
markDirty(slab);
}
markDirty(slab, m_atomic_stack.size());
if ((*slab)->empty()) {
// erase or mark as erased
erase(slab);
Expand All @@ -708,10 +732,6 @@ namespace db0

std::size_t SlabManager::getAllocSize(Address address, std::uint32_t slab_id) const
{
if (m_deferred_free_ops.find(address) != m_deferred_free_ops.end()) {
THROWF(db0::BadAddressException) << "Address " << address << " not found (pending deferred free)";
}

assert(m_slab_id_func(address) == slab_id);
return (*find(slab_id))->getAllocSize(address);
}
Expand Down Expand Up @@ -774,10 +794,13 @@ namespace db0
assert(m_atomic_stack.empty());
// perform the deferred free operations
if (!m_deferred_free_ops.empty()) {
for (auto addr : m_deferred_free_ops) {
const_cast<SlabManager&>(*this)._free(addr);
}
auto free_ops = std::move(m_deferred_free_ops);
m_deferred_free_ops.clear();
for (auto addr : free_ops) {
if (const_cast<SlabManager&>(*this).isAllocated(addr, nullptr)) {
const_cast<SlabManager&>(*this)._free(addr);
}
}
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/dbzero/core/memory/SlabManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ namespace db0
// Find existing slab by ID
std::shared_ptr<SlabItem> tryFind(std::uint32_t slab_id) const;
std::shared_ptr<SlabItem> find(std::uint32_t slab_id) const;
void markDirty(std::shared_ptr<SlabItem>);
void markDirty(std::shared_ptr<SlabItem>, std::size_t depth = 0);
void invalidateCachedSlab(std::uint64_t);

/**
Expand Down Expand Up @@ -167,9 +167,6 @@ namespace db0
{
// Slabs created in this atomic block; removed from cache if the block is rolled back.
std::vector<std::uint64_t> m_volatile_slabs;
// Existing slabs whose in-memory allocator state was changed in this block.
// Rollback evicts only these slabs so they are reopened lazily from the restored prefix.
std::unordered_set<std::uint64_t> m_dirty_slabs;
// Frees requested in this atomic block; promoted on commit, discarded on rollback.
std::vector<Address> m_deferred_free_ops;
};
Expand Down
Loading