Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions include/binary_graph_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ class BinaryGraphStream_MT {
*
* IMPORTANT: When perfoming queries it is the responsibility of the user to ensure that
* all threads have finished processing their current work. This is indicated by all threads
* pulling data from the stream returning NXT_QUERY.
* If all threads do not return NXT_QUERY then not all updates have been applied to the graph.
* pulling data from the stream returning BREAKPOINT.
* If all threads do not return BREAKPOINT then not all updates have been applied to the graph.
* Threads must not call get_edge while the query is in progress otherwise edge cases can occur.
* This is true for both on_demand and registered queries.
*/
Expand Down Expand Up @@ -148,7 +148,7 @@ class BinaryGraphStream_MT {
uint64_t end_of_file; // the index of the end of the file
std::atomic<uint64_t> stream_off; // where do threads read from in the stream
std::atomic<uint64_t> query_index; // what is the index of the next query in bytes
std::atomic<bool> query_block; // If true block read_data calls and have thr return NXT_QUERY
std::atomic<bool> query_block; // If true block read_data calls and have thr return BREAKPOINT
const uint32_t edge_size = sizeof(uint8_t) + 2 * sizeof(uint32_t); // size of binary encoded edge
const size_t header_size = sizeof(node_id_t) + sizeof(edge_id_t); // size of num_nodes + num_upds

Expand Down Expand Up @@ -198,9 +198,7 @@ class MT_StreamReader {
// if we have read all the data in the buffer than refill it
if (buf - start_buf >= data_in_buf) {
if ((data_in_buf = stream.read_data(start_buf)) == 0) {
if (stream.query_index <= stream.stream_off)
return {{-1, -1}, NXT_QUERY}; // return that a query should be processed now
return {{-1, -1}, END_OF_FILE}; // return that the stream is over
return {{0, 0}, BREAKPOINT}; // return that a break point has been reached
}
buf = start_buf; // point buf back to beginning of data buffer
}
Expand Down
14 changes: 6 additions & 8 deletions include/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
// forward declarations
class GraphWorker;

typedef std::pair<Edge, UpdateType> GraphUpdate;

// Exceptions the Graph class may throw
class UpdateLockedException : public std::exception {
virtual const char* what() const throw() {
Expand Down Expand Up @@ -126,15 +124,15 @@ class Graph {

inline void update(GraphUpdate upd, int thr_id = 0) {
if (update_locked) throw UpdateLockedException();
Edge &edge = upd.first;
Edge &edge = upd.edge;

gts->insert(edge, thr_id);
std::swap(edge.first, edge.second);
gts->insert(edge, thr_id);
gts->insert({edge.src, edge.dst}, thr_id);
std::swap(edge.src, edge.dst);
gts->insert({edge.src, edge.dst}, thr_id);
#ifdef USE_EAGER_DSU
if (dsu_valid) {
auto src = std::min(edge.first, edge.second);
auto dst = std::max(edge.first, edge.second);
auto src = std::min(edge.src, edge.dst);
auto dst = std::max(edge.src, edge.dst);
std::lock_guard<std::mutex> sflock (spanning_forest_mtx[src]);
if (spanning_forest[src].find(dst) != spanning_forest[src].end()) {
dsu_valid = false;
Expand Down
2 changes: 0 additions & 2 deletions include/supernode.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

#include "l0_sampling/sketch.h"

typedef std::pair<node_id_t, node_id_t> Edge;

/**
* This interface implements the "supernode" so Boruvka can use it as a black
* box without needing to worry about implementing l_0.
Expand Down
19 changes: 14 additions & 5 deletions include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,20 @@ static const auto& vec_hash = XXH32;
static const auto& col_hash = XXH64;

// Is a stream update an insertion or a deletion
// NXT_QUERY: special type to indicate getting more data would cross a query boundary
// END_OF_FILE: special type to indicate that there is no more data in the stream
// BREAKPOINT: special type that indicates that a break point has been reached
// a break point may be either the end of the stream or the index of a query
enum UpdateType {
INSERT = 0,
DELETE = 1,
NXT_QUERY = 2,
END_OF_FILE = 3
};
BREAKPOINT = 2
};

struct Edge {
node_id_t src = 0;
node_id_t dst = 0;
};

struct GraphUpdate {
Edge edge;
UpdateType type;
};
10 changes: 6 additions & 4 deletions include/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#include <string>
#include <tuple>

#include "types.h"

/**
* Cast a double to unsigned long long with epsilon adjustment.
* @param d the double to cast.
Expand All @@ -16,27 +18,27 @@ unsigned long long int double_to_ull(double d, double epsilon = 0.00000001);
* identical inputs.
* @return i + j*(j-1)/2
*/
uint64_t nondirectional_non_self_edge_pairing_fn(uint32_t i, uint32_t j);
edge_id_t nondirectional_non_self_edge_pairing_fn(node_id_t i, node_id_t j);

/**
* Inverts the nondirectional non-SE pairing function.
* @param idx
* @return the pair, with left and right ordered lexicographically.
*/
std::pair<uint32_t , uint32_t> inv_nondir_non_self_edge_pairing_fn(uint64_t idx);
Edge inv_nondir_non_self_edge_pairing_fn(edge_id_t idx);

/**
* Concatenates two node ids to form an edge ids
* @return (i << 32) & j
*/
uint64_t concat_pairing_fn(uint32_t i, uint32_t j);
edge_id_t concat_pairing_fn(node_id_t i, node_id_t j);

/**
* Inverts the concat pairing function.
* @param idx
* @return the pair, with left and right ordered lexicographically.
*/
std::pair<uint32_t , uint32_t> inv_concat_pairing_fn(uint64_t idx);
Edge inv_concat_pairing_fn(edge_id_t idx);

/**
* Configures the system using the configuration file streaming.conf
Expand Down
13 changes: 5 additions & 8 deletions src/graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ inline std::vector<std::vector<node_id_t>> Graph::supernodes_to_merge(std::pair<
}

// query dsu
node_id_t a = get_parent(edge.first);
node_id_t b = get_parent(edge.second);
node_id_t a = get_parent(edge.src);
node_id_t b = get_parent(edge.dst);
if (a == b) continue;

#ifdef VERIFY_SAMPLES_F
Expand All @@ -197,8 +197,8 @@ inline std::vector<std::vector<node_id_t>> Graph::supernodes_to_merge(std::pair<
modified = true;

// Update spanning forest
auto src = std::min(edge.first, edge.second);
auto dst = std::max(edge.first, edge.second);
auto src = std::min(edge.src, edge.dst);
auto dst = std::max(edge.src, edge.dst);
spanning_forest[src].insert(dst);
}

Expand Down Expand Up @@ -358,10 +358,10 @@ std::vector<std::set<node_id_t>> Graph::connected_components(bool cont) {
}
#endif
auto retval = cc_from_dsu();
cc_alg_end = std::chrono::steady_clock::now();
#ifdef VERIFY_SAMPLES_F
verifier->verify_soln(retval);
#endif
cc_alg_end = std::chrono::steady_clock::now();
return retval;
}

Expand Down Expand Up @@ -415,9 +415,6 @@ std::vector<std::set<node_id_t>> Graph::cc_from_dsu() {
std::vector<std::set<node_id_t>> retval;
retval.reserve(temp.size());
for (const auto& it : temp) retval.push_back(it.second);
#ifdef VERIFY_SAMPLES_F
verifier->verify_soln(retval);
#endif
return retval;
}

Expand Down
20 changes: 10 additions & 10 deletions src/util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
typedef uint32_t ul;
typedef uint64_t ull;

const ull ULLMAX = std::numeric_limits<ull>::max();
const uint8_t num_bits = sizeof(node_id_t) * 8;
constexpr ull ULLMAX = std::numeric_limits<ull>::max();
constexpr uint8_t num_bits = sizeof(node_id_t) * 8;

unsigned long long int double_to_ull(double d, double epsilon) {
return (unsigned long long) (d + epsilon);
}

uint64_t nondirectional_non_self_edge_pairing_fn(uint32_t i, uint32_t j) {
edge_id_t nondirectional_non_self_edge_pairing_fn(node_id_t i, node_id_t j) {
// swap i,j if necessary
if (i > j) {
std::swap(i,j);
Expand All @@ -30,27 +30,27 @@ uint64_t nondirectional_non_self_edge_pairing_fn(uint32_t i, uint32_t j) {
return i+j;
}

std::pair<uint32_t , uint32_t> inv_nondir_non_self_edge_pairing_fn(uint64_t idx) {
Edge inv_nondir_non_self_edge_pairing_fn(uint64_t idx) {
// we ignore possible overflow
ull eidx = 8ull*idx + 1ull;
eidx = sqrt(eidx)+1ull;
eidx/=2ull;
ull i,j = (ull) eidx;
if ((j & 1ull) == 0ull) i = idx-(j>>1ull)*(j-1ull);
else i = idx-j*((j-1ull)>>1ull);
return {i, j};
return {(node_id_t)i, (node_id_t)j};
}

ull concat_pairing_fn(ul i, ul j) {
edge_id_t concat_pairing_fn(node_id_t i, node_id_t j) {
// swap i,j if necessary
if (i > j) {
std::swap(i,j);
}
return ((ull)i << num_bits) | j;
return ((edge_id_t)i << num_bits) | j;
}

std::pair<ul, ul> inv_concat_pairing_fn(ull idx) {
ul j = idx & 0xFFFFFFFF;
ul i = idx >> num_bits;
Edge inv_concat_pairing_fn(ull idx) {
node_id_t j = idx & 0xFFFFFFFF;
node_id_t i = idx >> num_bits;
return {i, j};
}
29 changes: 17 additions & 12 deletions test/graph_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ TEST_P(GraphTest, TestCorrectnessOnSmallRandomGraphs) {
edge_id_t m;
in >> n >> m;
Graph g{n, config};
int type, a, b;
int type;
node_id_t a, b;
while (m--) {
in >> type >> a >> b;
if (type == INSERT) {
Expand All @@ -138,7 +139,8 @@ TEST_P(GraphTest, TestCorrectnessOnSmallSparseGraphs) {
edge_id_t m;
in >> n >> m;
Graph g{n, config};
int type, a, b;
int type;
node_id_t a, b;
while (m--) {
in >> type >> a >> b;
if (type == INSERT) {
Expand All @@ -161,7 +163,8 @@ TEST_P(GraphTest, TestCorrectnessOfReheating) {
edge_id_t m;
in >> n >> m;
Graph *g = new Graph (n, config);
int type, a, b;
int type;
node_id_t a, b;
printf("number of updates = %lu\n", m);
while (m--) {
in >> type >> a >> b;
Expand Down Expand Up @@ -205,7 +208,8 @@ TEST_P(GraphTest, MultipleWorkers) {
edge_id_t m;
in >> n >> m;
Graph g{n, config};
int type, a, b;
int type;
node_id_t a, b;
while (m--) {
in >> type >> a >> b;
if (type == INSERT) {
Expand Down Expand Up @@ -417,7 +421,7 @@ TEST(GraphTest, MultipleInsertThreads) {
}

TEST(GraphTest, MTStreamWithMultipleQueries) {
for(int i = 1; i <= 10; i++) {
for(int i = 1; i <= 3; i++) {
auto config = GraphConfiguration().gutter_sys(STANDALONE);

const std::string fname = __FILE__;
Expand All @@ -441,7 +445,8 @@ TEST(GraphTest, MTStreamWithMultipleQueries) {
std::mutex q_lock;

// prepare evenly spaced queries
int num_queries = 10;
std::atomic<int> num_queries;
num_queries = 10;
int upd_per_query = num_edges / num_queries;
int query_idx = upd_per_query;
ASSERT_TRUE(stream.register_query(query_idx)); // register first query
Expand All @@ -452,8 +457,8 @@ TEST(GraphTest, MTStreamWithMultipleQueries) {
GraphUpdate upd;
while(true) {
upd = reader.get_edge();
if (upd.second == END_OF_FILE) return;
else if (upd.second == NXT_QUERY) {
if (upd.type == BREAKPOINT && num_queries == 0) return;
else if (upd.type == BREAKPOINT) {
query_done = false;
if (thr_id > 0) {
// pause this thread and wait for query to be done
Expand All @@ -479,7 +484,7 @@ TEST(GraphTest, MTStreamWithMultipleQueries) {
// add updates to verifier and perform query
for (int j = 0; j < upd_per_query; j++) {
GraphUpdate upd = verify_stream.get_edge();
verify.edge_update(upd.first.first, upd.first.second);
verify.edge_update(upd.edge.src, upd.edge.dst);
}
verify.reset_cc_state();
g.set_verifier(std::make_unique<MatGraphVerifier>(verify));
Expand All @@ -491,15 +496,15 @@ TEST(GraphTest, MTStreamWithMultipleQueries) {
// prepare next query
query_idx += upd_per_query;
ASSERT_TRUE(stream.register_query(query_idx));
num_queries--;
}
num_queries--;
num_query_ready--;
query_done = true;
lk.unlock();
q_done_cond.notify_all();
}
}
else if (upd.second == INSERT || upd.second == DELETE)
else if (upd.type == INSERT || upd.type == DELETE)
g.update(upd, thr_id);
else
throw std::invalid_argument("Did not recognize edge code!");
Expand All @@ -518,7 +523,7 @@ TEST(GraphTest, MTStreamWithMultipleQueries) {
// process the rest of the stream into the MatGraphVerifier
for(size_t i = query_idx; i < num_edges; i++) {
GraphUpdate upd = verify_stream.get_edge();
verify.edge_update(upd.first.first, upd.first.second);
verify.edge_update(upd.edge.src, upd.edge.dst);
}

// perform final query
Expand Down
Loading