Skip to content
6 changes: 4 additions & 2 deletions src/common/types/common_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ template <typename T> using coro = boost::asio::traced_awaitable<T>;
inline coro<void> async_noop() { co_return; };

template <typename T> struct is_boost_awaitable : std::false_type {};

template <typename T>
struct is_boost_awaitable<boost::asio::awaitable<T>> : std::true_type {};

template <typename T>
constexpr bool is_boost_awaitable_v = is_boost_awaitable<T>::value;

template <typename T> struct is_coro : std::false_type {};
template <typename T> struct is_coro<coro<T>> : std::true_type {};
template <typename T> inline constexpr bool is_coro_v = is_coro<T>::value;

template <typename Awaitable>
requires is_boost_awaitable_v<std::decay_t<Awaitable>>
inline coro<void> async_wrap(Awaitable&& v) {
Expand Down
51 changes: 34 additions & 17 deletions src/entrypoint/commands/s3/delete_objects.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,15 @@ response get_response(const std::vector<std::string>& success,

} // namespace

coro<response> delete_objects::handle(request& req) {
metric<entrypoint_delete_objects_req>::increase(1);

co_await m_dir.bucket_exists(req.bucket());

coro<std::vector<delete_objects::delete_target>>
delete_objects::get_delete_object_keys(request& req) {
LOG_DEBUG() << req.peer() << ": delete_objects::handle(): content-length: "
<< req.content_length();

std::string buffer = co_await copy_to_buffer(req.body());

LOG_DEBUG() << req.peer() << ": delete_objects::handle(): request XML: "
<< buffer;
LOG_DEBUG() << req.peer()
<< ": delete_objects::handle(): request XML: " << buffer;

xml_parser xml_parser;
bool parsed = xml_parser.parse(buffer);
Expand All @@ -73,30 +70,50 @@ coro<response> delete_objects::handle(request& req) {
throw command_exception(status::bad_request, "MalformedXML",
"XML is invalid.");

auto bucket_id = req.bucket();
std::vector<std::string> success;
std::vector<fail> failure;
std::vector<delete_target> targets;
for (const auto& obj : object_nodes) {
auto key = obj.get().get_optional<std::string>("Key");
if (!key) {
const auto& pt = obj.get();
auto key = pt.get_optional<std::string>("Key");
if (!key.has_value()) {
throw command_exception(status::bad_request, "MalformedXML",
"XML is invalid.");
}
targets.emplace_back(*key, //
pt.get_optional<std::string>("VersionId"),
pt.get_optional<std::string>("ETag"),
pt.get_optional<std::string>("LastModifiedTime"),
pt.get_optional<long>("Size"));
}
co_return targets;
}

coro<response> delete_objects::handle(request& req) {
metric<entrypoint_delete_objects_req>::increase(1);

co_await m_dir.bucket_exists(req.bucket());

auto targets = co_await get_delete_object_keys(req);

auto bucket_id = req.bucket();
std::vector<std::string> success;
std::vector<fail> failure;
for (const auto& t : targets) {
auto& key = t.key;

try {
LOG_DEBUG() << req.peer() << ": delete_objects::handle(): deleting "
<< *key;
<< key;

std::optional<std::string> ver;
auto boostver = obj.get().get_optional<std::string>("VersionId");
auto boostver = t.version;
if (boostver) {
ver = *boostver;
}
co_await m_dir.delete_object(req.bucket(), *key, ver);
success.emplace_back(*key);
co_await m_dir.delete_object(req.bucket(), key, ver);
success.emplace_back(key);

} catch (const std::exception& e) {
failure.emplace_back(*key, e.what());
failure.emplace_back(key, e.what());
}
}

Expand Down
20 changes: 17 additions & 3 deletions src/entrypoint/commands/s3/delete_objects.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#pragma once

#include "entrypoint/directory.h"
#include "entrypoint/limits.h"
#include "storage/global/data_view.h"
#include <entrypoint/commands/command.h>
#include <entrypoint/directory.h>
#include <entrypoint/limits.h>

#include <storage/global/data_view.h>

#include <common/utils/xml_parser.h>

namespace uh::cluster {

Expand All @@ -17,6 +20,17 @@ class delete_objects : public command {

std::string action_id() const override;

struct delete_target {
std::string key;
boost::optional<std::string> version;
boost::optional<std::string> etag;
boost::optional<std::string> last_modified;
boost::optional<long> size;
};

static coro<std::vector<delete_target>>
get_delete_object_keys(ep::http::request& req);

private:
directory& m_dir;
static constexpr std::size_t MAXIMUM_DELETE_KEYS = 1000;
Expand Down
5 changes: 0 additions & 5 deletions src/proxy/cache/disk/disk_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@
#pragma once

#include <proxy/cache/disk/object.h>
#include <proxy/cache/disk/utils.h>

#include <common/crypto/hash.h>
#include <common/types/common_types.h>
#include <common/utils/strings.h>
#include <entrypoint/http/body.h>
#include <entrypoint/http/stream.h>
#include <storage/interfaces/data_view.h>

namespace uh::cluster::proxy::cache::disk {
Expand Down
171 changes: 0 additions & 171 deletions src/proxy/cache/disk/handler_example.cpp

This file was deleted.

24 changes: 16 additions & 8 deletions src/proxy/cache/disk/manager.h
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
#pragma once

#include <proxy/cache/disk/deletion_queue.h>
#include <proxy/cache/disk/disk_io.h>
#include <proxy/cache/disk/object.h>

#include <proxy/cache/lfu_cache.h>
#include <proxy/cache/lru_cache.h>

#include <common/coroutines/coro_util.h>
#include <entrypoint/http/body.h>
#include <entrypoint/http/stream.h>
#include <storage/global/data_view.h>

#include <memory>
#include <string>

#include <iostream>

namespace uh::cluster::proxy::cache::disk {

class manager {
Expand All @@ -27,8 +27,7 @@ class manager {
using stream = ep::http::stream;
using body = ep::http::body;

coro<void> put(object_metadata key, disk_sink& w) {
auto objh = w.get_object_handle();
coro<void> put(object_metadata key, object_handle objh) {
auto obj_size = objh.data_size();

auto total_size =
Expand Down Expand Up @@ -58,15 +57,24 @@ class manager {
if (p_prev) {
m_deletion_queue.push(std::move(p_prev));
}
std::cout << "Total size after put: " << m_current_size << std::endl;
LOG_INFO() << "Total size after put: " << m_current_size;
}

std::unique_ptr<disk_source> get(object_metadata key) {
std::shared_ptr<object_handle> get(object_metadata key) {
auto entry = m_cache->get(key);
if (!entry) {
return nullptr;
}
return std::make_unique<disk_source>(m_storage, std::move(entry));
return entry;
}

void remove(object_metadata key) {
auto entry = m_cache->remove(key);
if (entry) {
LOG_INFO() << "key: " << key.path << ", version: " << key.version
<< " removed from cache";
m_deletion_queue.push(std::move(entry));
}
}

static manager create(boost::asio::io_context& ioc, data_view& storage,
Expand Down
Loading