Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 5 additions & 29 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,11 @@ jobs:
steps:
- uses: actions/checkout@v2

- uses: actions/cache@v2
id: InstalledDependencyCache
with:
path: ${{runner.workspace}}/install
key: InstalledDependencyCache

- name: Install xxHash
if: steps.InstalledDependencyCache.outputs.cache-hit != 'true'
shell: bash
working-directory: ${{runner.workspace}}
run: |
git clone -b v0.8.0 https://github.com/Cyan4973/xxHash.git
cd xxHash
mkdir build
cd build
cmake -DCMAKE_INSTALL_PREFIX=${{runner.workspace}}/install ../cmake_unofficial
cmake --build . --target install

- name: Install GTest
if: steps.InstalledDependencyCache.outputs.cache-hit != 'true'
shell: bash
working-directory: ${{runner.workspace}}
run: |
git clone -b release-1.10.0 https://github.com/google/googletest.git
cd googletest
mkdir build
cd build
cmake -DCMAKE_INSTALL_PREFIX=${{runner.workspace}}/install ..
cmake --build . --target install
# - uses: actions/cache@v2
# id: InstalledDependencyCache
# with:
# path: ${{runner.workspace}}/install
# key: InstalledDependencyCache

- name: Create Build Environment
# Some projects don't allow in-source building, so create a separate build directory
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ int main() {
auto CC = g.connected_components(); // Extract the connected components in the graph defined by the stream
}
```
A more detailed example can be found in `tools/process_stream.cpp`.

### Binary Stream Format
GraphZeppelin uses a binary stream format for efficient file parsing. The format of these files is as follows.
Expand All @@ -53,9 +54,9 @@ Other file formats can be used by writing a simple file parser that passes graph
If receiving edge updates over the network it is equally straightforward to define a stream format that will receive, parse, and provide those updates to the graph `update()` function.

## Configuration
GraphZeppelin has a few parameters set via a configuration file. These include the number of CPU threads to use, and which datastructure to buffer updates in. The file `example_streaming.conf` gives an example of one such configuration and provides explanations of the various parameters.
GraphZeppelin has a number of parameters. These can be defined with the `GraphConfiguration` object. Key parameters include the number of graph workers and the guttering system to use for buffering updates.

To define your own configuration, copy `example_streaming.conf` into the `build` directory as `streaming.conf`. If using GraphZeppelin as an external library the process for defining the configuration is the same. Once you make changes to the configuration, you should see them reflected in the configuration displayed at the beginning of the program.
See `include/graph_configuration.h` for more details.

## Debugging
You can enable the symbol table and turn off compiler optimizations for debugging with tools like `gdb` or `valgrind` by performing the following steps
Expand Down
10 changes: 4 additions & 6 deletions include/binary_graph_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ class BinaryGraphStream_MT {
*
* IMPORTANT: When perfoming queries it is the responsibility of the user to ensure that
* all threads have finished processing their current work. This is indicated by all threads
* pulling data from the stream returning NXT_QUERY.
* If all threads do not return NXT_QUERY then not all updates have been applied to the graph.
* pulling data from the stream returning BREAKPOINT.
* If all threads do not return BREAKPOINT then not all updates have been applied to the graph.
* Threads must not call get_edge while the query is in progress otherwise edge cases can occur.
* This is true for both on_demand and registered queries.
*/
Expand Down Expand Up @@ -148,7 +148,7 @@ class BinaryGraphStream_MT {
uint64_t end_of_file; // the index of the end of the file
std::atomic<uint64_t> stream_off; // where do threads read from in the stream
std::atomic<uint64_t> query_index; // what is the index of the next query in bytes
std::atomic<bool> query_block; // If true block read_data calls and have thr return NXT_QUERY
std::atomic<bool> query_block; // If true block read_data calls and have thr return BREAKPOINT
const uint32_t edge_size = sizeof(uint8_t) + 2 * sizeof(uint32_t); // size of binary encoded edge
const size_t header_size = sizeof(node_id_t) + sizeof(edge_id_t); // size of num_nodes + num_upds

Expand Down Expand Up @@ -198,9 +198,7 @@ class MT_StreamReader {
// if we have read all the data in the buffer than refill it
if (buf - start_buf >= data_in_buf) {
if ((data_in_buf = stream.read_data(start_buf)) == 0) {
if (stream.query_index <= stream.stream_off)
return {{-1, -1}, NXT_QUERY}; // return that a query should be processed now
return {{-1, -1}, END_OF_FILE}; // return that the stream is over
return {{0, 0}, BREAKPOINT}; // return that a break point has been reached
}
buf = start_buf; // point buf back to beginning of data buffer
}
Expand Down
101 changes: 75 additions & 26 deletions include/dsu.h
Original file line number Diff line number Diff line change
@@ -1,43 +1,92 @@
#pragma once
#include <vector>
#include <atomic>

template<class T>
struct DSUMergeRet {
bool merged; // true if a merge actually occured
T root; // new root
T child; // new child of root
};

template <class T>
class DisjointSetUnion {
std::vector<T> parent;
std::vector<T> size;
public:
DisjointSetUnion();
DisjointSetUnion(T n);
DisjointSetUnion(T n) : parent(n), size(n, 1) {
for (T i = 0; i < n; i++) {
parent[i] = i;
}
}

inline T find_root(T u) {
while(parent[parent[u]] != u) {
parent[u] = parent[parent[u]];
u = parent[u];
}
return u;
}

void link(T i, T j);
T find_set(T i);
void union_set(T i, T j);
inline DSUMergeRet<T> merge(T u, T v) {
T a = find_root(u);
T b = find_root(v);
if (a == b) return {false, 0, 0};

if (size[a] < size[b]) std::swap(a,b);
parent[b] = a;
size[a] += size[b];
return {true, a, b};
}

inline void reset() {
for (T i = 0; i < parent.size(); i++) {
parent[i] = i;
size[i] = 1;
}
}
};

// Disjoint set union that uses atomics to be thread safe
// thus is a little slower for single threaded use cases
template <class T>
DisjointSetUnion<T>::DisjointSetUnion() {}
class MT_DisjoinSetUnion {
private:
std::vector<std::atomic<T>> parent;
std::vector<T> size;
public:
MT_DisjoinSetUnion(T n) : parent(n), size(n, 1) {
for (T i = 0; i < n; i++)
parent[i] = i;
}

template <class T>
DisjointSetUnion<T>::DisjointSetUnion(T n) : parent(n), size(n, 1) {
for (T i = 0; i < n; i++) {
parent[i] = i;
inline T find_root(T u) {
while (parent[parent[u]] != u) {
parent[u] = parent[parent[u]];
u = parent[u];
}
return u;
}
}

template <class T>
void DisjointSetUnion<T>::link(T i, T j) {
if (size[i] < size[j]) std::swap(i,j);
parent[j] = i;
size[i] += size[j];
}
// use CAS in this function to allow for simultaneous merge calls
inline DSUMergeRet<T> merge(T u, T v) {
while ((u = find_root(u)) != (v = find_root(v))) {
if (size[u] < size[v])
std::swap(u, v);

template <class T>
T DisjointSetUnion<T>::find_set(T i) {
if (parent[i] == i) return i;
return parent[i] = find_set(parent[i]);
}
// if parent of b has not been modified by another thread -> replace with a
if (std::atomic_compare_exchange_weak(&parent[u], &v, u)) {
size[u] += size[v];
return {true, u, v};
}
}
return {false, 0, 0};
}

template <class T>
void DisjointSetUnion<T>::union_set(T i, T j) {
link(find_set(i), find_set(j));
}
inline void reset() {
for (T i = 0; i < parent.size(); i++) {
parent[i] = i;
size[i] = 1;
}
}
};
16 changes: 8 additions & 8 deletions include/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
// forward declarations
class GraphWorker;

typedef std::pair<Edge, UpdateType> GraphUpdate;

// Exceptions the Graph class may throw
class UpdateLockedException : public std::exception {
virtual const char* what() const throw() {
Expand Down Expand Up @@ -126,15 +124,15 @@ class Graph {

inline void update(GraphUpdate upd, int thr_id = 0) {
if (update_locked) throw UpdateLockedException();
Edge &edge = upd.first;
Edge &edge = upd.edge;

gts->insert(edge, thr_id);
std::swap(edge.first, edge.second);
gts->insert(edge, thr_id);
gts->insert({edge.src, edge.dst}, thr_id);
std::swap(edge.src, edge.dst);
gts->insert({edge.src, edge.dst}, thr_id);
#ifdef USE_EAGER_DSU
if (dsu_valid) {
auto src = std::min(edge.first, edge.second);
auto dst = std::max(edge.first, edge.second);
auto src = std::min(edge.src, edge.dst);
auto dst = std::max(edge.src, edge.dst);
std::lock_guard<std::mutex> sflock (spanning_forest_mtx[src]);
if (spanning_forest[src].find(dst) != spanning_forest[src].end()) {
dsu_valid = false;
Expand All @@ -152,6 +150,8 @@ class Graph {
}
}
}
#else
dsu_valid = false;
#endif // USE_EAGER_DSU
}

Expand Down
3 changes: 3 additions & 0 deletions include/l0_sampling/sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ class Sketch {

inline static size_t sketchSizeof()
{ return sizeof(Sketch) + num_elems * (sizeof(vec_t) + sizeof(vec_hash_t)) - sizeof(char); }

inline static size_t serialized_size()
{ return num_elems * (sizeof(vec_t) + sizeof(vec_hash_t)); }

inline static vec_t get_failure_factor()
{ return failure_factor; }
Expand Down
11 changes: 8 additions & 3 deletions include/supernode.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@

#include "l0_sampling/sketch.h"

typedef std::pair<node_id_t, node_id_t> Edge;

/**
* This interface implements the "supernode" so Boruvka can use it as a black
* box without needing to worry about implementing l_0.
*/
class Supernode {
// the size of a super-node in bytes including the all sketches off the end
static size_t bytes_size;
static size_t serialized_size; // the size of a supernode that has been serialized
int idx;
int num_sketches;
std::mutex node_mt;
Expand Down Expand Up @@ -82,13 +81,19 @@ class Supernode {

static inline void configure(uint64_t n, vec_t sketch_fail_factor=100) {
Sketch::configure(n*n, sketch_fail_factor);
bytes_size = sizeof(Supernode) + log2(n)/(log2(3)-1) * Sketch::sketchSizeof() - sizeof(char);
bytes_size = sizeof(Supernode) + size_t(log2(n)/(log2(3)-1)) * Sketch::sketchSizeof() - sizeof(char);
serialized_size = size_t(log2(n)/(log2(3)-1)) * Sketch::serialized_size();
}

static inline size_t get_size() {
return bytes_size;
}

// return the size of a supernode that has been serialized using write_binary()
static inline size_t get_serialized_size() {
return serialized_size;
}

inline size_t get_sketch_size() {
return sketch_size;
}
Expand Down
3 changes: 1 addition & 2 deletions include/test/file_graph_verifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
class FileGraphVerifier : public GraphVerifier {
std::vector<std::set<node_id_t>> kruskal_ref;
std::vector<std::set<node_id_t>> boruvka_cc;
std::vector<std::set<node_id_t>> det_graph;
DisjointSetUnion<node_id_t> sets;

public:
FileGraphVerifier(const std::string& input_file = "./cumul_sample.txt");
FileGraphVerifier(node_id_t n, const std::string& input_file);

void verify_edge(Edge edge);
void verify_cc(node_id_t node);
Expand Down
20 changes: 16 additions & 4 deletions include/test/graph_verifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
* generates.
*/
class GraphVerifier {
protected:
std::vector<std::vector<bool>> adj_matrix;

public:
/**
* Verifies an edge exists in the graph. Verifies that the edge is in the cut
Expand All @@ -32,19 +35,28 @@ class GraphVerifier {
*/
virtual void verify_soln(std::vector<std::set<node_id_t>> &retval) = 0;

std::vector<std::vector<bool>> extract_adj_matrix() {return adj_matrix;}

GraphVerifier() = default;
GraphVerifier(std::vector<std::vector<bool>> _adj) : adj_matrix(std::move(_adj)) {};

virtual ~GraphVerifier() {};
};

class BadEdgeException : public std::exception {
virtual const char* what() const throw() {
return "The edge is not in the cut of the sample! (standard)";
return "The edge is not in the cut of the sample!";
}
};


class NotCCException : public std::exception {
virtual const char* what() const throw() {
return "The supernode is not a connected component. It has edges in its "
"cut!";
return "The supernode is not a connected component. It has edges in its cut!";
}
};

class IncorrectCCException : public std::exception {
virtual const char* what() const throw() {
return "The connected components are incorrect!";
}
};
9 changes: 6 additions & 3 deletions include/test/mat_graph_verifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@
class MatGraphVerifier : public GraphVerifier {
std::vector<std::set<node_id_t>> kruskal_ref;
std::vector<std::set<node_id_t>> boruvka_cc;
std::vector<std::vector<bool>> adj_graph;
DisjointSetUnion<node_id_t> sets;

node_id_t n;
DisjointSetUnion<node_id_t> sets;

/**
* Runs Kruskal's (deterministic) CC algo.
Expand All @@ -25,11 +24,15 @@ class MatGraphVerifier : public GraphVerifier {
std::vector<std::set<node_id_t>> kruskal();
public:
MatGraphVerifier(node_id_t n);

// When we want to build a MatGraphVerifier without iterative edge_updates
MatGraphVerifier(node_id_t n, std::vector<std::vector<bool>> _adj)
: GraphVerifier(_adj), n(n), sets(n) { reset_cc_state(); };

void reset_cc_state(); // run this function before using as a verifier in CC
void edge_update(node_id_t src, node_id_t dst);

void verify_edge(Edge edge);
void verify_cc(node_id_t node);
void verify_soln(std::vector<std::set<node_id_t>> &retval);
};
};
Loading