C++ || Multi Process Server & Client Hash Table Using Thread Pools, Message Queues, & Signal Handlers
The following is another homework assignment which was presented in an Operating Systems Concepts class. Using commandline arguments, the following is a program which implements a multi threaded hash table, utilizing message queues to pass text from a client program to a server program and vice versa. This program makes use of multiple interprocess communication function calls provided on Unix based systems.
REQUIRED KNOWLEDGE FOR THIS PROGRAM
How To Override The Default Signal Handler (CTRL-C)
How To Create And Use Pthreads For Interprocess Communication
How To Use Message Queues For Interprocess Communication
Multi Process Synchronization Producer Consumer Problem Using Pthreads
Sample Input Server Records File - Download Here
==== 1. OVERVIEW ====
Hash tables are widely used for efficient storage and retrieval of data. When using hash tables in multi threaded applications, you must ensure that concurrent accesses into the hash table is free from race conditions, dead locks, and other problems that arise in program synchronization.
One solution to overcome this problem is to prevent concurrent accesses into the hash table altogether; i.e. prior to accessing the hash table, a thread acquires a lock, and then releases the lock after the access is complete. Such approach, although simple, is inefficient. The program demonstrated on this page implements an alternative solution, one which permits safe concurrent accesses into the hash table. In the approach implemented on this page, each hash location within a hash table is protected with a separate lock. Hence, multiple threads access the hash table concurrently as long as they are accessing different hash locations. For greater efficiency, this program also makes use of a thread pool.
==== 2. TECHNICAL DETAILS ====
This program was implemented in two parts; a server program and a client program. The server side of the program maintains a hash table of records and a pool of threads. The client program requests from the server program search records by sending record ids over a message queue. The server program then retrieves a request from the message queue, and wakes up a thread in the thread pool. That awakened thread then uses the id (sent from the client program) to retrieve the corresponding record from the hash table, and sends the found record from the server program to the client program over the message queue.
The server also reads a specified file from the commandline, which stores initial user data that is to be inserted and stored into the hash table. The incoming text file has the following format:
a unique numerical id 1 (an integer)
the first name 1 (a string)
the last name 1 (a string)
.
.
.
a unique numerical id N (an integer)
the first name N (a string)
the last name N (a string)
These three fields make up one single record for one individual. More than one record may be present in the incoming text file.
==== 3. SERVER ====
The server has the following structure and function:
Multi Threaded Process Server Flow Control | SelectShow> |
---|---|
1. The server is invoked with two command line arguments specifying:
• The number of threads. 2. The server reads the specified file, then stores the records in the hash table. The hash table has the following structure and function:
• Each cell contains two items (1) a lock (mutex) protecting the cell; and (2) a linked-list of records hashing to the cell. • The hash table supports methods for inserting and retrieving records. • The hash keys of records are computed as record id mod hash table size where record id is the id associated with each record. 3. Creates a message queue using msgget() system call. 4. Creates the specified number of threads. 5. Each thread joins a thread pool. 6. The parent thread then checks the message queue for new messages using msgrcv() system call. 7. When a new message arrives the parent thread retrieves the message, adds it to a list of received messages, and wakes up a thread from the thread pool. 8. The awakened thread removes a message from the list and checks the id contained in the message. The id indicates that the client who sent the message wants to know the record associated with the respective id. 9. The thread then checks if the record with a given id exists in the table and if so retrieves it. The retrieved record is then sent to the client over the message queue. If the record does not exist, then the server sends back a record with id field set to -1. 10. The thread then removes the next message from the list and repeats the same process. If the list is empty, then the thread goes back to the thread pool. 11. The server also simulates a scenario where the hash table is constantly being updated with new records. You will simulate this by using 5 separate threads that periodically (e.g. every 1 second) randomly generate records and insert them into the hash table. 12. When the user presses Ctrl-c, the server catches the SIGINT signal, removes the message queue using msgctl(), deallocates any other resources (e.g. mutexes, condition variables, etc) and exits. To intercept the signal you will need to define a custom signal handler. |
The server program is invoked with the following commandline structure:
./Server [FILE NAME] [NUMBER OF THREADS] (e.g. ./Server database.dat 10)
The server is implemented below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 |
// ============================================================================= // Author: K Perkins // Date: Oct 5, 2013 // Taken From: http://programmingnotes.org/ // File: Server.cpp // Description: Using a Hash Table, this program simulates a server and client // application. The server program maintains a hash table of records and // a pool of threads. The client requests records from the server by sending // record ID's over the message queue. The server retrieves a request from // the message queue and wakes up a thread in the thread pool. The awakened // thread then uses the ID to retrieve the corresponding record from the // hash table and sends the record to the client over the message queue. // // This is the server program which creates a message queue using the msgget() // system call, checks the message queue for new messages using the msgrcv() // system call, and uses the msgsnd system call to send messages back to // the client // ============================================================================= #include <iostream> #include <vector> #include <list> #include <cstdlib> #include <cstring> #include <fstream> #include <csignal> #include <pthread.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <unistd.h> using namespace std; // Compile & Run // g++ Server.cpp -o Server -lpthread // ./Server <FILE NAME> <NUMBER OF THREADS> // function prototypes void SignalHandler(int arg); void InsertInitialDataIntoTable(ifstream& infile); void* SearchRecords(void* args); void SendRecord(int id, const char* firstName, const char* lastName); void* GenerateNewRecord(void* args); // items which are placed inside the hash table cell struct Record { int id; char firstName[50]; char lastName[50]; }; // cell for the hash table struct Cell { list<Record> cell; pthread_mutex_t mutex; }; // message queue structure struct MsgQueue { // the channel the connection will be on long messageType; int recordID; char recordName[100]; }; // constant variables const int HASH_TABLE_SIZE = 100; const int NUM_NEW_RECORDS_THREADS = 5; const int CLIENT_TO_SERVER_CHANNEL = 2; const int SERVER_TO_CLIENT_CHANNEL = 4; const int MSG_Q_ID = 0664; // global variables int msqid = 0; // message queue ID vector<Cell> hashTable(HASH_TABLE_SIZE); list<int> recievedMsg; int main(int argc, char* argv[]) { // declare variables srand(time(NULL)); signal(SIGINT, SignalHandler); // override the CTRL-C terminal singal ifstream infile; // used to read in the file int searchThreadNum = 0; // keep track of how many threads are being processed int newRecThreadNum = 0; int numSearchThreads = 0; // how many threads search the table for records key_t key = 0; // generate key to a file pthread_t* searchRecordThreads = NULL; // array of search records threads pthread_t* getNewRecordThreads = NULL; // array of new records threads MsgQueue msg; // instantiate a message queue // check if theres enough commandline args if(argc < 3) { cerr<<"n** ERROR NOT ENOUGH ARGS!nn" <<"USAGE: "<<argv[0]<<" <FILE NAME> <NUMBER OF THREADS>nn"; exit(1); } // open the input records file infile.open(argv[1]); // check to see if file exists if(infile.fail()) { cerr<<"n** ERROR: ""<<argv[1]<<"" could not be found!n"; exit(1); } // insert data into the hash table from the source file InsertInitialDataIntoTable(infile); // close the file after we are done using it infile.close(); // get the number of threads from the command line numSearchThreads = atoi(argv[2]); // initialize the search thread array with the number // specified from the commandline searchRecordThreads = new pthread_t[numSearchThreads]; // initialize the new records thread array with the // specified number getNewRecordThreads = new pthread_t[NUM_NEW_RECORDS_THREADS]; // get a key for our msg queue key = ftok("/bin/ls", 'O'); // check if we got a valid key if(key < 0) { perror("ftok error"); exit(1); } // get a msg ID msqid = msgget(key, MSG_Q_ID | IPC_CREAT); // check if we got a valid ID if(msqid < 0) { perror("msgget error"); exit(1); } cout<<"n** SERVER ID #"<<msqid<<" SUCCESSFULLY ESTABLISHEDn"<<endl; // infinite loop to send/recieve messages to/from client do{ // recieve record from client if(msgrcv(msqid, &msg, sizeof(msg) - sizeof(long), CLIENT_TO_SERVER_CHANNEL, 0) < 0) { perror("msgsnd error"); exit(1); } // save the recieved record from the client into a linked list // this is used in the "SearchRecords" function recievedMsg.push_back(msg.recordID); // --- SEARCH RECORDS THREADS // keep track of how many threads are being used if(searchThreadNum > numSearchThreads-1) { searchThreadNum = 0; } // create threads & send them to work if(pthread_create(&searchRecordThreads[searchThreadNum], NULL, &SearchRecords, NULL) < 0) { perror("pthread_create error"); exit(1); } // wait for threads to finish pthread_join(searchRecordThreads[searchThreadNum], NULL); // --- GENERATE NEW RECORDS THREADS // keep track of how many threads are being used if(newRecThreadNum > NUM_NEW_RECORDS_THREADS-1) { newRecThreadNum = 0; } // create threads & send them to work if(pthread_create(&getNewRecordThreads[newRecThreadNum], NULL, &GenerateNewRecord, NULL) < 0) { perror("pthread_create error"); exit(1); } // wait for threads to finish pthread_join(getNewRecordThreads[newRecThreadNum], NULL); // --- INCREMENT NUMBER OF THREADS BEING USED //cout<<searchThreadNum<<endl; ++searchThreadNum; ++newRecThreadNum; }while(true); return 0; }// end of main void* GenerateNewRecord(void* args) { const char* names[]={"Alva","Edda","Hiram","Lemuel","Della","Roseann","Sang", "Evelia","Claire","Marylou","Magda","Irvin","Reagan","Deb","Hillary", "Tuyetm","Cherilyn","Amina","Justin","Neville","Jessica","Demi", "Graham","Cinderella","Freddy","Vivan","Marjorie","Krystal","Liza", "Spencer","Jordon","Bernie","Geraldine","Kati","Jetta","Carmella", "Chery","Earlene","Gene","Lorri","Albertina","Ula","Karena","Johanna", "Alex","Tobias","Lashawna","Domitila","Chantel","Deneen","Nigel", "Lashanda","Donn","Theda","Many","Jeramy","Jodee","Tamra","Dessie", "Lawrence","Jaime","Basil","Roger","Cythia","Homer","Lilliam","Victoria", "Tod","Harley","Meghann","Jacquelyne","Arie","Rosemarie","Lyndon","Blanch", "Kenneth","Perkins","Kaleena"}; Record temp; int record = rand()%10000; int nameLen = sizeof(names)/sizeof(names[0]); // lock the current hash table cell if(pthread_mutex_lock(&hashTable[record%HASH_TABLE_SIZE].mutex) < 0) { perror("pthread_mutex_lock error"); exit(1); } // put the new data into the hash table temp.id = record; strncpy(temp.firstName, names[rand()%(nameLen-1)], sizeof(temp.firstName)); strncpy(temp.lastName, names[rand()%(nameLen-1)], sizeof(temp.lastName)); hashTable[record%HASH_TABLE_SIZE].cell.push_back(temp); pthread_mutex_init(&hashTable[temp.id%HASH_TABLE_SIZE].mutex, NULL); // unlock the current hash table cell if(pthread_mutex_unlock(&hashTable[record%HASH_TABLE_SIZE].mutex) < 0) { perror("pthread_mutex_lock error"); exit(1); } return 0; }// end of GenerateNewRecord void* SearchRecords(void* args) { int record = 0; bool found = false; // get the record from the client to search the hash table if(!recievedMsg.empty()) { record = recievedMsg.front(); recievedMsg.pop_front(); } //cout<<endl<<record<<endl; // if the current hash table cell isnt empty, search for the record id if(!hashTable[record%HASH_TABLE_SIZE].cell.empty()) { // lock the current hash table cell if(pthread_mutex_lock(&hashTable[record%HASH_TABLE_SIZE].mutex) < 0) { perror("pthread_mutex_lock error"); exit(1); } // search hash table cell to see if given record is there for(list<Record>::iterator iter = hashTable[record%HASH_TABLE_SIZE].cell.begin(); iter != hashTable[record%HASH_TABLE_SIZE].cell.end(); ++iter) { // we found a match in the hash table if(iter->id == record) { // set the data fields of the found record // to prepare it to be sent over the message queue SendRecord(iter->id, iter->firstName, iter->lastName); found = true; break; } } // unlock the current hash table cell if(pthread_mutex_unlock(&hashTable[record%HASH_TABLE_SIZE].mutex) < 0) { perror("pthread_mutex_lock error"); exit(1); } } // if we didnt find a record, tell that to the client if(!found) { SendRecord(-1, "Record Not Found!", ""); } return 0; }// end of SearchRecords void SendRecord(int id, const char* firstName, const char* lastName) { MsgQueue msg; // instantiate a message queue // set the message queue channel msg.messageType = SERVER_TO_CLIENT_CHANNEL; // set the record id msg.recordID = id; // set the record name strncpy(msg.recordName, firstName, sizeof(msg.recordName)); strncat(msg.recordName, " ", sizeof(msg.recordName)); strncat(msg.recordName, lastName, sizeof(msg.recordName)); // send record to client if(msgsnd(msqid, &msg, sizeof(msg) - sizeof(long), 0) < 0) { perror("msgsnd error"); exit(1); } }// end of SendRecord void InsertInitialDataIntoTable(ifstream& infile) { Record temp; while(infile>>temp.id >>temp.firstName >>temp.lastName) { hashTable[temp.id%HASH_TABLE_SIZE].cell.push_back(temp); pthread_mutex_init(&hashTable[temp.id%HASH_TABLE_SIZE].mutex, NULL); } }// end of InsertInitialDataIntoTable void SignalHandler(int arg) { cerr<<"nnCaught the CTRL-Cn" <<"Shutting down the server connection..n"; // deallocate the queue if(msgctl(msqid, IPC_RMID, NULL) < 0) { perror("msgctl error"); } exit(0); }// http://programmingnotes.org/ |
The client program has a much easier flow of control. It is implemented below.
==== 4. CLIENT ====
The client has the following structure and function:
Multi Threaded Process Client Flow Control | SelectShow> |
---|---|
1. The client program connects to the message queue previously established by the server. 2. If the connection is successful, then the client goes into an infinite loop where it:
• Sends the id to the server via the message queue. • Waits for the server to reply with the requested record. • Displays the record received. • Repeats the process.
|
The client program was designed to sleep for 1 second every time a new record is obtained from the server. This makes it so its easier for the user to see what is being displayed on the screen.
The client program is invoked with the following commandline structure:
./client
The client is implemented below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
// ============================================================================= // Author: K Perkins // Date: Oct 5, 2013 // Taken From: http://programmingnotes.org/ // File: Client.cpp // Description: Using a Hash Table, this program simulates a server and client // application. The server program maintains a hash table of records and // a pool of threads. The client requests records from the server by sending // record ID's over the message queue. The server retrieves a request from // the message queue and wakes up a thread in the thread pool. The awakened // thread then uses the ID to retrieve the corresponding record from the // hash table and sends the record to the client over the message queue. // // This is the client program which connects to the message queue that the // server previously establishes using the msgget() system call. It checks // the message queue for sucessfully found records using the msgrcv() system // call, and uses the msgsnd system call to send new search records back to // the server, where it searches for new records // // NOTE: This file is set to display records on a 1 second delay // ============================================================================= #include <iostream> #include <cstdlib> #include <ctime> #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> using namespace std; // Compile & Run // g++ Client.cpp -o Client // ./Client // message queue structure struct MsgQueue { // the channel the connection will be on long messageType; int recordID; char recordName[100]; }; // constant variables const int HOW_LONG_TO_SLEEP = 1; const int CLIENT_TO_SERVER_CHANNEL = 2; const int SERVER_TO_CLIENT_CHANNEL = 4; const int MSG_Q_ID = 0664; int main() { // declare variables MsgQueue msg; srand(time(NULL)); key_t key = 0; int msqid = 0; // message queue ID // get a key for the queue key = ftok("/bin/ls", 'O'); // check if we got a valid key if(key < 0) { cerr<<"ftok"; exit(1); } // get the ID of the queue msqid = msgget(key,MSG_Q_ID); // check if we got a valid ID if(msqid < 0) { cerr<<"n** ERROR - CONNECTION TO SERVER NOT FOUNDnn"; exit(1); } cerr<<"n** CONNECTION TO SERVER ID #"<<msqid<<" SUCCESSn"<<endl; // infinite loop to send/recieve messages from server do{ // connect to the server msg.messageType = CLIENT_TO_SERVER_CHANNEL; // generate a random search key msg.recordID = rand()%10000; // send record search to the server if(msgsnd(msqid, &msg, sizeof(msg) - sizeof(long), 0) < 0) { cerr<<"n** SERVER CONNECTION CLOSED..nn"; exit(1); } // recieve a record from the server if(msgrcv(msqid, &msg, sizeof(msg) - sizeof(long), SERVER_TO_CLIENT_CHANNEL, 0) < 0) { cerr<<"n** SERVER CONNECTION CLOSED..nn"; exit(1); } // if we found a valid record, display it to the screen if(msg.recordID > 0) { cerr<< "ID = "<<msg.recordID<<" tRecord = "<<msg.recordName<<endl; sleep(HOW_LONG_TO_SLEEP); } }while(true); return 0; }// http://programmingnotes.org/ |
QUICK NOTES:
The highlighted lines are sections of interest to look out for.
The code is heavily commented, so no further insight is necessary. If you have any questions, feel free to leave a comment below.
See sample files named server.cpp and client.cpp which illustrate interprocess communications using message queues. See file condvar.cpp which illustrates the use of condition variables. Finally, see file signal.cpp which illustrates the overriding of default signal handlers.
The following is sample output:
(Note: remember to include the initial records input file!)
SERVER OUTPUT:
./Server INPUT_Records_programmingnotes_freeweq_com.txt 26
** SERVER ID #1015808 SUCCESSFULLY ESTABLISHED
^C
Caught the CTRL-C
Shutting down the server connection..
CLIENT OUTPUT:
./Client
** CONNECTION TO SERVER ID #1015808 SUCCESS
ID = 243 Record = Graham Basil
ID = 7943 Record = Tobias Arie
ID = 3607 Record = Claire Amina
ID = 849 Record = Jetta Victoria
ID = 126 Record = Jeramy Tod
ID = 7483 Record = Vivan Krystal
ID = 8036 Record = Lilliam Harley
ID = 1901 Record = Kati Basil
ID = 3524 Record = Kenneth Perkins
ID = 5256 Record = Jodee Albertina
ID = 7065 Record = Marylou Donn
ID = 3951 Record = Ula Domitila
ID = 395 Record = Jaime Lilliam
ID = 9234 Record = Nigel Gene
ID = 4148 Record = Carmella Evelia
ID = 9340 Record = Sang Cherilyn
ID = 3834 Record = Jessica Freddy** SERVER CONNECTION CLOSED..
Leave a Reply