/* A very simple TCP server that implements a trivial protocol. This version of the server parallelizes the serial server by using a pool of worker pthreads. */ #include // for fdopen(), fprintf(), fflush(), fgetc() #include // for malloc(), free(), NULL, atoi(), and exit() #include // for memset(), bzero(), bcopy(), and strerror() #include // for errno and EINTR #include #include // for close() and fork() #include // for socket(), bind(), listen(), accept(), setsockopt() #include // for sockaddr_in, htons(), htonl(), inet_ntoa() #include // for gethostbyaddr(), hostent, hstrerror(), and h_errno #include // for waitpid() and WNOHANG #include #include // prototypes for two error handling functions (defined below) void unix_error(char *msg); void dns_error(char *msg); #define LISTENQUEUE 8 // second argument to listen() // thread prototype void *worker_thread(void *vargp); #define NUMBER_OF_THREADS 3 #define STACK_SIZE 4 typedef struct // data structure for holding client file descriptors { int top_of_stack; pthread_mutex_t mutex; sem_t sem_empty; // counts empty slots sem_t sem_full; // counts full slots int stack[STACK_SIZE]; } Stack_t; int main(int argc, char **argv) { int i; int socket_fd; // listening socket file descriptor int connected_fd; // connected socket file descriptor int optionvalue = 1; // used in setsockopt() struct sockaddr_in localAddr; // server's address structure struct sockaddr_in clientAddr; // clients's address structure struct hostent *hp; // used to get client's IP address int clientLength; // needed for accept() char *h_addressp; pthread_t tid[NUMBER_OF_THREADS]; Stack_t stack_of_fd; stack_of_fd.top_of_stack = 0; // initialize the mutex (in signaled state) pthread_mutex_init(&(stack_of_fd.mutex), NULL); // initialize the counting semaphores sem_init(&(stack_of_fd.sem_empty), 0, STACK_SIZE); sem_init(&(stack_of_fd.sem_full), 0, 0); // create the thread pool for (i = 0; i < NUMBER_OF_THREADS; i++) { // start a thread and pass it the address // of the Stack data structure pthread_create(tid+i, NULL, worker_thread, &stack_of_fd); } if (argc != 2) { fprintf(stderr, "usage: %s \n", argv[0]); exit(0); } // Create a reliable, stream socket using TCP. if ((socket_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) unix_error("socket() error"); // Eliminate "Address already in use" error from bind(). if ( setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&optionvalue, sizeof(int)) < 0 ) unix_error("setsockopt() error"); // Construct a local address structure. memset(&localAddr, 0, sizeof(localAddr)); // zero out the address structure localAddr.sin_family = AF_INET; // Internet address family localAddr.sin_addr.s_addr = htonl(INADDR_ANY); // any incoming interface localAddr.sin_port = htons( atoi(argv[1]) ); // fill in the port number // Bind to the local address if (bind(socket_fd, (struct sockaddr*)&localAddr, sizeof(localAddr)) < 0) unix_error("bind() error"); // Make it a listening socket ready to accept connection requests if (listen(socket_fd, LISTENQUEUE) < 0) unix_error("listen() error"); while(1) { clientLength = sizeof(clientAddr); // needed by accept() // Wait for a connection from a client. while( (connected_fd = accept(socket_fd, (struct sockaddr*)&clientAddr, &clientLength)) < 0) { if (errno == EINTR) { fprintf(stderr, "Restarting accept because: %s (errno = %d)\n", strerror(errno), errno); } else unix_error("accept() error"); } // Determine the domain name and IP address of the client. hp = gethostbyaddr( (const char*)&clientAddr.sin_addr.s_addr, sizeof(clientAddr.sin_addr.s_addr), AF_INET ); if ( hp == NULL ) dns_error("gethostbyaddr() error"); h_addressp = inet_ntoa(clientAddr.sin_addr); fprintf(stderr, "server connected to %s (%s)\n", hp->h_name, h_addressp); // put the connected file descriptor in the stack if (sem_wait(&(stack_of_fd.sem_empty)) < 0) unix_error("sem_wait() error"); pthread_mutex_lock(&(stack_of_fd.mutex)); (stack_of_fd.stack)[stack_of_fd.top_of_stack] = connected_fd; stack_of_fd.top_of_stack++; pthread_mutex_unlock(&(stack_of_fd.mutex)); if (sem_post(&(stack_of_fd.sem_full)) < 0) unix_error("sem_post() error"); } exit(0); }//main // Thread function that handles one client connection. // Each thread is a "consumer" of connected file descriptors. void *worker_thread(void *vargp) { FILE *connected_fpout; // connected socket file pointer (i.e., stream) FILE *connected_fpin; // connected socket file pointer int connected_fd; Stack_t* stack_of_fd_p = (Stack_t*)vargp; while(1) { // get a file descriptor out of the stack if (sem_wait(&(stack_of_fd_p->sem_full)) < 0) unix_error("sem_wait() error"); pthread_mutex_lock(&(stack_of_fd_p->mutex)); connected_fd = (stack_of_fd_p->stack)[(stack_of_fd_p->top_of_stack)-1]; stack_of_fd_p->top_of_stack--; pthread_mutex_unlock(&(stack_of_fd_p->mutex)); if (sem_post(&(stack_of_fd_p->sem_empty)) < 0) unix_error("sem_post() error"); // Convert the connected socket file descriptor to file pointers // (i.e., streams) and use C Standard Library I/O functions. // (But see the textbook CS:APP, pages 796-797.) if ( (connected_fpout = fdopen(connected_fd, "w")) == NULL ) unix_error("fdopen() error" ); if ( (connected_fpin = fdopen(connected_fd, "r")) == NULL ) unix_error("fdopen() error" ); // Send a welcome message to the client. fprintf(connected_fpout, "Welcome to the world's dumbest Internet server.\n"); fflush(connected_fpout); // Now block the thread on I/O from the client. char c = fgetc(connected_fpin); fprintf(stdout, "Got %c from client.\n", c); fflush(stdout); // thread closes "connected" socket if (close(connected_fd) < 0) unix_error("close() error"); } // should never get here return NULL; } /* Below are two error handling functions. They provide reasonably meaningful error messages. */ void unix_error(char *msg) { fprintf(stderr, "%s: %s (errno = %d)\n", msg, strerror(errno), errno); exit(1); } void dns_error(char *msg) { fprintf(stderr, "%s: %s (h_errno = %d)\n", msg, hstrerror(h_errno), h_errno); exit(1); }