// // File: // // Written by: David M. Stanhope // // Program to run a slave vserver, forwards all queries to the // master machine. Really written as a network migration tool, // since only database was one on the master vserver. This code // not needed anymore since now have mode to run duplicate // vservers, which share database updates. // // TODO: // #include #include #include #include #include #include #include #include #include #include "timestamp.h" static char server_name[] = "192.168.17.89"; static int server_port = 8009 ; #define MSG(x) { printf x ; fflush(stdout); } #define ERR(x) { printf x ; fflush(stdout); } #define MSG0(x) MSG(x) #define MSG1(x) // nothing #define MSG2(x) // nothing typedef unsigned long long U64; typedef int SOCKET; #define SOCKET_ERROR (-1) #define INVALID_SOCKET (-1) #define Socket_Close(fd) close(fd) #define Socket_Error() Show_Error() //#include "vars.inc" #define ONE_MINUTE (60) // seconds #define ONE_HOUR (60 * ONE_MINUTE) static int port_listen = 8009 ; static int interval_select = 30 ; // in seconds typedef struct _connection_ { struct _connection_ *next ; SOCKET fd ; u_short port ; struct in_addr sin_addr; char name[64]; } CONNECTION; static CONNECTION *connection_head = NULL; // --------------------------------------------------------------------------- static char * Show_Error(void) { static char buf[256]; sprintf(buf, "%d:%s", errno, strerror(errno)); return buf; } // --------------------------------------------------------------------------- char * scan_to(char **s, int t) { int c; char *cp, *bp; bp = *s; cp = bp; while(c = *cp) { if(c == t) { *cp++ = '\0'; *s = cp; return bp; } cp++; } if(t == '\0') { *s = cp; return bp; } ERR(("scan_to: end-of-string on (%s)\n", bp)) exit(-1); } static int update_check(char *src, char *dst, int len) { len--; // insure room for the '\0'; if(strlen(src) > len) { src[len] = '\0'; } if(strcmp(src, dst) == 0) return 0; // no change strcpy(dst, src); return 1; } // --------------------------------------------------------------------------- static void new_connection(SOCKET fd, struct sockaddr_in *pa) { CONNECTION *cp; if((cp = (CONNECTION *) malloc(sizeof(CONNECTION))) == NULL) { ERR(("new_conneciton: malloc failed!\n")) exit(-1); } cp->port = ntohs(pa->sin_port); memcpy(&(cp->sin_addr), &(pa->sin_addr), sizeof(struct in_addr)); sprintf(cp->name, "%s:%d:%d", inet_ntoa(cp->sin_addr), cp->port, fd); cp->fd = fd; cp->next = connection_head; connection_head = cp; MSG1(("%s Connect\n", cp->name)) } // --------------------------------------------------------------------------- #include "util.c" // --------------------------------------------------------------------------- void respond(CONNECTION *cp, char *msg) { write(cp->fd, msg, strlen(msg) + 1); } // --------------------------------------------------------------------------- void do_query(CONNECTION *cp, char *bp, int blen) { int fd, n, r; MSG0(("QUERY(%s)\n", bp)) // connect to 'vserver' if((fd = Connect_TCP(server_name, server_port)) < 0) { // error message already output exit(-1); } n = strlen(bp) + 1; // lenght + '\0' if((r = write(fd, bp, n)) != n) // send message to 'vserver' { ERR(("Write To Server Failed, sent(%d), returned(%d)\n", n, r)) close(fd); exit(-1); } // TODO: READ UNTIL TIMEOUT OR GET TRAILING '\0' n = read(fd, bp, blen); // get response from 'vserver' // TODO: ADD SOME VALIDATION close(fd); if(n < 0) { ERR(("Response Error(%s)\n", Show_Error())) exit(-1); } else if(n == 0) { ERR(("Response Empty\n")) exit(-1); } else if(bp[n - 1] != '\0') { ERR(("Response Bogus, No EOS\n")) exit(-1); } else { MSG0(("Response(%s)\n", bp)) } respond(cp, bp); } // --------------------------------------------------------------------------- main(int argc, char *argv[]) { int n, r; fd_set imask; struct timeval timer; struct sockaddr peer_addr; struct sockaddr_in server, *pa; SOCKET fd_accept, fd_peer, fd_max; CONNECTION *cp, *np; char ibuf[512]; MSG(("VSLAVE Started (%s)\n", timestamp())) #if 0 if(argc != 2) { MSG(("Master address not specified\n")) exit(-1); } #endif // ----------------------------------------------------------------------- // build socket to listen for connections on // ----------------------------------------------------------------------- if((fd_accept = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { ERR(("socket-accept: error(%s)\n", Socket_Error())) exit(-1); } #if 1 n = 1; setsockopt(fd_accept, SOL_SOCKET, SO_REUSEADDR, (char *) &n, sizeof(n)); n = 1; setsockopt(fd_accept, SOL_SOCKET, SO_REUSEPORT, (char *) &n, sizeof(n)); #endif memset((char *) &server, 0, sizeof(server)); server.sin_len = sizeof(struct sockaddr_in); server.sin_family = AF_INET ; server.sin_port = htons((u_short) port_listen); server.sin_addr.s_addr = htonl(INADDR_ANY); if(bind(fd_accept, (struct sockaddr *) &server, sizeof(server)) == SOCKET_ERROR) { ERR(("bind: error(%s)\n", Socket_Error())) exit(-1); } if(listen(fd_accept, 5) == SOCKET_ERROR) { ERR(("listen: error(%s)\n", Socket_Error())) exit(-1); } MSG(("Waiting for Connections (%s)\n", timestamp())) while(1) { fd_max = 0; FD_ZERO(&imask); FD_SET(fd_accept, &imask); if(fd_accept > fd_max) { fd_max = fd_accept; } for(cp = connection_head; cp; cp = cp->next) { FD_SET(cp->fd, &imask); if(cp->fd > fd_max) { fd_max = cp->fd; } } timer.tv_sec = interval_select; timer.tv_usec = 0; r = select(fd_max + 1, &imask, NULL, NULL, &timer); if(r < 0) { ERR(("select: r(%d) error(%s)\n", r, Socket_Error())) exit(-1); } if(r == 0) { continue; } if(FD_ISSET(fd_accept, &imask)) // any connections { n = sizeof(struct sockaddr); if((fd_peer = accept(fd_accept, &peer_addr, &n)) == INVALID_SOCKET) { ERR(("socket-accept: error(%s)\n", Socket_Error())) exit(-1); } new_connection(fd_peer, (struct sockaddr_in *) &peer_addr); } // as scan the list, also rebuild it, dropping any that closed for(cp = connection_head, connection_head = NULL; cp; cp = np) { np = cp->next; if(FD_ISSET(cp->fd, &imask)) // any data { if((n = read(cp->fd, ibuf, 512)) <= 0) { MSG1(("%s Close on (%d)\n", cp->name, n)) Socket_Close(cp->fd); free(cp); continue; // drop from list by not relinking it } // got something if((n < 1) || (ibuf[n - 1] != '\0')) { MSG(("%s Bogus Message, no EOS (%d)\n", cp->name, n)) Socket_Close(cp->fd); free(cp); continue; // drop from list by not relinking it } // add REMOTE_ADDRESS and REMOTE_PORT to message since // 'vserver' can't see client via getpeername() sprintf(ibuf + strlen(ibuf), "|peer_ip=%s|peer_port=%d", inet_ntoa(cp->sin_addr), cp->port); do_query(cp, ibuf, sizeof(ibuf)); } // keep this connection by relinking it to the list cp->next = connection_head; connection_head = cp; } } } // // The End! //