From 7b036386a06725a00d6aa90c60bb17d833a8e369 Mon Sep 17 00:00:00 2001 From: Jason Poovey Date: Tue, 10 Nov 2015 08:49:42 -0500 Subject: [PATCH] Implementation of betweenness subset JSON RPC Call --- lib/stinger_alg/inc/betweenness.h | 3 +- lib/stinger_alg/src/betweenness.c | 303 ++++++++---------- .../tools/json_rpc_server/CMakeLists.txt | 1 + .../json_rpc_server/inc/json_rpc_server.h | 5 + .../src/betweenness_subgraph.cpp | 87 +++++ .../json_rpc_server/src/json_rpc_server.cpp | 1 + .../betweenness_test/betweenness_test.cpp | 59 ++++ 7 files changed, 282 insertions(+), 177 deletions(-) create mode 100644 src/clients/tools/json_rpc_server/src/betweenness_subgraph.cpp diff --git a/lib/stinger_alg/inc/betweenness.h b/lib/stinger_alg/inc/betweenness.h index 5313cb3f..023ad6cf 100644 --- a/lib/stinger_alg/inc/betweenness.h +++ b/lib/stinger_alg/inc/betweenness.h @@ -8,7 +8,8 @@ #include "stinger_net/stinger_alg.h" #include "stinger_utils/timer.h" -void single_bc_search(stinger_t * S, int64_t nv, int64_t source, double * bc, int64_t * found_count); +void single_bc_search(stinger_t * S, int64_t nv, int64_t source, double * bc, int64_t * found_count, uint8_t * vertex_set); void sample_search(stinger_t * S, int64_t nv, int64_t nsamples, double * bc, int64_t * found_count); +void sample_search_subgraph(stinger_t * S, int64_t nv, uint8_t * vertex_set, int64_t nsamples, double * bc, int64_t * found_count); #endif diff --git a/lib/stinger_alg/src/betweenness.c b/lib/stinger_alg/src/betweenness.c index 705dde4e..87678373 100644 --- a/lib/stinger_alg/src/betweenness.c +++ b/lib/stinger_alg/src/betweenness.c @@ -12,219 +12,170 @@ #define PHASE_END -1 void -single_bc_search(stinger_t * S, int64_t nv, int64_t source, double * bc, int64_t * found_count) +single_bc_search(stinger_t * S, int64_t nv, int64_t source, double * bc, int64_t * found_count, uint8_t * vertex_set) { + #define USING_SET (vertex_set != NULL) - int64_t * paths = (int64_t * )xcalloc(nv * 3, sizeof(int64_t)); - int64_t * bfs_stack = paths + nv; - double * partial = (double *)xcalloc(nv, sizeof(double)); + int64_t * paths = (int64_t * )xcalloc(nv * 3, sizeof(int64_t)); + int64_t * bfs_stack = paths + nv; + double * partial = (double *)xcalloc(nv, sizeof(double)); - int64_t * d = (int64_t *)xmalloc(nv * sizeof(int64_t)); + int64_t * d = (int64_t *)xmalloc(nv * sizeof(int64_t)); - if (paths == NULL || partial == NULL || d == NULL) { - LOG_E("Could not allocate memory for BC search"); - return; - } - - for(int64_t i = 0; i < nv; i ++) d[i] = -1; + if (paths == NULL || partial == NULL || d == NULL) { + LOG_E("Could not allocate memory for BC search"); + return; + } - int64_t stack_top = 2; - bfs_stack[0] = source; - bfs_stack[1] = PHASE_END; - d[source] = 0; - paths[source] = 1; - int64_t count = 1; + for(int64_t i = 0; i < nv; i ++) d[i] = -1; - int64_t index = -1; + int64_t stack_top = 2; + bfs_stack[0] = source; + bfs_stack[1] = PHASE_END; + d[source] = 0; + paths[source] = 1; + int64_t count = 1; - //LOG_D("Directed"); + int64_t index = -1; - while (count > 0) { - count = 0; - index++; + //LOG_D("Directed"); - while (bfs_stack[index] != PHASE_END) { - int64_t v = bfs_stack[index]; - int64_t d_next = d[v] + 1; + while (count > 0) { + count = 0; + index++; - //LOG_D_A("(%ld) TOS=%ld",source,v); + while (bfs_stack[index] != PHASE_END) { + int64_t v = bfs_stack[index]; + int64_t d_next = d[v] + 1; - STINGER_FORALL_EDGES_OF_VTX_BEGIN(S, v) { - //LOG_D_A("(%ld) Begin - d[%ld]=%ld paths[%ld]=%ld",source,STINGER_EDGE_DEST,d[STINGER_EDGE_DEST],STINGER_EDGE_DEST,paths[STINGER_EDGE_DEST]); - if(d[STINGER_EDGE_DEST] < 0) { - d[STINGER_EDGE_DEST] = d_next; - paths[STINGER_EDGE_DEST] = paths[v]; - count = count + 1; - bfs_stack[stack_top++] = STINGER_EDGE_DEST; + //LOG_D_A("(%ld) TOS=%ld",source,v); - stinger_int64_fetch_add(found_count + STINGER_EDGE_DEST, 1); - } - else if(d[STINGER_EDGE_DEST] == d_next) { - paths[STINGER_EDGE_DEST] += paths[v]; - } - //LOG_D_A("(%ld) End - d[%ld]=%ld paths[%ld]=%ld",source,STINGER_EDGE_DEST,d[STINGER_EDGE_DEST],STINGER_EDGE_DEST,paths[STINGER_EDGE_DEST]); + STINGER_FORALL_EDGES_OF_VTX_BEGIN(S, v) { + if (!USING_SET || vertex_set[STINGER_EDGE_DEST]) { + //LOG_D_A("(%ld) Begin - d[%ld]=%ld paths[%ld]=%ld",source,STINGER_EDGE_DEST,d[STINGER_EDGE_DEST],STINGER_EDGE_DEST,paths[STINGER_EDGE_DEST]); + if(d[STINGER_EDGE_DEST] < 0) { + d[STINGER_EDGE_DEST] = d_next; + paths[STINGER_EDGE_DEST] = paths[v]; + count = count + 1; + bfs_stack[stack_top++] = STINGER_EDGE_DEST; - } STINGER_FORALL_EDGES_OF_VTX_END(); - - index++; + stinger_int64_fetch_add(found_count + STINGER_EDGE_DEST, 1); + } + else if(d[STINGER_EDGE_DEST] == d_next) { + paths[STINGER_EDGE_DEST] += paths[v]; + } + //LOG_D_A("(%ld) End - d[%ld]=%ld paths[%ld]=%ld",source,STINGER_EDGE_DEST,d[STINGER_EDGE_DEST],STINGER_EDGE_DEST,paths[STINGER_EDGE_DEST]); } + } STINGER_FORALL_EDGES_OF_VTX_END(); - bfs_stack[stack_top++] = PHASE_END; + index++; } - stack_top--; - while (stack_top > 0) { - while (bfs_stack[stack_top] != PHASE_END) { - int64_t w = bfs_stack[stack_top]; - double dsw = 0; - int64_t sw = paths[w]; - STINGER_FORALL_EDGES_OF_VTX_BEGIN(S, w) { - if(d[w] == (d[STINGER_EDGE_DEST] - 1)) { - dsw += frac(sw,paths[STINGER_EDGE_DEST]) * (1.0 + partial[STINGER_EDGE_DEST]); - } - } STINGER_FORALL_EDGES_OF_VTX_END(); - partial[w] = dsw; - bc[w] += dsw; - stack_top--; - //LOG_D_A("(%ld) Sum Partials for %ld -- partial[%ld]=%lf bc[%ld]=%lf",source,w,w,partial[w],w,bc[w]); + bfs_stack[stack_top++] = PHASE_END; + } + stack_top--; + + while (stack_top > 0) { + while (bfs_stack[stack_top] != PHASE_END) { + int64_t w = bfs_stack[stack_top]; + double dsw = 0; + int64_t sw = paths[w]; + STINGER_FORALL_EDGES_OF_VTX_BEGIN(S, w) { + if (!USING_SET || vertex_set[STINGER_EDGE_DEST]) { + if(d[w] == (d[STINGER_EDGE_DEST] - 1)) { + dsw += frac(sw,paths[STINGER_EDGE_DEST]) * (1.0 + partial[STINGER_EDGE_DEST]); + } } - stack_top--; + } STINGER_FORALL_EDGES_OF_VTX_END(); + partial[w] = dsw; + bc[w] += dsw; + stack_top--; + //LOG_D_A("(%ld) Sum Partials for %ld -- partial[%ld]=%lf bc[%ld]=%lf",source,w,w,partial[w],w,bc[w]); } + stack_top--; + } - xfree(d); - xfree(partial); - xfree(paths); + xfree(d); + xfree(partial); + xfree(paths); + #undef USING_SET } -#if 0 -/** -* @brief Perform a single source of the BC calculation per Brandes. -* -* Note that this follows the approach suggested by Green and Bader in which -* parent lists are not maintained, but instead the neighbors are searched -* to rediscover parents. -* -* Note also that found count will not include the source and that increments -* to this array are handled atomically. -* -* Operations to the bc array are NOT ATOMIC -* -* @param S The STINGER graph -* @param nv The maximum vertex ID in the graph plus one. -* @param source The vertex from where this search will start. -* @param bc The array into which the partial BCs will be added. -* @param found_count An array to track how many times a vertex is found for normalization. -*/ void -single_bc_search(stinger_t * S, int64_t nv, int64_t source, double * bc, int64_t * found_count) +sample_search(stinger_t * S, int64_t nv, int64_t nsamples, double * bc, int64_t * found_count) { - int64_t * paths = (int64_t * )xcalloc(nv * 2, sizeof(int64_t)); - int64_t * q = paths + nv; - double * partial = (double *)xcalloc(nv, sizeof(double)); - - int64_t * d = (int64_t *)xmalloc(nv * sizeof(int64_t)); - for(int64_t i = 0; i < nv; i ++) d[i] = -1; - - int64_t q_front = 1; - int64_t q_rear = 0; - q[0] = source; - d[source] = 0; - paths[source] = 1; - - //fprintf(stderr,"Undirected\n"); - - while(q_rear != q_front) { - int64_t v = q[q_rear++]; - int64_t d_next = d[v] + 1; - - //fprintf(stderr,"(%ld) v=%ld d_next=%ld\n",source,v,d_next); + sample_search_subgraph(S, nv, NULL, nsamples, bc, found_count); +} - STINGER_FORALL_EDGES_OF_VTX_BEGIN(S, v) { - //fprintf(stderr,"(%ld) Begin - d[%ld]=%ld paths[%ld]=%ld\n",source,STINGER_EDGE_DEST,d[STINGER_EDGE_DEST],STINGER_EDGE_DEST,paths[STINGER_EDGE_DEST]); - if(d[STINGER_EDGE_DEST] < 0) { - d[STINGER_EDGE_DEST] = d_next; - paths[STINGER_EDGE_DEST] = paths[v]; - q[q_front++] = STINGER_EDGE_DEST; +void +sample_search_subgraph(stinger_t * S, int64_t nv, uint8_t * vertex_set, int64_t nsamples, double * bc, int64_t * found_count) +{ + LOG_V_A(" > Beginning with %ld vertices and %ld samples\n", (long)nv, (long)nsamples); - stinger_int64_fetch_add(found_count + STINGER_EDGE_DEST, 1); - } + #define USING_SET (vertex_set != NULL) - else if(d[STINGER_EDGE_DEST] == d_next) { - paths[STINGER_EDGE_DEST] += paths[v]; - } - //fprintf(stderr,"(%ld) End - d[%ld]=%ld paths[%ld]=%ld\n",source,STINGER_EDGE_DEST,d[STINGER_EDGE_DEST],STINGER_EDGE_DEST,paths[STINGER_EDGE_DEST]); + int64_t set_nv = nv; - } STINGER_FORALL_EDGES_OF_VTX_END(); + if (USING_SET) { + set_nv = 0; + OMP("omp parallel for reduction(+:set_nv)") + for(int64_t v = 0; v < nv; v++) { + if (vertex_set[v]) { + set_nv++; + } } - - /* don't process source */ - while(q_front > 1) { - int64_t w = q[--q_front]; - - /* don't maintain parents, do search instead */ - STINGER_FORALL_EDGES_OF_VTX_BEGIN(S, w) { - if(d[STINGER_EDGE_DEST] == (d[w] - 1)) { - partial[STINGER_EDGE_DEST] += frac(paths[STINGER_EDGE_DEST],paths[w]) * (1 + partial[w]); - } - } STINGER_FORALL_EDGES_OF_VTX_END(); - bc[w] += partial[w]; - //fprintf(stderr,"(%ld) Sum Partials for %ld -- partial[%ld]=%lf bc[%ld]=%lf\n",source,w,w,partial[w],w,bc[w]); + } + + int64_t * found = xcalloc(nv, sizeof(int64_t)); + OMP("omp parallel") + { + OMP("omp for") + for(int64_t v = 0; v < nv; v++) { + found_count[v] = 0; + bc[v] = 0; } - xfree(d); - xfree(partial); - xfree(paths); -} -#endif + double * partials = (double *)xcalloc(nv, sizeof(double)); -void -sample_search(stinger_t * S, int64_t nv, int64_t nsamples, double * bc, int64_t * found_count) -{ - LOG_V_A(" > Beginning with %ld vertices and %ld samples\n", (long)nv, (long)nsamples); + if(set_nv < nsamples) { + int64_t min = nv; - int64_t * found = xcalloc(nv, sizeof(int64_t)); - OMP("omp parallel") - { - OMP("omp for") - for(int64_t v = 0; v < nv; v++) { - found_count[v] = 0; - bc[v] = 0; + OMP("omp for") + for(int64_t s = 0; s < min; s++) { + if (!USING_SET || vertex_set[s]) { + single_bc_search(S, nv, s, partials, found_count, vertex_set); } - - double * partials = (double *)xcalloc(nv, sizeof(double)); - - if(nv < nsamples) { - int64_t min = nv; - - OMP("omp for") - for(int64_t s = 0; s < min; s++) { - single_bc_search(S, nv, s, partials, found_count); - } - } else { - OMP("omp for") - for(int64_t s = 0; s < nsamples; s++) { - int64_t v = 0; - - while(1) { - v = rand() % nv; - if(found[v] == 0 && (0 == stinger_int64_fetch_add(found + v, 1))) { - break; - } - } - - single_bc_search(S, nv, v, partials, found_count); - } + } + } else { + OMP("omp for") + for(int64_t s = 0; s < nsamples; s++) { + int64_t v = 0; + + while(1) { + v = rand() % nv; + if (USING_SET && !vertex_set[v]) { + continue; + } + if(found[v] == 0 && (0 == stinger_int64_fetch_add(found + v, 1))) { + break; + } } - OMP("omp critical") - { - for(int64_t v = 0; v < nv; v++) { - bc[v] += partials[v]; - } - } + single_bc_search(S, nv, v, partials, found_count, vertex_set); + } + } - free(partials); + OMP("omp critical") + { + for(int64_t v = 0; v < nv; v++) { + bc[v] += partials[v]; + } } - free(found); + free(partials); + } + + free(found); + + #undef USING_SET } diff --git a/src/clients/tools/json_rpc_server/CMakeLists.txt b/src/clients/tools/json_rpc_server/CMakeLists.txt index 3078d257..39a35d8e 100644 --- a/src/clients/tools/json_rpc_server/CMakeLists.txt +++ b/src/clients/tools/json_rpc_server/CMakeLists.txt @@ -12,6 +12,7 @@ set(sources src/alg_data_array.cpp src/array_to_json_monolithic.cpp src/array_to_json_reduction.cpp + src/betweenness_subgraph.cpp src/bfs_edges.cpp src/breadth_first_search.cpp src/egonet.cpp diff --git a/src/clients/tools/json_rpc_server/inc/json_rpc_server.h b/src/clients/tools/json_rpc_server/inc/json_rpc_server.h index c48a1ba2..e5c424e5 100644 --- a/src/clients/tools/json_rpc_server/inc/json_rpc_server.h +++ b/src/clients/tools/json_rpc_server/inc/json_rpc_server.h @@ -187,6 +187,11 @@ struct JSON_RPC_pagerank_subgraph: JSON_RPCFunction { virtual int64_t operator()(rapidjson::Value * params, rapidjson::Value & result, rapidjson::MemoryPoolAllocator & allocator); }; +struct JSON_RPC_betweenness_subgraph: JSON_RPCFunction { + JSON_RPC_betweenness_subgraph(JSON_RPCServerState * state) : JSON_RPCFunction(state) { } + virtual int64_t operator()(rapidjson::Value * params, rapidjson::Value & result, rapidjson::MemoryPoolAllocator & allocator); +}; + /* Helper Functions */ int diff --git a/src/clients/tools/json_rpc_server/src/betweenness_subgraph.cpp b/src/clients/tools/json_rpc_server/src/betweenness_subgraph.cpp new file mode 100644 index 00000000..47beeda6 --- /dev/null +++ b/src/clients/tools/json_rpc_server/src/betweenness_subgraph.cpp @@ -0,0 +1,87 @@ +#include +#include + +//#define LOG_AT_W /* warning only */ +#include "stinger_core/stinger_error.h" + +#include "stinger_core/xmalloc.h" +#include "rapidjson/document.h" +#include "json_rpc_server.h" +#include "json_rpc.h" +extern "C" { + #include "stinger_alg/betweenness.h" +} + +using namespace gt::stinger; + + +int64_t +JSON_RPC_betweenness_subgraph::operator()(rapidjson::Value * params, rapidjson::Value & result, rapidjson::MemoryPoolAllocator & allocator) +{ + params_array_t vertices; + bool strings; + + rpc_params_t p[] = { + {"vertices", TYPE_ARRAY, &vertices, false, 0}, + {"strings", TYPE_BOOL, &strings, true, 0}, + {NULL, TYPE_NONE, NULL, false, 0} + }; + + if (!contains_params(p, params)) { + return json_rpc_error(-32602, result, allocator); + } + + stinger_t * S = server_state->get_stinger(); + if (!S) { + LOG_E ("STINGER pointer is invalid"); + return json_rpc_error(-32603, result, allocator); + } + + uint8_t * vtx_subset = (uint8_t *)xcalloc(stinger_max_nv(S)+1,sizeof(uint8_t)); + double * bc = (double *)xcalloc(stinger_max_active_vertex(S)+1,sizeof(double)); + int64_t * found_count = (int64_t *)xcalloc(stinger_max_active_vertex(S)+1,sizeof(int64_t)); + + for (int64_t i = 0; i < vertices.len; i++) { + vtx_subset[vertices.arr[i]] = 1; + } + + // Run BC + sample_search_subgraph(S, stinger_max_active_vertex(S)+1, vtx_subset, 512, bc, found_count); + + xfree(found_count); + xfree(vtx_subset); + + // CREATE the JSON + rapidjson::Value vertex_id_json(rapidjson::kArrayType); + rapidjson::Value value_json(rapidjson::kArrayType); + rapidjson::Value bc_json (rapidjson::kObjectType); + rapidjson::Value vtx_str (rapidjson::kArrayType); + rapidjson::Value vtx_phys; + + for (uint64_t i = 0; i < vertices.len; i++) { + uint64_t vtx = vertices.arr[i]; + vertex_id_json.PushBack(vtx,allocator); + value_json.PushBack(bc[vtx],allocator); + if (strings) { + char * physID; + uint64_t len; + if(-1 == stinger_mapping_physid_direct(S, vtx, &physID, &len)) { + physID = (char *) ""; + len = 0; + } + vtx_phys.SetString(physID, len, allocator); + vtx_str.PushBack(vtx_phys, allocator); + } + } + + bc_json.AddMember("vertex_id", vertex_id_json, allocator); + bc_json.AddMember("value", value_json, allocator); + if (strings) + bc_json.AddMember("vertex_str", vtx_str, allocator); + + result.AddMember("bc",bc_json,allocator); + + xfree(bc); + return 0; +} + diff --git a/src/clients/tools/json_rpc_server/src/json_rpc_server.cpp b/src/clients/tools/json_rpc_server/src/json_rpc_server.cpp index 24f53262..5e5043d0 100644 --- a/src/clients/tools/json_rpc_server/src/json_rpc_server.cpp +++ b/src/clients/tools/json_rpc_server/src/json_rpc_server.cpp @@ -168,6 +168,7 @@ main (int argc, char ** argv) server_state.add_rpc_function("get_connected_component", new JSON_RPC_get_connected_component(&server_state)); server_state.add_rpc_function("bfs_edges", new JSON_RPC_bfs_edges(&server_state)); server_state.add_rpc_function("pagerank_subgraph", new JSON_RPC_pagerank_subgraph(&server_state)); + server_state.add_rpc_function("betweenness_subgraph", new JSON_RPC_betweenness_subgraph(&server_state)); server_state.add_rpc_session("subgraph", new JSON_RPC_community_subgraph(0, &server_state)); server_state.add_rpc_session("vertex_event_notifier", new JSON_RPC_vertex_event_notifier(0, &server_state)); diff --git a/src/tests/betweenness_test/betweenness_test.cpp b/src/tests/betweenness_test/betweenness_test.cpp index e0caae66..df66aed6 100644 --- a/src/tests/betweenness_test/betweenness_test.cpp +++ b/src/tests/betweenness_test/betweenness_test.cpp @@ -59,6 +59,9 @@ TEST_F(BetweennessTest, DirectedGraph) { for (int64_t v = 0; v < nv; v++) { EXPECT_DOUBLE_EQ(expected_bc[v],bc[v]) << "v = " << v; } + + xfree(bc); + xfree(times_found); } TEST_F(BetweennessTest, DirectedGraphOverSample) { @@ -96,6 +99,9 @@ TEST_F(BetweennessTest, DirectedGraphOverSample) { for (int64_t v = 0; v < nv; v++) { EXPECT_DOUBLE_EQ(expected_bc[v],bc[v]) << "v = " << v; } + + xfree(bc); + xfree(times_found); } TEST_F(BetweennessTest, UndirectedGraph) { @@ -133,8 +139,61 @@ TEST_F(BetweennessTest, UndirectedGraph) { for (int64_t v = 0; v < nv; v++) { EXPECT_NEAR(expected_bc[v],bc[v],0.00001) << "v = " << v; } + + xfree(bc); + xfree(times_found); } +TEST_F(BetweennessTest, Subgraph) { + stinger_insert_edge(S, 0, 0, 1, 1, 1); + stinger_insert_edge(S, 0, 1, 2, 1, 1); + stinger_insert_edge(S, 0, 1, 3, 1, 1); + stinger_insert_edge(S, 0, 1, 4, 1, 1); + stinger_insert_edge(S, 0, 2, 8, 1, 1); + stinger_insert_edge(S, 0, 3, 5, 1, 1); + stinger_insert_edge(S, 0, 3, 6, 1, 1); + stinger_insert_edge(S, 0, 4, 5, 1, 1); + stinger_insert_edge(S, 0, 5, 6, 1, 1); + stinger_insert_edge(S, 0, 5, 7, 1, 1); + stinger_insert_edge(S, 0, 7, 8, 1, 1); + + int64_t nv = stinger_max_active_vertex(S)+1; + + double * bc = (double *)xcalloc(nv, sizeof(double)); + int64_t * times_found = (int64_t *)xcalloc(nv, sizeof(int64_t)); + uint8_t * vtx_set = (uint8_t *)xcalloc(nv,sizeof(uint8_t)); + + for (int64_t i = 0; i < nv; i++) { + if (i != 3) { + vtx_set[i] = 1; + } + } + + sample_search_subgraph(S, nv, vtx_set, 9, bc, times_found); + + xfree(vtx_set); + + double expected_bc[9] = { + 0.0, + 6.0, + 2.0, + 0.0, + 6.0, + 7.0, + 0.0, + 2.0, + 0.0 + }; + + for (int64_t v = 0; v < nv; v++) { + EXPECT_DOUBLE_EQ(expected_bc[v],bc[v]) << "v = " << v; + } + + xfree(bc); + xfree(times_found); +} + + int main (int argc, char *argv[]) {