Main Page   Alphabetical List   Compound List   File List   Compound Members   File Members  

total_order_channel.cpp

Go to the documentation of this file.
00001 /**********************************************************************************************/
00002 /*************************** TOTAL ORDER CHANNEL CLASS  *************************/
00003 /**************************** Jean FANCHON. LAAS-CNRS *************************/
00004 /**********************************************************************************************/
00005 
00006 #include "total_order_channel.h"
00007 
00008 extern void receive(char *, int);
00009 
00010 /********************************************************************/
00011 /********************************************************************/  
00012 
00013 void * start_phase( void * channel) {
00014         //to start phase code
00015         
00016                 void * t;
00017                 
00018                 ((total_order_channel *) channel)->phase_code(t);
00019 }
00020 
00021 
00022 /********************************************************************/
00023 /********************************************************************/  
00024 /* total_order_channel   definitions */
00025 
00026 
00027 total_order_channel::total_order_channel( int flag, char *file, char *ipnum) {
00028 
00029                 cout << "start total order channel" << endl;
00030 
00031 /********************************************************************/  
00032 /* initialise local data */
00033 
00034                 char ipadd[20];
00035                 
00036                 init_data();
00037                 
00038                 if (flag == 1)  strcpy(ipadd,ipnum);
00039                      
00040                 else my_ident = atoi(ipnum);
00041 
00042                 init_data();
00043                 
00044                 status = START;
00045                 
00046 /********************************************************************/  
00047 /* Case of a group creator: initialise group and  membership */  
00048                 
00049                 if (flag == 0 ) {
00050                         if (init_from_file(file) ) {
00051                 
00052                                 cout << " erreur opening " << file << endl;
00053                         
00054                                 exit(0);
00055                         }
00056                         
00057                         status = RUN;
00058                 }  
00059  
00060                 
00061 /********************************************************************/  
00062 /* initialise synchro objects */                
00063                 
00064                 if( int ret = pthread_cond_init(&all_received, NULL) ) {
00065                 
00066                         cout << " pthread_cond_init error " << "ret = " << ret << endl; 
00067                 }
00068                 
00069                 pthread_mutex_init(&mp,NULL);
00070                 
00071                 /*pthread_mutex_init(&net_mutex,NULL);*/
00072                                          
00073                 pthread_mutex_init(&phase_mutex, NULL);
00074                 
00075                 pthread_mutex_init(&send_mutex,NULL);
00076                 
00077                 pthread_mutex_init(&recv_mutex, NULL);
00078                 
00079                 pthread_mutex_unlock(&mp);
00080                 
00081                 /*pthread_mutex_unlock(&net_mutex);*/
00082                 
00083                 pthread_mutex_lock(&phase_mutex);
00084                 
00085                 pthread_mutex_unlock(&send_mutex);
00086                 
00087                 pthread_mutex_unlock(&recv_mutex);
00088  
00089 /********************************************************************/  
00090 /* initialise underlying layer */       
00091 
00092                 if (flag == 1)  /* following members */
00093                         net = new no_prop_channel(this, ipadd, my_group); 
00094 
00095                 else if (flag == 0)  /* first members  */
00096                         net = new no_prop_channel(this, my_ident , file, my_group);     
00097                         
00098 /********************************************************************/  
00099 /* for a newcomer, join group and initialise membership */                      
00100                 
00101                 if (flag == 1) {
00102                 
00103                         start_net_init(file); 
00104  
00105                         status = PRERUN ;
00106                 }
00107                 
00108 /********************************************************************/  
00109 /* start phase thread */  
00110 
00111                 max_iter = MAX_ITER;
00112                 
00113                 timeout = TIMEOUT2;
00114                 
00115                 int ret = pthread_create(&phase_thread, 0,start_phase, (void *)this);
00116 
00117                 if (ret) {
00118                         perror(" total_order()");       printf(" erreur th create  %d\n",ret);  
00119                         if (ret == ENOMEM) printf(" ENOMEM \n" );
00120                         if (ret== EINVAL) printf(" EINVAL \n" );
00121                         if (ret == EPERM) printf(" EPERM \n" );
00122                         exit(0); 
00123                 }
00124                 
00125                 
00126 }
00127 
00128 /********************************************************************/  
00129 /* new comer, join group by asking any member */
00130 /* for the moment find member adress in file */
00131 /*   receive data and initialise membership */                  
00132                 
00133 int total_order_channel::start_net_init( char * file) {
00134 
00135 
00136         header *head= &cmd_buf.head.head;
00137         int * pt;
00138                         
00139                 head->sender = -1;
00140 
00141                 head->num_phase = -1;
00142 
00143                 head->type = NEW;
00144 
00145                 head->size = 2 * sizeof(int) ;
00146         
00147                 pt =  (int *) &cmd_buf.buf[HEADSZ];
00148 
00149                 *pt = net->get_mynode();
00150         
00151                 *(pt + 1) = net->get_myport();
00152                 
00153                 cout << "***** start_net_init, my_port = " << *(pt + 1) <<"****" << endl;
00154                 
00155                 net->init_membership(file, cmd_message.data, head->size + HEADSZ ); 
00156         
00157 }
00158                 
00159 /********************************************************************/  
00160 /* Case of a group creator: initialise group from file  */  
00161 
00162 int total_order_channel::init_from_file( char * file) {
00163                         
00164                 int ident, nb = 0;              
00165                 
00166                 if (my_group->init_members_list(file) ) { 
00167                 
00168                         cout << "pb init_members_list " << endl;
00169                         exit(0);
00170                 }
00171                                 
00172                 for(int i=0; i< MAX_MEMBER; i++) {              
00173                                 
00174                         if (my_group->members_list[i].ident != UNDEF) {  
00175                         
00176                                 ident = my_group->members_list[i].ident ;
00177                 
00178                                 cur_get->status[ident]= WAITING;
00179                         
00180                                 next_get->status[ident]= WAITING;
00181 
00182                                 nb ++;
00183                         }
00184                 }
00185                 //DEBUGcout << " total grsiz = " << nb << endl;
00186                 
00187                 cur_get->grsize = nb;
00188                 
00189                 next_get->grsize = nb;
00190                                 
00191                 cur_get->phase = 1 ;
00192                 
00193                 next_get->phase = 2 ;
00194                 
00195                 curphase = 1;
00196                 
00197                 return(0) ;
00198 
00199     }   
00200 
00201 /********************************************************************/  
00202 /*takes messages from network, unfolds header,  act depending on type*/
00203 /* if all awaited messages received, signals to start phase_code */     
00204 
00205 int total_order_channel::deliver(char *data, int size) {
00206                 
00207                 int ret;
00208                 
00209                 int sender, mess_phase, type, mess_size ;
00210                 
00211                 int ident, faulty;      
00212                  
00213                 header *head = (header *) data;
00214                                 
00215                 sender = head->sender;
00216                 
00217                 type = head->type;
00218                 
00219                 mess_phase = head->num_phase;
00220                 
00221                 mess_size = head->size;
00222                 
00223                 if ( mess_size != size - HEADSZ) 
00224                 
00225                         cout << " ** Problem of message size , got :"<< size - HEADSZ << " expected : " << mess_size<< endl; 
00226                 
00227                 char * datapt = data + HEADSZ;
00228                 
00229                 //DEBUGcout <<"from " << sender <<" type " << type << " phase " << curphase <<" mess_phase " << mess_phase <<endl;
00230                                 
00231                 if ( status == RUN) {
00232                 
00233                 if(type == MESS || type == CMD || type == ACK || type == FAULT ) {
00234                 
00235                         /* cout<< "          lock recv " << endl; */
00236                 
00237                         pthread_mutex_lock(&recv_mutex);
00238                         
00239                         /* cout<< "          got recv " << endl; */
00240                         
00241                         if(mess_phase == curphase) {   
00242                         
00243                                 if (cur_get->sendflag == 0) { 
00244                                 
00245                                         pthread_mutex_unlock(&phase_mutex);  
00246                                 } 
00247                                 
00248                                 if(cur_get->status[sender]==WAITING) {
00249                                 
00250                                         //DEBUG cout << "  first message: from  " << sender << endl; 
00251                                         
00252                                         cur_get->mess_numb++;
00253                                         
00254                                         cur_get->status[sender]= GOT + type; 
00255                                         
00256                                         if ( type == MESS || type == CMD ) {
00257                                         
00258                                                 cur_get->mess_flag = 1;
00259                                 
00260                                                 cur_get->message[sender].data= new char[mess_size];
00261                                         
00262                                                 memcpy(cur_get->message[sender].data, datapt, mess_size);
00263                                         
00264                                                 cur_get->message[sender].size = mess_size;
00265                                         }
00266                                         else if (type == FAULT ) {
00267                                         
00268                                                 cur_get->fault_flag = 1;
00269                                                 
00270                                                 faulty = mess_size/ sizeof(int);
00271       
00272                                                 //DEBUGcout << "          got FAULT for phase " << mess_phase 
00273                                                                 //DEBUG<< " for " << faulty << " faulty" << endl;
00274                         
00275                                                 for(int i=1 ; i<= faulty ; i++, datapt += 4) {
00276                                 
00277                                                         ident = *(int *)datapt;
00278                                 
00279                                                         prev_get->fault_cnt[ident]++;
00280 
00281                                                 }                                               
00282                                         }
00283                                 }
00284                                 pthread_mutex_unlock(&recv_mutex);      
00285                                 
00286                 /********************************************************************/  
00287                 /* all messages received, signal end of phase */     
00288                                 
00289                                 if(cur_get->mess_numb == cur_get->grsize - 1 )  { 
00290                                 
00291                                         ret = pthread_cond_signal(&all_received);
00292                                         
00293                                         if (ret) {
00294                                         
00295                                                 cout << "          error signal all_received : " << ret 
00296                                                         << " at phase " << curphase << endl; 
00297                                         
00298                                                 if (ret== EINVAL) cout << "          EINVAL " << endl;
00299                                         }       
00300                                         
00301                                 }                                                               
00302 
00303                         } else if (mess_phase == curphase+1) {
00304                                                                                                 
00305                                 if(next_get->status[sender]== WAITING) {   
00306                                         
00307                                         next_get->mess_numb++;
00308                                         
00309                                         next_get->status[sender]= GOT + type;
00310                                         
00311                                         next_get->message[sender].data= new char[mess_size];
00312                                         
00313                                         memcpy(next_get->message[sender].data, datapt, mess_size);
00314                                         
00315                                         next_get->message[sender].size = mess_size;
00316                                         
00317                                         if ( type == MESS || type == CMD ) {
00318                                         
00319                                                 next_get->mess_flag = 1;
00320                                         }
00321                                         else if (type == FAULT ) {   
00322                                         
00323                                                 next_get->fault_flag = 1;
00324                                                 
00325                                                 faulty = mess_size/sizeof(int);
00326       
00327                                                 cout << "FAULT in advance for phase " << mess_phase 
00328                                                                 << " for " << faulty << " faulty" << endl;
00329                         
00330                                                 for(int i=1 ; i<= faulty ; i++, datapt += 4) {
00331                                 
00332                                                         ident = *(int *)datapt;
00333                                                         
00334                                                         if (ident == my_ident) {
00335                                                                 
00336                                                                 cout << " SABORDAGE " << endl;
00337                                                                 
00338                                                                 exit(1);
00339                                                         }
00340                                 
00341                                                         // cur_get->fault_cnt[ident]++;
00342                                                 }
00343                                         }
00344                                 }
00345                                 
00346                                 pthread_mutex_unlock(&recv_mutex);
00347                                 
00348                                 //DEBUGcout << "          message in advance " << endl;
00349                         } else {
00350                                 
00351                                 pthread_mutex_unlock(&recv_mutex);
00352                                 
00353                                 //DEBUGcout << "          message hors phase, phase " << mess_phase << endl;
00354                         }                       
00355  
00356                 } else if( type == NACK ) {
00357       
00358                         cout << "          got NACK for phase " << mess_phase << endl;
00359        
00360                                 /* cout<< "          lock recv " << endl; */
00361                 
00362                                 pthread_mutex_lock(&recv_mutex);
00363                         
00364                                 /* cout<< "          got recv " << endl; */
00365                                         
00366                                 if (mess_phase == curphase) {
00367                                         
00368                                         if ( cur_get->sendflag == 0 )  { 
00369                                                 
00370                                                 pthread_mutex_unlock(&recv_mutex);
00371                                                         
00372                                                 pthread_mutex_unlock(&phase_mutex);
00373                                         }
00374                                                 
00375                                         else {
00376 
00377                                                 if( cur_get->sendflag == MESS || cur_get->sendflag == FAULT || cur_get->sendflag == CMD) {
00378                                                 
00379                                                         //DEBUGcout << "          current re-unicast , size = " << current->size << endl;
00380                                                 
00381                                                         /*pthread_mutex_lock(&net_mutex);*/
00382                                                                 
00383                                                         ret = net->unicast(sender, current->data, current->size + HEADSZ );
00384                                                         
00385                                                         /*pthread_mutex_unlock(&net_mutex);*/
00386                                                         
00387                                                         pthread_mutex_unlock(&recv_mutex);
00388                                                         
00389                                                 } else {  // it was an empty ACK message 
00390                                                 
00391                                                         pthread_mutex_unlock(&recv_mutex);
00392                                                                 
00393                                                         head = &empty_message_header;
00394                                                         
00395                                                         head->sender = my_ident;
00396                                 
00397                                                         head->num_phase = curphase;
00398                                         
00399                                                         head->type = ACK;
00400                                                                 
00401                                                         head->size = 0; 
00402                                                         
00403                                                         //DEBUGcout << "          curphase empty re-unicast " << endl;
00404                                                                 
00405                                                         /*pthread_mutex_lock(&net_mutex);*/
00406                                                         
00407                                                         ret= net->unicast( sender, (char *) head, HEADSZ );
00408                                                                 
00409                                                         /*pthread_mutex_unlock(&net_mutex);*/
00410                                                 }       
00411                                                 
00412                                                 if(ret<0) {
00413                                                         cout << "       Resend unicast error, val =" << ret << endl;
00414                                                         }
00415                                         }
00416 
00417                                 } else if (mess_phase == curphase - 1) {
00418                                         
00419                                         
00420                                         // Sends to sender the complete information 
00421                                                 
00422                                         // should increase the timeout for the sender 
00423                                                                                         
00424                                         if ( prev_get->sendflag == MESS || prev_get->sendflag == FAULT || prev_get->sendflag == CMD) {
00425                                                 
00426                                                 //DEBUGcout << "          previous re-unicast , size = " << previous->size << endl;
00427                                                 
00428                                                 /*pthread_mutex_lock(&net_mutex);*/
00429                                                         
00430                                                 ret = net->unicast(sender, previous->data, previous->size + HEADSZ);
00431                                                         
00432                                                 /*pthread_mutex_unlock(&net_mutex);*/
00433                                                 
00434                                                 pthread_mutex_unlock(&recv_mutex);
00435                                                         
00436                                         } else {
00437                                                 
00438                                                 pthread_mutex_unlock(&recv_mutex);      
00439                                                         
00440                                                 head = &empty_message_header;
00441                                                         
00442                                                 head->sender = my_ident;
00443                                 
00444                                                 head->num_phase = curphase -1;
00445                                         
00446                                                 head->type = ACK;
00447                                                         
00448                                                 head->size = 0; 
00449                                                         
00450                                                 //DEBUGcout << "          previous empty re-unicast " << endl;
00451                                                         
00452                                                 /*pthread_mutex_lock(&net_mutex);*/
00453                                                         
00454                                                 ret= net->unicast(sender, (char *) head, HEADSZ );
00455                                                         
00456                                                 /*pthread_mutex_unlock(&net_mutex);*/
00457                                         }       
00458                                         if(ret<0) {
00459                                                         cout << "      Resend unicast error , val =" << ret << endl;
00460                                         }
00461                                                 
00462                                 } else if (mess_phase == curphase + 1) {
00463                                 
00464                                         pthread_mutex_unlock(&recv_mutex);
00465                                         
00466                                         cout << "message NACK en avance " << endl;
00467                                         
00468                                 } else {
00469                                         
00470                                         pthread_mutex_unlock(&recv_mutex);
00471                                                 
00472                                         cout << "message NACK hors phase, phase " << mess_phase << endl;
00473                                 }
00474                 } 
00475                 else if ( type == NEW ) {
00476                 
00477                         start_join( datapt, mess_size );
00478                         
00479                         pthread_mutex_unlock(&phase_mutex); 
00480                 }
00481                 }
00482                 else if ( status == PRERUN && type == INIT) {
00483                                 
00484                         init_from_net( datapt, mess_size );
00485                         
00486                         status = RUN;
00487                         
00488                 } 
00489 }
00490 
00491 /********************************************************************/  
00492 /*  */  
00493 
00494 int total_order_channel::broadcast(char * data, int size) {
00495 
00496         if(status != RUN) {
00497                 
00498                 cout<< "status broadcast " << status << endl;
00499                 
00500                 return (-2);
00501         }
00502 
00503                 /* cout<< "lock send " << endl; */
00504                 
00505                 pthread_mutex_lock(&send_mutex);
00506                 
00507                 /* cout<< "got send " << endl; */
00508                 
00509                 if( total != TAILLE_MAX - 1 ) {
00510       
00511                         send_buff[last].data = new char[size + HEADSZ ];
00512                         
00513                         memcpy(send_buff[last].data + HEADSZ , data, size); 
00514       
00515                         send_buff[last].size = size;   // the size WITHOUT the header 
00516                         
00517                         total++;
00518   
00519                         last = (last+1) % TAILLE_MAX;
00520 
00521                 } else {
00522                         
00523                         cout << " broadcast buffer plein " << endl;
00524                         
00525                         pthread_mutex_unlock(&send_mutex);
00526                         
00527                         return(-1);
00528                 }
00529 
00530                 n_to_send++;
00531                 
00532                 pthread_mutex_unlock(&send_mutex);
00533 
00534                 pthread_mutex_unlock(&phase_mutex);
00535                 
00536                 return (0);
00537         }
00538         
00539 
00540 /********************************************************************/  
00541 /* code executed at each phase */       
00542 
00543 void * total_order_channel::phase_code(void *) {
00544         
00545         int test, sz , iter ;
00546                 
00547         header *head ;
00548                 
00549         char *data;
00550         
00551         cout << "     start phase thread" << endl;
00552          
00553         for (;;) {
00554  
00555                 pthread_mutex_lock(&phase_mutex);
00556                 
00557                 cout << " ->> on phase " << curphase << endl;
00558                 
00559                 do {
00560 
00561 /********************************************************************/  
00562 /* There is a FAULT message to send  */
00563 
00564                         cout << "PHASE " << curphase << endl;
00565 
00566                         if ( fault_count) { 
00567                                 
00568                                 cur_get->sendflag = FAULT;
00569                         
00570                                 current = &fault_message;
00571                                 
00572                                 //DEBUGcout << "    fault broadcast at phase " << curphase << endl;
00573                         
00574                                 test = net->broadcast(fault_message.data, fault_message.size + HEADSZ);
00575  
00576                                 if(test<0) {
00577                                         cout << "Fault broadcast error, test=" << test << endl;
00578                                 }
00579                                 fault_count = 0;
00580                                 
00581                                 cur_get->status[my_ident]= GOTFAULT; 
00582                         }
00583 
00584 /********************************************************************/  
00585 /* There is a CMD message to broadcast  (JOIN / QUIT) */
00586 
00587                         else if ( cmd_to_send )  {
00588                         
00589                                 cur_get->sendflag = CMD ;
00590                                 
00591                                 current = &cmd_message ;
00592                                 
00593                                 data = &current->data[HEADSZ];
00594                                 
00595                                 cmd_buf.head.head.num_phase = curphase;
00596                                 
00597                                 //DEBUGcout << "    cmd broadcast at phase " << curphase << endl;
00598                                 
00599                                 test = net->broadcast(cmd_message.data, cmd_message.size + HEADSZ);
00600  
00601                                 if(test<0) {
00602                                         cout << "Cmd broadcast error, test=" << test << endl;
00603                                 }
00604                                 
00605                                 cur_get->status[my_ident]= GOTCMD; 
00606                                 
00607                                 cur_get->message[my_ident].data= new char[current->size];
00608                                         
00609                                 memcpy(cur_get->message[my_ident].data, data, current->size);
00610                                         
00611                                 cur_get->message[my_ident].size = current->size;
00612                                 
00613                                 cmd_to_send = 0;
00614                         }
00615                                                                                 
00616 /********************************************************************/  
00617 /*  There is a "standart" message to broadcast at this phase */
00618 
00619                         else if ( n_to_send != 0 ) {
00620                         
00621                                 cur_get->sendflag = MESS;
00622                                 
00623                                 total_sent++;
00624                                 
00625                                 current = (msgtype *) get_first_to_send(); 
00626                                 
00627                                 data = &current->data[HEADSZ];                                  
00628                                                         
00629                                 head = (header *) current->data;
00630                                         
00631                                 head->sender = my_ident;
00632                                 
00633                                 head->num_phase = curphase;
00634                                         
00635                                 head->type = MESS;
00636                                         
00637                                 head->size = current->size; 
00638                                                                                 
00639                                 //DEBUGcout << "     std broadcast at phase " << curphase << endl;
00640                                 
00641                                 /*pthread_mutex_lock(&net_mutex);*/
00642                                 
00643                                 test = net->broadcast( current->data, current->size + HEADSZ);
00644                                 
00645                                 /*pthread_mutex_unlock(&net_mutex);*/
00646                                         
00647                                 if(test < 0){
00648                                         cout << "  std  broadcast error, test = " << test << endl;
00649                                 }
00650                                 
00651                                 cur_get->status[my_ident]= GOTMESS; 
00652                                 
00653                                 cur_get->message[my_ident].data= new char[current->size];
00654                                         
00655                                 memcpy(cur_get->message[my_ident].data, data, current->size);
00656                                         
00657                                 cur_get->message[my_ident].size = current->size;
00658 
00659                         } else  {
00660 
00661 /********************************************************************/  
00662 /*  no message to broadcast at this phase , send ACK */
00663 
00664                                 cur_get->sendflag = ACK;
00665 
00666                                 current = NULL;
00667 
00668                                 head = & empty_message_header;
00669                                                                                                                         
00670                                 head->sender = my_ident;
00671                                 
00672                                 head->num_phase = curphase;
00673                                         
00674                                 head->type = ACK;
00675                                         
00676                                 head->size = 0; 
00677                                 
00678                                 //DEBUGcout << "     empty broadcast at phase " << curphase << endl;
00679                                 
00680                                 /*pthread_mutex_lock(&net_mutex);*/
00681 
00682                                 test = net->broadcast((char *) head, HEADSZ);
00683                                 
00684                                 /*pthread_mutex_unlock(&net_mutex);*/
00685                                         
00686                                 if(test<0) {
00687                                         cout << "  ack  broadcast error, test=" << test << endl;
00688                                 }
00689                                 
00690                                 cur_get->status[my_ident]= GOTACK;
00691                         }
00692                 
00693                 
00694 /**************************************************************************/
00695 /*********************  WAITING end-of phase ********************/      
00696 /* if some message is not yet received , wait for condition, set by deliver() */ 
00697 /* routine after receiving messages from all partners */     
00698 
00699                 if(cur_get->mess_numb != cur_get->grsize - 1 ) {
00700                                         
00701                         /* pthread_mutex_unlock(&recv_mutex); */
00702                         
00703                         if ( new_flag ) max_iter = NEW_MAX_ITER;
00704                         
00705                         else max_iter = MAX_ITER ; 
00706                         
00707                         new_flag = 0;
00708         
00709                         to.tv_sec = time(NULL) + timeout;
00710                         to.tv_nsec = 0;
00711                         
00712                         pthread_mutex_lock(&mp);
00713                         
00714                         int ret = pthread_cond_timedwait(&all_received, &mp, &to);
00715                         
00716                         pthread_mutex_unlock(&mp);
00717                         
00718                         for ( iter = 1 ; ret == ETIMEDOUT && iter <= max_iter ; iter ++)  {
00719                         
00720                                 // timeout routine 
00721 
00722                                 if (timeout_handler(iter) != 0 ) { 
00723                                         
00724                                         /* cout << "     lock recv " << endl;*/
00725                                         
00726                                         /* pthread_mutex_lock(&recv_mutex);*/
00727                                                 
00728                                         /* cout << "     got recv " << endl; */
00729                                         
00730                                         if(cur_get->mess_numb != cur_get->grsize - 1 ) {
00731                                         
00732                                         /* pthread_mutex_unlock(&recv_mutex); */
00733                                         
00734                                                 to.tv_sec = time(NULL) + timeout ;
00735                                                 to.tv_nsec = 0;
00736                                                 
00737                                                 pthread_mutex_lock(&mp);
00738                                                 
00739                                                 ret = pthread_cond_timedwait(&all_received, &mp, &to);
00740                                                 
00741                                                 pthread_mutex_unlock(&mp);
00742                                         }                                       
00743                                 }
00744                                 
00745                                 else { 
00746                                         ret = 0; 
00747                                 
00748                                         break;
00749                                 }                               
00750                         }
00751                                                 
00752                         if (ret) {
00753                         
00754                                 if (ret == ETIMEDOUT) {
00755                                 
00756                                         // some message was not received 
00757                                         
00758                                         fault_handler() ;   
00759                                 }
00760                                 else if (ret== EINVAL) {
00761                                         cout << "     phase erreur EINVAL cond wait iter " << iter << endl;
00762                                         exit(0);
00763                                         
00764                                 } else {                
00765                                         cout << "     phase erreur cond wait at iter " << iter << endl;
00766                                         exit(0);
00767                                 }       
00768                         }                        
00769                 }
00770                 /* else  pthread_mutex_unlock(&recv_mutex); */                          
00771                 
00772                 //DEBUGcout << " phase  " << curphase << " emissions " << total_sent << endl ;
00773 
00774 /**************************************************************************/
00775 /*********************  End-of phase ********************/      
00776 
00777 /* take phase decision and actions */
00778 
00779                         if (curphase != 1) phase_actions();
00780 
00781                         pthread_mutex_trylock(&phase_mutex);
00782                                                 
00783 /*******************************************************************/   
00784 /* update send buffers */
00785                         
00786                         /* cout<< "     lock send " << endl; */
00787 
00788                         pthread_mutex_lock(&send_mutex);
00789                         
00790                         /* cout<< "     got send " << endl; */
00791                         
00792                         send_buffer_advance();
00793                         
00794                         pthread_mutex_unlock(&send_mutex);
00795 
00796 /*******************************************************************/   
00797 /* Initialise next phase */     
00798 
00799                         //DEBUGcout << "     reinit for next phase " << curphase + 1<< endl;
00800                         
00801                         phase_update();
00802                         
00803                         //DEBUGcout << " to_send " << n_to_send  << " received " <<  n_received << endl;
00804                                                                 
00805                 } while( (cmd_to_send + n_to_send + n_received + ack_flag + fault_count ) != 0);
00806         }
00807 }
00808 /********************************************************************/  
00809 /* routine for the iterated timeout */  
00810 
00811 int total_order_channel::timeout_handler(int it ) {
00812 
00813                 int test;
00814 
00815                 //DEBUGcout << "     TIMEOUT  at iter " << it  << endl;
00816 
00817                 union{ 
00818                 
00819                         header head;
00820                         char buf[25];
00821                         
00822                 } nack_message; 
00823                 
00824                 int dest_list[MAX_MEMBER] ;
00825                 
00826                 int dest ; 
00827 
00828                 dest = (cur_get->grsize - cur_get->mess_numb) -1;       //number of missing senders
00829                 
00830                 if ( dest == 0 ) return( 0);
00831                 
00832                 dest = 0;
00833                 
00834                 //Fill nack message             
00835                 
00836                 header *head= &nack_message.head;
00837                                                 
00838                 head->sender = my_ident;
00839 
00840                 head->num_phase = curphase;
00841 
00842                 head->type = NACK;              
00843                 
00844                 head->size = 0;
00845 
00846                 for(int id =0 ; id < MAX_MEMBER ; id++) {
00847 
00848                         if( id != my_ident && cur_get->status[id]== WAITING ) {
00849                         
00850                                 dest_list[dest]= id; 
00851                                 
00852                                 dest ++; 
00853                                 
00854                                 //DEBUGcout << " Nack for " << id <<  endl;
00855                         }
00856                 }
00857                 if ( dest == 0 ) return( 0);
00858                 
00859                 test= net->multicast(dest, dest_list , nack_message.buf, HEADSZ );
00860  
00861                 if(test<0) {
00862                         cout << "nack broadcast error test=" << test << endl;
00863                 } 
00864                 return(1);
00865 
00866 }
00867 
00868 /********************************************************************/  
00869 /* last timeout routine */
00870 
00871 void total_order_channel::fault_handler() {
00872 
00873                 //DEBUGcout << "     TIMEOUT FAULT "  << endl;
00874                 
00875                 header *head= &fault_buf.head;
00876                 
00877                 char *data = &fault_buf.buf[HEADSZ];
00878                 
00879                 fault_count = 0;
00880                         
00881                 for(int i=0 ; i< MAX_MEMBER ; i++) {
00882 
00883                         if(i != my_ident && cur_get->status[i]== WAITING ) {
00884                         
00885                                 *(int *) data = i; 
00886                         
00887                                 data += sizeof(int);
00888                                 
00889                                 fault_count ++;
00890                                 
00891                                 cur_get->fault_cnt[i]++ ;
00892                                 
00893                                 //DEBUGcout << "     seems lost : " << i << endl;
00894                                 
00895                         }
00896                 }
00897                 
00898                 head->sender = my_ident;
00899 
00900                 head->num_phase = curphase + 1;
00901 
00902                 head->type = FAULT;
00903 
00904                 head->size = sizeof(int) * fault_count ;
00905                                 
00906                 fault_message.size = head->size ;
00907         }
00908         
00909 /********************************************************************/  
00910 /*********************** phase_actions *********************/
00911 /* decide on which members are correct and which */ 
00912 /* messages of previous phase are delivered */
00913 
00914 void total_order_channel::phase_actions() {
00915 
00916         int sz, i,k;
00917         
00918         int host, port, ident;
00919         
00920         cmdheader * cmdpt;
00921                         
00922         //DEBUGcout << "     phase " << curphase << " phase action " << endl;
00923         
00924         /* cout<< "     lock recv " << endl; */
00925                                                 
00926         pthread_mutex_lock(&recv_mutex);
00927                         
00928         /* cout<< "     got recv " << endl; */  
00929         
00930         /********************************************************************/  
00931         /***** suppress faulty members. This is the critical choice algorithm ****/     
00932         
00933         for(int i=0 ; i < MAX_MEMBER ; i++) {
00934         
00935                 if (prev_get->fault_cnt[i] != 0 || cur_get->fault_cnt[i] != 0 ) { 
00936 
00937                         cur_get->status[i] = FAULTY ;
00938                         
00939                         if (next_get->status[i] != UNDEF ) {
00940                                                 
00941                                 next_get->status[i] = UNDEF ;
00942                         
00943                                 next_get->grsize--;
00944                         
00945                                 my_group->del_member(i);
00946                         
00947                                 cout << "          XXXXXXXXXXX " << my_ident << " SUPPRESSION de " << i << endl; 
00948                         }
00949 
00950                         if ( prev_get->fault_cnt[i] != 0 ) 
00951                         
00952                                 prev_get->status[i]= FAULTY;
00953                 }
00954                 
00955                 /********************************************************************/  
00956                 /* deliver application messages */
00957                                                 
00958                 if( prev_get->status[i] == GOTMESS) {
00959                 
00960                         sz = prev_get->message[i].size;
00961                                                                 
00962                         prev_get->message[i].data[sz] = '\0';
00963                                 
00964                         cout << "        XXXXXXXXXXX " << my_ident << " DELIVRE de " << i <<": "<< prev_get->message[i].data << endl;
00965                                         
00966                         //receive(prev_get->message[i].data, prev_get->message[i].size); 
00967                         
00968                 }
00969         }
00970         /********************************************************************/  
00971         /* treat join and quit command messages */
00972         
00973         for(i=0 ; i < MAX_MEMBER ; i++) {
00974         
00975                 if(prev_get->status[i] == GOTCMD ) {
00976                 
00977                         cmdpt = (cmdheader *) prev_get->message[i].data;
00978                         
00979                         if ( cmdpt->type == JOIN ) {
00980                         
00981                                 new_flag = 1;
00982                         
00983                                 host = cmdpt->nodeid;
00984                                 
00985                                 port = cmdpt->port;
00986                                 
00987                                 if ( next_get->grsize < MAX_MEMBER) {
00988                         
00989                                         for(k=1 ; k < MAX_MEMBER ; k++) {
00990                                 
00991                                                 if (next_get->status[k] == UNDEF ) {
00992                                         
00993                                                         ident = k;
00994                                                 
00995                                                         next_get->status[k] = NEWMEM;
00996                                                         
00997                                                         next_get->grsize++;
00998                                                 
00999                                                         my_group->add_member(ident, host, port);
01000                                                         
01001                                                         cout << "        XXXXXXXXXXX " << my_ident << " AJOUT de " << ident <<endl ;
01002                                                 
01003                                                         break;
01004                                                 }
01005                                         }
01006                                 } else { 
01007                                         }
01008                         }
01009                         else if ( cmdpt->type == QUIT ) {
01010                         
01011                                 ident = cmdpt->ident;
01012                                 
01013                                 if(next_get->status[ident] != UNDEF ) {
01014                                 
01015                                         next_get->status[ident] = UNDEF ; 
01016                                         
01017                                         next_get->grsize--;
01018                                         
01019                                         my_group->del_member(ident);
01020                                 }
01021                         }                                       
01022                 }
01023         }
01024         for(i=0 ; i < MAX_MEMBER ; i++) {
01025         
01026                 if(next_get->status[i] == NEWMEM) {
01027                 
01028                         new_member_init(i);  
01029                         
01030                         next_get->status[i] = WAITING;
01031                 }
01032         }                               
01033        pthread_mutex_unlock(&recv_mutex); 
01034 }
01035  /********************************************************************/ 
01036 /********************************************************************/  
01037 /* phase_update method */
01038 
01039 void total_order_channel::phase_update() {
01040 
01041                         
01042         /* cout<< "     lock recv " << endl; */
01043                                                 
01044         pthread_mutex_lock(&recv_mutex);
01045                         
01046         /* cout<< "     got recv " << endl; */
01047                         
01048         curphase++;
01049         
01050         previous = current;
01051                         
01052         current = NULL;
01053                         
01054         ack_flag = cur_get->mess_flag + cur_get->fault_flag + cur_get->cmd_flag;
01055         
01056 /********************************************************************/  
01057 /* phase buffers permutation */
01058         
01059         struct phase_buffer *tmp ; 
01060                 
01061         tmp = prev_get;
01062         
01063         prev_get = cur_get;
01064         
01065         cur_get = next_get;
01066         
01067         next_get= tmp;
01068                                         
01069         /* prev_get = &get_net_buffer[(curphase +2) % 3];
01070                                 
01071         cur_get= &get_net_buffer[curphase % 3];
01072         
01073         next_get =&get_net_buffer[(curphase +1) % 3]; */
01074         
01075 /********************************************************************/  
01076 /* next buffer initialisation */        
01077 
01078 
01079         n_received = cur_get->mess_numb;
01080                 
01081         next_get->phase = curphase + 1;
01082                 
01083         next_get->grsize = cur_get->grsize; 
01084         
01085         next_get->sendflag = 0 ;
01086         
01087         next_get->mess_numb = 0;
01088         
01089         next_get->mess_flag = 0 ;
01090         
01091         next_get->fault_flag = 0 ;
01092         
01093         next_get->cmd_flag = 0 ;
01094                 
01095         for(int i=0 ; i< MAX_MEMBER; i++) {
01096         
01097                 //cout<< " ident " << i << " status " << cur_get->status[i] << endl; 
01098         
01099                 if( cur_get->status[i] == UNDEF ) next_get->status[i] = UNDEF;
01100                 
01101                 else { 
01102                         next_get->status[i] = WAITING;
01103                 
01104                         //cout << "MEMBER ident : "<< i << endl;
01105                 }
01106         
01107                 next_get-> fault_cnt[i] = 0;
01108                 
01109                 if (next_get->message[i].data != NULL) { 
01110                 
01111                         delete(next_get->message[i].data );
01112                         
01113                         next_get->message[i].data = NULL;
01114                         
01115                 }
01116                         
01117                 next_get->message[i].size = 0;
01118         }
01119         
01120         pthread_mutex_unlock(&recv_mutex); 
01121         
01122 }       
01123 
01124 /********************************************************************/
01125 /***** initialisations ***********/
01126 
01127 void total_order_channel::init_data() {
01128         
01129         delt=0 ; last = 0; next=0; total = 0 ; 
01130         int i,j; 
01131         
01132         for ( i = 0; i< TAILLE_MAX ; i++) {
01133                 
01134                 send_buff[i].data  = NULL;
01135                 
01136                 send_buff[i].size = 0;
01137         }       
01138         
01139         prev_get = &get_net_buffer[0] ;
01140         
01141         cur_get = &get_net_buffer[1];
01142         
01143         next_get = &get_net_buffer[2];
01144         
01145         for (i = 0; i<3 ;i++ ) {
01146         
01147                 get_net_buffer[i].grsize =0;
01148                 
01149                 get_net_buffer[i].sendflag = 0;
01150         
01151                 get_net_buffer[i].mess_numb = 0;
01152                 
01153                 get_net_buffer[i].mess_flag = 0;
01154                 
01155                 get_net_buffer[i].fault_flag = 0;
01156                 
01157                 get_net_buffer[i].cmd_flag = 0;
01158         
01159                 for(j =0 ; j < MAX_MEMBER; j++) {
01160         
01161                         get_net_buffer[i].status[j] = UNDEF;
01162                 
01163                         get_net_buffer[i].fault_cnt[j] = 0;
01164                 
01165                         get_net_buffer[i].message[j].data = NULL;
01166                 
01167                         get_net_buffer[i].message[j].size = 0;
01168                 }               
01169         }
01170         total_sent = 0;
01171                 
01172         n_to_send = 0;
01173 
01174         n_received = 0;
01175                 
01176         new_flag = 0;
01177                                 
01178         fault_count = 0;
01179                 
01180         fault_message.data = fault_buf.buf;
01181         
01182         cmd_to_send  = 0;
01183         
01184         cmd_message.data = cmd_buf.buf ;
01185         
01186         init_flag = 0;
01187         
01188         init_message.data = init_buf.buf;
01189 
01190         current = NULL;
01191         
01192         my_group = new group_admin();
01193         
01194 }
01195 
01196 /********************************************************************/
01197 /***** send buffer routines ***********/        
01198 
01199 void * total_order_channel::get_first_to_send() {
01200 
01201         /* cout<< "     lock send " << endl; */
01202 
01203         pthread_mutex_lock(&send_mutex);
01204                         
01205         /* cout<< "     got send " << endl; */
01206                         
01207         void * pt = (void *)&send_buff[next];
01208 
01209         next = (next + 1 ) % TAILLE_MAX;
01210         
01211         n_to_send-- ;
01212         
01213         pthread_mutex_unlock(&send_mutex);
01214         
01215         return(pt); 
01216 }
01217 
01218 
01219 void total_order_channel::send_buffer_advance() {
01220 
01221         int ind;
01222         
01223         if (total != 0) {
01224         
01225                 if (prev_get->sendflag == MESS)  {
01226                 
01227                         if ( cur_get->sendflag == MESS ) ind = next - 2 ;
01228                         
01229                         else ind = next - 1;
01230                 }
01231                 
01232                 else {
01233                 
01234                          return ; 
01235                 }
01236 
01237                 if (ind < 0 ) ind = ind + TAILLE_MAX ;
01238         
01239                 if (send_buff[ind].size != 0 ) {
01240                 
01241                         delete(send_buff[ind].data) ;
01242                          
01243                         send_buff[ind].data = NULL;
01244                         
01245                         send_buff[ind].size = 0 ;
01246                 }
01247                 
01248                 total--;
01249                 
01250                 //DEBUGcout << " total " << total << " next " << next << " last " << last << " ind " << ind << endl;
01251         }
01252 }
01253 
01254 /********************************************************************/
01255 /***** group membership routines  ***********/  
01256 
01257 /******  initiate a JOIN process for a new member ***********/
01258 
01259 int total_order_channel::start_join( char *data, int size ) {
01260 
01261 
01262         header *head = &cmd_buf.head.head;
01263         
01264         cmdheader *cmdpt= &cmd_buf.head.chead;
01265         
01266         if (cmd_to_send == 0 && init_flag == 0) {
01267                         
01268                 head->sender = my_ident;
01269 
01270                 head->num_phase = -1;
01271 
01272                 head->type = CMD;
01273 
01274                 head->size = CMHEADSZ ;
01275                         
01276                 cmdpt->type = JOIN ;
01277                 
01278                 cmdpt->ident = 0 ;
01279                         
01280                 cmdpt->nodeid = * (int *)data;
01281         
01282                 cmdpt->port = * (int *)(data + sizeof(int));
01283                 
01284                 cmd_message.size = head->size ;
01285         
01286                 cmd_to_send = 1;
01287                 
01288                 init_flag = 1;
01289         }
01290                 
01291         else {
01292         
01293                 cout << " start_join aborted : *** " << my_ident << " *** " << endl;
01294         }       
01295         
01296 }
01297 
01298 /********************************************************************/
01299 /***** terminate a JOIN process ( send data to new member ) ***********/        
01300  
01301 int total_order_channel::new_member_init( int ident ) {
01302 
01303         int size , ret;
01304         
01305         char *pt ;
01306         
01307         header *head= &init_buf.head.head;
01308                 
01309         initheader *inhead = &init_buf.head.ihead;
01310         
01311         struct member *memb ;
01312         
01313         //DEBUGcout << " new_member_init : *** " << my_ident << " *** " << endl;
01314                 
01315         //DEBUGcout << " grsize: " << next_get->grsize <<  " , curphase: " << curphase << endl;
01316         
01317         size = INHEADSZ + (next_get->grsize * MEMBSZ) ;
01318         
01319         init_message.size = size + HEADSZ;
01320                 
01321         head->sender = my_ident;
01322         
01323         head->num_phase = curphase + 1;
01324         
01325         head->type = INIT ;
01326         
01327         head->size = size;
01328 
01329         inhead->ident = ident ; 
01330                 
01331         inhead->groupsize = next_get->grsize ; 
01332         
01333         inhead->phase = curphase + 1;
01334         
01335         pt = &init_buf.buf[HEADSZ + INHEADSZ];
01336         
01337         memb = (struct member *) pt;
01338                 
01339         int nb = 0;
01340                 
01341         for(int i=0 ; i< MAX_MEMBER; i++) { 
01342                 
01343                 if( my_group->members_list[i].ident != UNDEF ) {
01344                 
01345                         memb->ident = my_group->members_list[i].ident;
01346                         
01347                         memb->nodeid  = my_group->members_list[i].nodeid ;
01348                         
01349                         memb->port = my_group->members_list[i].port ;
01350                         
01351                         memb++;
01352                         
01353                         nb++;
01354                 }
01355         }
01356                         
01357         if ( nb != next_get->grsize) {   cout << " error groupsize " << next_get->grsize << " != " << nb ;}
01358         
01359         /*pthread_mutex_lock(&net_mutex);*/     
01360         
01361         ret= net->unicast(ident, init_message.data, init_message.size );
01362                                                                 
01363         /*pthread_mutex_unlock(&net_mutex);*/
01364         
01365         if(ret<0) {
01366                 
01367                 cout << "      Init unicast error, val =" << ret << endl;
01368         }    
01369         
01370         init_flag = 0;    
01371 }               
01372                         
01373 
01374 /********************************************************************/
01375 /***** member initialisation , data send by new_member_init() ***********/                      
01376  
01377 int total_order_channel::init_from_net( char * data, int size) {
01378 
01379         int ident;
01380 
01381         struct member *memb ;
01382 
01383         initheader * inhead ;
01384         
01385         inhead = (initheader * ) data;
01386         
01387         my_ident =  inhead->ident ;
01388         
01389         net->my_ident = my_ident ; 
01390         
01391         grsize =  inhead->groupsize ; 
01392         
01393         curphase = inhead->phase;
01394         
01395         //DEBUGcout << " Init from net member : *** " << my_ident << " *** " << endl;
01396         
01397         //DEBUGcout << " grsize: " << grsize <<  " , curphase: " << curphase << endl;
01398         
01399         prev_get = &get_net_buffer[(curphase +2) % 3];
01400                                 
01401         cur_get= &get_net_buffer[curphase % 3];
01402         
01403         next_get =&get_net_buffer[(curphase +1) % 3];
01404         
01405         memb  = (member *) &data[INHEADSZ];
01406         
01407         for ( int i = 1; i<= grsize ;i ++) {
01408         
01409                 ident = memb->ident ;           
01410 
01411                 my_group->add_member(memb->ident, memb->nodeid, memb->port );
01412                 
01413                 cur_get->status[ident]= WAITING;
01414                 
01415                 next_get->status[ident]= WAITING;
01416                         
01417                 memb++;
01418                         
01419         }               
01420                 
01421         cur_get->grsize = grsize;
01422 
01423         next_get->grsize = grsize;
01424 
01425         cur_get->phase = curphase ;
01426 
01427         next_get->phase = curphase + 1 ;
01428 
01429         
01430   }
01431                         
01432                 
01433                 
01434                 

Generated at Mon Mar 1 18:05:55 2004 for Groupware by doxygen 1.1.1 written by Dimitri van Heesch, © 1997-2000