Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions include/graph_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class GraphConfiguration {
// How many OMP threads each graph worker uses
size_t _group_size = 1;

// Option to create more sketches than for standard connected components
// Ex factor of 1.5, 1.5 times the sketches
// factor of 1, normal quantity of sketches
double _adtl_skts_factor = 1;

// Configuration for the guttering system
GutteringConfiguration _gutter_conf;

Expand All @@ -49,6 +54,8 @@ class GraphConfiguration {

GraphConfiguration& group_size(size_t group_size);

GraphConfiguration& adtl_skts_factor(double factor);

GutteringConfiguration& gutter_conf();

friend std::ostream& operator<< (std::ostream &out, const GraphConfiguration &conf);
Expand Down
7 changes: 4 additions & 3 deletions include/supernode.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,10 @@ class Supernode {

~Supernode();

static inline void configure(uint64_t n, vec_t sketch_fail_factor=default_fail_factor) {
Sketch::configure(n*n, sketch_fail_factor);
max_sketches = log2(n)/(log2(3)-1);
static inline void configure(uint64_t n, vec_t sketch_fail_factor = default_fail_factor,
double skt_factor = 1) {
Sketch::configure(n * n, sketch_fail_factor);
max_sketches = log2(n) / (log2(3) - 1) * skt_factor;
bytes_size = sizeof(Supernode) + max_sketches * Sketch::sketchSizeof();
serialized_size = max_sketches * Sketch::serialized_size();
}
Expand Down
12 changes: 9 additions & 3 deletions src/graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Graph::Graph(node_id_t num_nodes, GraphConfiguration config, int num_inserters)
#ifdef VERIFY_SAMPLES_F
std::cout << "Verifying samples..." << std::endl;
#endif
Supernode::configure(num_nodes);
Supernode::configure(num_nodes, Supernode::default_fail_factor, config._adtl_skts_factor);
representatives = new std::set<node_id_t>();
supernodes = new Supernode*[num_nodes];
parent = new std::remove_reference<decltype(*parent)>::type[num_nodes];
Expand Down Expand Up @@ -59,11 +59,13 @@ Graph::Graph(const std::string& input_file, GraphConfiguration config, int num_i
if (open_graph) throw MultipleGraphsException();

vec_t sketch_fail_factor;
double adtl_skts_factor;
auto binary_in = std::fstream(input_file, std::ios::in | std::ios::binary);
binary_in.read((char*)&seed, sizeof(seed));
binary_in.read((char*)&num_nodes, sizeof(num_nodes));
binary_in.read((char*)&sketch_fail_factor, sizeof(sketch_fail_factor));
Supernode::configure(num_nodes, sketch_fail_factor);
binary_in.read((char*)&adtl_skts_factor, sizeof(adtl_skts_factor));
Supernode::configure(num_nodes, sketch_fail_factor, adtl_skts_factor);

#ifdef VERIFY_SAMPLES_F
std::cout << "Verifying samples..." << std::endl;
Expand Down Expand Up @@ -213,7 +215,7 @@ inline std::vector<std::vector<node_id_t>> Graph::supernodes_to_merge(
return to_merge;
}

inline void Graph::merge_supernodes(Supernode** copy_supernodes, std::vector<node_id_t> &new_reps,
void Graph::merge_supernodes(Supernode** copy_supernodes, std::vector<node_id_t> &new_reps,
std::vector<std::vector<node_id_t>> &to_merge, bool make_copy) {
bool except = false;
std::exception_ptr err;
Expand Down Expand Up @@ -280,6 +282,7 @@ std::vector<std::set<node_id_t>> Graph::boruvka_emulation(bool make_copy) {
parent[i] = i;
spanning_forest[i].clear();
}
size_t round_num = 1;
try {
do {
modified = false;
Expand All @@ -297,6 +300,7 @@ std::vector<std::set<node_id_t>> Graph::boruvka_emulation(bool make_copy) {
if (!first_round && fail_round_2) throw OutOfQueriesException();
#endif
first_round = false;
++round_num;
} while (modified);
} catch (...) {
cleanup_copy();
Expand All @@ -307,6 +311,7 @@ std::vector<std::set<node_id_t>> Graph::boruvka_emulation(bool make_copy) {
delete[] query;
dsu_valid = true;

std::cout << "Query complete in " << round_num << " rounds." << std::endl;
auto retval = cc_from_dsu();
cc_alg_end = std::chrono::steady_clock::now();
return retval;
Expand Down Expand Up @@ -482,6 +487,7 @@ void Graph::write_binary(const std::string& filename) {
binary_out.write((char*)&seed, sizeof(seed));
binary_out.write((char*)&num_nodes, sizeof(num_nodes));
binary_out.write((char*)&fail_factor, sizeof(fail_factor));
binary_out.write((char*)&config._adtl_skts_factor, sizeof(config._adtl_skts_factor));
for (node_id_t i = 0; i < num_nodes; ++i) {
supernodes[i]->write_binary(binary_out);
}
Expand Down
19 changes: 17 additions & 2 deletions src/graph_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ GraphConfiguration& GraphConfiguration::backup_in_mem(bool backup_in_mem) {
GraphConfiguration& GraphConfiguration::num_groups(size_t num_groups) {
_num_groups = num_groups;
if (_num_groups < 1) {
std::cout << "num_groups="<< _num_groups << " is out of bounds. "
std::cout << "num_groups="<< _num_groups << " is out of bounds. [1, infty)"
<< "Defaulting to 1." << std::endl;
_num_groups = 1;
}
Expand All @@ -30,13 +30,28 @@ GraphConfiguration& GraphConfiguration::num_groups(size_t num_groups) {
GraphConfiguration& GraphConfiguration::group_size(size_t group_size) {
_group_size = group_size;
if (_group_size < 1) {
std::cout << "group_size="<< _group_size << " is out of bounds. "
std::cout << "group_size="<< _group_size << " is out of bounds. [1, infty)"
<< "Defaulting to 1." << std::endl;
_group_size = 1;
}
return *this;
}

GraphConfiguration& GraphConfiguration::adtl_skts_factor(double factor) {
_adtl_skts_factor = factor;
if (_adtl_skts_factor <= 1) {
std::cout << "adtl_skts_factor=" << _adtl_skts_factor << " is out of bounds. [1, infty)"
<< "Defaulting to 1." << std::endl;
_adtl_skts_factor = 1;
}
if (_adtl_skts_factor > 1) {
std::cerr << "WARNING: Your graph configuration specifies using a factor " << _adtl_skts_factor
<< " more memory than normal." << std::endl;
std::cerr << " Is this intentional? If not, set adtl_skts_factor to one" << std::endl;
}
return *this;
}

GutteringConfiguration& GraphConfiguration::gutter_conf() {
return _gutter_conf;
}
Expand Down
10 changes: 9 additions & 1 deletion tools/process_stream.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
#include <graph.h>
#include <binary_graph_stream.h>
#include <thread>
#include <sys/resource.h> // for rusage

static bool shutdown = false;

static double get_max_mem_used() {
struct rusage data;
getrusage(RUSAGE_SELF, &data);
return (double) data.ru_maxrss / 1024.0;
}

/*
* Function which is run in a seperate thread and will query
* the graph for the number of updates it has processed
Expand All @@ -12,7 +19,7 @@ static bool shutdown = false;
* @param start_time the time that we started stream ingestion
*/
void track_insertions(uint64_t total, Graph *g, std::chrono::steady_clock::time_point start_time) {
total = total * 2; // we insert 2 edge updates per edge
total = total * 2; // we insert 2 edge updates per edge

printf("Insertions\n");
printf("Progress: | 0%%\r"); fflush(stdout);
Expand Down Expand Up @@ -117,4 +124,5 @@ int main(int argc, char **argv) {
std::cout << " Flush Gutters(sec): " << flush_time.count() << std::endl;
std::cout << " Boruvka's Algorithm(sec): " << cc_alg_time.count() << std::endl;
std::cout << "Connected Components: " << CC_num << std::endl;
std::cout << "Maximum Memory Usage(MiB): " << get_max_mem_used() << std::endl;
}