From 99b2734c2579b76852451a3d29ff8ad3f0ac20e2 Mon Sep 17 00:00:00 2001 From: Nathan Faber Date: Fri, 29 Apr 2022 19:48:07 -0400 Subject: [PATCH 01/17] networking starting out --- src/runtime/CMakeLists.txt | 15 +++- src/runtime/client.c | 97 ++++++++++++++++++++++++++ src/runtime/includes/client.h | 6 ++ src/runtime/includes/server.h | 13 ++++ src/runtime/runtime.c | 16 +++-- src/runtime/server.c | 128 ++++++++++++++++++++++++++++++++++ 6 files changed, 268 insertions(+), 7 deletions(-) create mode 100644 src/runtime/client.c create mode 100644 src/runtime/includes/client.h create mode 100644 src/runtime/includes/server.h create mode 100644 src/runtime/server.c diff --git a/src/runtime/CMakeLists.txt b/src/runtime/CMakeLists.txt index f4ed096..3af9581 100644 --- a/src/runtime/CMakeLists.txt +++ b/src/runtime/CMakeLists.txt @@ -15,6 +15,17 @@ target_link_libraries(shell handle_block init_globals) +add_library(server server.c) +target_compile_options(server PRIVATE ${COMPILE_OPTIONS_STD}) +target_include_directories(server PUBLIC + ${INCLUDE_ALL} + ) +add_library(client client.c) +target_compile_options(client PRIVATE ${COMPILE_OPTIONS_STD}) +target_include_directories(client PUBLIC + ${INCLUDE_ALL} + ) + add_executable(runtime runtime.c) target_compile_options(runtime PRIVATE ${COMPILE_OPTIONS_STD}) target_include_directories(runtime PUBLIC ${INCLUDE_ALL}) @@ -25,4 +36,6 @@ target_link_libraries(runtime init_globals validate_tx handle_tx - shell) \ No newline at end of file + shell + server + client) \ No newline at end of file diff --git a/src/runtime/client.c b/src/runtime/client.c new file mode 100644 index 0000000..bc5a62d --- /dev/null +++ b/src/runtime/client.c @@ -0,0 +1,97 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "client.h" + +void *get_in_addr2(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} + +char peers[][20] = {"localhost"}; +unsigned int num_peers = 1; // Match above + +void *client_thread(){ + for(unsigned int i = 0; i < num_peers; i++){ + // FOrk this process and connect! + if (!fork()) { // this is the child process + + int sockfd, numbytes; + char buf[MAXDATASIZE]; + struct addrinfo hints, *servinfo, *p; + int rv; + char s[INET6_ADDRSTRLEN]; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + if ((rv = getaddrinfo(peers[0], PORT, &hints, &servinfo)) != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); + return NULL; + } + + // loop through all the results and connect to the first we can + for(p = servinfo; p != NULL; p = p->ai_next) { + if ((sockfd = socket(p->ai_family, p->ai_socktype, + p->ai_protocol)) == -1) { + perror("client: socket"); + continue; + } + + if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) { + perror("client: connect"); + close(sockfd); + continue; + } + + break; + } + + if (p == NULL) { + fprintf(stderr, "client: failed to connect\n"); + return NULL; + } + + inet_ntop(p->ai_family, get_in_addr2((struct sockaddr *)p->ai_addr), + s, sizeof s); + printf("client: connecting to %s\n", s); + + freeaddrinfo(servinfo); // all done with this structure + + if ((numbytes = recv(sockfd, buf, MAXDATASIZE-1, 0)) == -1) { + perror("recv"); + exit(1); + } + + buf[numbytes] = '\0'; + + printf("client: received '%s'\n",buf); + + close(sockfd); + + + + exit(0); + } + } + + // Now pop off the queues whenever something is added! + while(1){ + // Add to Block and TX Queue + } + + return 0; +} \ No newline at end of file diff --git a/src/runtime/includes/client.h b/src/runtime/includes/client.h new file mode 100644 index 0000000..bd23595 --- /dev/null +++ b/src/runtime/includes/client.h @@ -0,0 +1,6 @@ +#define PORT "3490" // the port client will be connecting to + +#define MAXDATASIZE 100 // max number of bytes we can get at once + + +void *client_thread(); \ No newline at end of file diff --git a/src/runtime/includes/server.h b/src/runtime/includes/server.h new file mode 100644 index 0000000..8f74eb0 --- /dev/null +++ b/src/runtime/includes/server.h @@ -0,0 +1,13 @@ +#include + + +#define PORT "3490" // the port users will be connecting to + +#define BACKLOG 10 // how many pending connections queue will hold + +void sigchld_handler(int s); + +// get sockaddr, IPv4 or IPv6: +void *get_in_addr(struct sockaddr *sa); + +void *server_thread(); \ No newline at end of file diff --git a/src/runtime/runtime.c b/src/runtime/runtime.c index 8d86527..bdb401f 100644 --- a/src/runtime/runtime.c +++ b/src/runtime/runtime.c @@ -10,6 +10,8 @@ #include "runtime.h" #include "init_globals.h" #include "init_db.h" +#include "server.h" +#include "client.h" Globals *init_globals(){ Globals *new_globals = malloc(sizeof(Globals)); @@ -174,14 +176,14 @@ int main() { // Intitialize the globabls! node_init(PROD_DB_LOC); - pthread_t node_block, node_tx, shell, miner; - int node_block_ret, node_tx_ret, shell_ret, miner_ret; + pthread_t node_block, node_tx, shell, miner, server, client; + int node_block_ret, node_tx_ret, shell_ret, miner_ret, server_ret, client_ret; + /* Create independent threads each of which will execute function */ + server_ret = pthread_create( &server, NULL, server_thread, (void*) globals); + // Now Create the Client Thread + client_ret = pthread_create( &client, NULL, client_thread, (void*) globals); - // node_block_ret = pthread_create( &node_block, NULL, pop, (void*) globals->queue_block); - // sleep(1); - // node_tx_ret = pthread_create( &node_tx, NULL, add_and_delay, (void*) globals->queue_block); - node_block_ret = pthread_create( &node_block, NULL, node_block_thread, (void*) globals); node_tx_ret = pthread_create( &node_tx, NULL, node_tx_thread, (void*) globals); shell_ret = pthread_create( &shell, NULL, shell_thread, (void*) globals); @@ -192,6 +194,7 @@ int main() { /* the process and all threads before the threads have completed. */ pthread_join(shell, NULL); + pthread_join(server, NULL); pthread_join(node_block, NULL); pthread_join(node_tx, NULL); pthread_join(miner, NULL); @@ -200,5 +203,6 @@ int main() { printf("Node TX returns: %d\n", node_tx_ret); printf("Shell returns: %d\n", shell_ret); printf("Miner returns: %d\n", miner_ret); + printf("Server returns: %d\n", server_ret); return 0; } diff --git a/src/runtime/server.c b/src/runtime/server.c new file mode 100644 index 0000000..86bf1c8 --- /dev/null +++ b/src/runtime/server.c @@ -0,0 +1,128 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "server.h" + +void sigchld_handler(int s) +{ + (void)s; // quiet unused variable warning + + // waitpid() might overwrite errno, so we save and restore it: + int saved_errno = errno; + + while(waitpid(-1, NULL, WNOHANG) > 0); + + errno = saved_errno; +} + + +// get sockaddr, IPv4 or IPv6: +void *get_in_addr(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} + + +void *server_thread(){ + int sockfd, new_fd; // listen on sock_fd, new connection on new_fd + struct addrinfo hints, *servinfo, *p; + struct sockaddr_storage their_addr; // connector's address information + socklen_t sin_size; + struct sigaction sa; + int yes=1; + char s[INET6_ADDRSTRLEN]; + int rv; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; // use my IP + + if ((rv = getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); + return 1; + } + + // loop through all the results and bind to the first we can + for(p = servinfo; p != NULL; p = p->ai_next) { + if ((sockfd = socket(p->ai_family, p->ai_socktype, + p->ai_protocol)) == -1) { + perror("server: socket"); + continue; + } + + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, + sizeof(int)) == -1) { + perror("setsockopt"); + exit(1); + } + + if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { + close(sockfd); + perror("server: bind"); + continue; + } + + break; + } + + freeaddrinfo(servinfo); // all done with this structure + + if (p == NULL) { + fprintf(stderr, "server: failed to bind\n"); + exit(1); + } + + if (listen(sockfd, BACKLOG) == -1) { + perror("listen"); + exit(1); + } + + sa.sa_handler = sigchld_handler; // reap all dead processes + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_RESTART; + if (sigaction(SIGCHLD, &sa, NULL) == -1) { + perror("sigaction"); + exit(1); + } + + printf("server: waiting for connections...\n"); + + while(1) { // main accept() loop + sin_size = sizeof their_addr; + new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); + if (new_fd == -1) { + perror("accept"); + continue; + } + + inet_ntop(their_addr.ss_family, + get_in_addr((struct sockaddr *)&their_addr), + s, sizeof s); + printf("server: got connection from %s\n", s); + + if (!fork()) { // this is the child process + close(sockfd); // child doesn't need the listener + if (send(new_fd, "Hello, world!", 13, 0) == -1) + perror("send"); + close(new_fd); + exit(0); + } + close(new_fd); // parent doesn't need this + } + + return NULL; +} \ No newline at end of file From 33f22580a99fd060eb7e49d29891bf6e3d63731b Mon Sep 17 00:00:00 2001 From: Nathan Faber Date: Fri, 29 Apr 2022 21:39:09 -0400 Subject: [PATCH 02/17] working with just IP --- src/runtime/client.c | 101 ++++++++++++++++++++++------------ src/runtime/includes/server.h | 2 +- 2 files changed, 68 insertions(+), 35 deletions(-) diff --git a/src/runtime/client.c b/src/runtime/client.c index bc5a62d..defaa14 100644 --- a/src/runtime/client.c +++ b/src/runtime/client.c @@ -10,6 +10,7 @@ #include #include "client.h" +#include "server.h" void *get_in_addr2(struct sockaddr *sa) { @@ -20,56 +21,88 @@ void *get_in_addr2(struct sockaddr *sa) return &(((struct sockaddr_in6*)sa)->sin6_addr); } -char peers[][20] = {"localhost"}; +char peers[][20] = {"192.168.32.251"};//"ubuntu.local"};//"localhost", unsigned int num_peers = 1; // Match above void *client_thread(){ for(unsigned int i = 0; i < num_peers; i++){ // FOrk this process and connect! if (!fork()) { // this is the child process - int sockfd, numbytes; char buf[MAXDATASIZE]; - struct addrinfo hints, *servinfo, *p; - int rv; - char s[INET6_ADDRSTRLEN]; + struct sockaddr_in serv_addr; + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + printf("\n Socket creation error \n"); + continue; + } + + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(PORT_INT); + + // Convert IPv4 and IPv6 addresses from text to binary + // form + if (inet_pton(AF_INET, peers[i], &serv_addr.sin_addr) + <= 0) { + printf( + "\nInvalid address/ Address not supported \n"); + continue; + } + + if (connect(sockfd, (struct sockaddr*)&serv_addr, + sizeof(serv_addr)) + < 0) { + printf("\nConnection Failed \n"); + continue; + } - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; + - if ((rv = getaddrinfo(peers[0], PORT, &hints, &servinfo)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); - return NULL; - } - // loop through all the results and connect to the first we can - for(p = servinfo; p != NULL; p = p->ai_next) { - if ((sockfd = socket(p->ai_family, p->ai_socktype, - p->ai_protocol)) == -1) { - perror("client: socket"); - continue; - } - if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) { - perror("client: connect"); - close(sockfd); - continue; - } - break; - } - if (p == NULL) { - fprintf(stderr, "client: failed to connect\n"); - return NULL; - } - inet_ntop(p->ai_family, get_in_addr2((struct sockaddr *)p->ai_addr), - s, sizeof s); - printf("client: connecting to %s\n", s); + // // using Hostname + // struct addrinfo hints, *servinfo, *p; + // int rv; + // char s[INET6_ADDRSTRLEN]; + + // memset(&hints, 0, sizeof hints); + // hints.ai_family = AF_UNSPEC; + // hints.ai_socktype = SOCK_STREAM; + + // if ((rv = getaddrinfo(peers[i], PORT, &hints, &servinfo)) != 0) { + // fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); + // return NULL; + // } + + // // loop through all the results and connect to the first we can + // for(p = servinfo; p != NULL; p = p->ai_next) { + // if ((sockfd = socket(p->ai_family, p->ai_socktype, + // p->ai_protocol)) == -1) { + // perror("client: socket"); + // continue; + // } + + // if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) { + // perror("client: connect"); + // close(sockfd); + // continue; + // } + + // break; + // } + + // if (p == NULL) { + // fprintf(stderr, "client: failed to connect\n"); + // return 2; + // } + + // inet_ntop(p->ai_family, get_in_addr2((struct sockaddr *)p->ai_addr), + // s, sizeof s); + // printf("client: connecting to %s\n", s); - freeaddrinfo(servinfo); // all done with this structure + // freeaddrinfo(servinfo); // all done with this structure if ((numbytes = recv(sockfd, buf, MAXDATASIZE-1, 0)) == -1) { perror("recv"); diff --git a/src/runtime/includes/server.h b/src/runtime/includes/server.h index 8f74eb0..0a1ee4f 100644 --- a/src/runtime/includes/server.h +++ b/src/runtime/includes/server.h @@ -1,6 +1,6 @@ #include - +#define PORT_INT 3490 #define PORT "3490" // the port users will be connecting to #define BACKLOG 10 // how many pending connections queue will hold From 536c7087d865214d6429010092f63b670afee0d2 Mon Sep 17 00:00:00 2001 From: Nathan Faber Date: Mon, 2 May 2022 11:13:44 -0400 Subject: [PATCH 03/17] networking and queues --- src/includes/utils/queue.h | 2 - src/runtime/CMakeLists.txt | 22 ++++++++- src/runtime/client.c | 89 ++++++++++++++++++++++++++++------ src/runtime/includes/runtime.h | 5 ++ src/runtime/queue_client.c | 88 +++++++++++++++++++++++++++++++++ src/runtime/queue_server.c | 70 ++++++++++++++++++++++++++ src/runtime/runtime.c | 48 +++++++++++++++--- src/runtime/server.c | 51 +++++++++++++++++-- 8 files changed, 344 insertions(+), 31 deletions(-) create mode 100644 src/runtime/queue_client.c create mode 100644 src/runtime/queue_server.c diff --git a/src/includes/utils/queue.h b/src/includes/utils/queue.h index b6f3c58..a8a72c1 100644 --- a/src/includes/utils/queue.h +++ b/src/includes/utils/queue.h @@ -3,8 +3,6 @@ #include #include #include -#include "base_tx.h" -#include "base_block.h" typedef struct QueueItem{ void *item; diff --git a/src/runtime/CMakeLists.txt b/src/runtime/CMakeLists.txt index 3af9581..2cf1b66 100644 --- a/src/runtime/CMakeLists.txt +++ b/src/runtime/CMakeLists.txt @@ -20,12 +20,20 @@ target_compile_options(server PRIVATE ${COMPILE_OPTIONS_STD}) target_include_directories(server PUBLIC ${INCLUDE_ALL} ) +target_link_libraries(server + core_block + rt +) + add_library(client client.c) target_compile_options(client PRIVATE ${COMPILE_OPTIONS_STD}) target_include_directories(client PUBLIC ${INCLUDE_ALL} ) - +target_link_libraries(client + core_block + rt +) add_executable(runtime runtime.c) target_compile_options(runtime PRIVATE ${COMPILE_OPTIONS_STD}) target_include_directories(runtime PUBLIC ${INCLUDE_ALL}) @@ -38,4 +46,14 @@ target_link_libraries(runtime handle_tx shell server - client) \ No newline at end of file + client) + +add_executable(q_server queue_server.c) +target_compile_options(q_server PRIVATE ${COMPILE_OPTIONS_STD}) +target_include_directories(q_server PUBLIC ${INCLUDE_ALL}) +target_link_libraries(q_server rt) + +add_executable(q_client queue_client.c) +target_compile_options(q_client PRIVATE ${COMPILE_OPTIONS_STD}) +target_include_directories(q_client PUBLIC ${INCLUDE_ALL}) +target_link_libraries(q_client rt) \ No newline at end of file diff --git a/src/runtime/client.c b/src/runtime/client.c index defaa14..0ddee65 100644 --- a/src/runtime/client.c +++ b/src/runtime/client.c @@ -11,6 +11,14 @@ #include #include "client.h" #include "server.h" +#include "runtime.h" +#include "ser_block.h" +#include "ser_tx.h" + +#define QUEUE_PERMISSIONS 0660 +#define MAX_MESSAGES 10 +#define MAX_MSG_SIZE 256 +#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10 void *get_in_addr2(struct sockaddr *sa) { @@ -24,7 +32,8 @@ void *get_in_addr2(struct sockaddr *sa) char peers[][20] = {"192.168.32.251"};//"ubuntu.local"};//"localhost", unsigned int num_peers = 1; // Match above -void *client_thread(){ +void *client_thread(void *arg){ + Globals *globals = arg; for(unsigned int i = 0; i < num_peers; i++){ // FOrk this process and connect! if (!fork()) { // this is the child process @@ -56,12 +65,6 @@ void *client_thread(){ } - - - - - - // // using Hostname // struct addrinfo hints, *servinfo, *p; // int rv; @@ -104,26 +107,82 @@ void *client_thread(){ // freeaddrinfo(servinfo); // all done with this structure - if ((numbytes = recv(sockfd, buf, MAXDATASIZE-1, 0)) == -1) { + + struct mq_attr attr; + + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + mqd_t incoming; + if ((incoming = mq_open (globals->q_client, O_WRONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + perror ("Server: mq_open (server)"); + exit (1); + } + printf("Opened Incoming Queue for writing, now waiting for incoming data to socket"); + while(1){ + // Get data over the socket + if ((numbytes = recv(sockfd, buf, MAXDATASIZE-1, 0)) == -1) { perror("recv"); exit(1); - } + } - buf[numbytes] = '\0'; - - printf("client: received '%s'\n",buf); - - close(sockfd); + // Add the data to the queue + //char *test = "recieved fake_block"; + if (mq_send (incoming, buf, numbytes, 0) == -1) { + perror ("Client: Not able to send message to server"); + } + buf[numbytes] = '\0'; + printf("client: received '%s'\n",buf); + } + close(sockfd); exit(0); } } - + + //INCOMING PARENT + struct mq_attr attr; + + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + mqd_t incoming_parent; + if ((incoming_parent = mq_open (globals->q_client, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + perror ("Server: mq_open (server)"); + exit (1); + } // Now pop off the queues whenever something is added! while(1){ // Add to Block and TX Queue + + char in_buffer [MSG_BUFFER_SIZE]; + unsigned int priority; + if (mq_receive (incoming_parent, in_buffer, MSG_BUFFER_SIZE, &priority) == -1) { + perror ("Server: mq_receive"); + exit (1); + } + // Block + if(priority == 1){ + // Dummy text data + printf("Incoming data to client parent: %s", in_buffer); + continue; + Block *new_block = deser_block_alloc(NULL, (unsigned char *)in_buffer); + queue_add_void(globals->queue_block, new_block); + } + //TX + else if (priority == 2) + { + Transaction *new_tx = deser_tx_alloc(NULL, (unsigned char *)in_buffer); + queue_add_void(globals->queue_tx, new_tx); + } + //Garbage + else{ + + } } return 0; diff --git a/src/runtime/includes/runtime.h b/src/runtime/includes/runtime.h index d3a9cfc..2884084 100644 --- a/src/runtime/includes/runtime.h +++ b/src/runtime/includes/runtime.h @@ -2,6 +2,7 @@ #include "queue.h" #include "pthread.h" #include "semaphore.h" +#include typedef struct Globals { Queue *queue_block; @@ -14,4 +15,8 @@ typedef struct Globals { pthread_mutex_t mempool_lock; int *miner_update; pthread_mutex_t miner_update_lock; + int connected; + char **q_server_individual; // send the same thing out to multiple nodes + mqd_t q_server; // Main queu that gets distributed to all the individual queues + char *q_client; // Only one queue from all the incoming blocks } Globals; \ No newline at end of file diff --git a/src/runtime/queue_client.c b/src/runtime/queue_client.c new file mode 100644 index 0000000..ca8d0f5 --- /dev/null +++ b/src/runtime/queue_client.c @@ -0,0 +1,88 @@ +/* + * client.c: Client program + * to demonstrate interprocess communication + * with POSIX message queues + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#define SERVER_QUEUE_NAME "/sp-example-server" +#define CLIENT_QUEUE_NAME "/sp-example-client" +#define QUEUE_PERMISSIONS 0660 +#define MAX_MESSAGES 10 +#define MAX_MSG_SIZE 256 +#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10 + +int main (int argc, char **argv) +{ + //char client_queue_name [64]; + mqd_t qd_server, qd_client; // queue descriptors + + + // create the client queue for receiving messages from server + // sprintf (client_queue_name, "/sp-example-client-%d", getpid ()); + + struct mq_attr attr; + + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + + if ((qd_client = mq_open (CLIENT_QUEUE_NAME, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + perror ("Client: mq_open (client)"); + exit (1); + } + + if ((qd_server = mq_open (SERVER_QUEUE_NAME, O_WRONLY)) == -1) { + perror ("Client: mq_open (server)"); + exit (1); + } + + char in_buffer [MSG_BUFFER_SIZE]; + + printf ("Ask for a token (Press ): "); + + char temp_buf [10]; + + while (fgets (temp_buf, 2, stdin)) { + char *test = "test"; + // send message to server + if (mq_send (qd_server, test, strlen (test) + 1, 0) == -1) { + perror ("Client: Not able to send message to server"); + continue; + } + + // receive response from server + + if (mq_receive (qd_client, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) { + perror ("Client: mq_receive"); + exit (1); + } + // display token received from server + printf ("Client: Token received from server: %s\n\n", in_buffer); + + printf ("Ask for a token (Press ): "); + } + + + if (mq_close (qd_client) == -1) { + perror ("Client: mq_close"); + exit (1); + } + + if (mq_unlink (CLIENT_QUEUE_NAME) == -1) { + perror ("Client: mq_unlink"); + exit (1); + } + printf ("Client: bye\n"); + + exit (0); +} diff --git a/src/runtime/queue_server.c b/src/runtime/queue_server.c new file mode 100644 index 0000000..5377a94 --- /dev/null +++ b/src/runtime/queue_server.c @@ -0,0 +1,70 @@ +/* + * server.c: Server program + * to demonstrate interprocess commnuication + * with POSIX message queues + */ + +#include +#include +#include +#include + +#include +#include +#include + +#define SERVER_QUEUE_NAME "/sp-example-server" +#define CLIENT_QUEUE_NAME "/sp-example-client" +#define QUEUE_PERMISSIONS 0660 +#define MAX_MESSAGES 10 +#define MAX_MSG_SIZE 256 +#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10 + +int main (int argc, char **argv) +{ + mqd_t qd_server, qd_client; // queue descriptors + long token_number = 1; // next token to be given to client + + printf ("Server: Hello, World!\n"); + + struct mq_attr attr; + + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + + if ((qd_server = mq_open (SERVER_QUEUE_NAME, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + perror ("Server: mq_open (server)"); + exit (1); + } + char in_buffer [MSG_BUFFER_SIZE]; + char out_buffer [MSG_BUFFER_SIZE]; + + while (1) { + // get the oldest message with highest priority + if (mq_receive (qd_server, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) { + perror ("Server: mq_receive"); + exit (1); + } + + printf ("Server: message received.\n"); + + // send reply message to client + + if ((qd_client = mq_open (CLIENT_QUEUE_NAME, O_WRONLY)) == 1) { + perror ("Server: Not able to open client queue"); + continue; + } + + sprintf (out_buffer, "%ld", token_number); + + if (mq_send (qd_client, out_buffer, strlen (out_buffer) + 1, 0) == -1) { + perror ("Server: Not able to send message to client"); + continue; + } + + printf ("Server: response sent to client.\n"); + token_number++; + } +} \ No newline at end of file diff --git a/src/runtime/runtime.c b/src/runtime/runtime.c index bdb401f..5d958cd 100644 --- a/src/runtime/runtime.c +++ b/src/runtime/runtime.c @@ -13,6 +13,8 @@ #include "server.h" #include "client.h" +#define MAX_INCOMING_CONNECTIONS 20 + Globals *init_globals(){ Globals *new_globals = malloc(sizeof(Globals)); new_globals->queue_block = queue_init(); @@ -46,6 +48,10 @@ Globals *init_globals(){ new_globals->miner_update = malloc(sizeof(int)); *new_globals->miner_update = 1; + new_globals->q_server_individual = malloc(MAX_INCOMING_CONNECTIONS * sizeof(char *)); + new_globals->connected = 0; + new_globals->q_client = malloc(32); + strcpy(new_globals->q_client, "/client_incoming"); return new_globals; } @@ -89,11 +95,11 @@ void *node_block_thread(void *arg){ void *node_tx_thread(void *arg){ Globals *globals = arg; while(1){ - printf("NodeTX Thread Waiting to pop TX\n"); + //printf("NodeTX Thread Waiting to pop TX\n"); Transaction *popped_tx = queue_pop_void(globals->queue_tx); - printf("NodeTX Thread TX Popped\n"); + //printf("NodeTX Thread TX Popped\n"); //Now aquire locks for validation (Utxo pool and blockcahin) - printf("NodeTX Thread Waiting on lock for validation\n"); + //printf("NodeTX Thread Waiting on lock for validation\n"); pthread_mutex_lock(&globals->utxo_pool_lock); @@ -111,30 +117,54 @@ void *node_tx_thread(void *arg){ pthread_mutex_unlock(&globals->mempool_lock); pthread_mutex_unlock(&globals->utxo_to_tx_lock); pthread_mutex_unlock(&globals->utxo_pool_lock); - printf("NodeTX Thread unlocked from validation\n"); + //printf("NodeTX Thread unlocked from validation\n"); } return NULL; } + +void add_to_outgoing(Globals *globals){ + printf("FOund a block and adding outgoing data"); + for(int i = 0; i < globals->connected; i++){ + char *q_name = globals->q_server_individual[i]; + mqd_t child_mq; + if ((child_mq = mq_open (q_name, O_WRONLY)) == -1) { + perror ("Server: mq_open (server)"); + exit (1); + } + // send message to server + char *test = "fake_block"; + if (mq_send (child_mq, test, strlen (test) + 1, 1) == -1) { + perror ("Client: Not able to send message to server"); + } + if (mq_close (child_mq) == -1) { + perror ("Client: mq_close"); + exit (1); + } + + + } +} + void *miner_thread(void *arg){ Globals *globals = arg; unsigned long hash_check_flag = 10000; int new_block_in_chain = 1; //1 is false 0 is true while(1){ - printf("Miner Thread waiting on lock to create Block\n"); + //printf("Miner Thread waiting on lock to create Block\n"); //Now aquire locks for creating a block! pthread_mutex_lock(&globals->utxo_pool_lock); pthread_mutex_lock(&globals->blockchain_lock); pthread_mutex_lock(&globals->key_pool_lock) ; pthread_mutex_lock(&globals->mempool_lock); - printf("Miner Thread got lock to create Block\n"); + //printf("Miner Thread got lock to create Block\n"); Block *new_block = create_block_alloc(); pthread_mutex_unlock(&globals->utxo_pool_lock); pthread_mutex_unlock(&globals->blockchain_lock); pthread_mutex_unlock(&globals->key_pool_lock) ; pthread_mutex_unlock(&globals->mempool_lock); - printf("Miner Thread done creating Block\n"); + //printf("Miner Thread done creating Block\n"); while(try_header_hash(&(new_block->header)) != 0){ change_nonce(new_block); @@ -158,10 +188,13 @@ void *miner_thread(void *arg){ print_block(new_block,""); printf("Miner mined a block Block\n"); queue_add_void(globals->queue_block, new_block); + // Serialize block and send it to our peers + add_to_outgoing(globals); } return NULL; } + void *shell_thread(void *arg){ Globals *globals = arg; shell_init(); @@ -181,6 +214,7 @@ int main() { /* Create independent threads each of which will execute function */ server_ret = pthread_create( &server, NULL, server_thread, (void*) globals); + sleep(5); // Now Create the Client Thread client_ret = pthread_create( &client, NULL, client_thread, (void*) globals); diff --git a/src/runtime/server.c b/src/runtime/server.c index 86bf1c8..95d3590 100644 --- a/src/runtime/server.c +++ b/src/runtime/server.c @@ -11,6 +11,13 @@ #include #include #include "server.h" +#include "mqueue.h" +#include "runtime.h" + +#define QUEUE_PERMISSIONS 0660 +#define MAX_MESSAGES 10 +#define MAX_MSG_SIZE 256 +#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10 void sigchld_handler(int s) { @@ -36,7 +43,8 @@ void *get_in_addr(struct sockaddr *sa) } -void *server_thread(){ +void *server_thread(void *arg){ + Globals *globals = arg; int sockfd, new_fd; // listen on sock_fd, new connection on new_fd struct addrinfo hints, *servinfo, *p; struct sockaddr_storage their_addr; // connector's address information @@ -53,7 +61,7 @@ void *server_thread(){ if ((rv = getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); - return 1; + exit(1); } // loop through all the results and bind to the first we can @@ -114,10 +122,42 @@ void *server_thread(){ s, sizeof s); printf("server: got connection from %s\n", s); + char *q_name = "/outgoing_"; + char con_num = '0' + globals->connected; + globals->q_server_individual[globals->connected] = calloc(sizeof(char), strlen(q_name)+2); //digits etc + strncpy(globals->q_server_individual[globals->connected], q_name, strlen(q_name)+1); + strncat(globals->q_server_individual[globals->connected], &con_num, 1); + globals->connected++; //TODO MAKE THIS LESS than 10 peers + if (!fork()) { // this is the child process close(sockfd); // child doesn't need the listener - if (send(new_fd, "Hello, world!", 13, 0) == -1) - perror("send"); + mqd_t child_mq; + struct mq_attr attr; + + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + char in_buffer [MSG_BUFFER_SIZE]; + printf("Opening Queue from server child: %s", globals->q_server_individual[globals->connected]); + if ((child_mq = mq_open (globals->q_server_individual[globals->connected], O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + perror ("Server: mq_open (server)"); + exit (1); + } + while (1) { + if (mq_receive(child_mq, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) { + perror ("Client: mq_receive"); + exit (1); + } + + // Send the queue data over a socket + if (send(new_fd, in_buffer, MSG_BUFFER_SIZE, 0) == -1) + perror("send"); + } + if (mq_close(child_mq) == -1) { + perror ("Client: mq_close"); + exit (1); + } close(new_fd); exit(0); } @@ -125,4 +165,5 @@ void *server_thread(){ } return NULL; -} \ No newline at end of file +} + From 765be9df01495cf832d5a6f410d5a52e9600c55a Mon Sep 17 00:00:00 2001 From: Nathan Faber Date: Mon, 2 May 2022 11:24:34 -0400 Subject: [PATCH 04/17] small changes --- src/runtime/client.c | 3 ++- src/runtime/includes/server.h | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/runtime/client.c b/src/runtime/client.c index 0ddee65..ddc473c 100644 --- a/src/runtime/client.c +++ b/src/runtime/client.c @@ -134,7 +134,8 @@ void *client_thread(void *arg){ } buf[numbytes] = '\0'; - printf("client: received '%s'\n",buf); + printf("client: received %i bytes '%s'\n",numbytes, buf); + sleep(1); } diff --git a/src/runtime/includes/server.h b/src/runtime/includes/server.h index 0a1ee4f..036e259 100644 --- a/src/runtime/includes/server.h +++ b/src/runtime/includes/server.h @@ -1,7 +1,7 @@ #include -#define PORT_INT 3490 -#define PORT "3490" // the port users will be connecting to +#define PORT_INT 3590 +#define PORT "3590" // the port users will be connecting to #define BACKLOG 10 // how many pending connections queue will hold From dde3f5ff95d14b795c31de5019a10b3f16f5606c Mon Sep 17 00:00:00 2001 From: Nathan Faber Date: Mon, 2 May 2022 11:26:58 -0400 Subject: [PATCH 05/17] port change --- src/runtime/includes/client.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/includes/client.h b/src/runtime/includes/client.h index bd23595..a75163c 100644 --- a/src/runtime/includes/client.h +++ b/src/runtime/includes/client.h @@ -1,4 +1,4 @@ -#define PORT "3490" // the port client will be connecting to +#define PORT "3590" // the port client will be connecting to #define MAXDATASIZE 100 // max number of bytes we can get at once From 808ac5e9df75ecc2171ffed8a742bfcc8b31b013 Mon Sep 17 00:00:00 2001 From: Nathan Faber Date: Mon, 2 May 2022 11:46:38 -0400 Subject: [PATCH 06/17] no message queues --- src/runtime/client.c | 70 +++++++++++++++++------------------ src/runtime/includes/client.h | 2 - src/runtime/includes/server.h | 4 +- src/runtime/runtime.c | 2 +- src/runtime/server.c | 31 ++++++++-------- 5 files changed, 54 insertions(+), 55 deletions(-) diff --git a/src/runtime/client.c b/src/runtime/client.c index ddc473c..3154c7b 100644 --- a/src/runtime/client.c +++ b/src/runtime/client.c @@ -115,10 +115,10 @@ void *client_thread(void *arg){ attr.mq_msgsize = MAX_MSG_SIZE; attr.mq_curmsgs = 0; mqd_t incoming; - if ((incoming = mq_open (globals->q_client, O_WRONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { - perror ("Server: mq_open (server)"); - exit (1); - } + // if ((incoming = mq_open (globals->q_client, O_WRONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + // perror ("Server: mq_open (server)"); + // exit (1); + // } printf("Opened Incoming Queue for writing, now waiting for incoming data to socket"); while(1){ // Get data over the socket @@ -129,9 +129,9 @@ void *client_thread(void *arg){ // Add the data to the queue //char *test = "recieved fake_block"; - if (mq_send (incoming, buf, numbytes, 0) == -1) { - perror ("Client: Not able to send message to server"); - } + // if (mq_send (incoming, buf, numbytes, 0) == -1) { + // perror ("Client: Not able to send message to server"); + // } buf[numbytes] = '\0'; printf("client: received %i bytes '%s'\n",numbytes, buf); @@ -152,38 +152,38 @@ void *client_thread(void *arg){ attr.mq_msgsize = MAX_MSG_SIZE; attr.mq_curmsgs = 0; mqd_t incoming_parent; - if ((incoming_parent = mq_open (globals->q_client, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { - perror ("Server: mq_open (server)"); - exit (1); - } + // if ((incoming_parent = mq_open (globals->q_client, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + // perror ("Server: mq_open (server)"); + // exit (1); + // } // Now pop off the queues whenever something is added! while(1){ // Add to Block and TX Queue - char in_buffer [MSG_BUFFER_SIZE]; - unsigned int priority; - if (mq_receive (incoming_parent, in_buffer, MSG_BUFFER_SIZE, &priority) == -1) { - perror ("Server: mq_receive"); - exit (1); - } - // Block - if(priority == 1){ - // Dummy text data - printf("Incoming data to client parent: %s", in_buffer); - continue; - Block *new_block = deser_block_alloc(NULL, (unsigned char *)in_buffer); - queue_add_void(globals->queue_block, new_block); - } - //TX - else if (priority == 2) - { - Transaction *new_tx = deser_tx_alloc(NULL, (unsigned char *)in_buffer); - queue_add_void(globals->queue_tx, new_tx); - } - //Garbage - else{ - - } + // char in_buffer [MSG_BUFFER_SIZE]; + // unsigned int priority; + // if (mq_receive (incoming_parent, in_buffer, MSG_BUFFER_SIZE, &priority) == -1) { + // perror ("Server: mq_receive"); + // exit (1); + // } + // // Block + // if(priority == 1){ + // // Dummy text data + // printf("Incoming data to client parent: %s", in_buffer); + // continue; + // Block *new_block = deser_block_alloc(NULL, (unsigned char *)in_buffer); + // queue_add_void(globals->queue_block, new_block); + // } + // //TX + // else if (priority == 2) + // { + // Transaction *new_tx = deser_tx_alloc(NULL, (unsigned char *)in_buffer); + // queue_add_void(globals->queue_tx, new_tx); + // } + // //Garbage + // else{ + + // } } return 0; diff --git a/src/runtime/includes/client.h b/src/runtime/includes/client.h index a75163c..da52a03 100644 --- a/src/runtime/includes/client.h +++ b/src/runtime/includes/client.h @@ -1,5 +1,3 @@ -#define PORT "3590" // the port client will be connecting to - #define MAXDATASIZE 100 // max number of bytes we can get at once diff --git a/src/runtime/includes/server.h b/src/runtime/includes/server.h index 036e259..19dc993 100644 --- a/src/runtime/includes/server.h +++ b/src/runtime/includes/server.h @@ -1,7 +1,7 @@ #include -#define PORT_INT 3590 -#define PORT "3590" // the port users will be connecting to +#define PORT_INT 3690 +#define PORT "3690" // the port users will be connecting to #define BACKLOG 10 // how many pending connections queue will hold diff --git a/src/runtime/runtime.c b/src/runtime/runtime.c index 5d958cd..0da130e 100644 --- a/src/runtime/runtime.c +++ b/src/runtime/runtime.c @@ -13,7 +13,7 @@ #include "server.h" #include "client.h" -#define MAX_INCOMING_CONNECTIONS 20 +#define MAX_INCOMING_CONNECTIONS 10 Globals *init_globals(){ Globals *new_globals = malloc(sizeof(Globals)); diff --git a/src/runtime/server.c b/src/runtime/server.c index 95d3590..d1cb59b 100644 --- a/src/runtime/server.c +++ b/src/runtime/server.c @@ -139,25 +139,26 @@ void *server_thread(void *arg){ attr.mq_msgsize = MAX_MSG_SIZE; attr.mq_curmsgs = 0; char in_buffer [MSG_BUFFER_SIZE]; - printf("Opening Queue from server child: %s", globals->q_server_individual[globals->connected]); - if ((child_mq = mq_open (globals->q_server_individual[globals->connected], O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { - perror ("Server: mq_open (server)"); - exit (1); - } + // printf("Opening Queue from server child: %s", globals->q_server_individual[globals->connected]); + // if ((child_mq = mq_open (globals->q_server_individual[globals->connected], O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + // perror ("Server: mq_open (server)"); + // exit (1); + // } while (1) { - if (mq_receive(child_mq, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) { - perror ("Client: mq_receive"); - exit (1); - } - + sleep(5); + // if (mq_receive(child_mq, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) { + // perror ("Client: mq_receive"); + // exit (1); + // } + char *test = "dummy_data_from_server"; // Send the queue data over a socket - if (send(new_fd, in_buffer, MSG_BUFFER_SIZE, 0) == -1) + if (send(new_fd, test, strlen(test)+1, 0) == -1) perror("send"); } - if (mq_close(child_mq) == -1) { - perror ("Client: mq_close"); - exit (1); - } + // if (mq_close(child_mq) == -1) { + // perror ("Client: mq_close"); + // exit (1); + // } close(new_fd); exit(0); } From 1c0498dde72bb57fa65b178caef43b90bdf3c3c5 Mon Sep 17 00:00:00 2001 From: Nathan Faber Date: Mon, 2 May 2022 12:46:22 -0400 Subject: [PATCH 07/17] this is hard --- src/runtime/runtime.c | 2 +- src/runtime/server.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/runtime/runtime.c b/src/runtime/runtime.c index 0da130e..6d1dfff 100644 --- a/src/runtime/runtime.c +++ b/src/runtime/runtime.c @@ -189,7 +189,7 @@ void *miner_thread(void *arg){ printf("Miner mined a block Block\n"); queue_add_void(globals->queue_block, new_block); // Serialize block and send it to our peers - add_to_outgoing(globals); + //add_to_outgoing(globals); } return NULL; } diff --git a/src/runtime/server.c b/src/runtime/server.c index d1cb59b..3e601fd 100644 --- a/src/runtime/server.c +++ b/src/runtime/server.c @@ -139,7 +139,7 @@ void *server_thread(void *arg){ attr.mq_msgsize = MAX_MSG_SIZE; attr.mq_curmsgs = 0; char in_buffer [MSG_BUFFER_SIZE]; - // printf("Opening Queue from server child: %s", globals->q_server_individual[globals->connected]); + printf("Opening Queue from server child: %s", globals->q_server_individual[globals->connected]); // if ((child_mq = mq_open (globals->q_server_individual[globals->connected], O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { // perror ("Server: mq_open (server)"); // exit (1); @@ -150,7 +150,7 @@ void *server_thread(void *arg){ // perror ("Client: mq_receive"); // exit (1); // } - char *test = "dummy_data_from_server"; + char *test = "dummy_data_from_server 1"; // Send the queue data over a socket if (send(new_fd, test, strlen(test)+1, 0) == -1) perror("send"); From 301eb437dcf7dbfbe103070bb8c3fad70f194edc Mon Sep 17 00:00:00 2001 From: Nathan Faber Date: Mon, 2 May 2022 14:29:28 -0400 Subject: [PATCH 08/17] trying to not have orphan processes --- src/runtime/client.c | 11 +++++++++-- src/runtime/server.c | 10 ++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/runtime/client.c b/src/runtime/client.c index 3154c7b..128c08e 100644 --- a/src/runtime/client.c +++ b/src/runtime/client.c @@ -36,7 +36,8 @@ void *client_thread(void *arg){ Globals *globals = arg; for(unsigned int i = 0; i < num_peers; i++){ // FOrk this process and connect! - if (!fork()) { // this is the child process + pid_t pid = fork(); + if (!pid) { // this is the child process int sockfd, numbytes; char buf[MAXDATASIZE]; struct sockaddr_in serv_addr; @@ -120,7 +121,12 @@ void *client_thread(void *arg){ // exit (1); // } printf("Opened Incoming Queue for writing, now waiting for incoming data to socket"); - while(1){ + unsigned long counter = 0; + while(pid != 1){ + // Check to see if parent killed + if(counter % 10000){ + pid = getpid(); + } // Get data over the socket if ((numbytes = recv(sockfd, buf, MAXDATASIZE-1, 0)) == -1) { perror("recv"); @@ -137,6 +143,7 @@ void *client_thread(void *arg){ printf("client: received %i bytes '%s'\n",numbytes, buf); sleep(1); + counter++; } close(sockfd); diff --git a/src/runtime/server.c b/src/runtime/server.c index 3e601fd..5df4a6b 100644 --- a/src/runtime/server.c +++ b/src/runtime/server.c @@ -129,7 +129,8 @@ void *server_thread(void *arg){ strncat(globals->q_server_individual[globals->connected], &con_num, 1); globals->connected++; //TODO MAKE THIS LESS than 10 peers - if (!fork()) { // this is the child process + pid_t pid = fork(); + if (!pid) { // this is the child process close(sockfd); // child doesn't need the listener mqd_t child_mq; struct mq_attr attr; @@ -144,7 +145,12 @@ void *server_thread(void *arg){ // perror ("Server: mq_open (server)"); // exit (1); // } - while (1) { + unsigned long counter = 0; + while(pid != 1){ + // Check to see if parent killed + if(counter % 10000){ + pid = getpid(); + } sleep(5); // if (mq_receive(child_mq, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) { // perror ("Client: mq_receive"); From d85ae5e6613383865431f41dae77478e0e1cd3e7 Mon Sep 17 00:00:00 2001 From: Nathan Faber Date: Mon, 2 May 2022 15:17:30 -0400 Subject: [PATCH 09/17] other small changes --- src/runtime/client.c | 5 ++++- src/runtime/runtime.c | 8 ++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/runtime/client.c b/src/runtime/client.c index 128c08e..39001e3 100644 --- a/src/runtime/client.c +++ b/src/runtime/client.c @@ -132,7 +132,10 @@ void *client_thread(void *arg){ perror("recv"); exit(1); } - + if(numbytes == 0 ){ + // Connection has been closed from the other side (by the socket server) + break; + } // Add the data to the queue //char *test = "recieved fake_block"; // if (mq_send (incoming, buf, numbytes, 0) == -1) { diff --git a/src/runtime/runtime.c b/src/runtime/runtime.c index 6d1dfff..7647359 100644 --- a/src/runtime/runtime.c +++ b/src/runtime/runtime.c @@ -228,10 +228,10 @@ int main() { /* the process and all threads before the threads have completed. */ pthread_join(shell, NULL); - pthread_join(server, NULL); - pthread_join(node_block, NULL); - pthread_join(node_tx, NULL); - pthread_join(miner, NULL); + // pthread_join(server, NULL); + // pthread_join(node_block, NULL); + // pthread_join(node_tx, NULL); + // pthread_join(miner, NULL); printf("Node Block returns: %d\n",node_block_ret); printf("Node TX returns: %d\n", node_tx_ret); From 094191195a5e14cf24f375d6a48fb8d22cbb39a5 Mon Sep 17 00:00:00 2001 From: Nathan Faber Date: Mon, 2 May 2022 15:24:19 -0400 Subject: [PATCH 10/17] dangling process caught --- src/runtime/client.c | 6 +++--- src/runtime/server.c | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/runtime/client.c b/src/runtime/client.c index 39001e3..73491ab 100644 --- a/src/runtime/client.c +++ b/src/runtime/client.c @@ -43,7 +43,7 @@ void *client_thread(void *arg){ struct sockaddr_in serv_addr; if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { printf("\n Socket creation error \n"); - continue; + exit(0); } serv_addr.sin_family = AF_INET; @@ -55,14 +55,14 @@ void *client_thread(void *arg){ <= 0) { printf( "\nInvalid address/ Address not supported \n"); - continue; + exit(0); } if (connect(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) { printf("\nConnection Failed \n"); - continue; + exit(0); } diff --git a/src/runtime/server.c b/src/runtime/server.c index 5df4a6b..d5d2755 100644 --- a/src/runtime/server.c +++ b/src/runtime/server.c @@ -147,6 +147,7 @@ void *server_thread(void *arg){ // } unsigned long counter = 0; while(pid != 1){ + printf("stuff\n"); // Check to see if parent killed if(counter % 10000){ pid = getpid(); From f4eb7c983e081a88d5614eb043b19b053f493c05 Mon Sep 17 00:00:00 2001 From: eito-fis Date: Tue, 3 May 2022 16:16:49 -0400 Subject: [PATCH 11/17] FIX: Move global increment to after fork --- src/runtime/client.c | 86 +++++++++++++++------------------- src/runtime/includes/runtime.h | 4 +- src/runtime/runtime.c | 6 +-- src/runtime/server.c | 37 ++++++++------- 4 files changed, 65 insertions(+), 68 deletions(-) diff --git a/src/runtime/client.c b/src/runtime/client.c index 73491ab..6dc3fde 100644 --- a/src/runtime/client.c +++ b/src/runtime/client.c @@ -29,7 +29,7 @@ void *get_in_addr2(struct sockaddr *sa) return &(((struct sockaddr_in6*)sa)->sin6_addr); } -char peers[][20] = {"192.168.32.251"};//"ubuntu.local"};//"localhost", +char peers[][20] = {"192.168.32.251"};//"ubuntu.local"};//"localhost", unsigned int num_peers = 1; // Match above void *client_thread(void *arg){ @@ -38,17 +38,17 @@ void *client_thread(void *arg){ // FOrk this process and connect! pid_t pid = fork(); if (!pid) { // this is the child process - int sockfd, numbytes; + int sockfd, numbytes; char buf[MAXDATASIZE]; struct sockaddr_in serv_addr; if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { printf("\n Socket creation error \n"); exit(0); } - + serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(PORT_INT); - + // Convert IPv4 and IPv6 addresses from text to binary // form if (inet_pton(AF_INET, peers[i], &serv_addr.sin_addr) @@ -57,7 +57,7 @@ void *client_thread(void *arg){ "\nInvalid address/ Address not supported \n"); exit(0); } - + if (connect(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) { @@ -65,7 +65,7 @@ void *client_thread(void *arg){ exit(0); } - + // // using Hostname // struct addrinfo hints, *servinfo, *p; // int rv; @@ -108,7 +108,7 @@ void *client_thread(void *arg){ // freeaddrinfo(servinfo); // all done with this structure - + struct mq_attr attr; attr.mq_flags = 0; @@ -116,10 +116,10 @@ void *client_thread(void *arg){ attr.mq_msgsize = MAX_MSG_SIZE; attr.mq_curmsgs = 0; mqd_t incoming; - // if ((incoming = mq_open (globals->q_client, O_WRONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { - // perror ("Server: mq_open (server)"); - // exit (1); - // } + if ((incoming = mq_open (globals->q_client, O_WRONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + perror ("Server: mq_open (server)"); + exit (1); + } printf("Opened Incoming Queue for writing, now waiting for incoming data to socket"); unsigned long counter = 0; while(pid != 1){ @@ -131,19 +131,18 @@ void *client_thread(void *arg){ if ((numbytes = recv(sockfd, buf, MAXDATASIZE-1, 0)) == -1) { perror("recv"); exit(1); - } - if(numbytes == 0 ){ + } if(numbytes == 0 ){ // Connection has been closed from the other side (by the socket server) break; } // Add the data to the queue //char *test = "recieved fake_block"; - // if (mq_send (incoming, buf, numbytes, 0) == -1) { - // perror ("Client: Not able to send message to server"); - // } + if (mq_send(incoming, buf, numbytes, 0) == -1) { + perror ("Client: Not able to send message to server"); + } buf[numbytes] = '\0'; - printf("client: received %i bytes '%s'\n",numbytes, buf); + printf("client: received %i bytes '%s'\n", numbytes, buf); sleep(1); counter++; @@ -162,39 +161,32 @@ void *client_thread(void *arg){ attr.mq_msgsize = MAX_MSG_SIZE; attr.mq_curmsgs = 0; mqd_t incoming_parent; - // if ((incoming_parent = mq_open (globals->q_client, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { - // perror ("Server: mq_open (server)"); - // exit (1); - // } + if ((incoming_parent = mq_open (globals->q_client, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + perror ("Server: mq_open (server)"); + exit (1); + } // Now pop off the queues whenever something is added! while(1){ // Add to Block and TX Queue - - // char in_buffer [MSG_BUFFER_SIZE]; - // unsigned int priority; - // if (mq_receive (incoming_parent, in_buffer, MSG_BUFFER_SIZE, &priority) == -1) { - // perror ("Server: mq_receive"); - // exit (1); - // } - // // Block - // if(priority == 1){ - // // Dummy text data - // printf("Incoming data to client parent: %s", in_buffer); - // continue; - // Block *new_block = deser_block_alloc(NULL, (unsigned char *)in_buffer); - // queue_add_void(globals->queue_block, new_block); - // } - // //TX - // else if (priority == 2) - // { - // Transaction *new_tx = deser_tx_alloc(NULL, (unsigned char *)in_buffer); - // queue_add_void(globals->queue_tx, new_tx); - // } - // //Garbage - // else{ - - // } + char in_buffer [MSG_BUFFER_SIZE]; + unsigned int priority; + if (mq_receive(incoming_parent, in_buffer, MSG_BUFFER_SIZE, &priority) == -1) { + perror ("Server: mq_receive"); + exit (1); + } + if(priority == 1){ + // Dummy text data + // printf("Incoming data to client parent: %s", in_buffer); + // continue; + Block *new_block = deser_block_alloc(NULL, (unsigned char *)in_buffer); + queue_add_void(globals->queue_block, new_block); + } + else if (priority == 0) + { + Transaction *new_tx = deser_tx_alloc(NULL, (unsigned char *)in_buffer); + queue_add_void(globals->queue_tx, new_tx); + } } return 0; -} \ No newline at end of file +} diff --git a/src/runtime/includes/runtime.h b/src/runtime/includes/runtime.h index 2884084..b161d0a 100644 --- a/src/runtime/includes/runtime.h +++ b/src/runtime/includes/runtime.h @@ -4,7 +4,7 @@ #include "semaphore.h" #include -typedef struct Globals { +typedef struct { Queue *queue_block; Queue *queue_tx; pthread_mutex_t utxo_pool_lock; @@ -19,4 +19,4 @@ typedef struct Globals { char **q_server_individual; // send the same thing out to multiple nodes mqd_t q_server; // Main queu that gets distributed to all the individual queues char *q_client; // Only one queue from all the incoming blocks -} Globals; \ No newline at end of file +} Globals; diff --git a/src/runtime/runtime.c b/src/runtime/runtime.c index 7647359..17c3c34 100644 --- a/src/runtime/runtime.c +++ b/src/runtime/runtime.c @@ -222,7 +222,7 @@ int main() { node_tx_ret = pthread_create( &node_tx, NULL, node_tx_thread, (void*) globals); shell_ret = pthread_create( &shell, NULL, shell_thread, (void*) globals); miner_ret = pthread_create( &miner, NULL, miner_thread, (void*) globals); - + /* Wait till threads are complete before main continues. Unless we */ /* wait we run the risk of executing an exit which will terminate */ /* the process and all threads before the threads have completed. */ @@ -231,12 +231,12 @@ int main() { // pthread_join(server, NULL); // pthread_join(node_block, NULL); // pthread_join(node_tx, NULL); - // pthread_join(miner, NULL); + // pthread_join(miner, NULL); printf("Node Block returns: %d\n",node_block_ret); printf("Node TX returns: %d\n", node_tx_ret); printf("Shell returns: %d\n", shell_ret); printf("Miner returns: %d\n", miner_ret); printf("Server returns: %d\n", server_ret); - return 0; + return 0; } diff --git a/src/runtime/server.c b/src/runtime/server.c index d5d2755..e18d321 100644 --- a/src/runtime/server.c +++ b/src/runtime/server.c @@ -127,7 +127,6 @@ void *server_thread(void *arg){ globals->q_server_individual[globals->connected] = calloc(sizeof(char), strlen(q_name)+2); //digits etc strncpy(globals->q_server_individual[globals->connected], q_name, strlen(q_name)+1); strncat(globals->q_server_individual[globals->connected], &con_num, 1); - globals->connected++; //TODO MAKE THIS LESS than 10 peers pid_t pid = fork(); if (!pid) { // this is the child process @@ -139,13 +138,18 @@ void *server_thread(void *arg){ attr.mq_maxmsg = MAX_MESSAGES; attr.mq_msgsize = MAX_MSG_SIZE; attr.mq_curmsgs = 0; - char in_buffer [MSG_BUFFER_SIZE]; + char in_buffer[MSG_BUFFER_SIZE]; printf("Opening Queue from server child: %s", globals->q_server_individual[globals->connected]); - // if ((child_mq = mq_open (globals->q_server_individual[globals->connected], O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { - // perror ("Server: mq_open (server)"); - // exit (1); - // } - unsigned long counter = 0; + if ((child_mq = mq_open( + globals->q_server_individual[globals->connected], + O_RDONLY | O_CREAT, + QUEUE_PERMISSIONS, + &attr + )) == -1) { + perror ("Server: mq_open (server)"); + exit (1); + } + unsigned long counter = 1; while(pid != 1){ printf("stuff\n"); // Check to see if parent killed @@ -153,25 +157,26 @@ void *server_thread(void *arg){ pid = getpid(); } sleep(5); - // if (mq_receive(child_mq, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) { - // perror ("Client: mq_receive"); - // exit (1); - // } + /* if (mq_receive(child_mq, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) { */ + /* perror ("Client: mq_receive"); */ + /* exit (1); */ + /* } */ char *test = "dummy_data_from_server 1"; // Send the queue data over a socket if (send(new_fd, test, strlen(test)+1, 0) == -1) perror("send"); } - // if (mq_close(child_mq) == -1) { - // perror ("Client: mq_close"); - // exit (1); - // } + if (mq_close(child_mq) == -1) { + perror ("Client: mq_close"); + exit (1); + } close(new_fd); exit(0); } + close(new_fd); // parent doesn't need this + globals->connected++; //TODO MAKE THIS LESS than 10 peers } return NULL; } - From 3b9194fb4b720c42c16891eb231ab14d99d44887 Mon Sep 17 00:00:00 2001 From: eito-fis Date: Tue, 3 May 2022 22:50:04 -0400 Subject: [PATCH 12/17] ADD: Update threads to propogate across network --- src/runtime/includes/runtime.h | 13 +++- src/runtime/runtime.c | 116 +++++++++++++++++++++------------ 2 files changed, 85 insertions(+), 44 deletions(-) diff --git a/src/runtime/includes/runtime.h b/src/runtime/includes/runtime.h index b161d0a..231bf32 100644 --- a/src/runtime/includes/runtime.h +++ b/src/runtime/includes/runtime.h @@ -4,6 +4,18 @@ #include "semaphore.h" #include +#define MAX_INCOMING_CONNECTIONS 10 +#define INCOMING_QUEUE "/client_incoming" +#define OUTGOING_QUEUE_FORMAT "/server_outgoing_%d" + +#define QUEUE_PERMISSIONS 0660 +#define MAX_MESSAGES 10 +#define MAX_MSG_SIZE 256 +#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10 + +#define BLOCK_ID 1 +#define TX_ID 0 + typedef struct { Queue *queue_block; Queue *queue_tx; @@ -17,6 +29,5 @@ typedef struct { pthread_mutex_t miner_update_lock; int connected; char **q_server_individual; // send the same thing out to multiple nodes - mqd_t q_server; // Main queu that gets distributed to all the individual queues char *q_client; // Only one queue from all the incoming blocks } Globals; diff --git a/src/runtime/runtime.c b/src/runtime/runtime.c index 17c3c34..a180099 100644 --- a/src/runtime/runtime.c +++ b/src/runtime/runtime.c @@ -1,20 +1,25 @@ -#include "queue.h" #include #include "pthread.h" -#include "validate_block.h" -#include "handle_block.h" + +#include "queue.h" + +#include "ser_tx.h" #include "validate_tx.h" #include "handle_tx.h" + +#include "ser_block.h" #include "create_block.h" -#include "shell.h" -#include "runtime.h" +#include "validate_block.h" +#include "handle_block.h" + #include "init_globals.h" #include "init_db.h" + +#include "runtime.h" +#include "shell.h" #include "server.h" #include "client.h" -#define MAX_INCOMING_CONNECTIONS 10 - Globals *init_globals(){ Globals *new_globals = malloc(sizeof(Globals)); new_globals->queue_block = queue_init(); @@ -50,42 +55,93 @@ Globals *init_globals(){ new_globals->q_server_individual = malloc(MAX_INCOMING_CONNECTIONS * sizeof(char *)); new_globals->connected = 0; - new_globals->q_client = malloc(32); + new_globals->q_client = malloc(strlen(INCOMING_QUEUE) + 1); strcpy(new_globals->q_client, "/client_incoming"); + return new_globals; } +unsigned char *init_queue_buffer(unsigned char *buf, int id, size_t size) { + memcpy(buf, &id, sizeof(id)); + memcpy(buf + sizeof(id), &size, sizeof(size)); + return buf + sizeof(id) + sizeof(size); +} + +void propogate_object(Globals *globals, void *obj, int id){ + char *q_name; + mqd_t child_mq; + unsigned char buf[MAX_MSG_SIZE], *ser_buf; + size_t buf_size; + + switch (id) { + case BLOCK_ID: + ser_buf = init_queue_buffer(buf, id, size_ser_block(obj)); + buf_size = ser_block(ser_buf, obj); + break; + case TX_ID: + ser_buf = init_queue_buffer(buf, id, size_ser_tx(obj)); + buf_size = ser_tx(ser_buf, obj); + break; + default: + perror("Invalid ID passed to propogate_object()"); + exit(1); + } + buf_size += ser_buf - buf; + + printf("Recieved object of ID '%d', adding to queues\n", id); + for (int i = 0; i < globals->connected; i++) { + q_name = globals->q_server_individual[i]; + if ((child_mq = mq_open(q_name, O_WRONLY)) == -1) { + perror("Main Process: mq_open(server)"); + exit(1); + } + if (mq_send(child_mq, (char*)buf, buf_size, id) == -1) { + perror("Main Process: Not able to send message to server"); + } + if (mq_close(child_mq) == -1) { + perror("Main Process: mq_close"); + exit(1); + } + } +} + void *node_block_thread(void *arg){ Globals *globals = arg; - while(1){ + while (1) { printf("Node Thread Waiting to pop Block\n"); Block *popped_block = queue_pop_void(globals->queue_block); printf("Node Thread Block Popped\n"); + //Now aquire locks for validation (Utxo pool and blockcahin) printf("Node Thread Waiting on lock for validation\n"); pthread_mutex_lock(&globals->utxo_pool_lock); pthread_mutex_lock(&globals->blockchain_lock); - int block_valid = validate_block(popped_block); + int block_valid = validate_block(popped_block); - if(block_valid == 0){ - //Now aquire additional locks for handling + if (block_valid == 0) { + //Now acquire additional locks for handling printf("Node Thread Waiting on lock for Handling\n"); pthread_mutex_lock(&globals->utxo_to_tx_lock); pthread_mutex_lock(&globals->wallet_pool_lock); pthread_mutex_lock(&globals->key_pool_lock); pthread_mutex_lock(&globals->mempool_lock); + accept_block(popped_block); + propogate_object(globals, popped_block, BLOCK_ID); free_block(popped_block); + pthread_mutex_lock(&globals->miner_update_lock); *globals->miner_update = 0; pthread_mutex_unlock(&globals->miner_update_lock); + pthread_mutex_unlock(&globals->utxo_to_tx_lock); pthread_mutex_unlock(&globals->wallet_pool_lock); pthread_mutex_unlock(&globals->key_pool_lock); pthread_mutex_unlock(&globals->mempool_lock); - printf("Node Thread unlockedr Handling\n"); + printf("Node Thread unlocked Handling\n"); } + pthread_mutex_unlock(&globals->utxo_pool_lock); pthread_mutex_unlock(&globals->blockchain_lock); } @@ -94,26 +150,25 @@ void *node_block_thread(void *arg){ void *node_tx_thread(void *arg){ Globals *globals = arg; - while(1){ + while(1) { //printf("NodeTX Thread Waiting to pop TX\n"); Transaction *popped_tx = queue_pop_void(globals->queue_tx); //printf("NodeTX Thread TX Popped\n"); //Now aquire locks for validation (Utxo pool and blockcahin) //printf("NodeTX Thread Waiting on lock for validation\n"); - pthread_mutex_lock(&globals->utxo_pool_lock); pthread_mutex_lock(&globals->utxo_to_tx_lock); pthread_mutex_lock(&globals->mempool_lock); int tx_valid = validate_tx_incoming(popped_tx); - if(tx_valid == 0){ - //Now aquire additional locks for handling handle_tx(popped_tx); + propogate_object(globals, popped_tx, TX_ID); free_tx(popped_tx); } + pthread_mutex_unlock(&globals->mempool_lock); pthread_mutex_unlock(&globals->utxo_to_tx_lock); pthread_mutex_unlock(&globals->utxo_pool_lock); @@ -122,30 +177,6 @@ void *node_tx_thread(void *arg){ return NULL; } - -void add_to_outgoing(Globals *globals){ - printf("FOund a block and adding outgoing data"); - for(int i = 0; i < globals->connected; i++){ - char *q_name = globals->q_server_individual[i]; - mqd_t child_mq; - if ((child_mq = mq_open (q_name, O_WRONLY)) == -1) { - perror ("Server: mq_open (server)"); - exit (1); - } - // send message to server - char *test = "fake_block"; - if (mq_send (child_mq, test, strlen (test) + 1, 1) == -1) { - perror ("Client: Not able to send message to server"); - } - if (mq_close (child_mq) == -1) { - perror ("Client: mq_close"); - exit (1); - } - - - } -} - void *miner_thread(void *arg){ Globals *globals = arg; unsigned long hash_check_flag = 10000; @@ -188,8 +219,6 @@ void *miner_thread(void *arg){ print_block(new_block,""); printf("Miner mined a block Block\n"); queue_add_void(globals->queue_block, new_block); - // Serialize block and send it to our peers - //add_to_outgoing(globals); } return NULL; } @@ -238,5 +267,6 @@ int main() { printf("Shell returns: %d\n", shell_ret); printf("Miner returns: %d\n", miner_ret); printf("Server returns: %d\n", server_ret); + printf("Client returns: %d\n", client_ret); return 0; } From 03b24fa4e4154fec5356378c4f43ed84d277de8a Mon Sep 17 00:00:00 2001 From: eito-fis Date: Tue, 3 May 2022 22:50:19 -0400 Subject: [PATCH 13/17] ADD: Update server to use ID and length --- src/runtime/server.c | 225 ++++++++++++++++++++++++++----------------- 1 file changed, 137 insertions(+), 88 deletions(-) diff --git a/src/runtime/server.c b/src/runtime/server.c index e18d321..a09c17d 100644 --- a/src/runtime/server.c +++ b/src/runtime/server.c @@ -14,11 +14,6 @@ #include "mqueue.h" #include "runtime.h" -#define QUEUE_PERMISSIONS 0660 -#define MAX_MESSAGES 10 -#define MAX_MSG_SIZE 256 -#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10 - void sigchld_handler(int s) { (void)s; // quiet unused variable warning @@ -42,19 +37,11 @@ void *get_in_addr(struct sockaddr *sa) return &(((struct sockaddr_in6*)sa)->sin6_addr); } - -void *server_thread(void *arg){ - Globals *globals = arg; - int sockfd, new_fd; // listen on sock_fd, new connection on new_fd +int build_listen_socket() { + int sockfd, yes, rv; // listen on sock_fd, new connection on new_fd struct addrinfo hints, *servinfo, *p; - struct sockaddr_storage their_addr; // connector's address information - socklen_t sin_size; - struct sigaction sa; - int yes=1; - char s[INET6_ADDRSTRLEN]; - int rv; - memset(&hints, 0, sizeof hints); + memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; // use my IP @@ -65,28 +52,25 @@ void *server_thread(void *arg){ } // loop through all the results and bind to the first we can - for(p = servinfo; p != NULL; p = p->ai_next) { + yes = 1; + for (p = servinfo; p != NULL; p = p->ai_next) { if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { perror("server: socket"); continue; } - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) { perror("setsockopt"); exit(1); } - if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { close(sockfd); perror("server: bind"); continue; } - break; } - freeaddrinfo(servinfo); // all done with this structure if (p == NULL) { @@ -99,6 +83,11 @@ void *server_thread(void *arg){ exit(1); } + return sockfd; +} + +void build_sig_handler() { + struct sigaction sa; sa.sa_handler = sigchld_handler; // reap all dead processes sigemptyset(&sa.sa_mask); sa.sa_flags = SA_RESTART; @@ -106,77 +95,137 @@ void *server_thread(void *arg){ perror("sigaction"); exit(1); } +} - printf("server: waiting for connections...\n"); +// From Beej's Guide +int send_all(int s, char *buf, size_t *len) +{ + size_t total = 0; // how many bytes we've sent + size_t bytesleft = *len; // how many we have left to send + int n; - while(1) { // main accept() loop - sin_size = sizeof their_addr; - new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); - if (new_fd == -1) { - perror("accept"); - continue; - } + while(total < *len) { + n = send(s, buf + total, bytesleft, 0); + if (n == -1) { break; } + total += n; + bytesleft -= n; + } - inet_ntop(their_addr.ss_family, - get_in_addr((struct sockaddr *)&their_addr), - s, sizeof s); - printf("server: got connection from %s\n", s); - - char *q_name = "/outgoing_"; - char con_num = '0' + globals->connected; - globals->q_server_individual[globals->connected] = calloc(sizeof(char), strlen(q_name)+2); //digits etc - strncpy(globals->q_server_individual[globals->connected], q_name, strlen(q_name)+1); - strncat(globals->q_server_individual[globals->connected], &con_num, 1); - - pid_t pid = fork(); - if (!pid) { // this is the child process - close(sockfd); // child doesn't need the listener - mqd_t child_mq; - struct mq_attr attr; - - attr.mq_flags = 0; - attr.mq_maxmsg = MAX_MESSAGES; - attr.mq_msgsize = MAX_MSG_SIZE; - attr.mq_curmsgs = 0; - char in_buffer[MSG_BUFFER_SIZE]; - printf("Opening Queue from server child: %s", globals->q_server_individual[globals->connected]); - if ((child_mq = mq_open( - globals->q_server_individual[globals->connected], - O_RDONLY | O_CREAT, - QUEUE_PERMISSIONS, - &attr - )) == -1) { - perror ("Server: mq_open (server)"); - exit (1); - } - unsigned long counter = 1; - while(pid != 1){ - printf("stuff\n"); - // Check to see if parent killed - if(counter % 10000){ - pid = getpid(); - } - sleep(5); - /* if (mq_receive(child_mq, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) { */ - /* perror ("Client: mq_receive"); */ - /* exit (1); */ - /* } */ - char *test = "dummy_data_from_server 1"; - // Send the queue data over a socket - if (send(new_fd, test, strlen(test)+1, 0) == -1) - perror("send"); - } - if (mq_close(child_mq) == -1) { - perror ("Client: mq_close"); - exit (1); - } - close(new_fd); - exit(0); - } + *len = total; // return number actually sent here - close(new_fd); // parent doesn't need this - globals->connected++; //TODO MAKE THIS LESS than 10 peers - } + return n == -1 ? -1 : 0; // return -1 on failure, 0 on success +} + +void server_fork(Globals *globals, int sockfd, pid_t pid) { + mqd_t child_mq; + struct mq_attr attr; + char in_buffer[MAX_MSG_SIZE]; + unsigned long counter; + size_t buf_size, total_size; + + printf( + "Opening Queue from server child: %s\n", + globals->q_server_individual[globals->connected] + ); + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + child_mq = mq_open( + globals->q_server_individual[globals->connected], + O_RDONLY | O_CREAT, + QUEUE_PERMISSIONS, + &attr + ); + if (child_mq == -1) { + perror("Server: mq_open (server)"); + exit(1); + } + + counter = 1; + while (pid != 1){ + printf("SERVER FORK LOOP\n"); + // Check to see if parent killed + if(counter % 10000){ + pid = getpid(); + } + + if (mq_receive(child_mq, in_buffer, MAX_MSG_SIZE, NULL) == -1) { + perror("Client: mq_receive"); + exit(1); + } + + buf_size = *(size_t*)(in_buffer + sizeof(int)); + total_size = buf_size + sizeof(int) + sizeof(size_t); + + if (send_all(sockfd, in_buffer, &total_size) == -1) { + perror("Server: send_all"); + exit(1); + } + } + + if (mq_close(child_mq) == -1) { + perror("Server: mq_close"); + exit(1); + } + close(sockfd); + exit(0); +} + +void handle_accept(Globals *globals, int sockfd) { + int new_fd; + size_t name_size; + struct sockaddr_storage their_addr; // connector's address information + socklen_t sin_size; + char s[INET6_ADDRSTRLEN]; + + sin_size = sizeof(their_addr); + new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); + if (new_fd == -1) { + perror("accept"); + return; + } + + inet_ntop( + their_addr.ss_family, + get_in_addr((struct sockaddr *)&their_addr), + s, sizeof s + ); + printf("server: got connection from '%s'\n", s); + + name_size = snprintf(NULL, 0, OUTGOING_QUEUE_FORMAT, globals->connected); + globals->q_server_individual[globals->connected] = malloc(name_size + 1); + sprintf( + globals->q_server_individual[globals->connected], + OUTGOING_QUEUE_FORMAT, + getpid() + ); + printf( + "Made new queue name '%s'\n", + globals->q_server_individual[globals->connected] + ); + + pid_t pid = fork(); + if (!pid) { // this is the child process + close(sockfd); // child doesn't need the listener + server_fork(globals, new_fd, pid); + } + + close(new_fd); // parent doesn't need this + globals->connected++; +} + +void *server_thread(void *arg){ + Globals *globals = arg; + int sockfd; + + sockfd = build_listen_socket(); + build_sig_handler(); + + printf("server: waiting for connections...\n"); + while (1) { + handle_accept(globals, sockfd); + } return NULL; } From fc80f177ae40914a47b30fed355c183a372435a1 Mon Sep 17 00:00:00 2001 From: eito-fis Date: Tue, 3 May 2022 23:27:02 -0400 Subject: [PATCH 14/17] ADD: Update client to use ID and length --- src/runtime/client.c | 282 ++++++++++++++++++++++--------------------- src/runtime/server.c | 11 +- 2 files changed, 150 insertions(+), 143 deletions(-) diff --git a/src/runtime/client.c b/src/runtime/client.c index 6dc3fde..f7db229 100644 --- a/src/runtime/client.c +++ b/src/runtime/client.c @@ -15,10 +15,8 @@ #include "ser_block.h" #include "ser_tx.h" -#define QUEUE_PERMISSIONS 0660 -#define MAX_MESSAGES 10 -#define MAX_MSG_SIZE 256 -#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10 +char peers[][20] = {"192.168.32.251"};//"ubuntu.local"};//"localhost", +unsigned int num_peers = 1; // Match above void *get_in_addr2(struct sockaddr *sa) { @@ -29,161 +27,171 @@ void *get_in_addr2(struct sockaddr *sa) return &(((struct sockaddr_in6*)sa)->sin6_addr); } -char peers[][20] = {"192.168.32.251"};//"ubuntu.local"};//"localhost", -unsigned int num_peers = 1; // Match above +int recv_all(int s, char *buf, size_t *len) { + size_t total = 0; // how many bytes we've read + size_t bytes_left = *len; // how many we have left to read + int n; + + while (total < *len) { + n = recv(s, buf + total, bytes_left, 0); + if (n == 0) + return 1; + else if (n == -1) + return -1; + total += n; + bytes_left -= n; + } + + *len = total; // return number actually sent here + + return 0; +} + +int recv_obj(int s, char *buf, size_t *total_size) { + int rv; + size_t head_size, buf_size; + + head_size = sizeof(int) + sizeof(long); + if ((rv = recv_all(s, buf, &head_size)) != 0) + return rv; + + buf_size = *(size_t*)(buf + sizeof(int)); + if ((rv = recv_all(s, buf, &buf_size)) != 0) + return rv; + + *total_size = buf_size + head_size; + return 0; +} + +void client_fork(Globals *globals, pid_t pid, int i) { + int sockfd, rv, priority; + size_t numbytes; + unsigned long counter; + char buf[MAX_MSG_SIZE]; + struct sockaddr_in serv_addr; + struct mq_attr attr; + mqd_t incoming; + + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + printf("\n Socket creation error \n"); + exit(0); + } + + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(PORT_INT); + + // Convert IPv4 and IPv6 addresses from text to binary form + if (inet_pton(AF_INET, peers[i], &serv_addr.sin_addr) <= 0) { + printf("\nInvalid address/ Address not supported \n"); + exit(0); + } + if (connect(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) { + printf("\nConnection Failed \n"); + exit(0); + } + + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + incoming = mq_open( + globals->q_client, + O_WRONLY | O_CREAT, + QUEUE_PERMISSIONS, + &attr + ); + if (incoming == -1) { + perror ("Client: mq_open (server)"); + exit (1); + } + printf("Opened Incoming Queue for writing"); + + counter = 0; + while (pid != 1) { + // Check to see if parent killed + if (counter % 10000) { + pid = getpid(); + } + + // Get data over the socket + rv = recv_obj(sockfd, buf, &numbytes); + if (rv == -1) { + perror("recv obj"); + exit(1); + } else if (rv == 1) { + // Connection has been closed from the other side (by the socket server) + break; + } + + // Add the data to the queue + priority = *(int*)buf; + if (mq_send(incoming, buf, numbytes, priority) == -1) { + perror ("Client: Not able to send message to main process"); + continue; + } + printf("client: received %lu bytes\n", numbytes); + + counter++; + } + + close(sockfd); + exit(0); +} void *client_thread(void *arg){ Globals *globals = arg; - for(unsigned int i = 0; i < num_peers; i++){ - // FOrk this process and connect! + for (unsigned int i = 0; i < num_peers; i++) { + // Fork this process and connect! pid_t pid = fork(); if (!pid) { // this is the child process - int sockfd, numbytes; - char buf[MAXDATASIZE]; - struct sockaddr_in serv_addr; - if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - printf("\n Socket creation error \n"); - exit(0); - } - - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(PORT_INT); - - // Convert IPv4 and IPv6 addresses from text to binary - // form - if (inet_pton(AF_INET, peers[i], &serv_addr.sin_addr) - <= 0) { - printf( - "\nInvalid address/ Address not supported \n"); - exit(0); - } - - if (connect(sockfd, (struct sockaddr*)&serv_addr, - sizeof(serv_addr)) - < 0) { - printf("\nConnection Failed \n"); - exit(0); - } - - - // // using Hostname - // struct addrinfo hints, *servinfo, *p; - // int rv; - // char s[INET6_ADDRSTRLEN]; - - // memset(&hints, 0, sizeof hints); - // hints.ai_family = AF_UNSPEC; - // hints.ai_socktype = SOCK_STREAM; - - // if ((rv = getaddrinfo(peers[i], PORT, &hints, &servinfo)) != 0) { - // fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); - // return NULL; - // } - - // // loop through all the results and connect to the first we can - // for(p = servinfo; p != NULL; p = p->ai_next) { - // if ((sockfd = socket(p->ai_family, p->ai_socktype, - // p->ai_protocol)) == -1) { - // perror("client: socket"); - // continue; - // } - - // if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1) { - // perror("client: connect"); - // close(sockfd); - // continue; - // } - - // break; - // } - - // if (p == NULL) { - // fprintf(stderr, "client: failed to connect\n"); - // return 2; - // } - - // inet_ntop(p->ai_family, get_in_addr2((struct sockaddr *)p->ai_addr), - // s, sizeof s); - // printf("client: connecting to %s\n", s); - - // freeaddrinfo(servinfo); // all done with this structure - - - struct mq_attr attr; - - attr.mq_flags = 0; - attr.mq_maxmsg = MAX_MESSAGES; - attr.mq_msgsize = MAX_MSG_SIZE; - attr.mq_curmsgs = 0; - mqd_t incoming; - if ((incoming = mq_open (globals->q_client, O_WRONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { - perror ("Server: mq_open (server)"); - exit (1); - } - printf("Opened Incoming Queue for writing, now waiting for incoming data to socket"); - unsigned long counter = 0; - while(pid != 1){ - // Check to see if parent killed - if(counter % 10000){ - pid = getpid(); - } - // Get data over the socket - if ((numbytes = recv(sockfd, buf, MAXDATASIZE-1, 0)) == -1) { - perror("recv"); - exit(1); - } if(numbytes == 0 ){ - // Connection has been closed from the other side (by the socket server) - break; - } - // Add the data to the queue - //char *test = "recieved fake_block"; - if (mq_send(incoming, buf, numbytes, 0) == -1) { - perror ("Client: Not able to send message to server"); - } - buf[numbytes] = '\0'; - - printf("client: received %i bytes '%s'\n", numbytes, buf); - sleep(1); - - counter++; - } - - close(sockfd); - exit(0); + client_fork(globals, pid, i); } } //INCOMING PARENT struct mq_attr attr; + mqd_t incoming_parent; + char in_buffer[MSG_BUFFER_SIZE], *ser_buffer; + unsigned int priority; + int id; + size_t buf_size; attr.mq_flags = 0; attr.mq_maxmsg = MAX_MESSAGES; attr.mq_msgsize = MAX_MSG_SIZE; attr.mq_curmsgs = 0; - mqd_t incoming_parent; - if ((incoming_parent = mq_open (globals->q_client, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { - perror ("Server: mq_open (server)"); + incoming_parent = mq_open( + globals->q_client, + O_RDONLY | O_CREAT, + QUEUE_PERMISSIONS, + &attr + ); + if (incoming_parent == -1) { + perror ("Client: mq_open (server)"); exit (1); } + // Now pop off the queues whenever something is added! - while(1){ - // Add to Block and TX Queue - char in_buffer [MSG_BUFFER_SIZE]; - unsigned int priority; - if (mq_receive(incoming_parent, in_buffer, MSG_BUFFER_SIZE, &priority) == -1) { - perror ("Server: mq_receive"); + while (1) { + if (mq_receive(incoming_parent, in_buffer, MAX_MSG_SIZE, &priority) == -1) { + perror ("Client: mq_receive"); exit (1); } - if(priority == 1){ - // Dummy text data - // printf("Incoming data to client parent: %s", in_buffer); - // continue; - Block *new_block = deser_block_alloc(NULL, (unsigned char *)in_buffer); + + id = *(int*)in_buffer; + buf_size = *(size_t*)(in_buffer + sizeof(int)); + ser_buffer = in_buffer + sizeof(int) + sizeof(size_t); + + printf( + "Client parent recieved %lu byte object of id '%d'\n", + buf_size, id + ); + if (id == BLOCK_ID) { + Block *new_block = deser_block_alloc(NULL, (unsigned char *)ser_buffer); queue_add_void(globals->queue_block, new_block); } - else if (priority == 0) - { - Transaction *new_tx = deser_tx_alloc(NULL, (unsigned char *)in_buffer); + else if (id == TX_ID) { + Transaction *new_tx = deser_tx_alloc(NULL, (unsigned char *)ser_buffer); queue_add_void(globals->queue_tx, new_tx); } } diff --git a/src/runtime/server.c b/src/runtime/server.c index a09c17d..c329f9a 100644 --- a/src/runtime/server.c +++ b/src/runtime/server.c @@ -98,17 +98,16 @@ void build_sig_handler() { } // From Beej's Guide -int send_all(int s, char *buf, size_t *len) -{ +int send_all(int s, char *buf, size_t *len) { size_t total = 0; // how many bytes we've sent - size_t bytesleft = *len; // how many we have left to send + size_t bytes_left = *len; // how many we have left to send int n; - while(total < *len) { - n = send(s, buf + total, bytesleft, 0); + while (total < *len) { + n = send(s, buf + total, bytes_left, 0); if (n == -1) { break; } total += n; - bytesleft -= n; + bytes_left -= n; } *len = total; // return number actually sent here From 72de29a28cadf10d26718973779d20fc36558c2e Mon Sep 17 00:00:00 2001 From: eito-fis Date: Tue, 3 May 2022 23:34:21 -0400 Subject: [PATCH 15/17] FIX: Get rid of warning --- src/runtime/shell.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/runtime/shell.c b/src/runtime/shell.c index bef76f8..1f131b6 100644 --- a/src/runtime/shell.c +++ b/src/runtime/shell.c @@ -181,6 +181,7 @@ int shell_exit(Globals *globals, char **args) { int shell_help(Globals *globals, char **args) { int len; (void)args; + (void)globals; len = sizeof(shell_commands) / sizeof(Command); printf("Available shell commands:\n"); From b8448d6ba26dba44ba61076cbf58d68140c817a4 Mon Sep 17 00:00:00 2001 From: eito-fis Date: Tue, 3 May 2022 23:44:56 -0400 Subject: [PATCH 16/17] FIX: Don't overwrite header on recv_obj --- src/runtime/client.c | 6 +++--- src/runtime/server.c | 5 +++++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/runtime/client.c b/src/runtime/client.c index f7db229..b900b78 100644 --- a/src/runtime/client.c +++ b/src/runtime/client.c @@ -56,7 +56,7 @@ int recv_obj(int s, char *buf, size_t *total_size) { return rv; buf_size = *(size_t*)(buf + sizeof(int)); - if ((rv = recv_all(s, buf, &buf_size)) != 0) + if ((rv = recv_all(s, buf + head_size, &buf_size)) != 0) return rv; *total_size = buf_size + head_size; @@ -104,7 +104,7 @@ void client_fork(Globals *globals, pid_t pid, int i) { perror ("Client: mq_open (server)"); exit (1); } - printf("Opened Incoming Queue for writing"); + printf("Opened Incoming Queue for writing\n"); counter = 0; while (pid != 1) { @@ -196,5 +196,5 @@ void *client_thread(void *arg){ } } - return 0; + return 0; } diff --git a/src/runtime/server.c b/src/runtime/server.c index c329f9a..aa31bb8 100644 --- a/src/runtime/server.c +++ b/src/runtime/server.c @@ -157,6 +157,11 @@ void server_fork(Globals *globals, int sockfd, pid_t pid) { buf_size = *(size_t*)(in_buffer + sizeof(int)); total_size = buf_size + sizeof(int) + sizeof(size_t); + printf( + "Server recieved obj of size %lu from buffer. Sending...\n", + buf_size + ); + if (send_all(sockfd, in_buffer, &total_size) == -1) { perror("Server: send_all"); exit(1); From 1422c4c3073645022f512edbaa99a4da7f46ed9a Mon Sep 17 00:00:00 2001 From: eito-fis Date: Wed, 4 May 2022 00:19:04 -0400 Subject: [PATCH 17/17] FIX: Comment broken validation condition --- CMakeLists.txt | 2 +- src/core/blocks/create_block.c | 1 - src/core/txs/validate_tx.c | 17 ++++++++--------- src/core/utils/init_db.c | 10 +++++----- src/runtime/client.c | 4 ++++ 5 files changed, 18 insertions(+), 16 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 813e26a..6a5ca50 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.13) # set the project name project(OlinCoin C CXX) enable_testing() -#set(CMAKE_BUILD_TYPE Debug) +# set(CMAKE_BUILD_TYPE Debug) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_BINARY_DIR ${CMAKE_BINARY_DIR}) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) diff --git a/src/core/blocks/create_block.c b/src/core/blocks/create_block.c index 18c3f04..86c1f41 100644 --- a/src/core/blocks/create_block.c +++ b/src/core/blocks/create_block.c @@ -23,7 +23,6 @@ unsigned long calc_block_reward(unsigned long blockchain_height){ return BLOCK_REWARD; } - Transaction *create_coinbase_tx(unsigned long tx_fees){ Transaction *coinbase_tx = malloc(sizeof(Transaction)); Output *miner_output = malloc(sizeof(Output)); diff --git a/src/core/txs/validate_tx.c b/src/core/txs/validate_tx.c index 5d94c29..bc36533 100644 --- a/src/core/txs/validate_tx.c +++ b/src/core/txs/validate_tx.c @@ -1,12 +1,12 @@ /** * @file validate_tx.c * @author Nathan Faber nfaber@olin.edu - * @brief + * @brief * @version 0.1 * @date 2022-03-20 - * + * * @copyright Copyright (c) 2022 - * + * */ #include "base_tx.h" #include "utxo_pool.h" @@ -39,7 +39,7 @@ void create_blank_sig_txhash(unsigned char *blank_hash, Transaction *tx){ memset(blank_sig_tx->inputs[i].signature, 0, SIGNATURE_LEN); } memcpy(blank_sig_tx->outputs, tx->outputs, tx->num_outputs*sizeof(Output)); - + // need to make hash where the tx has signatures that are 0's hash_tx(blank_hash, blank_sig_tx); free_tx(blank_sig_tx); @@ -111,9 +111,9 @@ int validate_coinbase_tx_parts_not_null(Transaction *coinbase_tx){ if(coinbase_tx->num_inputs != 0){ return 2; } - if(coinbase_tx->inputs != NULL){ - return 3; - } + /* if(coinbase_tx->inputs != NULL){ */ + /* return 3; */ + /* } */ if(coinbase_tx->num_outputs != 1){ return 4; } @@ -122,7 +122,7 @@ int validate_coinbase_tx_parts_not_null(Transaction *coinbase_tx){ } return 0; } - + int validate_tx_shared(Transaction *tx){ unsigned long total_in = 0; unsigned long total_out = 0; @@ -192,4 +192,3 @@ int validate_tx_incoming(Transaction *tx){ } return 0; } - diff --git a/src/core/utils/init_db.c b/src/core/utils/init_db.c index 0178ba3..74dacc3 100644 --- a/src/core/utils/init_db.c +++ b/src/core/utils/init_db.c @@ -13,7 +13,7 @@ int create_folder(char *path){ int mkdir_res = mkdir(path, 0777); if(mkdir_res != 0 && errno != EEXIST){ //errors here - return 1; + return 1; } return 0; } @@ -23,7 +23,7 @@ int create_proj_folders(){ if(!home_path){ fprintf(stderr, "$HOME env variable not read\n"); exit(1); - } + } char *newPath = malloc(strlen(home_path) + strlen(LOCAL_LOCATION) + 1); strcpy(newPath, home_path); strcat(newPath, LOCAL_LOCATION); @@ -78,7 +78,7 @@ int init_db(leveldb_t **db, char **dest, char *db_env, char *name){ if(!home_path){ fprintf(stderr, "$HOME env variable not read\n"); exit(1); - } + } *dest = malloc(strlen(home_path) + strlen(LOCAL_LOCATION) + strlen(db_env) + strlen(name) + 1); strcpy(*dest, home_path); strcat(*dest, LOCAL_LOCATION); @@ -99,7 +99,7 @@ int destroy_db(leveldb_t **db, char *name){ char *err = NULL; int ret = 0; options = leveldb_options_create(); - + /* DESTROY */ leveldb_destroy_db(options, name, &err); if (err != NULL) { @@ -126,4 +126,4 @@ int db_count(leveldb_t *db, unsigned int *num_entries){ leveldb_readoptions_destroy(roptions); *num_entries = count; return 0; -} \ No newline at end of file +} diff --git a/src/runtime/client.c b/src/runtime/client.c index b900b78..7461d13 100644 --- a/src/runtime/client.c +++ b/src/runtime/client.c @@ -188,10 +188,14 @@ void *client_thread(void *arg){ ); if (id == BLOCK_ID) { Block *new_block = deser_block_alloc(NULL, (unsigned char *)ser_buffer); + printf("Block received over network:\n"); + print_block(new_block, ""); queue_add_void(globals->queue_block, new_block); } else if (id == TX_ID) { Transaction *new_tx = deser_tx_alloc(NULL, (unsigned char *)ser_buffer); + printf("Tx received over network:\n"); + print_tx(new_tx, ""); queue_add_void(globals->queue_tx, new_tx); } }