Take the 2-minute tour ×
Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

edit: This section has not thrown segmentation faults so lets focus on it please.

I am working on multithreading a data server application. I am new to multithreading in general. I wrote a version a few days ago where I tried to use the main to listen for sensor connection and client connections, a thread to handle sensor inputs into a database, and a thread to go through connected clients and send data that they have requested. I have run into segmentation faults which I can't seem to get rid of. It may run for several hours but will crash every time. Often it seems associated with a client disconnect but it also kicks sensors and clients off for no apparent reason.

I am trying to rework the concept from sharing a main database to having each client have a thread which copies the data from the main database then sends or processes on the copied buffer. I am hoping to see much less time holding the mutexes that way. Here is a function start for the client thread. This will be called by __beginThread with a pointer to a class being passed. I intend to create a "new" instance of the class after accepting the client login and just before calling the thread. The idea being the thread should run until the client is disconnected.

void serviceClient(void* ptrClient)
   {
CLIENT* myClient = (CLIENT*)ptrClient;
int       result;
DWORD     flags;
timeval   tv;
FD_SET    localReader;
time_f    lTime;
time_f    lTimer10;
HANDLE  Mutex[4];

INT32U  epochGLO;   //27 bits Tk
INT32U  epochGPS;   //30 bit TOW
INT32U  epochBDU;   //30 bit TOW
INT32U  epochGAL;

while( myClient->connected == true )  //read write to socket
{
    lTime = getTimePrecise();
    //check for new data first
    FD_ZERO(&localReader);
    FD_SET( myClient->socket, &localReader );


    result = select(0, &localReader, NULL, NULL, &tv);
    if( result > 0)
    {
        result = WSARecv(myClient->socket, &myClient->WSABuffer, 1, &myClient->RecvBytes, &flags, NULL, NULL);
        if(result != 0)
        {
            result = WSAGetLastError();
            if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
            {
                cout << "socket error, closing client connection with error  " << result << " userID " << myClient->user << endl;
                myClient->connected = false;
            }
        }
        else if(myClient->RecvBytes == 0)
        {
            cout << "   client closed connection, removing   " << myClient->user << endl;
            myClient->connected = false;
        }
        else  //data was received into myClient->buffer as char type, process data
        {
            if(ID_NMEA0183(myClient->buffer, 0, myClient->RecvBytes) == 1)
            {
                float lat, lng;
                if( decodeGGA(myClient->buffer, 0, myClient->RecvBytes, lat, lng) == true)
                {
                    myClient->lat = lat;
                    myClient->lng = lng;
                }
                //recalculate here
            }
            //process other input messages here
        }//end else some data was read in from client
    }//select greater than 0
    else if(result < 0)
    {
        cout << "select() function returned error  " << WSAGetLastError() << endl;
        myClient->connected = false;
        break;
    }
  //data send to client part

    //fill array of active base mutexes
    int mutexcount = 0;
    if(myClient->masterBase_index > -1 && datastream[myClient->masterBase_index].active == true)
    {
        Mutex[mutexcount] = mutex_datastream[myClient->masterBase_index];
        mutexcount++;
    }
    if(myClient->auxBase1_index > -1 && datastream[myClient->auxBase1_index].active == true)
    {
        Mutex[mutexcount] = mutex_datastream[myClient->auxBase1_index];
        mutexcount++;
    }
    if(myClient->auxBase2_index > -1 && datastream[myClient->auxBase2_index].active == true)
    {
        Mutex[mutexcount] = mutex_datastream[myClient->auxBase2_index];
        mutexcount++;
    }

    if(myClient->auxBase3_index > -1 && datastream[myClient->auxBase3_index].active == true)
    {
        Mutex[mutexcount] = mutex_datastream[myClient->auxBase3_index];
        mutexcount++;
    }
    if(mutexcount < 4)
    {
        for (int n=mutexcount; n<4; n++)
        {
            Mutex[mutexcount] = NULL;
        }
    }
    //check for new database info to send
    if( datastream[myClient->masterBase_index].epochGPS != epochGPS)
    {
        epochGPS = datastream[myClient->masterBase_index].epochGPS;

        if(myClient->requestedStream == 0 || myClient->requestedStream == 10 || myClient->requestedStream == StreamNumber_rtcm31MAXULTRA)
        {

          WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
            memcpy(myClient->buffer, datastream[myClient->masterBase_index].rtcm31_1004, datastream[myClient->masterBase_index].len1004);
            myClient->SendBytes = datastream[myClient->masterBase_index].len1004;
          ReleaseMutex(mutex_datastream[myClient->masterBase_index]);

            if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
            {
                result = WSAGetLastError();
                if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                {
                    cout << "send failed with error " << result << " for userID " << myClient->user << endl;
                    myClient->connected = false;
                    break;
                }
            }

            if(myClient->requestedStream == StreamNumber_rtcm31MAXULTRA)
            {
                INT8U outbuffer[BUFFERSIZE];
                bool aux1, aux2, aux3, MMI;
                if(myClient->auxBase1_index >= 0){aux1 = true;} else{aux1=false;}
                if(myClient->auxBase2_index >= 0){aux2 = true;} else{aux2=false;}
                if(myClient->auxBase3_index >= 0){aux3 = true;} else{aux3=false;}
                //second digit is aux station
                //first digit system number 1=gps, 2=glo, 3=bdu
                //number of master aux matches in each set match11 = master/aux1 gps , match12 = master/aux2 gps ect
                INT8U match11, match12, match13;
                fillNetDataTECbasedGPS(myClient, match11, match12, match13);
                //gps messages first
                if(aux1 == true)
                {
                    if(aux2 == true || aux3 == true){MMI = true;}else{MMI = false;}
                    //mutex 2 datastreams

                  WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
                  WaitForSingleObject(mutex_datastream[myClient->auxBase1_index], INFINITE);
                    myClient->SendBytes = build1017_MAX(myClient, outbuffer, 1, match11, MMI);
                  ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
                  ReleaseMutex(mutex_datastream[myClient->auxBase1_index]);

                    memcpy(myClient->buffer, outbuffer, myClient->SendBytes);
                    if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
                    {
                        result = WSAGetLastError();
                        if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                        {
                            cout << "send failed with error " << result << " for userID " << myClient->user << endl;
                            myClient->connected = false;
                            break;
                        }
                    }
                }
                if(aux2 == true)
                {
                    if(aux3 == true){MMI = true;}else{MMI = false;}
                //mutex 2 datastreams

                  WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
                  WaitForSingleObject(mutex_datastream[myClient->auxBase2_index], INFINITE);
                    myClient->SendBytes = build1017_MAX(myClient, outbuffer, 2, match12, MMI);
                  ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
                  ReleaseMutex(mutex_datastream[myClient->auxBase2_index]);

                    memcpy(myClient->buffer, outbuffer, myClient->SendBytes);
                    if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
                    {
                        result = WSAGetLastError();
                        if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                        {
                            cout << "send failed with error " << result << " for userID " << myClient->user << endl;
                            myClient->connected = false;
                            break;
                        }
                    }
                }
                if(aux3 == true)
                {
                    MMI = false;

                  WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
                  WaitForSingleObject(mutex_datastream[myClient->auxBase3_index], INFINITE);
                    myClient->SendBytes = build1017_MAX(myClient, outbuffer, 3, match13, MMI);
                  ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
                  ReleaseMutex(mutex_datastream[myClient->auxBase3_index]);

                    memcpy(myClient->buffer, outbuffer, myClient->SendBytes);
                    if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
                    {
                        result = WSAGetLastError();
                        if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                        {
                            cout << "send failed with error " << result << " for userID " << myClient->user << endl;
                            myClient->connected = false;
                            break;
                        }
                    }
                }//end network gps messages
            }
        }//end RTCMv3.1 gps
        if(myClient->requestedStream == 1 || myClient->requestedStream == StreamNumber_binrNEAR)
        {
            INT8U message69h[215];  //215 max size with 16 sat in a system
            INT8U message6Bh[11];
            myClient->SendBytes = 11;
          WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
            build6Bh(message6Bh, myClient->masterBase_index);
          ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
            memcpy(myClient->buffer, message6Bh, myClient->SendBytes);
            if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
            {
                result = WSAGetLastError();
                if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                {
                    cout << "send failed with error " << result << " for userID " << myClient->user << endl;
                    myClient->connected = false;
                    break;
                }
            }
          WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
            myClient->SendBytes = build69h(message69h, myClient->masterBase_index, 1); //gps
          ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
            memcpy(myClient->buffer, message69h, myClient->SendBytes);
            if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
            {
                result = WSAGetLastError();
                if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                {
                    cout << "send failed with error " << result << " for userID " << myClient->user << endl;
                    myClient->connected = false;
                    break;
                }
            }
        }//end BINR single
//GPS BINR max
        if(myClient->requestedStream == StreamNumber_binriMAX)
        {
            INT8U message69h[215];  //215 max size with 16 sat in a system
            INT8U message6Bh[11];
            myClient->SendBytes = 11;
          WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
            build6Bh(message6Bh, myClient->masterBase_index);
          ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
            memcpy(myClient->buffer, message6Bh, myClient->SendBytes);
            if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
            {
                result = WSAGetLastError();
                if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                {
                    cout << "send failed with error " << result << " for userID " << myClient->user << endl;
                    myClient->connected = false;
                    break;
                }
            }
          WaitForMultipleObjects(mutexcount, Mutex, true, INFINITE);
            myClient->SendBytes = calcNetGPS69h(myClient, message69h);
          for(int n=0; n<mutexcount; n++)
          {
              ReleaseMutex(mutex_datastream[n]);
          }
            memcpy(myClient->buffer, message69h, myClient->SendBytes);
            if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
            {
                result = WSAGetLastError();
                if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                {
                    cout << "send failed with error " << result << " for userID " << myClient->user << endl;
                    myClient->connected = false;
                    break;
                }
            }
        }//end BINR networked gps


    }//end new GPS
//**********************************************************************
   //10 second data update to client

        if(lTime - lTimer10 >= 10)       //check if 10 seconds have passed
    {
        INT8U buffer[BUFFERSIZE];
        //RTCM 3.1-3.2
        if(myClient->requestedStream == 0 || myClient->requestedStream == 10 || myClient->requestedStream == 12 ||
           myClient->requestedStream == 20 || myClient->requestedStream == 30 || myClient->requestedStream == 32 )
        {
            WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
            myClient->SendBytes = build1013(myClient, buffer);
            ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
            if(myClient->SendBytes > 0)
            {
                memcpy(myClient->buffer, buffer, myClient->SendBytes);
                if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
                {
                    result = WSAGetLastError();
                    if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                    {
                        cout << "send 1013 failed with error " << result << " for userID " << myClient->user << endl;
                        myClient->connected = false;
                        break;
                    }
                }
            }//end message 1013
            WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
            myClient->SendBytes = build1008(myClient, buffer);
            ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
            if(myClient->SendBytes > 0)
            {
                memcpy(myClient->buffer, buffer, myClient->SendBytes);
                if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
                {
                    result = WSAGetLastError();
                    if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                    {
                        cout << "send 1008 failed with error " << result << " for userID " << myClient->user << endl;
                        myClient->connected = false;
                        break;
                    }
                }
            }//end message 1008
            WaitForSingleObject(mutex_datastream[myClient->masterBase_index], INFINITE);
            myClient->SendBytes = datastream[myClient->masterBase_index].len1005;
            memcpy(myClient->buffer, datastream[myClient->masterBase_index].rtcm31_1005, myClient->SendBytes);
            ReleaseMutex(mutex_datastream[myClient->masterBase_index]);
            if(myClient->SendBytes > 0)
            {

                if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
                {
                    result = WSAGetLastError();
                    if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                    {
                        cout << "send 1005 failed with error " << result << " for userID " << myClient->user << endl;
                        myClient->connected = false;
                        break;
                    }
                }
            }//end message 1005
            //RTCM networked base location data
            if(myClient->requestedStream == 20) //3.1 MAC
            {
                if(myClient->auxBase1_index >= 0)
                {
                    WaitForSingleObject(mutex_datastream[myClient->auxBase1_index], INFINITE);
                    myClient->SendBytes = build1014(clientIndex, buffer, 1);
                    ReleaseMutex(mutex_datastream[myClient->auxBase1_index]);
                    if(myClient->SendBytes > 0)
                    {
                        memcpy(myClient->buffer, buffer, myClient->SendBytes);
                        if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
                        {
                            result = WSAGetLastError();
                            if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                            {
                                cout << "send aux1 1014 failed with error " << result << " for userID " << myClient->user << endl;
                                myClient->connected = false;
                                break;
                            }
                        }
                    }
                }//aux1 base
                if(myClient->auxBase2_index >= 0)
                {
                    WaitForSingleObject(mutex_datastream[myClient->auxBase2_index], INFINITE);
                    myClient->SendBytes = build1014(clientIndex, buffer, 2);
                    ReleaseMutex(mutex_datastream[myClient->auxBase2_index]);
                    if(myClient->SendBytes > 0)
                    {
                        memcpy(myClient->buffer, buffer, myClient->SendBytes);
                        if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
                        {
                            result = WSAGetLastError();
                            if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                            {
                                cout << "send aux2 1014 failed with error " << result << " for userID " << myClient->user << endl;
                                myClient->connected = false;
                                break;
                            }
                        }
                    }
                }//aux2 base
                if(myClient->auxBase3_index >= 0)
                {
                    WaitForSingleObject(mutex_datastream[myClient->auxBase3_index], INFINITE);
                    myClient->SendBytes = build1014(clientIndex, buffer, 3);
                    ReleaseMutex(mutex_datastream[myClient->auxBase3_index]);
                    if(myClient->SendBytes > 0)
                    {
                        memcpy(myClient->buffer, buffer, myClient->SendBytes);
                        if( WSASend(myClient->socket, &(myClient->WSABuffer), 1, &(myClient->SendBytes), 0, NULL, NULL) == SOCKET_ERROR)
                        {
                            result = WSAGetLastError();
                            if(result != WSA_IO_PENDING && result != WSAEWOULDBLOCK)
                            {
                                cout << "send aux3 1014 failed with error " << result << " for userID " << myClient->user << endl;
                                myClient->connected = false;
                                break;
                            }
                        }
                    }
                }//aux3 base
            }//end RTCM MAC
        }//end RTCM v3
        lTime10 = lTime;              //reset 10 second timer
    }//end 10 second update function

}//end while loop segment
      //do not need to clear if delete the class structure from memory
closesocket(myClient->socket);
delete myClient;                //free memory for class object pointed to
_endthread();
 }
//end serviceClient function
share|improve this question

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Browse other questions tagged or ask your own question.