From 5b7e64008df3789afe4c5b20d296cfb74f24b7a6 Mon Sep 17 00:00:00 2001 From: Eric Hein Date: Fri, 15 Apr 2016 10:35:20 -0400 Subject: [PATCH] Added BFS implementations to stinger_alg. --- lib/stinger_alg/CMakeLists.txt | 2 + lib/stinger_alg/inc/bfs.h | 30 +++++++ lib/stinger_alg/src/bfs.c | 156 +++++++++++++++++++++++++++++++++ 3 files changed, 188 insertions(+) create mode 100644 lib/stinger_alg/inc/bfs.h create mode 100644 lib/stinger_alg/src/bfs.c diff --git a/lib/stinger_alg/CMakeLists.txt b/lib/stinger_alg/CMakeLists.txt index 1583a5b7..dd80e228 100644 --- a/lib/stinger_alg/CMakeLists.txt +++ b/lib/stinger_alg/CMakeLists.txt @@ -1,6 +1,7 @@ set(sources src/adamic_adar.c src/betweenness.c + src/bfs.c src/clustering.c src/kcore.c src/pagerank.c @@ -13,6 +14,7 @@ set(sources set(headers inc/adamic_adar.h inc/betweenness.h + inc/bfs.h inc/clustering.h inc/kcore.h inc/pagerank.h diff --git a/lib/stinger_alg/inc/bfs.h b/lib/stinger_alg/inc/bfs.h new file mode 100644 index 00000000..394b7a65 --- /dev/null +++ b/lib/stinger_alg/inc/bfs.h @@ -0,0 +1,30 @@ +#ifndef STINGER_BFS_H_ +#define STINGER_BFS_H_ + +#include "stinger_core/stinger.h" +#include "stinger_core/xmalloc.h" +#include "stinger_core/stinger_error.h" + +int64_t +parallel_breadth_first_search ( + struct stinger * S, + int64_t nv, + int64_t source, + int64_t * marks, + int64_t * queue, + int64_t * Qhead, + int64_t * level +); + +int64_t +direction_optimizing_parallel_breadth_first_search ( + struct stinger * S, + int64_t nv, + int64_t source, + int64_t * marks, + int64_t * queue, + int64_t * Qhead, + int64_t * level +); + +#endif diff --git a/lib/stinger_alg/src/bfs.c b/lib/stinger_alg/src/bfs.c new file mode 100644 index 00000000..23384cb3 --- /dev/null +++ b/lib/stinger_alg/src/bfs.c @@ -0,0 +1,156 @@ +#include "bfs.h" + +#include "stinger_core/x86_full_empty.h" + +int64_t +parallel_breadth_first_search (struct stinger * S, int64_t nv, + int64_t source, int64_t * marks, + int64_t * queue, int64_t * Qhead, int64_t * level) +{ + for (int64_t i = 0; i < nv; i++) { + level[i] = -1; + marks[i] = 0; + } + + int64_t nQ, Qnext, Qstart, Qend; + /* initialize */ + queue[0] = source; + level[source] = 0; + marks[source] = 1; + Qnext = 1; /* next open slot in the queue */ + nQ = 1; /* level we are currently processing */ + Qhead[0] = 0; /* beginning of the current frontier */ + Qhead[1] = 1; /* end of the current frontier */ + + Qstart = Qhead[nQ-1]; + Qend = Qhead[nQ]; + + while (Qstart != Qend) { + OMP ("omp parallel for") + for (int64_t j = Qstart; j < Qend; j++) { + STINGER_FORALL_OUT_EDGES_OF_VTX_BEGIN (S, queue[j]) { + int64_t d = level[STINGER_EDGE_DEST]; + if (d < 0) { + if (stinger_int64_fetch_add (&marks[STINGER_EDGE_DEST], 1) == 0) { + level[STINGER_EDGE_DEST] = nQ; + + int64_t mine = stinger_int64_fetch_add(&Qnext, 1); + queue[mine] = STINGER_EDGE_DEST; + } + } + } STINGER_FORALL_OUT_EDGES_OF_VTX_END(); + } + + Qstart = Qhead[nQ-1]; + Qend = Qnext; + Qhead[nQ++] = Qend; + } + + return nQ; +} + +int64_t +direction_optimizing_parallel_breadth_first_search (struct stinger * S, int64_t nv, + int64_t source, int64_t * marks, + int64_t * queue, int64_t * Qhead, int64_t * level) +{ + for (int64_t i = 0; i < nv; i++) { + level[i] = -1; + marks[i] = 0; + } + + int64_t nQ, Qnext, Qstart, Qend; + int64_t mf = 0; /* number of edges incident on frontier */ + int64_t nf = 0; /* number of vertices in frontier */ + int64_t mu = 0; /* number of edges incident on unexplored vertices */ + int64_t nf_last = 0; /* size of previous frontier */ + + /* initialize */ + queue[0] = source; + level[source] = 0; + marks[source] = 1; + Qnext = 1; /* next open slot in the queue */ + nQ = 1; /* level we are currently processing */ + Qhead[0] = 0; /* beginning of the current frontier */ + Qhead[1] = 1; /* end of the current frontier */ + + Qstart = Qhead[nQ-1]; + Qend = Qhead[nQ]; + + mf = 0; + nf = 0; + mu = stinger_total_edges(S); + //printf("mf: %ld, nf: %ld, mu: %ld, nf_last: %ld\n", mf, nf, mu, nf_last); + + int64_t top_down = 1; + + while (Qstart != Qend) { + if (top_down) { + /* forward (top down) traversal */ + OMP ("omp parallel for") + for (int64_t j = Qstart; j < Qend; j++) { + STINGER_FORALL_OUT_EDGES_OF_VTX_BEGIN (S, queue[j]) { + int64_t d = level[STINGER_EDGE_DEST]; + if (d < 0) { + if (stinger_int64_fetch_add (&marks[STINGER_EDGE_DEST], 1) == 0) { + level[STINGER_EDGE_DEST] = nQ; + + int64_t mine = stinger_int64_fetch_add(&Qnext, 1); + queue[mine] = STINGER_EDGE_DEST; + } + } + } STINGER_FORALL_OUT_EDGES_OF_VTX_END(); + } + } else { + /* reverse (bottom up) traversal */ + OMP ("omp parallel for") + for (int64_t i = 0; i < nv; i++) { + int64_t done = 0; + /* only process unvisited vertices */ + if (!marks[i]) { + STINGER_FORALL_OUT_EDGES_OF_VTX_BEGIN (S, i) { + /* neighbor has been visited */ + if (!done && level[STINGER_EDGE_DEST] == nQ-1) { + level[i] = nQ; + marks[i] = 1; + + /* if we don't queue here, we cannot restart the forward search */ + int64_t mine = stinger_int64_fetch_add(&Qnext, 1); + queue[mine] = i; + done = 1; + } + } STINGER_FORALL_OUT_EDGES_OF_VTX_END(); + } + } + } + + Qstart = Qhead[nQ-1]; + Qend = Qnext; + Qhead[nQ++] = Qend; + + /* calculate new transition parameters */ + nf_last = nf; + nf = Qend - Qstart; + mf = 0; + OMP ("omp parallel for reduction(+:mf) reduction(-:mu)") + for (int64_t j = Qstart; j < Qend; j++) { + int64_t deg_j = stinger_outdegree_get(S, queue[j]); + mf += deg_j; + mu -= deg_j; + } + //printf("mf: %ld, nf: %ld, mu: %ld, nf_last: %ld\n", mf, nf, mu, nf_last); + + /* evaluate the transition state machine */ + if (top_down) { + if (mf > (mu / 14) && (nf - nf_last > 0)) { + top_down = 0; + } + } else { + if (nf < (nv / 24) && (nf - nf_last < 0)) { + top_down = 1; + } + } + } /* end while */ + + return nQ; +}