edit: This section has not thrown segmentation faults so lets focus on it please.
I am working on multithreading a data server application. I am new to multithreading in general. I wrote a version a few days ago where I tried to use the main to listen for sensor connection and client connections, a thread to handle sensor inputs into a database, and a thread to go through connected clients and send data that they have requested. I have run into segmentation faults which I can't seem to get rid of. It may run for several hours but will crash every time. Often it seems associated with a client disconnect but it also kicks sensors and clients off for no apparent reason.
I am trying to rework the concept from sharing a main database to having each client have a thread which copies the data from the main database then sends or processes on the copied buffer. I am hoping to see much less time holding the mutexes that way.
Here is a function start for the client thread. This will be called by __beginThread
with a pointer to a class being passed. I intend to create a "new" instance of the class after accepting the client login and just before calling the thread. The idea being the thread should run until the client is disconnected.
void serviceClient(void* ptrClient)
{
CLIENT* myClient = (CLIENT*)ptrClient;
int result;
DWORD flags;
timeval tv;
FD_SET localReader;
time_f lTime;
time_f lTimer10;
HANDLE Mutex[4];
INT32U epochGLO; //27 bits Tk
INT32U epochGPS; //30 bit TOW
INT32U epochBDU; //30 bit TOW
INT32U epochGAL;
while( myClient->connected == true ) //read write to socket
{
lTime = getTimePrecise();
//check for new data first
FD_ZERO(&localReader);
FD_SET( myClient->socket, &localReader );
result = select(0, &localReader, NULL, NULL, &tv);
if( result > 0)
{
result = WSARecv(myClient->socket, &myClient->WSABuffer, 1, &myClient->RecvBytes, &flags, NULL, NULL);
if(result != 0)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "socket error, closing client connection with error " << result << " userID " << myClient->user << endl;
myClient->connected = false;
}
}
else if(myClient->RecvBytes == 0)
{
cout << " client closed connection, removing " << myClient->user << endl;
myClient->connected = false;
}
else //data was received into myClient->buffer as char type, process data
{
if(ID_NMEA0183(myClient->buffer, 0, myClient->RecvBytes) == 1)
{
float lat, lng;
if( decodeGGA(myClient->buffer, 0, myClient->RecvBytes, lat, lng) == true)
{
myClient->lat = lat;
myClient->lng = lng;
}
//recalculate here
}
//process other input messages here
}//end else some data was read in from client
}//select greater than 0
else if(result < 0)
{
cout << "select() function returned error " << WSAGetLastError() << endl;
myClient->connected = false;
break;
}
//data send to client part
//fill array of active base mutexes
int mutexcount = 0;
if(myClient->masterBase_index > -1 && datastream[myClient->masterBase_index].active == true)
{
Mutex[mutexcount] = mutex_datastream[myClient->masterBase_index];
mutexcount++;
}
if(myClient->auxBase1_index > -1 && datastream[myClient->auxBase1_index].active == true)
{
Mutex[mutexcount] = mutex_datastream[myClient->auxBase1_index];
mutexcount++;
}
if(myClient->auxBase2_index > -1 && datastream[myClient->auxBase2_index].active == true)
{
Mutex[mutexcount] = mutex_datastream[myClient->auxBase2_index];
mutexcount++;
}
if(myClient->auxBase3_index > -1 && datastream[myClient->auxBase3_index].active == true)
{
Mutex[mutexcount] = mutex_datastream[myClient->auxBase3_index];
mutexcount++;
}
if(mutexcount < 4)
{
for (int n=mutexcount; n<4; n++)
{
Mutex[mutexcount] = NULL;
}
}
//check for new database info to send
if( datastream[myClient->masterBase_index].epochGPS != epochGPS)
{
epochGPS = datastream[myClient->masterBase_index].epochGPS;
if(myClient->requestedStream == 0 || myClient->requestedStream == 10 || myClient->requestedStream == StreamNumber_rtcm31MAXULTRA)
{
WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
memcpy(myClient->buffer, datastream[myClient->masterBase_index].rtcm31_1004, datastream[myClient->masterBase_index].len1004);
myClient->SendBytes = datastream[myClient->masterBase_index].len1004;
ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
if(myClient->requestedStream == StreamNumber_rtcm31MAXULTRA)
{
INT8U outbuffer[BUFFERSIZE];
bool aux1, aux2, aux3, MMI;
if(myClient->auxBase1_index >= 0){aux1 = true;} else{aux1=false;}
if(myClient->auxBase2_index >= 0){aux2 = true;} else{aux2=false;}
if(myClient->auxBase3_index >= 0){aux3 = true;} else{aux3=false;}
//second digit is aux station
//first digit system number 1=gps, 2=glo, 3=bdu
//number of master aux matches in each set match11 = master/aux1 gps , match12 = master/aux2 gps ect
INT8U match11, match12, match13;
fillNetDataTECbasedGPS(myClient, match11, match12, match13);
//gps messages first
if(aux1 == true)
{
if(aux2 == true || aux3 == true){MMI = true;}else{MMI = false;}
//mutex 2 datastreams
WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
WaitForSingleObject(mutex_datastream[myClient->auxBase1_index], INFINITE);
myClient->SendBytes = build1017_MAX(myClient, outbuffer, 1, match11, MMI);
ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
ReleaseMutex(mutex_datastream[myClient->auxBase1_index]);
memcpy(myClient->buffer, outbuffer, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
}
if(aux2 == true)
{
if(aux3 == true){MMI = true;}else{MMI = false;}
//mutex 2 datastreams
WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
WaitForSingleObject(mutex_datastream[myClient->auxBase2_index], INFINITE);
myClient->SendBytes = build1017_MAX(myClient, outbuffer, 2, match12, MMI);
ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
ReleaseMutex(mutex_datastream[myClient->auxBase2_index]);
memcpy(myClient->buffer, outbuffer, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
}
if(aux3 == true)
{
MMI = false;
WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
WaitForSingleObject(mutex_datastream[myClient->auxBase3_index], INFINITE);
myClient->SendBytes = build1017_MAX(myClient, outbuffer, 3, match13, MMI);
ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
ReleaseMutex(mutex_datastream[myClient->auxBase3_index]);
memcpy(myClient->buffer, outbuffer, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
}//end network gps messages
}
}//end RTCMv3.1 gps
if(myClient->requestedStream == 1 || myClient->requestedStream == StreamNumber_binrNEAR)
{
INT8U message69h[215]; //215 max size with 16 sat in a system
INT8U message6Bh[11];
myClient->SendBytes = 11;
WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
build6Bh(message6Bh, myClient->masterBase_index);
ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
memcpy(myClient->buffer, message6Bh, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
myClient->SendBytes = build69h(message69h, myClient->masterBase_index, 1); //gps
ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
memcpy(myClient->buffer, message69h, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
}//end BINR single
//GPS BINR max
if(myClient->requestedStream == StreamNumber_binriMAX)
{
INT8U message69h[215]; //215 max size with 16 sat in a system
INT8U message6Bh[11];
myClient->SendBytes = 11;
WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
build6Bh(message6Bh, myClient->masterBase_index);
ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
memcpy(myClient->buffer, message6Bh, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
WaitForMultipleObjects(mutexcount, Mutex, true, INFINITE);
myClient->SendBytes = calcNetGPS69h(myClient, message69h);
for(int n=0; n<mutexcount; n++)
{
ReleaseMutex(mutex_datastream[n]);
}
memcpy(myClient->buffer, message69h, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
}//end BINR networked gps
}//end new GPS
//**********************************************************************
//10 second data update to client
if(lTime - lTimer10 >= 10) //check if 10 seconds have passed
{
INT8U buffer[BUFFERSIZE];
//RTCM 3.1-3.2
if(myClient->requestedStream == 0 || myClient->requestedStream == 10 || myClient->requestedStream == 12 ||
myClient->requestedStream == 20 || myClient->requestedStream == 30 || myClient->requestedStream == 32 )
{
WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
myClient->SendBytes = build1013(myClient, buffer);
ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
if(myClient->SendBytes > 0)
{
memcpy(myClient->buffer, buffer, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send 1013 failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
}//end message 1013
WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
myClient->SendBytes = build1008(myClient, buffer);
ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
if(myClient->SendBytes > 0)
{
memcpy(myClient->buffer, buffer, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send 1008 failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
}//end message 1008
WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
myClient->SendBytes = datastream[myClient->masterBase_index].len1005;
memcpy(myClient->buffer, datastream[myClient->masterBase_index].rtcm31_1005, myClient->SendBytes);
ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
if(myClient->SendBytes > 0)
{
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send 1005 failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
}//end message 1005
//RTCM networked base location data
if(myClient->requestedStream == 20) //3.1 MAC
{
if(myClient->auxBase1_index >= 0)
{
WaitForSingleObject(mutex_datastream[myClient->auxBase1_index], INFINITE);
myClient->SendBytes = build1014(clientIndex, buffer, 1);
ReleaseMutex(mutex_datastream[myClient->auxBase1_index]);
if(myClient->SendBytes > 0)
{
memcpy(myClient->buffer, buffer, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send aux1 1014 failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
}
}//aux1 base
if(myClient->auxBase2_index >= 0)
{
WaitForSingleObject(mutex_datastream[myClient->auxBase2_index], INFINITE);
myClient->SendBytes = build1014(clientIndex, buffer, 2);
ReleaseMutex(mutex_datastream[myClient->auxBase2_index]);
if(myClient->SendBytes > 0)
{
memcpy(myClient->buffer, buffer, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send aux2 1014 failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
}
}//aux2 base
if(myClient->auxBase3_index >= 0)
{
WaitForSingleObject(mutex_datastream[myClient->auxBase3_index], INFINITE);
myClient->SendBytes = build1014(clientIndex, buffer, 3);
ReleaseMutex(mutex_datastream[myClient->auxBase3_index]);
if(myClient->SendBytes > 0)
{
memcpy(myClient->buffer, buffer, myClient->SendBytes);
if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
{
result = WSAGetLastError();
if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
{
cout << "send aux3 1014 failed with error " << result << " for userID " << myClient->user << endl;
myClient->connected = false;
break;
}
}
}
}//aux3 base
}//end RTCM MAC
}//end RTCM v3
lTime10 = lTime; //reset 10 second timer
}//end 10 second update function
}//end while loop segment
//do not need to clear if delete the class structure from memory
closesocket(myClient->socket);
delete myClient; //free memory for class object pointed to
_endthread();
}
//end serviceClient function