00001 /**********************************************************************************************/ 00002 /*************************** TOTAL ORDER CHANNEL CLASS *************************/ 00003 /**************************** Jean FANCHON. LAAS-CNRS *************************/ 00004 /**********************************************************************************************/ 00005 00006 #include "total_order_channel.h" 00007 00008 extern void receive(char *, int); 00009 00010 /********************************************************************/ 00011 /********************************************************************/ 00012 00013 void * start_phase( void * channel) { 00014 //to start phase code 00015 00016 void * t; 00017 00018 ((total_order_channel *) channel)->phase_code(t); 00019 } 00020 00021 00022 /********************************************************************/ 00023 /********************************************************************/ 00024 /* total_order_channel definitions */ 00025 00026 00027 total_order_channel::total_order_channel( int flag, char *file, char *ipnum) { 00028 00029 cout << "start total order channel" << endl; 00030 00031 /********************************************************************/ 00032 /* initialise local data */ 00033 00034 char ipadd[20]; 00035 00036 init_data(); 00037 00038 if (flag == 1) strcpy(ipadd,ipnum); 00039 00040 else my_ident = atoi(ipnum); 00041 00042 init_data(); 00043 00044 status = START; 00045 00046 /********************************************************************/ 00047 /* Case of a group creator: initialise group and membership */ 00048 00049 if (flag == 0 ) { 00050 if (init_from_file(file) ) { 00051 00052 cout << " erreur opening " << file << endl; 00053 00054 exit(0); 00055 } 00056 00057 status = RUN; 00058 } 00059 00060 00061 /********************************************************************/ 00062 /* initialise synchro objects */ 00063 00064 if( int ret = pthread_cond_init(&all_received, NULL) ) { 00065 00066 cout << " pthread_cond_init error " << "ret = " << ret << endl; 00067 } 00068 00069 pthread_mutex_init(&mp,NULL); 00070 00071 /*pthread_mutex_init(&net_mutex,NULL);*/ 00072 00073 pthread_mutex_init(&phase_mutex, NULL); 00074 00075 pthread_mutex_init(&send_mutex,NULL); 00076 00077 pthread_mutex_init(&recv_mutex, NULL); 00078 00079 pthread_mutex_unlock(&mp); 00080 00081 /*pthread_mutex_unlock(&net_mutex);*/ 00082 00083 pthread_mutex_lock(&phase_mutex); 00084 00085 pthread_mutex_unlock(&send_mutex); 00086 00087 pthread_mutex_unlock(&recv_mutex); 00088 00089 /********************************************************************/ 00090 /* initialise underlying layer */ 00091 00092 if (flag == 1) /* following members */ 00093 net = new no_prop_channel(this, ipadd, my_group); 00094 00095 else if (flag == 0) /* first members */ 00096 net = new no_prop_channel(this, my_ident , file, my_group); 00097 00098 /********************************************************************/ 00099 /* for a newcomer, join group and initialise membership */ 00100 00101 if (flag == 1) { 00102 00103 start_net_init(file); 00104 00105 status = PRERUN ; 00106 } 00107 00108 /********************************************************************/ 00109 /* start phase thread */ 00110 00111 max_iter = MAX_ITER; 00112 00113 timeout = TIMEOUT2; 00114 00115 int ret = pthread_create(&phase_thread, 0,start_phase, (void *)this); 00116 00117 if (ret) { 00118 perror(" total_order()"); printf(" erreur th create %d\n",ret); 00119 if (ret == ENOMEM) printf(" ENOMEM \n" ); 00120 if (ret== EINVAL) printf(" EINVAL \n" ); 00121 if (ret == EPERM) printf(" EPERM \n" ); 00122 exit(0); 00123 } 00124 00125 00126 } 00127 00128 /********************************************************************/ 00129 /* new comer, join group by asking any member */ 00130 /* for the moment find member adress in file */ 00131 /* receive data and initialise membership */ 00132 00133 int total_order_channel::start_net_init( char * file) { 00134 00135 00136 header *head= &cmd_buf.head.head; 00137 int * pt; 00138 00139 head->sender = -1; 00140 00141 head->num_phase = -1; 00142 00143 head->type = NEW; 00144 00145 head->size = 2 * sizeof(int) ; 00146 00147 pt = (int *) &cmd_buf.buf[HEADSZ]; 00148 00149 *pt = net->get_mynode(); 00150 00151 *(pt + 1) = net->get_myport(); 00152 00153 cout << "***** start_net_init, my_port = " << *(pt + 1) <<"****" << endl; 00154 00155 net->init_membership(file, cmd_message.data, head->size + HEADSZ ); 00156 00157 } 00158 00159 /********************************************************************/ 00160 /* Case of a group creator: initialise group from file */ 00161 00162 int total_order_channel::init_from_file( char * file) { 00163 00164 int ident, nb = 0; 00165 00166 if (my_group->init_members_list(file) ) { 00167 00168 cout << "pb init_members_list " << endl; 00169 exit(0); 00170 } 00171 00172 for(int i=0; i< MAX_MEMBER; i++) { 00173 00174 if (my_group->members_list[i].ident != UNDEF) { 00175 00176 ident = my_group->members_list[i].ident ; 00177 00178 cur_get->status[ident]= WAITING; 00179 00180 next_get->status[ident]= WAITING; 00181 00182 nb ++; 00183 } 00184 } 00185 //DEBUGcout << " total grsiz = " << nb << endl; 00186 00187 cur_get->grsize = nb; 00188 00189 next_get->grsize = nb; 00190 00191 cur_get->phase = 1 ; 00192 00193 next_get->phase = 2 ; 00194 00195 curphase = 1; 00196 00197 return(0) ; 00198 00199 } 00200 00201 /********************************************************************/ 00202 /*takes messages from network, unfolds header, act depending on type*/ 00203 /* if all awaited messages received, signals to start phase_code */ 00204 00205 int total_order_channel::deliver(char *data, int size) { 00206 00207 int ret; 00208 00209 int sender, mess_phase, type, mess_size ; 00210 00211 int ident, faulty; 00212 00213 header *head = (header *) data; 00214 00215 sender = head->sender; 00216 00217 type = head->type; 00218 00219 mess_phase = head->num_phase; 00220 00221 mess_size = head->size; 00222 00223 if ( mess_size != size - HEADSZ) 00224 00225 cout << " ** Problem of message size , got :"<< size - HEADSZ << " expected : " << mess_size<< endl; 00226 00227 char * datapt = data + HEADSZ; 00228 00229 //DEBUGcout <<"from " << sender <<" type " << type << " phase " << curphase <<" mess_phase " << mess_phase <<endl; 00230 00231 if ( status == RUN) { 00232 00233 if(type == MESS || type == CMD || type == ACK || type == FAULT ) { 00234 00235 /* cout<< " lock recv " << endl; */ 00236 00237 pthread_mutex_lock(&recv_mutex); 00238 00239 /* cout<< " got recv " << endl; */ 00240 00241 if(mess_phase == curphase) { 00242 00243 if (cur_get->sendflag == 0) { 00244 00245 pthread_mutex_unlock(&phase_mutex); 00246 } 00247 00248 if(cur_get->status[sender]==WAITING) { 00249 00250 //DEBUG cout << " first message: from " << sender << endl; 00251 00252 cur_get->mess_numb++; 00253 00254 cur_get->status[sender]= GOT + type; 00255 00256 if ( type == MESS || type == CMD ) { 00257 00258 cur_get->mess_flag = 1; 00259 00260 cur_get->message[sender].data= new char[mess_size]; 00261 00262 memcpy(cur_get->message[sender].data, datapt, mess_size); 00263 00264 cur_get->message[sender].size = mess_size; 00265 } 00266 else if (type == FAULT ) { 00267 00268 cur_get->fault_flag = 1; 00269 00270 faulty = mess_size/ sizeof(int); 00271 00272 //DEBUGcout << " got FAULT for phase " << mess_phase 00273 //DEBUG<< " for " << faulty << " faulty" << endl; 00274 00275 for(int i=1 ; i<= faulty ; i++, datapt += 4) { 00276 00277 ident = *(int *)datapt; 00278 00279 prev_get->fault_cnt[ident]++; 00280 00281 } 00282 } 00283 } 00284 pthread_mutex_unlock(&recv_mutex); 00285 00286 /********************************************************************/ 00287 /* all messages received, signal end of phase */ 00288 00289 if(cur_get->mess_numb == cur_get->grsize - 1 ) { 00290 00291 ret = pthread_cond_signal(&all_received); 00292 00293 if (ret) { 00294 00295 cout << " error signal all_received : " << ret 00296 << " at phase " << curphase << endl; 00297 00298 if (ret== EINVAL) cout << " EINVAL " << endl; 00299 } 00300 00301 } 00302 00303 } else if (mess_phase == curphase+1) { 00304 00305 if(next_get->status[sender]== WAITING) { 00306 00307 next_get->mess_numb++; 00308 00309 next_get->status[sender]= GOT + type; 00310 00311 next_get->message[sender].data= new char[mess_size]; 00312 00313 memcpy(next_get->message[sender].data, datapt, mess_size); 00314 00315 next_get->message[sender].size = mess_size; 00316 00317 if ( type == MESS || type == CMD ) { 00318 00319 next_get->mess_flag = 1; 00320 } 00321 else if (type == FAULT ) { 00322 00323 next_get->fault_flag = 1; 00324 00325 faulty = mess_size/sizeof(int); 00326 00327 cout << "FAULT in advance for phase " << mess_phase 00328 << " for " << faulty << " faulty" << endl; 00329 00330 for(int i=1 ; i<= faulty ; i++, datapt += 4) { 00331 00332 ident = *(int *)datapt; 00333 00334 if (ident == my_ident) { 00335 00336 cout << " SABORDAGE " << endl; 00337 00338 exit(1); 00339 } 00340 00341 // cur_get->fault_cnt[ident]++; 00342 } 00343 } 00344 } 00345 00346 pthread_mutex_unlock(&recv_mutex); 00347 00348 //DEBUGcout << " message in advance " << endl; 00349 } else { 00350 00351 pthread_mutex_unlock(&recv_mutex); 00352 00353 //DEBUGcout << " message hors phase, phase " << mess_phase << endl; 00354 } 00355 00356 } else if( type == NACK ) { 00357 00358 cout << " got NACK for phase " << mess_phase << endl; 00359 00360 /* cout<< " lock recv " << endl; */ 00361 00362 pthread_mutex_lock(&recv_mutex); 00363 00364 /* cout<< " got recv " << endl; */ 00365 00366 if (mess_phase == curphase) { 00367 00368 if ( cur_get->sendflag == 0 ) { 00369 00370 pthread_mutex_unlock(&recv_mutex); 00371 00372 pthread_mutex_unlock(&phase_mutex); 00373 } 00374 00375 else { 00376 00377 if( cur_get->sendflag == MESS || cur_get->sendflag == FAULT || cur_get->sendflag == CMD) { 00378 00379 //DEBUGcout << " current re-unicast , size = " << current->size << endl; 00380 00381 /*pthread_mutex_lock(&net_mutex);*/ 00382 00383 ret = net->unicast(sender, current->data, current->size + HEADSZ ); 00384 00385 /*pthread_mutex_unlock(&net_mutex);*/ 00386 00387 pthread_mutex_unlock(&recv_mutex); 00388 00389 } else { // it was an empty ACK message 00390 00391 pthread_mutex_unlock(&recv_mutex); 00392 00393 head = &empty_message_header; 00394 00395 head->sender = my_ident; 00396 00397 head->num_phase = curphase; 00398 00399 head->type = ACK; 00400 00401 head->size = 0; 00402 00403 //DEBUGcout << " curphase empty re-unicast " << endl; 00404 00405 /*pthread_mutex_lock(&net_mutex);*/ 00406 00407 ret= net->unicast( sender, (char *) head, HEADSZ ); 00408 00409 /*pthread_mutex_unlock(&net_mutex);*/ 00410 } 00411 00412 if(ret<0) { 00413 cout << " Resend unicast error, val =" << ret << endl; 00414 } 00415 } 00416 00417 } else if (mess_phase == curphase - 1) { 00418 00419 00420 // Sends to sender the complete information 00421 00422 // should increase the timeout for the sender 00423 00424 if ( prev_get->sendflag == MESS || prev_get->sendflag == FAULT || prev_get->sendflag == CMD) { 00425 00426 //DEBUGcout << " previous re-unicast , size = " << previous->size << endl; 00427 00428 /*pthread_mutex_lock(&net_mutex);*/ 00429 00430 ret = net->unicast(sender, previous->data, previous->size + HEADSZ); 00431 00432 /*pthread_mutex_unlock(&net_mutex);*/ 00433 00434 pthread_mutex_unlock(&recv_mutex); 00435 00436 } else { 00437 00438 pthread_mutex_unlock(&recv_mutex); 00439 00440 head = &empty_message_header; 00441 00442 head->sender = my_ident; 00443 00444 head->num_phase = curphase -1; 00445 00446 head->type = ACK; 00447 00448 head->size = 0; 00449 00450 //DEBUGcout << " previous empty re-unicast " << endl; 00451 00452 /*pthread_mutex_lock(&net_mutex);*/ 00453 00454 ret= net->unicast(sender, (char *) head, HEADSZ ); 00455 00456 /*pthread_mutex_unlock(&net_mutex);*/ 00457 } 00458 if(ret<0) { 00459 cout << " Resend unicast error , val =" << ret << endl; 00460 } 00461 00462 } else if (mess_phase == curphase + 1) { 00463 00464 pthread_mutex_unlock(&recv_mutex); 00465 00466 cout << "message NACK en avance " << endl; 00467 00468 } else { 00469 00470 pthread_mutex_unlock(&recv_mutex); 00471 00472 cout << "message NACK hors phase, phase " << mess_phase << endl; 00473 } 00474 } 00475 else if ( type == NEW ) { 00476 00477 start_join( datapt, mess_size ); 00478 00479 pthread_mutex_unlock(&phase_mutex); 00480 } 00481 } 00482 else if ( status == PRERUN && type == INIT) { 00483 00484 init_from_net( datapt, mess_size ); 00485 00486 status = RUN; 00487 00488 } 00489 } 00490 00491 /********************************************************************/ 00492 /* */ 00493 00494 int total_order_channel::broadcast(char * data, int size) { 00495 00496 if(status != RUN) { 00497 00498 cout<< "status broadcast " << status << endl; 00499 00500 return (-2); 00501 } 00502 00503 /* cout<< "lock send " << endl; */ 00504 00505 pthread_mutex_lock(&send_mutex); 00506 00507 /* cout<< "got send " << endl; */ 00508 00509 if( total != TAILLE_MAX - 1 ) { 00510 00511 send_buff[last].data = new char[size + HEADSZ ]; 00512 00513 memcpy(send_buff[last].data + HEADSZ , data, size); 00514 00515 send_buff[last].size = size; // the size WITHOUT the header 00516 00517 total++; 00518 00519 last = (last+1) % TAILLE_MAX; 00520 00521 } else { 00522 00523 cout << " broadcast buffer plein " << endl; 00524 00525 pthread_mutex_unlock(&send_mutex); 00526 00527 return(-1); 00528 } 00529 00530 n_to_send++; 00531 00532 pthread_mutex_unlock(&send_mutex); 00533 00534 pthread_mutex_unlock(&phase_mutex); 00535 00536 return (0); 00537 } 00538 00539 00540 /********************************************************************/ 00541 /* code executed at each phase */ 00542 00543 void * total_order_channel::phase_code(void *) { 00544 00545 int test, sz , iter ; 00546 00547 header *head ; 00548 00549 char *data; 00550 00551 cout << " start phase thread" << endl; 00552 00553 for (;;) { 00554 00555 pthread_mutex_lock(&phase_mutex); 00556 00557 cout << " ->> on phase " << curphase << endl; 00558 00559 do { 00560 00561 /********************************************************************/ 00562 /* There is a FAULT message to send */ 00563 00564 cout << "PHASE " << curphase << endl; 00565 00566 if ( fault_count) { 00567 00568 cur_get->sendflag = FAULT; 00569 00570 current = &fault_message; 00571 00572 //DEBUGcout << " fault broadcast at phase " << curphase << endl; 00573 00574 test = net->broadcast(fault_message.data, fault_message.size + HEADSZ); 00575 00576 if(test<0) { 00577 cout << "Fault broadcast error, test=" << test << endl; 00578 } 00579 fault_count = 0; 00580 00581 cur_get->status[my_ident]= GOTFAULT; 00582 } 00583 00584 /********************************************************************/ 00585 /* There is a CMD message to broadcast (JOIN / QUIT) */ 00586 00587 else if ( cmd_to_send ) { 00588 00589 cur_get->sendflag = CMD ; 00590 00591 current = &cmd_message ; 00592 00593 data = ¤t->data[HEADSZ]; 00594 00595 cmd_buf.head.head.num_phase = curphase; 00596 00597 //DEBUGcout << " cmd broadcast at phase " << curphase << endl; 00598 00599 test = net->broadcast(cmd_message.data, cmd_message.size + HEADSZ); 00600 00601 if(test<0) { 00602 cout << "Cmd broadcast error, test=" << test << endl; 00603 } 00604 00605 cur_get->status[my_ident]= GOTCMD; 00606 00607 cur_get->message[my_ident].data= new char[current->size]; 00608 00609 memcpy(cur_get->message[my_ident].data, data, current->size); 00610 00611 cur_get->message[my_ident].size = current->size; 00612 00613 cmd_to_send = 0; 00614 } 00615 00616 /********************************************************************/ 00617 /* There is a "standart" message to broadcast at this phase */ 00618 00619 else if ( n_to_send != 0 ) { 00620 00621 cur_get->sendflag = MESS; 00622 00623 total_sent++; 00624 00625 current = (msgtype *) get_first_to_send(); 00626 00627 data = ¤t->data[HEADSZ]; 00628 00629 head = (header *) current->data; 00630 00631 head->sender = my_ident; 00632 00633 head->num_phase = curphase; 00634 00635 head->type = MESS; 00636 00637 head->size = current->size; 00638 00639 //DEBUGcout << " std broadcast at phase " << curphase << endl; 00640 00641 /*pthread_mutex_lock(&net_mutex);*/ 00642 00643 test = net->broadcast( current->data, current->size + HEADSZ); 00644 00645 /*pthread_mutex_unlock(&net_mutex);*/ 00646 00647 if(test < 0){ 00648 cout << " std broadcast error, test = " << test << endl; 00649 } 00650 00651 cur_get->status[my_ident]= GOTMESS; 00652 00653 cur_get->message[my_ident].data= new char[current->size]; 00654 00655 memcpy(cur_get->message[my_ident].data, data, current->size); 00656 00657 cur_get->message[my_ident].size = current->size; 00658 00659 } else { 00660 00661 /********************************************************************/ 00662 /* no message to broadcast at this phase , send ACK */ 00663 00664 cur_get->sendflag = ACK; 00665 00666 current = NULL; 00667 00668 head = & empty_message_header; 00669 00670 head->sender = my_ident; 00671 00672 head->num_phase = curphase; 00673 00674 head->type = ACK; 00675 00676 head->size = 0; 00677 00678 //DEBUGcout << " empty broadcast at phase " << curphase << endl; 00679 00680 /*pthread_mutex_lock(&net_mutex);*/ 00681 00682 test = net->broadcast((char *) head, HEADSZ); 00683 00684 /*pthread_mutex_unlock(&net_mutex);*/ 00685 00686 if(test<0) { 00687 cout << " ack broadcast error, test=" << test << endl; 00688 } 00689 00690 cur_get->status[my_ident]= GOTACK; 00691 } 00692 00693 00694 /**************************************************************************/ 00695 /********************* WAITING end-of phase ********************/ 00696 /* if some message is not yet received , wait for condition, set by deliver() */ 00697 /* routine after receiving messages from all partners */ 00698 00699 if(cur_get->mess_numb != cur_get->grsize - 1 ) { 00700 00701 /* pthread_mutex_unlock(&recv_mutex); */ 00702 00703 if ( new_flag ) max_iter = NEW_MAX_ITER; 00704 00705 else max_iter = MAX_ITER ; 00706 00707 new_flag = 0; 00708 00709 to.tv_sec = time(NULL) + timeout; 00710 to.tv_nsec = 0; 00711 00712 pthread_mutex_lock(&mp); 00713 00714 int ret = pthread_cond_timedwait(&all_received, &mp, &to); 00715 00716 pthread_mutex_unlock(&mp); 00717 00718 for ( iter = 1 ; ret == ETIMEDOUT && iter <= max_iter ; iter ++) { 00719 00720 // timeout routine 00721 00722 if (timeout_handler(iter) != 0 ) { 00723 00724 /* cout << " lock recv " << endl;*/ 00725 00726 /* pthread_mutex_lock(&recv_mutex);*/ 00727 00728 /* cout << " got recv " << endl; */ 00729 00730 if(cur_get->mess_numb != cur_get->grsize - 1 ) { 00731 00732 /* pthread_mutex_unlock(&recv_mutex); */ 00733 00734 to.tv_sec = time(NULL) + timeout ; 00735 to.tv_nsec = 0; 00736 00737 pthread_mutex_lock(&mp); 00738 00739 ret = pthread_cond_timedwait(&all_received, &mp, &to); 00740 00741 pthread_mutex_unlock(&mp); 00742 } 00743 } 00744 00745 else { 00746 ret = 0; 00747 00748 break; 00749 } 00750 } 00751 00752 if (ret) { 00753 00754 if (ret == ETIMEDOUT) { 00755 00756 // some message was not received 00757 00758 fault_handler() ; 00759 } 00760 else if (ret== EINVAL) { 00761 cout << " phase erreur EINVAL cond wait iter " << iter << endl; 00762 exit(0); 00763 00764 } else { 00765 cout << " phase erreur cond wait at iter " << iter << endl; 00766 exit(0); 00767 } 00768 } 00769 } 00770 /* else pthread_mutex_unlock(&recv_mutex); */ 00771 00772 //DEBUGcout << " phase " << curphase << " emissions " << total_sent << endl ; 00773 00774 /**************************************************************************/ 00775 /********************* End-of phase ********************/ 00776 00777 /* take phase decision and actions */ 00778 00779 if (curphase != 1) phase_actions(); 00780 00781 pthread_mutex_trylock(&phase_mutex); 00782 00783 /*******************************************************************/ 00784 /* update send buffers */ 00785 00786 /* cout<< " lock send " << endl; */ 00787 00788 pthread_mutex_lock(&send_mutex); 00789 00790 /* cout<< " got send " << endl; */ 00791 00792 send_buffer_advance(); 00793 00794 pthread_mutex_unlock(&send_mutex); 00795 00796 /*******************************************************************/ 00797 /* Initialise next phase */ 00798 00799 //DEBUGcout << " reinit for next phase " << curphase + 1<< endl; 00800 00801 phase_update(); 00802 00803 //DEBUGcout << " to_send " << n_to_send << " received " << n_received << endl; 00804 00805 } while( (cmd_to_send + n_to_send + n_received + ack_flag + fault_count ) != 0); 00806 } 00807 } 00808 /********************************************************************/ 00809 /* routine for the iterated timeout */ 00810 00811 int total_order_channel::timeout_handler(int it ) { 00812 00813 int test; 00814 00815 //DEBUGcout << " TIMEOUT at iter " << it << endl; 00816 00817 union{ 00818 00819 header head; 00820 char buf[25]; 00821 00822 } nack_message; 00823 00824 int dest_list[MAX_MEMBER] ; 00825 00826 int dest ; 00827 00828 dest = (cur_get->grsize - cur_get->mess_numb) -1; //number of missing senders 00829 00830 if ( dest == 0 ) return( 0); 00831 00832 dest = 0; 00833 00834 //Fill nack message 00835 00836 header *head= &nack_message.head; 00837 00838 head->sender = my_ident; 00839 00840 head->num_phase = curphase; 00841 00842 head->type = NACK; 00843 00844 head->size = 0; 00845 00846 for(int id =0 ; id < MAX_MEMBER ; id++) { 00847 00848 if( id != my_ident && cur_get->status[id]== WAITING ) { 00849 00850 dest_list[dest]= id; 00851 00852 dest ++; 00853 00854 //DEBUGcout << " Nack for " << id << endl; 00855 } 00856 } 00857 if ( dest == 0 ) return( 0); 00858 00859 test= net->multicast(dest, dest_list , nack_message.buf, HEADSZ ); 00860 00861 if(test<0) { 00862 cout << "nack broadcast error test=" << test << endl; 00863 } 00864 return(1); 00865 00866 } 00867 00868 /********************************************************************/ 00869 /* last timeout routine */ 00870 00871 void total_order_channel::fault_handler() { 00872 00873 //DEBUGcout << " TIMEOUT FAULT " << endl; 00874 00875 header *head= &fault_buf.head; 00876 00877 char *data = &fault_buf.buf[HEADSZ]; 00878 00879 fault_count = 0; 00880 00881 for(int i=0 ; i< MAX_MEMBER ; i++) { 00882 00883 if(i != my_ident && cur_get->status[i]== WAITING ) { 00884 00885 *(int *) data = i; 00886 00887 data += sizeof(int); 00888 00889 fault_count ++; 00890 00891 cur_get->fault_cnt[i]++ ; 00892 00893 //DEBUGcout << " seems lost : " << i << endl; 00894 00895 } 00896 } 00897 00898 head->sender = my_ident; 00899 00900 head->num_phase = curphase + 1; 00901 00902 head->type = FAULT; 00903 00904 head->size = sizeof(int) * fault_count ; 00905 00906 fault_message.size = head->size ; 00907 } 00908 00909 /********************************************************************/ 00910 /*********************** phase_actions *********************/ 00911 /* decide on which members are correct and which */ 00912 /* messages of previous phase are delivered */ 00913 00914 void total_order_channel::phase_actions() { 00915 00916 int sz, i,k; 00917 00918 int host, port, ident; 00919 00920 cmdheader * cmdpt; 00921 00922 //DEBUGcout << " phase " << curphase << " phase action " << endl; 00923 00924 /* cout<< " lock recv " << endl; */ 00925 00926 pthread_mutex_lock(&recv_mutex); 00927 00928 /* cout<< " got recv " << endl; */ 00929 00930 /********************************************************************/ 00931 /***** suppress faulty members. This is the critical choice algorithm ****/ 00932 00933 for(int i=0 ; i < MAX_MEMBER ; i++) { 00934 00935 if (prev_get->fault_cnt[i] != 0 || cur_get->fault_cnt[i] != 0 ) { 00936 00937 cur_get->status[i] = FAULTY ; 00938 00939 if (next_get->status[i] != UNDEF ) { 00940 00941 next_get->status[i] = UNDEF ; 00942 00943 next_get->grsize--; 00944 00945 my_group->del_member(i); 00946 00947 cout << " XXXXXXXXXXX " << my_ident << " SUPPRESSION de " << i << endl; 00948 } 00949 00950 if ( prev_get->fault_cnt[i] != 0 ) 00951 00952 prev_get->status[i]= FAULTY; 00953 } 00954 00955 /********************************************************************/ 00956 /* deliver application messages */ 00957 00958 if( prev_get->status[i] == GOTMESS) { 00959 00960 sz = prev_get->message[i].size; 00961 00962 prev_get->message[i].data[sz] = '\0'; 00963 00964 cout << " XXXXXXXXXXX " << my_ident << " DELIVRE de " << i <<": "<< prev_get->message[i].data << endl; 00965 00966 //receive(prev_get->message[i].data, prev_get->message[i].size); 00967 00968 } 00969 } 00970 /********************************************************************/ 00971 /* treat join and quit command messages */ 00972 00973 for(i=0 ; i < MAX_MEMBER ; i++) { 00974 00975 if(prev_get->status[i] == GOTCMD ) { 00976 00977 cmdpt = (cmdheader *) prev_get->message[i].data; 00978 00979 if ( cmdpt->type == JOIN ) { 00980 00981 new_flag = 1; 00982 00983 host = cmdpt->nodeid; 00984 00985 port = cmdpt->port; 00986 00987 if ( next_get->grsize < MAX_MEMBER) { 00988 00989 for(k=1 ; k < MAX_MEMBER ; k++) { 00990 00991 if (next_get->status[k] == UNDEF ) { 00992 00993 ident = k; 00994 00995 next_get->status[k] = NEWMEM; 00996 00997 next_get->grsize++; 00998 00999 my_group->add_member(ident, host, port); 01000 01001 cout << " XXXXXXXXXXX " << my_ident << " AJOUT de " << ident <<endl ; 01002 01003 break; 01004 } 01005 } 01006 } else { 01007 } 01008 } 01009 else if ( cmdpt->type == QUIT ) { 01010 01011 ident = cmdpt->ident; 01012 01013 if(next_get->status[ident] != UNDEF ) { 01014 01015 next_get->status[ident] = UNDEF ; 01016 01017 next_get->grsize--; 01018 01019 my_group->del_member(ident); 01020 } 01021 } 01022 } 01023 } 01024 for(i=0 ; i < MAX_MEMBER ; i++) { 01025 01026 if(next_get->status[i] == NEWMEM) { 01027 01028 new_member_init(i); 01029 01030 next_get->status[i] = WAITING; 01031 } 01032 } 01033 pthread_mutex_unlock(&recv_mutex); 01034 } 01035 /********************************************************************/ 01036 /********************************************************************/ 01037 /* phase_update method */ 01038 01039 void total_order_channel::phase_update() { 01040 01041 01042 /* cout<< " lock recv " << endl; */ 01043 01044 pthread_mutex_lock(&recv_mutex); 01045 01046 /* cout<< " got recv " << endl; */ 01047 01048 curphase++; 01049 01050 previous = current; 01051 01052 current = NULL; 01053 01054 ack_flag = cur_get->mess_flag + cur_get->fault_flag + cur_get->cmd_flag; 01055 01056 /********************************************************************/ 01057 /* phase buffers permutation */ 01058 01059 struct phase_buffer *tmp ; 01060 01061 tmp = prev_get; 01062 01063 prev_get = cur_get; 01064 01065 cur_get = next_get; 01066 01067 next_get= tmp; 01068 01069 /* prev_get = &get_net_buffer[(curphase +2) % 3]; 01070 01071 cur_get= &get_net_buffer[curphase % 3]; 01072 01073 next_get =&get_net_buffer[(curphase +1) % 3]; */ 01074 01075 /********************************************************************/ 01076 /* next buffer initialisation */ 01077 01078 01079 n_received = cur_get->mess_numb; 01080 01081 next_get->phase = curphase + 1; 01082 01083 next_get->grsize = cur_get->grsize; 01084 01085 next_get->sendflag = 0 ; 01086 01087 next_get->mess_numb = 0; 01088 01089 next_get->mess_flag = 0 ; 01090 01091 next_get->fault_flag = 0 ; 01092 01093 next_get->cmd_flag = 0 ; 01094 01095 for(int i=0 ; i< MAX_MEMBER; i++) { 01096 01097 //cout<< " ident " << i << " status " << cur_get->status[i] << endl; 01098 01099 if( cur_get->status[i] == UNDEF ) next_get->status[i] = UNDEF; 01100 01101 else { 01102 next_get->status[i] = WAITING; 01103 01104 //cout << "MEMBER ident : "<< i << endl; 01105 } 01106 01107 next_get-> fault_cnt[i] = 0; 01108 01109 if (next_get->message[i].data != NULL) { 01110 01111 delete(next_get->message[i].data ); 01112 01113 next_get->message[i].data = NULL; 01114 01115 } 01116 01117 next_get->message[i].size = 0; 01118 } 01119 01120 pthread_mutex_unlock(&recv_mutex); 01121 01122 } 01123 01124 /********************************************************************/ 01125 /***** initialisations ***********/ 01126 01127 void total_order_channel::init_data() { 01128 01129 delt=0 ; last = 0; next=0; total = 0 ; 01130 int i,j; 01131 01132 for ( i = 0; i< TAILLE_MAX ; i++) { 01133 01134 send_buff[i].data = NULL; 01135 01136 send_buff[i].size = 0; 01137 } 01138 01139 prev_get = &get_net_buffer[0] ; 01140 01141 cur_get = &get_net_buffer[1]; 01142 01143 next_get = &get_net_buffer[2]; 01144 01145 for (i = 0; i<3 ;i++ ) { 01146 01147 get_net_buffer[i].grsize =0; 01148 01149 get_net_buffer[i].sendflag = 0; 01150 01151 get_net_buffer[i].mess_numb = 0; 01152 01153 get_net_buffer[i].mess_flag = 0; 01154 01155 get_net_buffer[i].fault_flag = 0; 01156 01157 get_net_buffer[i].cmd_flag = 0; 01158 01159 for(j =0 ; j < MAX_MEMBER; j++) { 01160 01161 get_net_buffer[i].status[j] = UNDEF; 01162 01163 get_net_buffer[i].fault_cnt[j] = 0; 01164 01165 get_net_buffer[i].message[j].data = NULL; 01166 01167 get_net_buffer[i].message[j].size = 0; 01168 } 01169 } 01170 total_sent = 0; 01171 01172 n_to_send = 0; 01173 01174 n_received = 0; 01175 01176 new_flag = 0; 01177 01178 fault_count = 0; 01179 01180 fault_message.data = fault_buf.buf; 01181 01182 cmd_to_send = 0; 01183 01184 cmd_message.data = cmd_buf.buf ; 01185 01186 init_flag = 0; 01187 01188 init_message.data = init_buf.buf; 01189 01190 current = NULL; 01191 01192 my_group = new group_admin(); 01193 01194 } 01195 01196 /********************************************************************/ 01197 /***** send buffer routines ***********/ 01198 01199 void * total_order_channel::get_first_to_send() { 01200 01201 /* cout<< " lock send " << endl; */ 01202 01203 pthread_mutex_lock(&send_mutex); 01204 01205 /* cout<< " got send " << endl; */ 01206 01207 void * pt = (void *)&send_buff[next]; 01208 01209 next = (next + 1 ) % TAILLE_MAX; 01210 01211 n_to_send-- ; 01212 01213 pthread_mutex_unlock(&send_mutex); 01214 01215 return(pt); 01216 } 01217 01218 01219 void total_order_channel::send_buffer_advance() { 01220 01221 int ind; 01222 01223 if (total != 0) { 01224 01225 if (prev_get->sendflag == MESS) { 01226 01227 if ( cur_get->sendflag == MESS ) ind = next - 2 ; 01228 01229 else ind = next - 1; 01230 } 01231 01232 else { 01233 01234 return ; 01235 } 01236 01237 if (ind < 0 ) ind = ind + TAILLE_MAX ; 01238 01239 if (send_buff[ind].size != 0 ) { 01240 01241 delete(send_buff[ind].data) ; 01242 01243 send_buff[ind].data = NULL; 01244 01245 send_buff[ind].size = 0 ; 01246 } 01247 01248 total--; 01249 01250 //DEBUGcout << " total " << total << " next " << next << " last " << last << " ind " << ind << endl; 01251 } 01252 } 01253 01254 /********************************************************************/ 01255 /***** group membership routines ***********/ 01256 01257 /****** initiate a JOIN process for a new member ***********/ 01258 01259 int total_order_channel::start_join( char *data, int size ) { 01260 01261 01262 header *head = &cmd_buf.head.head; 01263 01264 cmdheader *cmdpt= &cmd_buf.head.chead; 01265 01266 if (cmd_to_send == 0 && init_flag == 0) { 01267 01268 head->sender = my_ident; 01269 01270 head->num_phase = -1; 01271 01272 head->type = CMD; 01273 01274 head->size = CMHEADSZ ; 01275 01276 cmdpt->type = JOIN ; 01277 01278 cmdpt->ident = 0 ; 01279 01280 cmdpt->nodeid = * (int *)data; 01281 01282 cmdpt->port = * (int *)(data + sizeof(int)); 01283 01284 cmd_message.size = head->size ; 01285 01286 cmd_to_send = 1; 01287 01288 init_flag = 1; 01289 } 01290 01291 else { 01292 01293 cout << " start_join aborted : *** " << my_ident << " *** " << endl; 01294 } 01295 01296 } 01297 01298 /********************************************************************/ 01299 /***** terminate a JOIN process ( send data to new member ) ***********/ 01300 01301 int total_order_channel::new_member_init( int ident ) { 01302 01303 int size , ret; 01304 01305 char *pt ; 01306 01307 header *head= &init_buf.head.head; 01308 01309 initheader *inhead = &init_buf.head.ihead; 01310 01311 struct member *memb ; 01312 01313 //DEBUGcout << " new_member_init : *** " << my_ident << " *** " << endl; 01314 01315 //DEBUGcout << " grsize: " << next_get->grsize << " , curphase: " << curphase << endl; 01316 01317 size = INHEADSZ + (next_get->grsize * MEMBSZ) ; 01318 01319 init_message.size = size + HEADSZ; 01320 01321 head->sender = my_ident; 01322 01323 head->num_phase = curphase + 1; 01324 01325 head->type = INIT ; 01326 01327 head->size = size; 01328 01329 inhead->ident = ident ; 01330 01331 inhead->groupsize = next_get->grsize ; 01332 01333 inhead->phase = curphase + 1; 01334 01335 pt = &init_buf.buf[HEADSZ + INHEADSZ]; 01336 01337 memb = (struct member *) pt; 01338 01339 int nb = 0; 01340 01341 for(int i=0 ; i< MAX_MEMBER; i++) { 01342 01343 if( my_group->members_list[i].ident != UNDEF ) { 01344 01345 memb->ident = my_group->members_list[i].ident; 01346 01347 memb->nodeid = my_group->members_list[i].nodeid ; 01348 01349 memb->port = my_group->members_list[i].port ; 01350 01351 memb++; 01352 01353 nb++; 01354 } 01355 } 01356 01357 if ( nb != next_get->grsize) { cout << " error groupsize " << next_get->grsize << " != " << nb ;} 01358 01359 /*pthread_mutex_lock(&net_mutex);*/ 01360 01361 ret= net->unicast(ident, init_message.data, init_message.size ); 01362 01363 /*pthread_mutex_unlock(&net_mutex);*/ 01364 01365 if(ret<0) { 01366 01367 cout << " Init unicast error, val =" << ret << endl; 01368 } 01369 01370 init_flag = 0; 01371 } 01372 01373 01374 /********************************************************************/ 01375 /***** member initialisation , data send by new_member_init() ***********/ 01376 01377 int total_order_channel::init_from_net( char * data, int size) { 01378 01379 int ident; 01380 01381 struct member *memb ; 01382 01383 initheader * inhead ; 01384 01385 inhead = (initheader * ) data; 01386 01387 my_ident = inhead->ident ; 01388 01389 net->my_ident = my_ident ; 01390 01391 grsize = inhead->groupsize ; 01392 01393 curphase = inhead->phase; 01394 01395 //DEBUGcout << " Init from net member : *** " << my_ident << " *** " << endl; 01396 01397 //DEBUGcout << " grsize: " << grsize << " , curphase: " << curphase << endl; 01398 01399 prev_get = &get_net_buffer[(curphase +2) % 3]; 01400 01401 cur_get= &get_net_buffer[curphase % 3]; 01402 01403 next_get =&get_net_buffer[(curphase +1) % 3]; 01404 01405 memb = (member *) &data[INHEADSZ]; 01406 01407 for ( int i = 1; i<= grsize ;i ++) { 01408 01409 ident = memb->ident ; 01410 01411 my_group->add_member(memb->ident, memb->nodeid, memb->port ); 01412 01413 cur_get->status[ident]= WAITING; 01414 01415 next_get->status[ident]= WAITING; 01416 01417 memb++; 01418 01419 } 01420 01421 cur_get->grsize = grsize; 01422 01423 next_get->grsize = grsize; 01424 01425 cur_get->phase = curphase ; 01426 01427 next_get->phase = curphase + 1 ; 01428 01429 01430 } 01431 01432 01433 01434