diff --git a/CMakeLists.txt b/CMakeLists.txt index 813e26a..6a5ca50 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,7 +3,7 @@ cmake_minimum_required(VERSION 3.13) # set the project name project(OlinCoin C CXX) enable_testing() -#set(CMAKE_BUILD_TYPE Debug) +# set(CMAKE_BUILD_TYPE Debug) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_BINARY_DIR ${CMAKE_BINARY_DIR}) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}) diff --git a/src/core/blocks/create_block.c b/src/core/blocks/create_block.c index 18c3f04..86c1f41 100644 --- a/src/core/blocks/create_block.c +++ b/src/core/blocks/create_block.c @@ -23,7 +23,6 @@ unsigned long calc_block_reward(unsigned long blockchain_height){ return BLOCK_REWARD; } - Transaction *create_coinbase_tx(unsigned long tx_fees){ Transaction *coinbase_tx = malloc(sizeof(Transaction)); Output *miner_output = malloc(sizeof(Output)); diff --git a/src/core/txs/validate_tx.c b/src/core/txs/validate_tx.c index 5d94c29..bc36533 100644 --- a/src/core/txs/validate_tx.c +++ b/src/core/txs/validate_tx.c @@ -1,12 +1,12 @@ /** * @file validate_tx.c * @author Nathan Faber nfaber@olin.edu - * @brief + * @brief * @version 0.1 * @date 2022-03-20 - * + * * @copyright Copyright (c) 2022 - * + * */ #include "base_tx.h" #include "utxo_pool.h" @@ -39,7 +39,7 @@ void create_blank_sig_txhash(unsigned char *blank_hash, Transaction *tx){ memset(blank_sig_tx->inputs[i].signature, 0, SIGNATURE_LEN); } memcpy(blank_sig_tx->outputs, tx->outputs, tx->num_outputs*sizeof(Output)); - + // need to make hash where the tx has signatures that are 0's hash_tx(blank_hash, blank_sig_tx); free_tx(blank_sig_tx); @@ -111,9 +111,9 @@ int validate_coinbase_tx_parts_not_null(Transaction *coinbase_tx){ if(coinbase_tx->num_inputs != 0){ return 2; } - if(coinbase_tx->inputs != NULL){ - return 3; - } + /* if(coinbase_tx->inputs != NULL){ */ + /* return 3; */ + /* } */ if(coinbase_tx->num_outputs != 1){ return 4; } @@ -122,7 +122,7 @@ int validate_coinbase_tx_parts_not_null(Transaction *coinbase_tx){ } return 0; } - + int validate_tx_shared(Transaction *tx){ unsigned long total_in = 0; unsigned long total_out = 0; @@ -192,4 +192,3 @@ int validate_tx_incoming(Transaction *tx){ } return 0; } - diff --git a/src/core/utils/init_db.c b/src/core/utils/init_db.c index 0178ba3..74dacc3 100644 --- a/src/core/utils/init_db.c +++ b/src/core/utils/init_db.c @@ -13,7 +13,7 @@ int create_folder(char *path){ int mkdir_res = mkdir(path, 0777); if(mkdir_res != 0 && errno != EEXIST){ //errors here - return 1; + return 1; } return 0; } @@ -23,7 +23,7 @@ int create_proj_folders(){ if(!home_path){ fprintf(stderr, "$HOME env variable not read\n"); exit(1); - } + } char *newPath = malloc(strlen(home_path) + strlen(LOCAL_LOCATION) + 1); strcpy(newPath, home_path); strcat(newPath, LOCAL_LOCATION); @@ -78,7 +78,7 @@ int init_db(leveldb_t **db, char **dest, char *db_env, char *name){ if(!home_path){ fprintf(stderr, "$HOME env variable not read\n"); exit(1); - } + } *dest = malloc(strlen(home_path) + strlen(LOCAL_LOCATION) + strlen(db_env) + strlen(name) + 1); strcpy(*dest, home_path); strcat(*dest, LOCAL_LOCATION); @@ -99,7 +99,7 @@ int destroy_db(leveldb_t **db, char *name){ char *err = NULL; int ret = 0; options = leveldb_options_create(); - + /* DESTROY */ leveldb_destroy_db(options, name, &err); if (err != NULL) { @@ -126,4 +126,4 @@ int db_count(leveldb_t *db, unsigned int *num_entries){ leveldb_readoptions_destroy(roptions); *num_entries = count; return 0; -} \ No newline at end of file +} diff --git a/src/includes/utils/queue.h b/src/includes/utils/queue.h index b6f3c58..a8a72c1 100644 --- a/src/includes/utils/queue.h +++ b/src/includes/utils/queue.h @@ -3,8 +3,6 @@ #include #include #include -#include "base_tx.h" -#include "base_block.h" typedef struct QueueItem{ void *item; diff --git a/src/runtime/CMakeLists.txt b/src/runtime/CMakeLists.txt index f4ed096..2cf1b66 100644 --- a/src/runtime/CMakeLists.txt +++ b/src/runtime/CMakeLists.txt @@ -15,6 +15,25 @@ target_link_libraries(shell handle_block init_globals) +add_library(server server.c) +target_compile_options(server PRIVATE ${COMPILE_OPTIONS_STD}) +target_include_directories(server PUBLIC + ${INCLUDE_ALL} + ) +target_link_libraries(server + core_block + rt +) + +add_library(client client.c) +target_compile_options(client PRIVATE ${COMPILE_OPTIONS_STD}) +target_include_directories(client PUBLIC + ${INCLUDE_ALL} + ) +target_link_libraries(client + core_block + rt +) add_executable(runtime runtime.c) target_compile_options(runtime PRIVATE ${COMPILE_OPTIONS_STD}) target_include_directories(runtime PUBLIC ${INCLUDE_ALL}) @@ -25,4 +44,16 @@ target_link_libraries(runtime init_globals validate_tx handle_tx - shell) \ No newline at end of file + shell + server + client) + +add_executable(q_server queue_server.c) +target_compile_options(q_server PRIVATE ${COMPILE_OPTIONS_STD}) +target_include_directories(q_server PUBLIC ${INCLUDE_ALL}) +target_link_libraries(q_server rt) + +add_executable(q_client queue_client.c) +target_compile_options(q_client PRIVATE ${COMPILE_OPTIONS_STD}) +target_include_directories(q_client PUBLIC ${INCLUDE_ALL}) +target_link_libraries(q_client rt) \ No newline at end of file diff --git a/src/runtime/client.c b/src/runtime/client.c new file mode 100644 index 0000000..7461d13 --- /dev/null +++ b/src/runtime/client.c @@ -0,0 +1,204 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include "client.h" +#include "server.h" +#include "runtime.h" +#include "ser_block.h" +#include "ser_tx.h" + +char peers[][20] = {"192.168.32.251"};//"ubuntu.local"};//"localhost", +unsigned int num_peers = 1; // Match above + +void *get_in_addr2(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} + +int recv_all(int s, char *buf, size_t *len) { + size_t total = 0; // how many bytes we've read + size_t bytes_left = *len; // how many we have left to read + int n; + + while (total < *len) { + n = recv(s, buf + total, bytes_left, 0); + if (n == 0) + return 1; + else if (n == -1) + return -1; + total += n; + bytes_left -= n; + } + + *len = total; // return number actually sent here + + return 0; +} + +int recv_obj(int s, char *buf, size_t *total_size) { + int rv; + size_t head_size, buf_size; + + head_size = sizeof(int) + sizeof(long); + if ((rv = recv_all(s, buf, &head_size)) != 0) + return rv; + + buf_size = *(size_t*)(buf + sizeof(int)); + if ((rv = recv_all(s, buf + head_size, &buf_size)) != 0) + return rv; + + *total_size = buf_size + head_size; + return 0; +} + +void client_fork(Globals *globals, pid_t pid, int i) { + int sockfd, rv, priority; + size_t numbytes; + unsigned long counter; + char buf[MAX_MSG_SIZE]; + struct sockaddr_in serv_addr; + struct mq_attr attr; + mqd_t incoming; + + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + printf("\n Socket creation error \n"); + exit(0); + } + + serv_addr.sin_family = AF_INET; + serv_addr.sin_port = htons(PORT_INT); + + // Convert IPv4 and IPv6 addresses from text to binary form + if (inet_pton(AF_INET, peers[i], &serv_addr.sin_addr) <= 0) { + printf("\nInvalid address/ Address not supported \n"); + exit(0); + } + if (connect(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) { + printf("\nConnection Failed \n"); + exit(0); + } + + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + incoming = mq_open( + globals->q_client, + O_WRONLY | O_CREAT, + QUEUE_PERMISSIONS, + &attr + ); + if (incoming == -1) { + perror ("Client: mq_open (server)"); + exit (1); + } + printf("Opened Incoming Queue for writing\n"); + + counter = 0; + while (pid != 1) { + // Check to see if parent killed + if (counter % 10000) { + pid = getpid(); + } + + // Get data over the socket + rv = recv_obj(sockfd, buf, &numbytes); + if (rv == -1) { + perror("recv obj"); + exit(1); + } else if (rv == 1) { + // Connection has been closed from the other side (by the socket server) + break; + } + + // Add the data to the queue + priority = *(int*)buf; + if (mq_send(incoming, buf, numbytes, priority) == -1) { + perror ("Client: Not able to send message to main process"); + continue; + } + printf("client: received %lu bytes\n", numbytes); + + counter++; + } + + close(sockfd); + exit(0); +} + +void *client_thread(void *arg){ + Globals *globals = arg; + for (unsigned int i = 0; i < num_peers; i++) { + // Fork this process and connect! + pid_t pid = fork(); + if (!pid) { // this is the child process + client_fork(globals, pid, i); + } + } + + //INCOMING PARENT + struct mq_attr attr; + mqd_t incoming_parent; + char in_buffer[MSG_BUFFER_SIZE], *ser_buffer; + unsigned int priority; + int id; + size_t buf_size; + + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + incoming_parent = mq_open( + globals->q_client, + O_RDONLY | O_CREAT, + QUEUE_PERMISSIONS, + &attr + ); + if (incoming_parent == -1) { + perror ("Client: mq_open (server)"); + exit (1); + } + + // Now pop off the queues whenever something is added! + while (1) { + if (mq_receive(incoming_parent, in_buffer, MAX_MSG_SIZE, &priority) == -1) { + perror ("Client: mq_receive"); + exit (1); + } + + id = *(int*)in_buffer; + buf_size = *(size_t*)(in_buffer + sizeof(int)); + ser_buffer = in_buffer + sizeof(int) + sizeof(size_t); + + printf( + "Client parent recieved %lu byte object of id '%d'\n", + buf_size, id + ); + if (id == BLOCK_ID) { + Block *new_block = deser_block_alloc(NULL, (unsigned char *)ser_buffer); + printf("Block received over network:\n"); + print_block(new_block, ""); + queue_add_void(globals->queue_block, new_block); + } + else if (id == TX_ID) { + Transaction *new_tx = deser_tx_alloc(NULL, (unsigned char *)ser_buffer); + printf("Tx received over network:\n"); + print_tx(new_tx, ""); + queue_add_void(globals->queue_tx, new_tx); + } + } + + return 0; +} diff --git a/src/runtime/includes/client.h b/src/runtime/includes/client.h new file mode 100644 index 0000000..da52a03 --- /dev/null +++ b/src/runtime/includes/client.h @@ -0,0 +1,4 @@ +#define MAXDATASIZE 100 // max number of bytes we can get at once + + +void *client_thread(); \ No newline at end of file diff --git a/src/runtime/includes/runtime.h b/src/runtime/includes/runtime.h index d3a9cfc..231bf32 100644 --- a/src/runtime/includes/runtime.h +++ b/src/runtime/includes/runtime.h @@ -2,8 +2,21 @@ #include "queue.h" #include "pthread.h" #include "semaphore.h" +#include -typedef struct Globals { +#define MAX_INCOMING_CONNECTIONS 10 +#define INCOMING_QUEUE "/client_incoming" +#define OUTGOING_QUEUE_FORMAT "/server_outgoing_%d" + +#define QUEUE_PERMISSIONS 0660 +#define MAX_MESSAGES 10 +#define MAX_MSG_SIZE 256 +#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10 + +#define BLOCK_ID 1 +#define TX_ID 0 + +typedef struct { Queue *queue_block; Queue *queue_tx; pthread_mutex_t utxo_pool_lock; @@ -14,4 +27,7 @@ typedef struct Globals { pthread_mutex_t mempool_lock; int *miner_update; pthread_mutex_t miner_update_lock; -} Globals; \ No newline at end of file + int connected; + char **q_server_individual; // send the same thing out to multiple nodes + char *q_client; // Only one queue from all the incoming blocks +} Globals; diff --git a/src/runtime/includes/server.h b/src/runtime/includes/server.h new file mode 100644 index 0000000..19dc993 --- /dev/null +++ b/src/runtime/includes/server.h @@ -0,0 +1,13 @@ +#include + +#define PORT_INT 3690 +#define PORT "3690" // the port users will be connecting to + +#define BACKLOG 10 // how many pending connections queue will hold + +void sigchld_handler(int s); + +// get sockaddr, IPv4 or IPv6: +void *get_in_addr(struct sockaddr *sa); + +void *server_thread(); \ No newline at end of file diff --git a/src/runtime/queue_client.c b/src/runtime/queue_client.c new file mode 100644 index 0000000..ca8d0f5 --- /dev/null +++ b/src/runtime/queue_client.c @@ -0,0 +1,88 @@ +/* + * client.c: Client program + * to demonstrate interprocess communication + * with POSIX message queues + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#define SERVER_QUEUE_NAME "/sp-example-server" +#define CLIENT_QUEUE_NAME "/sp-example-client" +#define QUEUE_PERMISSIONS 0660 +#define MAX_MESSAGES 10 +#define MAX_MSG_SIZE 256 +#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10 + +int main (int argc, char **argv) +{ + //char client_queue_name [64]; + mqd_t qd_server, qd_client; // queue descriptors + + + // create the client queue for receiving messages from server + // sprintf (client_queue_name, "/sp-example-client-%d", getpid ()); + + struct mq_attr attr; + + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + + if ((qd_client = mq_open (CLIENT_QUEUE_NAME, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + perror ("Client: mq_open (client)"); + exit (1); + } + + if ((qd_server = mq_open (SERVER_QUEUE_NAME, O_WRONLY)) == -1) { + perror ("Client: mq_open (server)"); + exit (1); + } + + char in_buffer [MSG_BUFFER_SIZE]; + + printf ("Ask for a token (Press ): "); + + char temp_buf [10]; + + while (fgets (temp_buf, 2, stdin)) { + char *test = "test"; + // send message to server + if (mq_send (qd_server, test, strlen (test) + 1, 0) == -1) { + perror ("Client: Not able to send message to server"); + continue; + } + + // receive response from server + + if (mq_receive (qd_client, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) { + perror ("Client: mq_receive"); + exit (1); + } + // display token received from server + printf ("Client: Token received from server: %s\n\n", in_buffer); + + printf ("Ask for a token (Press ): "); + } + + + if (mq_close (qd_client) == -1) { + perror ("Client: mq_close"); + exit (1); + } + + if (mq_unlink (CLIENT_QUEUE_NAME) == -1) { + perror ("Client: mq_unlink"); + exit (1); + } + printf ("Client: bye\n"); + + exit (0); +} diff --git a/src/runtime/queue_server.c b/src/runtime/queue_server.c new file mode 100644 index 0000000..5377a94 --- /dev/null +++ b/src/runtime/queue_server.c @@ -0,0 +1,70 @@ +/* + * server.c: Server program + * to demonstrate interprocess commnuication + * with POSIX message queues + */ + +#include +#include +#include +#include + +#include +#include +#include + +#define SERVER_QUEUE_NAME "/sp-example-server" +#define CLIENT_QUEUE_NAME "/sp-example-client" +#define QUEUE_PERMISSIONS 0660 +#define MAX_MESSAGES 10 +#define MAX_MSG_SIZE 256 +#define MSG_BUFFER_SIZE MAX_MSG_SIZE + 10 + +int main (int argc, char **argv) +{ + mqd_t qd_server, qd_client; // queue descriptors + long token_number = 1; // next token to be given to client + + printf ("Server: Hello, World!\n"); + + struct mq_attr attr; + + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + + if ((qd_server = mq_open (SERVER_QUEUE_NAME, O_RDONLY | O_CREAT, QUEUE_PERMISSIONS, &attr)) == -1) { + perror ("Server: mq_open (server)"); + exit (1); + } + char in_buffer [MSG_BUFFER_SIZE]; + char out_buffer [MSG_BUFFER_SIZE]; + + while (1) { + // get the oldest message with highest priority + if (mq_receive (qd_server, in_buffer, MSG_BUFFER_SIZE, NULL) == -1) { + perror ("Server: mq_receive"); + exit (1); + } + + printf ("Server: message received.\n"); + + // send reply message to client + + if ((qd_client = mq_open (CLIENT_QUEUE_NAME, O_WRONLY)) == 1) { + perror ("Server: Not able to open client queue"); + continue; + } + + sprintf (out_buffer, "%ld", token_number); + + if (mq_send (qd_client, out_buffer, strlen (out_buffer) + 1, 0) == -1) { + perror ("Server: Not able to send message to client"); + continue; + } + + printf ("Server: response sent to client.\n"); + token_number++; + } +} \ No newline at end of file diff --git a/src/runtime/runtime.c b/src/runtime/runtime.c index 8d86527..a180099 100644 --- a/src/runtime/runtime.c +++ b/src/runtime/runtime.c @@ -1,16 +1,25 @@ -#include "queue.h" #include #include "pthread.h" -#include "validate_block.h" -#include "handle_block.h" + +#include "queue.h" + +#include "ser_tx.h" #include "validate_tx.h" #include "handle_tx.h" + +#include "ser_block.h" #include "create_block.h" -#include "shell.h" -#include "runtime.h" +#include "validate_block.h" +#include "handle_block.h" + #include "init_globals.h" #include "init_db.h" +#include "runtime.h" +#include "shell.h" +#include "server.h" +#include "client.h" + Globals *init_globals(){ Globals *new_globals = malloc(sizeof(Globals)); new_globals->queue_block = queue_init(); @@ -44,40 +53,95 @@ Globals *init_globals(){ new_globals->miner_update = malloc(sizeof(int)); *new_globals->miner_update = 1; + new_globals->q_server_individual = malloc(MAX_INCOMING_CONNECTIONS * sizeof(char *)); + new_globals->connected = 0; + new_globals->q_client = malloc(strlen(INCOMING_QUEUE) + 1); + strcpy(new_globals->q_client, "/client_incoming"); + return new_globals; } +unsigned char *init_queue_buffer(unsigned char *buf, int id, size_t size) { + memcpy(buf, &id, sizeof(id)); + memcpy(buf + sizeof(id), &size, sizeof(size)); + return buf + sizeof(id) + sizeof(size); +} + +void propogate_object(Globals *globals, void *obj, int id){ + char *q_name; + mqd_t child_mq; + unsigned char buf[MAX_MSG_SIZE], *ser_buf; + size_t buf_size; + + switch (id) { + case BLOCK_ID: + ser_buf = init_queue_buffer(buf, id, size_ser_block(obj)); + buf_size = ser_block(ser_buf, obj); + break; + case TX_ID: + ser_buf = init_queue_buffer(buf, id, size_ser_tx(obj)); + buf_size = ser_tx(ser_buf, obj); + break; + default: + perror("Invalid ID passed to propogate_object()"); + exit(1); + } + buf_size += ser_buf - buf; + + printf("Recieved object of ID '%d', adding to queues\n", id); + for (int i = 0; i < globals->connected; i++) { + q_name = globals->q_server_individual[i]; + if ((child_mq = mq_open(q_name, O_WRONLY)) == -1) { + perror("Main Process: mq_open(server)"); + exit(1); + } + if (mq_send(child_mq, (char*)buf, buf_size, id) == -1) { + perror("Main Process: Not able to send message to server"); + } + if (mq_close(child_mq) == -1) { + perror("Main Process: mq_close"); + exit(1); + } + } +} + void *node_block_thread(void *arg){ Globals *globals = arg; - while(1){ + while (1) { printf("Node Thread Waiting to pop Block\n"); Block *popped_block = queue_pop_void(globals->queue_block); printf("Node Thread Block Popped\n"); + //Now aquire locks for validation (Utxo pool and blockcahin) printf("Node Thread Waiting on lock for validation\n"); pthread_mutex_lock(&globals->utxo_pool_lock); pthread_mutex_lock(&globals->blockchain_lock); - int block_valid = validate_block(popped_block); + int block_valid = validate_block(popped_block); - if(block_valid == 0){ - //Now aquire additional locks for handling + if (block_valid == 0) { + //Now acquire additional locks for handling printf("Node Thread Waiting on lock for Handling\n"); pthread_mutex_lock(&globals->utxo_to_tx_lock); pthread_mutex_lock(&globals->wallet_pool_lock); pthread_mutex_lock(&globals->key_pool_lock); pthread_mutex_lock(&globals->mempool_lock); + accept_block(popped_block); + propogate_object(globals, popped_block, BLOCK_ID); free_block(popped_block); + pthread_mutex_lock(&globals->miner_update_lock); *globals->miner_update = 0; pthread_mutex_unlock(&globals->miner_update_lock); + pthread_mutex_unlock(&globals->utxo_to_tx_lock); pthread_mutex_unlock(&globals->wallet_pool_lock); pthread_mutex_unlock(&globals->key_pool_lock); pthread_mutex_unlock(&globals->mempool_lock); - printf("Node Thread unlockedr Handling\n"); + printf("Node Thread unlocked Handling\n"); } + pthread_mutex_unlock(&globals->utxo_pool_lock); pthread_mutex_unlock(&globals->blockchain_lock); } @@ -86,13 +150,12 @@ void *node_block_thread(void *arg){ void *node_tx_thread(void *arg){ Globals *globals = arg; - while(1){ - printf("NodeTX Thread Waiting to pop TX\n"); + while(1) { + //printf("NodeTX Thread Waiting to pop TX\n"); Transaction *popped_tx = queue_pop_void(globals->queue_tx); - printf("NodeTX Thread TX Popped\n"); + //printf("NodeTX Thread TX Popped\n"); //Now aquire locks for validation (Utxo pool and blockcahin) - printf("NodeTX Thread Waiting on lock for validation\n"); - + //printf("NodeTX Thread Waiting on lock for validation\n"); pthread_mutex_lock(&globals->utxo_pool_lock); pthread_mutex_lock(&globals->utxo_to_tx_lock); @@ -100,16 +163,16 @@ void *node_tx_thread(void *arg){ int tx_valid = validate_tx_incoming(popped_tx); - if(tx_valid == 0){ - //Now aquire additional locks for handling handle_tx(popped_tx); + propogate_object(globals, popped_tx, TX_ID); free_tx(popped_tx); } + pthread_mutex_unlock(&globals->mempool_lock); pthread_mutex_unlock(&globals->utxo_to_tx_lock); pthread_mutex_unlock(&globals->utxo_pool_lock); - printf("NodeTX Thread unlocked from validation\n"); + //printf("NodeTX Thread unlocked from validation\n"); } return NULL; } @@ -119,20 +182,20 @@ void *miner_thread(void *arg){ unsigned long hash_check_flag = 10000; int new_block_in_chain = 1; //1 is false 0 is true while(1){ - printf("Miner Thread waiting on lock to create Block\n"); + //printf("Miner Thread waiting on lock to create Block\n"); //Now aquire locks for creating a block! pthread_mutex_lock(&globals->utxo_pool_lock); pthread_mutex_lock(&globals->blockchain_lock); pthread_mutex_lock(&globals->key_pool_lock) ; pthread_mutex_lock(&globals->mempool_lock); - printf("Miner Thread got lock to create Block\n"); + //printf("Miner Thread got lock to create Block\n"); Block *new_block = create_block_alloc(); pthread_mutex_unlock(&globals->utxo_pool_lock); pthread_mutex_unlock(&globals->blockchain_lock); pthread_mutex_unlock(&globals->key_pool_lock) ; pthread_mutex_unlock(&globals->mempool_lock); - printf("Miner Thread done creating Block\n"); + //printf("Miner Thread done creating Block\n"); while(try_header_hash(&(new_block->header)) != 0){ change_nonce(new_block); @@ -160,6 +223,7 @@ void *miner_thread(void *arg){ return NULL; } + void *shell_thread(void *arg){ Globals *globals = arg; shell_init(); @@ -174,31 +238,35 @@ int main() { // Intitialize the globabls! node_init(PROD_DB_LOC); - pthread_t node_block, node_tx, shell, miner; - int node_block_ret, node_tx_ret, shell_ret, miner_ret; + pthread_t node_block, node_tx, shell, miner, server, client; + int node_block_ret, node_tx_ret, shell_ret, miner_ret, server_ret, client_ret; + /* Create independent threads each of which will execute function */ + server_ret = pthread_create( &server, NULL, server_thread, (void*) globals); + sleep(5); + // Now Create the Client Thread + client_ret = pthread_create( &client, NULL, client_thread, (void*) globals); - // node_block_ret = pthread_create( &node_block, NULL, pop, (void*) globals->queue_block); - // sleep(1); - // node_tx_ret = pthread_create( &node_tx, NULL, add_and_delay, (void*) globals->queue_block); - node_block_ret = pthread_create( &node_block, NULL, node_block_thread, (void*) globals); node_tx_ret = pthread_create( &node_tx, NULL, node_tx_thread, (void*) globals); shell_ret = pthread_create( &shell, NULL, shell_thread, (void*) globals); miner_ret = pthread_create( &miner, NULL, miner_thread, (void*) globals); - + /* Wait till threads are complete before main continues. Unless we */ /* wait we run the risk of executing an exit which will terminate */ /* the process and all threads before the threads have completed. */ pthread_join(shell, NULL); - pthread_join(node_block, NULL); - pthread_join(node_tx, NULL); - pthread_join(miner, NULL); + // pthread_join(server, NULL); + // pthread_join(node_block, NULL); + // pthread_join(node_tx, NULL); + // pthread_join(miner, NULL); printf("Node Block returns: %d\n",node_block_ret); printf("Node TX returns: %d\n", node_tx_ret); printf("Shell returns: %d\n", shell_ret); printf("Miner returns: %d\n", miner_ret); - return 0; + printf("Server returns: %d\n", server_ret); + printf("Client returns: %d\n", client_ret); + return 0; } diff --git a/src/runtime/server.c b/src/runtime/server.c new file mode 100644 index 0000000..aa31bb8 --- /dev/null +++ b/src/runtime/server.c @@ -0,0 +1,235 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "server.h" +#include "mqueue.h" +#include "runtime.h" + +void sigchld_handler(int s) +{ + (void)s; // quiet unused variable warning + + // waitpid() might overwrite errno, so we save and restore it: + int saved_errno = errno; + + while(waitpid(-1, NULL, WNOHANG) > 0); + + errno = saved_errno; +} + + +// get sockaddr, IPv4 or IPv6: +void *get_in_addr(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} + +int build_listen_socket() { + int sockfd, yes, rv; // listen on sock_fd, new connection on new_fd + struct addrinfo hints, *servinfo, *p; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; // use my IP + + if ((rv = getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); + exit(1); + } + + // loop through all the results and bind to the first we can + yes = 1; + for (p = servinfo; p != NULL; p = p->ai_next) { + if ((sockfd = socket(p->ai_family, p->ai_socktype, + p->ai_protocol)) == -1) { + perror("server: socket"); + continue; + } + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, + sizeof(int)) == -1) { + perror("setsockopt"); + exit(1); + } + if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { + close(sockfd); + perror("server: bind"); + continue; + } + break; + } + freeaddrinfo(servinfo); // all done with this structure + + if (p == NULL) { + fprintf(stderr, "server: failed to bind\n"); + exit(1); + } + + if (listen(sockfd, BACKLOG) == -1) { + perror("listen"); + exit(1); + } + + return sockfd; +} + +void build_sig_handler() { + struct sigaction sa; + sa.sa_handler = sigchld_handler; // reap all dead processes + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_RESTART; + if (sigaction(SIGCHLD, &sa, NULL) == -1) { + perror("sigaction"); + exit(1); + } +} + +// From Beej's Guide +int send_all(int s, char *buf, size_t *len) { + size_t total = 0; // how many bytes we've sent + size_t bytes_left = *len; // how many we have left to send + int n; + + while (total < *len) { + n = send(s, buf + total, bytes_left, 0); + if (n == -1) { break; } + total += n; + bytes_left -= n; + } + + *len = total; // return number actually sent here + + return n == -1 ? -1 : 0; // return -1 on failure, 0 on success +} + +void server_fork(Globals *globals, int sockfd, pid_t pid) { + mqd_t child_mq; + struct mq_attr attr; + char in_buffer[MAX_MSG_SIZE]; + unsigned long counter; + size_t buf_size, total_size; + + printf( + "Opening Queue from server child: %s\n", + globals->q_server_individual[globals->connected] + ); + attr.mq_flags = 0; + attr.mq_maxmsg = MAX_MESSAGES; + attr.mq_msgsize = MAX_MSG_SIZE; + attr.mq_curmsgs = 0; + child_mq = mq_open( + globals->q_server_individual[globals->connected], + O_RDONLY | O_CREAT, + QUEUE_PERMISSIONS, + &attr + ); + if (child_mq == -1) { + perror("Server: mq_open (server)"); + exit(1); + } + + counter = 1; + while (pid != 1){ + printf("SERVER FORK LOOP\n"); + // Check to see if parent killed + if(counter % 10000){ + pid = getpid(); + } + + if (mq_receive(child_mq, in_buffer, MAX_MSG_SIZE, NULL) == -1) { + perror("Client: mq_receive"); + exit(1); + } + + buf_size = *(size_t*)(in_buffer + sizeof(int)); + total_size = buf_size + sizeof(int) + sizeof(size_t); + + printf( + "Server recieved obj of size %lu from buffer. Sending...\n", + buf_size + ); + + if (send_all(sockfd, in_buffer, &total_size) == -1) { + perror("Server: send_all"); + exit(1); + } + } + + if (mq_close(child_mq) == -1) { + perror("Server: mq_close"); + exit(1); + } + close(sockfd); + exit(0); +} + +void handle_accept(Globals *globals, int sockfd) { + int new_fd; + size_t name_size; + struct sockaddr_storage their_addr; // connector's address information + socklen_t sin_size; + char s[INET6_ADDRSTRLEN]; + + sin_size = sizeof(their_addr); + new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); + if (new_fd == -1) { + perror("accept"); + return; + } + + inet_ntop( + their_addr.ss_family, + get_in_addr((struct sockaddr *)&their_addr), + s, sizeof s + ); + printf("server: got connection from '%s'\n", s); + + name_size = snprintf(NULL, 0, OUTGOING_QUEUE_FORMAT, globals->connected); + globals->q_server_individual[globals->connected] = malloc(name_size + 1); + sprintf( + globals->q_server_individual[globals->connected], + OUTGOING_QUEUE_FORMAT, + getpid() + ); + printf( + "Made new queue name '%s'\n", + globals->q_server_individual[globals->connected] + ); + + pid_t pid = fork(); + if (!pid) { // this is the child process + close(sockfd); // child doesn't need the listener + server_fork(globals, new_fd, pid); + } + + close(new_fd); // parent doesn't need this + globals->connected++; +} + +void *server_thread(void *arg){ + Globals *globals = arg; + int sockfd; + + sockfd = build_listen_socket(); + build_sig_handler(); + + printf("server: waiting for connections...\n"); + while (1) { + handle_accept(globals, sockfd); + } + + return NULL; +} diff --git a/src/runtime/shell.c b/src/runtime/shell.c index bef76f8..1f131b6 100644 --- a/src/runtime/shell.c +++ b/src/runtime/shell.c @@ -181,6 +181,7 @@ int shell_exit(Globals *globals, char **args) { int shell_help(Globals *globals, char **args) { int len; (void)args; + (void)globals; len = sizeof(shell_commands) / sizeof(Command); printf("Available shell commands:\n");