Main Page   Alphabetical List   Compound List   File List   Compound Members   File Members  

total_order_channel Class Reference

#include <total_order_channel.h>

List of all members.

Public Methods

 total_order_channel (int , char *, char *)
int deliver ( char *, int)
int broadcast ( char *, int )
void* phase_code (void *)

Public Attributes

header head
initheader ihead
cmdheader chead

Private Methods

int init_from_file (char *)
int start_net_init (char *)
int init_from_net (char * data, int size)
void init_data ()
int timeout_handler ( int )
void fault_handler ()
void phase_update ()
void phase_actions ()
void send_buffer_advance ()
void* get_first_to_send ()
int start_join (char *data, int size )
int new_member_init ( int ident )

Private Attributes

no_prop_channelnet
pthread_t phase_thread
int status
int curphase
int grsize
int total_sent
group_adminmy_group
char ipadd [15]
int my_ident
msgtypecurrent
msgtypeprevious
msgtype init_message
union {
init_buf
msgtype cmd_message
union {
cmd_buf
header empty_message_header
msgtype fault_message
union {
fault_buf
msgtype send_buff [TAILLE_MAX]
int delt
int next
int last
int total
int n_received
int n_to_send
int cmd_to_send
int ack_flag
int new_flag
int fault_count
int init_flag
int max_iter
int timeout
struct total_order_channel::phase_buffer  get_net_buffer [3]
struct phase_bufferprev_get
struct phase_buffercur_get
struct phase_buffernext_get
pthread_cond_t all_received
pthread_mutex_t mp
pthread_mutex_t phase_mutex
pthread_mutex_t send_mutex
pthread_mutex_t recv_mutex
timestruc_t to
   struct {
   } head
char buf [400]
   struct {
   } head


Detailed Description

Definition at line 49 of file total_order_channel.h.

00049 {  // : public channel, channeluser { 
00050 
00051 /********************************************************************/  
00052 /*interface*/ 
00053 
00054 public: 
00055         total_order_channel(int , char *, char *); 
00056 
00057         int deliver ( char *, int);        // from network 
00058         
00059         int broadcast ( char *, int );  // from application
00060                 
00061         void * phase_code(void *);      //Phase thread code
00062 
00063 
00064 private:
00065         
00066         no_prop_channel * net;                  // the no-prop channel UDP
00067         
00068         pthread_t phase_thread;         //Phase thread structure        
00069         
00070 /********************************************************************/  
00071 /*local state*/ 
00072 
00073         int status ; /* initialisation, running or ending */
00074                 
00075         int curphase;
00076         
00077         int grsize;
00078         
00079         int total_sent;
00080         
00081 #define START   0       // waiting for channel information 
00082 #define PRERUN  1       // initialising 
00083 #define RUN     2       // running
00084 /* #define      SYNC    3        resynchronising */
00085 #define ERROR   4       // unexpected (!!) situation 
00086 #define STOP    5       // waiting for group close confirmation 
00087 #define END     6       // after group close confirmation  
00088 
00089 
00090 /********************************************************************/  
00091 /*******************   Group data *******************/  
00092 
00093         group_admin  *my_group; 
00094         
00095         char ipadd[15];
00096             
00097         int    my_ident;                //local member ident
00098                 
00099 /********************************************************************/  
00100 /******************* messages data *******************/
00101 
00102         typedef struct {
00103 
00104                 char *data;
00105                 int size;
00106         
00107         } msgtype ;
00108                 
00109         msgtype *current , *previous;   
00110 
00111 /********************************************************************/  
00112 /******************* message header structure*******************/ 
00113         
00114         typedef struct {
00115 
00116                 int sender;             //ident of the sender 
00117 
00118                 int num_phase;          //phase number
00119 
00120                 int type;               //
00121 
00122                 int size;               //
00123 
00124         } header; 
00125 
00126 #define  HEADSZ sizeof(header)
00127 
00128 /********** Definition of message types  ************************/
00129 
00130 #define NEW     1       // new member demand, (one destination)
00131 #define INIT    2       // channel parameters (ident, grsize etc ..)
00132 #define MESS    3       // application message 
00133 #define CMD     4       // JOIN, QUIT 
00134 #define ACK     5       // empty message
00135 #define NACK    6       // multicasted to late senders
00136 #define FAULT   7       // list of faulty idents 
00137 
00138 /********************************************************************/  
00139 /******************* INIT message structure*******************/ 
00140 
00141         typedef struct {
00142         
00143                 int ident;
00144                 
00145                 int groupsize;
00146                 
00147                 int phase; 
00148                 
00149         } initheader;
00150         
00151 #define INHEADSZ sizeof(initheader)
00152 
00153 /******************* init message *******************/ 
00154 
00155         msgtype init_message;
00156         union  {        
00157                 struct  {       
00158                         header head;
00159                         initheader ihead;
00160                 } head;
00161                 char buf[400];
00162                         
00163         } init_buf ;
00164 
00165 /********************************************************************/  
00166 /******************* CMD message structure ***********************/ 
00167 
00168         typedef struct {
00169         
00170                 int type ;  // JOIN or QUIT 
00171                 
00172                 int ident;
00173                 
00174                 int nodeid;
00175                 
00176                 int port;
00177                 
00178         } cmdheader;
00179         
00180 #define CMHEADSZ sizeof(cmdheader)
00181                 
00182 /******************* cmd (join/quit) message *******************/ 
00183 
00184         msgtype cmd_message;
00185         union {
00186                 struct {        
00187                         header head;
00188                         cmdheader chead;
00189                 } head;
00190                 char buf[100];                          
00191         } cmd_buf ;
00192         
00193 #define JOIN 1
00194 #define QUIT 2
00195 
00196 /********************************************************************/  
00197 /******************* ack message *******************/
00198 
00199         header empty_message_header;
00200 
00201 /********************************************************************/  
00202 /******************* fault message *******************/
00203 
00204         msgtype fault_message;                  
00205         union {
00206                         header head;
00207                         char buf[400];
00208                         
00209         } fault_buf; 
00210 
00211 
00212 /********************************************************************/
00213 /*******************  send buffer *******************/ 
00214 
00215 #define TAILLE_MAX 100     
00216 
00217         msgtype send_buff[TAILLE_MAX];  
00218         
00219         int delt,               // next to delete ( unused )
00220             next,               // next to send   
00221             last,               // input index ( last to send +1)
00222             total;
00223             
00224 
00225 /********************************************************************/  
00226 /************** current phase data *********************/
00227         
00228         int n_received; // number of senders of the received messages at curphase
00229         
00230         int n_to_send;  // number of unsent buffered messages 
00231         
00232         int cmd_to_send; // a join (or quit) message has to be sent 
00233         
00234         int ack_flag;   // a message has to be acknowledged at curphase
00235         
00236         int new_flag;   // a new member is joining
00237         
00238         int fault_count; // number of missing messages/members 
00239         
00240         int init_flag; // in charge of a join process
00241         
00242 /*************** timeout and iteration values ( dynamic and static) *****************/
00243 
00244         int max_iter;
00245         int timeout ; 
00246         
00247 #define TIMEOUT1        1
00248 #define TIMEOUT2        2
00249 #define MAX_ITER        2
00250 #define NEW_MAX_ITER    2 * MAX_ITER 
00251                 
00252 /********************************************************************/  
00253 /******** per-phase structure with input message buffers *************/ 
00254         
00255         struct phase_buffer {
00256         
00257                 int phase;
00258                 
00259                 int sendflag;   // type of message sent at curphase
00260         
00261                 int grsize;     // number of members
00262         
00263                 int mess_numb;  // number of senders for the received messages 
00264                 
00265                 int mess_flag ; // a MESS has been received at this phase
00266                 
00267                 int fault_flag; // a FAULT has been received at this phase
00268                 
00269                 int cmd_flag;   // a CMD has been received at this phase
00270                 
00271                 int status[MAX_MEMBER]; 
00272                         
00273                         //see values below
00274                         
00275                 int fault_cnt[MAX_MEMBER];
00276                 
00277                         // count of FAULT messages for that member ( previous phase )
00278                 
00279                 msgtype message[MAX_MEMBER];
00280 
00281                         // the messages received at this phase;
00282                 
00283         } get_net_buffer[3];
00284         
00285         struct phase_buffer  *prev_get, *cur_get, *next_get;    
00286         
00287 /********************************************************************/  
00288 /********* (per-phase) status of members *********/     
00289 
00290 #define UNDEF           -1      
00291 #define WAITING         1
00292 #define FAULTY          2
00293 #define NEWMEM          4
00294 #define GOT             32
00295 #define GOTMESS         GOT + MESS
00296 #define GOTCMD          GOT + CMD
00297 #define GOTACK          GOT + ACK
00298 #define GOTFAULT        GOT + FAULT
00299         
00300 /********************************************************************/  
00301 /********************************************************************/  
00302 /*****************initialisation method *****************/
00303 
00304         int init_from_file(char *) ;
00305         
00306         int start_net_init(char *) ;
00307                 
00308         int init_from_net(char * data, int size) ;      
00309 
00310         void init_data();
00311 
00312 /*****************per-phase methods *****************/
00313                 
00314         int timeout_handler( int );
00315         
00316         void fault_handler();
00317         
00318         void phase_update() ;
00319         
00320         void phase_actions();
00321 
00322         void send_buffer_advance();     
00323         
00324         void * get_first_to_send();
00325                 
00326 /******  initiate a JOIN process for a new member ***********/
00327         
00328         int start_join(char *data, int size ); 
00329         
00330 /***** terminate a JOIN process ( send data to new member ) ***********/
00331         
00332         int new_member_init( int ident );       
00333         
00334 /********************************************************************/  
00335 /****************Synchronisation objects ****************/      
00336 
00337         pthread_cond_t all_received;    //waiting for the reception from all members
00338         
00339         pthread_mutex_t mp ;
00340         
00341         /*pthread_mutex_t net_mutex; */         //to access UDP 
00342         
00343         pthread_mutex_t phase_mutex;    //to start a new phase
00344         
00345         pthread_mutex_t send_mutex;     //mutual exclusion for send buffer              
00346         
00347         pthread_mutex_t recv_mutex;     //mutual exclusion for recv buffer
00348                         
00349         timestruc_t to;                 // time struct for timeouts 
00350                 
00351   
00352 }

Constructor & Destructor Documentation

total_order_channel::total_order_channel (int flag, char * file, char * ipnum)

Definition at line 27 of file total_order_channel.cpp.

00027 {
00028 
00029                 cout << "start total order channel" << endl;
00030 
00031 /********************************************************************/  
00032 /* initialise local data */
00033 
00034                 char ipadd[20];
00035                 
00036                 init_data();
00037                 
00038                 if (flag == 1)  strcpy(ipadd,ipnum);
00039                      
00040                 else my_ident = atoi(ipnum);
00041 
00042                 init_data();
00043                 
00044                 status = START;
00045                 
00046 /********************************************************************/  
00047 /* Case of a group creator: initialise group and  membership */  
00048                 
00049                 if (flag == 0 ) {
00050                         if (init_from_file(file) ) {
00051                 
00052                                 cout << " erreur opening " << file << endl;
00053                         
00054                                 exit(0);
00055                         }
00056                         
00057                         status = RUN;
00058                 }  
00059  
00060                 
00061 /********************************************************************/  
00062 /* initialise synchro objects */                
00063                 
00064                 if( int ret = pthread_cond_init(&all_received, NULL) ) {
00065                 
00066                         cout << " pthread_cond_init error " << "ret = " << ret << endl; 
00067                 }
00068                 
00069                 pthread_mutex_init(&mp,NULL);
00070                 
00071                 /*pthread_mutex_init(&net_mutex,NULL);*/
00072                                          
00073                 pthread_mutex_init(&phase_mutex, NULL);
00074                 
00075                 pthread_mutex_init(&send_mutex,NULL);
00076                 
00077                 pthread_mutex_init(&recv_mutex, NULL);
00078                 
00079                 pthread_mutex_unlock(&mp);
00080                 
00081                 /*pthread_mutex_unlock(&net_mutex);*/
00082                 
00083                 pthread_mutex_lock(&phase_mutex);
00084                 
00085                 pthread_mutex_unlock(&send_mutex);
00086                 
00087                 pthread_mutex_unlock(&recv_mutex);
00088  
00089 /********************************************************************/  
00090 /* initialise underlying layer */       
00091 
00092                 if (flag == 1)  /* following members */
00093                         net = new no_prop_channel(this, ipadd, my_group); 
00094 
00095                 else if (flag == 0)  /* first members  */
00096                         net = new no_prop_channel(this, my_ident , file, my_group);     
00097                         
00098 /********************************************************************/  
00099 /* for a newcomer, join group and initialise membership */                      
00100                 
00101                 if (flag == 1) {
00102                 
00103                         start_net_init(file); 
00104  
00105                         status = PRERUN ;
00106                 }
00107                 
00108 /********************************************************************/  
00109 /* start phase thread */  
00110 
00111                 max_iter = MAX_ITER;
00112                 
00113                 timeout = TIMEOUT2;
00114                 
00115                 int ret = pthread_create(&phase_thread, 0,start_phase, (void *)this);
00116 
00117                 if (ret) {
00118                         perror(" total_order()");       printf(" erreur th create  %d\n",ret);  
00119                         if (ret == ENOMEM) printf(" ENOMEM \n" );
00120                         if (ret== EINVAL) printf(" EINVAL \n" );
00121                         if (ret == EPERM) printf(" EPERM \n" );
00122                         exit(0); 
00123                 }
00124                 
00125                 
00126 }

Member Function Documentation

int total_order_channel::broadcast (char * data, int size)

Definition at line 494 of file total_order_channel.cpp.

00494 {
00495 
00496         if(status != RUN) {
00497                 
00498                 cout<< "status broadcast " << status << endl;
00499                 
00500                 return (-2);
00501         }
00502 
00503                 /* cout<< "lock send " << endl; */
00504                 
00505                 pthread_mutex_lock(&send_mutex);
00506                 
00507                 /* cout<< "got send " << endl; */
00508                 
00509                 if( total != TAILLE_MAX - 1 ) {
00510       
00511                         send_buff[last].data = new char[size + HEADSZ ];
00512                         
00513                         memcpy(send_buff[last].data + HEADSZ , data, size); 
00514       
00515                         send_buff[last].size = size;   // the size WITHOUT the header 
00516                         
00517                         total++;
00518   
00519                         last = (last+1) % TAILLE_MAX;
00520 
00521                 } else {
00522                         
00523                         cout << " broadcast buffer plein " << endl;
00524                         
00525                         pthread_mutex_unlock(&send_mutex);
00526                         
00527                         return(-1);
00528                 }
00529 
00530                 n_to_send++;
00531                 
00532                 pthread_mutex_unlock(&send_mutex);
00533 
00534                 pthread_mutex_unlock(&phase_mutex);
00535                 
00536                 return (0);
00537         }

int total_order_channel::deliver (char * data, int size)

Definition at line 205 of file total_order_channel.cpp.

00205 {
00206                 
00207                 int ret;
00208                 
00209                 int sender, mess_phase, type, mess_size ;
00210                 
00211                 int ident, faulty;      
00212                  
00213                 header *head = (header *) data;
00214                                 
00215                 sender = head->sender;
00216                 
00217                 type = head->type;
00218                 
00219                 mess_phase = head->num_phase;
00220                 
00221                 mess_size = head->size;
00222                 
00223                 if ( mess_size != size - HEADSZ) 
00224                 
00225                         cout << " ** Problem of message size , got :"<< size - HEADSZ << " expected : " << mess_size<< endl; 
00226                 
00227                 char * datapt = data + HEADSZ;
00228                 
00229                 //DEBUGcout <<"from " << sender <<" type " << type << " phase " << curphase <<" mess_phase " << mess_phase <<endl;
00230                                 
00231                 if ( status == RUN) {
00232                 
00233                 if(type == MESS || type == CMD || type == ACK || type == FAULT ) {
00234                 
00235                         /* cout<< "          lock recv " << endl; */
00236                 
00237                         pthread_mutex_lock(&recv_mutex);
00238                         
00239                         /* cout<< "          got recv " << endl; */
00240                         
00241                         if(mess_phase == curphase) {   
00242                         
00243                                 if (cur_get->sendflag == 0) { 
00244                                 
00245                                         pthread_mutex_unlock(&phase_mutex);  
00246                                 } 
00247                                 
00248                                 if(cur_get->status[sender]==WAITING) {
00249                                 
00250                                         //DEBUG cout << "  first message: from  " << sender << endl; 
00251                                         
00252                                         cur_get->mess_numb++;
00253                                         
00254                                         cur_get->status[sender]= GOT + type; 
00255                                         
00256                                         if ( type == MESS || type == CMD ) {
00257                                         
00258                                                 cur_get->mess_flag = 1;
00259                                 
00260                                                 cur_get->message[sender].data= new char[mess_size];
00261                                         
00262                                                 memcpy(cur_get->message[sender].data, datapt, mess_size);
00263                                         
00264                                                 cur_get->message[sender].size = mess_size;
00265                                         }
00266                                         else if (type == FAULT ) {
00267                                         
00268                                                 cur_get->fault_flag = 1;
00269                                                 
00270                                                 faulty = mess_size/ sizeof(int);
00271       
00272                                                 //DEBUGcout << "          got FAULT for phase " << mess_phase 
00273                                                                 //DEBUG<< " for " << faulty << " faulty" << endl;
00274                         
00275                                                 for(int i=1 ; i<= faulty ; i++, datapt += 4) {
00276                                 
00277                                                         ident = *(int *)datapt;
00278                                 
00279                                                         prev_get->fault_cnt[ident]++;
00280 
00281                                                 }                                               
00282                                         }
00283                                 }
00284                                 pthread_mutex_unlock(&recv_mutex);      
00285                                 
00286                 /********************************************************************/  
00287                 /* all messages received, signal end of phase */     
00288                                 
00289                                 if(cur_get->mess_numb == cur_get->grsize - 1 )  { 
00290                                 
00291                                         ret = pthread_cond_signal(&all_received);
00292                                         
00293                                         if (ret) {
00294                                         
00295                                                 cout << "          error signal all_received : " << ret 
00296                                                         << " at phase " << curphase << endl; 
00297                                         
00298                                                 if (ret== EINVAL) cout << "          EINVAL " << endl;
00299                                         }       
00300                                         
00301                                 }                                                               
00302 
00303                         } else if (mess_phase == curphase+1) {
00304                                                                                                 
00305                                 if(next_get->status[sender]== WAITING) {   
00306                                         
00307                                         next_get->mess_numb++;
00308                                         
00309                                         next_get->status[sender]= GOT + type;
00310                                         
00311                                         next_get->message[sender].data= new char[mess_size];
00312                                         
00313                                         memcpy(next_get->message[sender].data, datapt, mess_size);
00314                                         
00315                                         next_get->message[sender].size = mess_size;
00316                                         
00317                                         if ( type == MESS || type == CMD ) {
00318                                         
00319                                                 next_get->mess_flag = 1;
00320                                         }
00321                                         else if (type == FAULT ) {   
00322                                         
00323                                                 next_get->fault_flag = 1;
00324                                                 
00325                                                 faulty = mess_size/sizeof(int);
00326       
00327                                                 cout << "FAULT in advance for phase " << mess_phase 
00328                                                                 << " for " << faulty << " faulty" << endl;
00329                         
00330                                                 for(int i=1 ; i<= faulty ; i++, datapt += 4) {
00331                                 
00332                                                         ident = *(int *)datapt;
00333                                                         
00334                                                         if (ident == my_ident) {
00335                                                                 
00336                                                                 cout << " SABORDAGE " << endl;
00337                                                                 
00338                                                                 exit(1);
00339                                                         }
00340                                 
00341                                                         // cur_get->fault_cnt[ident]++;
00342                                                 }
00343                                         }
00344                                 }
00345                                 
00346                                 pthread_mutex_unlock(&recv_mutex);
00347                                 
00348                                 //DEBUGcout << "          message in advance " << endl;
00349                         } else {
00350                                 
00351                                 pthread_mutex_unlock(&recv_mutex);
00352                                 
00353                                 //DEBUGcout << "          message hors phase, phase " << mess_phase << endl;
00354                         }                       
00355  
00356                 } else if( type == NACK ) {
00357       
00358                         cout << "          got NACK for phase " << mess_phase << endl;
00359        
00360                                 /* cout<< "          lock recv " << endl; */
00361                 
00362                                 pthread_mutex_lock(&recv_mutex);
00363                         
00364                                 /* cout<< "          got recv " << endl; */
00365                                         
00366                                 if (mess_phase == curphase) {
00367                                         
00368                                         if ( cur_get->sendflag == 0 )  { 
00369                                                 
00370                                                 pthread_mutex_unlock(&recv_mutex);
00371                                                         
00372                                                 pthread_mutex_unlock(&phase_mutex);
00373                                         }
00374                                                 
00375                                         else {
00376 
00377                                                 if( cur_get->sendflag == MESS || cur_get->sendflag == FAULT || cur_get->sendflag == CMD) {
00378                                                 
00379                                                         //DEBUGcout << "          current re-unicast , size = " << current->size << endl;
00380                                                 
00381                                                         /*pthread_mutex_lock(&net_mutex);*/
00382                                                                 
00383                                                         ret = net->unicast(sender, current->data, current->size + HEADSZ );
00384                                                         
00385                                                         /*pthread_mutex_unlock(&net_mutex);*/
00386                                                         
00387                                                         pthread_mutex_unlock(&recv_mutex);
00388                                                         
00389                                                 } else {  // it was an empty ACK message 
00390                                                 
00391                                                         pthread_mutex_unlock(&recv_mutex);
00392                                                                 
00393                                                         head = &empty_message_header;
00394                                                         
00395                                                         head->sender = my_ident;
00396                                 
00397                                                         head->num_phase = curphase;
00398                                         
00399                                                         head->type = ACK;
00400                                                                 
00401                                                         head->size = 0; 
00402                                                         
00403                                                         //DEBUGcout << "          curphase empty re-unicast " << endl;
00404                                                                 
00405                                                         /*pthread_mutex_lock(&net_mutex);*/
00406                                                         
00407                                                         ret= net->unicast( sender, (char *) head, HEADSZ );
00408                                                                 
00409                                                         /*pthread_mutex_unlock(&net_mutex);*/
00410                                                 }       
00411                                                 
00412                                                 if(ret<0) {
00413                                                         cout << "       Resend unicast error, val =" << ret << endl;
00414                                                         }
00415                                         }
00416 
00417                                 } else if (mess_phase == curphase - 1) {
00418                                         
00419                                         
00420                                         // Sends to sender the complete information 
00421                                                 
00422                                         // should increase the timeout for the sender 
00423                                                                                         
00424                                         if ( prev_get->sendflag == MESS || prev_get->sendflag == FAULT || prev_get->sendflag == CMD) {
00425                                                 
00426                                                 //DEBUGcout << "          previous re-unicast , size = " << previous->size << endl;
00427                                                 
00428                                                 /*pthread_mutex_lock(&net_mutex);*/
00429                                                         
00430                                                 ret = net->unicast(sender, previous->data, previous->size + HEADSZ);
00431                                                         
00432                                                 /*pthread_mutex_unlock(&net_mutex);*/
00433                                                 
00434                                                 pthread_mutex_unlock(&recv_mutex);
00435                                                         
00436                                         } else {
00437                                                 
00438                                                 pthread_mutex_unlock(&recv_mutex);      
00439                                                         
00440                                                 head = &empty_message_header;
00441                                                         
00442                                                 head->sender = my_ident;
00443                                 
00444                                                 head->num_phase = curphase -1;
00445                                         
00446                                                 head->type = ACK;
00447                                                         
00448                                                 head->size = 0; 
00449                                                         
00450                                                 //DEBUGcout << "          previous empty re-unicast " << endl;
00451                                                         
00452                                                 /*pthread_mutex_lock(&net_mutex);*/
00453                                                         
00454                                                 ret= net->unicast(sender, (char *) head, HEADSZ );
00455                                                         
00456                                                 /*pthread_mutex_unlock(&net_mutex);*/
00457                                         }       
00458                                         if(ret<0) {
00459                                                         cout << "      Resend unicast error , val =" << ret << endl;
00460                                         }
00461                                                 
00462                                 } else if (mess_phase == curphase + 1) {
00463                                 
00464                                         pthread_mutex_unlock(&recv_mutex);
00465                                         
00466                                         cout << "message NACK en avance " << endl;
00467                                         
00468                                 } else {
00469                                         
00470                                         pthread_mutex_unlock(&recv_mutex);
00471                                                 
00472                                         cout << "message NACK hors phase, phase " << mess_phase << endl;
00473                                 }
00474                 } 
00475                 else if ( type == NEW ) {
00476                 
00477                         start_join( datapt, mess_size );
00478                         
00479                         pthread_mutex_unlock(&phase_mutex); 
00480                 }
00481                 }
00482                 else if ( status == PRERUN && type == INIT) {
00483                                 
00484                         init_from_net( datapt, mess_size );
00485                         
00486                         status = RUN;
00487                         
00488                 } 
00489 }

void total_order_channel::fault_handler () [private]

Definition at line 871 of file total_order_channel.cpp.

00871 {
00872 
00873                 //DEBUGcout << "     TIMEOUT FAULT "  << endl;
00874                 
00875                 header *head= &fault_buf.head;
00876                 
00877                 char *data = &fault_buf.buf[HEADSZ];
00878                 
00879                 fault_count = 0;
00880                         
00881                 for(int i=0 ; i< MAX_MEMBER ; i++) {
00882 
00883                         if(i != my_ident && cur_get->status[i]== WAITING ) {
00884                         
00885                                 *(int *) data = i; 
00886                         
00887                                 data += sizeof(int);
00888                                 
00889                                 fault_count ++;
00890                                 
00891                                 cur_get->fault_cnt[i]++ ;
00892                                 
00893                                 //DEBUGcout << "     seems lost : " << i << endl;
00894                                 
00895                         }
00896                 }
00897                 
00898                 head->sender = my_ident;
00899 
00900                 head->num_phase = curphase + 1;
00901 
00902                 head->type = FAULT;
00903 
00904                 head->size = sizeof(int) * fault_count ;
00905                                 
00906                 fault_message.size = head->size ;
00907         }

void * total_order_channel::get_first_to_send () [private]

Definition at line 1199 of file total_order_channel.cpp.

01199 {
01200 
01201         /* cout<< "     lock send " << endl; */
01202 
01203         pthread_mutex_lock(&send_mutex);
01204                         
01205         /* cout<< "     got send " << endl; */
01206                         
01207         void * pt = (void *)&send_buff[next];
01208 
01209         next = (next + 1 ) % TAILLE_MAX;
01210         
01211         n_to_send-- ;
01212         
01213         pthread_mutex_unlock(&send_mutex);
01214         
01215         return(pt); 
01216 }

void total_order_channel::init_data () [private]

Definition at line 1127 of file total_order_channel.cpp.

01127 {
01128         
01129         delt=0 ; last = 0; next=0; total = 0 ; 
01130         int i,j; 
01131         
01132         for ( i = 0; i< TAILLE_MAX ; i++) {
01133                 
01134                 send_buff[i].data  = NULL;
01135                 
01136                 send_buff[i].size = 0;
01137         }       
01138         
01139         prev_get = &get_net_buffer[0] ;
01140         
01141         cur_get = &get_net_buffer[1];
01142         
01143         next_get = &get_net_buffer[2];
01144         
01145         for (i = 0; i<3 ;i++ ) {
01146         
01147                 get_net_buffer[i].grsize =0;
01148                 
01149                 get_net_buffer[i].sendflag = 0;
01150         
01151                 get_net_buffer[i].mess_numb = 0;
01152                 
01153                 get_net_buffer[i].mess_flag = 0;
01154                 
01155                 get_net_buffer[i].fault_flag = 0;
01156                 
01157                 get_net_buffer[i].cmd_flag = 0;
01158         
01159                 for(j =0 ; j < MAX_MEMBER; j++) {
01160         
01161                         get_net_buffer[i].status[j] = UNDEF;
01162                 
01163                         get_net_buffer[i].fault_cnt[j] = 0;
01164                 
01165                         get_net_buffer[i].message[j].data = NULL;
01166                 
01167                         get_net_buffer[i].message[j].size = 0;
01168                 }               
01169         }
01170         total_sent = 0;
01171                 
01172         n_to_send = 0;
01173 
01174         n_received = 0;
01175                 
01176         new_flag = 0;
01177                                 
01178         fault_count = 0;
01179                 
01180         fault_message.data = fault_buf.buf;
01181         
01182         cmd_to_send  = 0;
01183         
01184         cmd_message.data = cmd_buf.buf ;
01185         
01186         init_flag = 0;
01187         
01188         init_message.data = init_buf.buf;
01189 
01190         current = NULL;
01191         
01192         my_group = new group_admin();
01193         
01194 }

int total_order_channel::init_from_file (char * file) [private]

Definition at line 162 of file total_order_channel.cpp.

00162 {
00163                         
00164                 int ident, nb = 0;              
00165                 
00166                 if (my_group->init_members_list(file) ) { 
00167                 
00168                         cout << "pb init_members_list " << endl;
00169                         exit(0);
00170                 }
00171                                 
00172                 for(int i=0; i< MAX_MEMBER; i++) {              
00173                                 
00174                         if (my_group->members_list[i].ident != UNDEF) {  
00175                         
00176                                 ident = my_group->members_list[i].ident ;
00177                 
00178                                 cur_get->status[ident]= WAITING;
00179                         
00180                                 next_get->status[ident]= WAITING;
00181 
00182                                 nb ++;
00183                         }
00184                 }
00185                 //DEBUGcout << " total grsiz = " << nb << endl;
00186                 
00187                 cur_get->grsize = nb;
00188                 
00189                 next_get->grsize = nb;
00190                                 
00191                 cur_get->phase = 1 ;
00192                 
00193                 next_get->phase = 2 ;
00194                 
00195                 curphase = 1;
00196                 
00197                 return(0) ;
00198 
00199     }

int total_order_channel::init_from_net (char * data, int size) [private]

Definition at line 1377 of file total_order_channel.cpp.

01377 {
01378 
01379         int ident;
01380 
01381         struct member *memb ;
01382 
01383         initheader * inhead ;
01384         
01385         inhead = (initheader * ) data;
01386         
01387         my_ident =  inhead->ident ;
01388         
01389         net->my_ident = my_ident ; 
01390         
01391         grsize =  inhead->groupsize ; 
01392         
01393         curphase = inhead->phase;
01394         
01395         //DEBUGcout << " Init from net member : *** " << my_ident << " *** " << endl;
01396         
01397         //DEBUGcout << " grsize: " << grsize <<  " , curphase: " << curphase << endl;
01398         
01399         prev_get = &get_net_buffer[(curphase +2) % 3];
01400                                 
01401         cur_get= &get_net_buffer[curphase % 3];
01402         
01403         next_get =&get_net_buffer[(curphase +1) % 3];
01404         
01405         memb  = (member *) &data[INHEADSZ];
01406         
01407         for ( int i = 1; i<= grsize ;i ++) {
01408         
01409                 ident = memb->ident ;           
01410 
01411                 my_group->add_member(memb->ident, memb->nodeid, memb->port );
01412                 
01413                 cur_get->status[ident]= WAITING;
01414                 
01415                 next_get->status[ident]= WAITING;
01416                         
01417                 memb++;
01418                         
01419         }               
01420                 
01421         cur_get->grsize = grsize;
01422 
01423         next_get->grsize = grsize;
01424 
01425         cur_get->phase = curphase ;
01426 
01427         next_get->phase = curphase + 1 ;
01428 
01429         
01430   }

int total_order_channel::new_member_init (int ident) [private]

Definition at line 1301 of file total_order_channel.cpp.

01301 {
01302 
01303         int size , ret;
01304         
01305         char *pt ;
01306         
01307         header *head= &init_buf.head.head;
01308                 
01309         initheader *inhead = &init_buf.head.ihead;
01310         
01311         struct member *memb ;
01312         
01313         //DEBUGcout << " new_member_init : *** " << my_ident << " *** " << endl;
01314                 
01315         //DEBUGcout << " grsize: " << next_get->grsize <<  " , curphase: " << curphase << endl;
01316         
01317         size = INHEADSZ + (next_get->grsize * MEMBSZ) ;
01318         
01319         init_message.size = size + HEADSZ;
01320                 
01321         head->sender = my_ident;
01322         
01323         head->num_phase = curphase + 1;
01324         
01325         head->type = INIT ;
01326         
01327         head->size = size;
01328 
01329         inhead->ident = ident ; 
01330                 
01331         inhead->groupsize = next_get->grsize ; 
01332         
01333         inhead->phase = curphase + 1;
01334         
01335         pt = &init_buf.buf[HEADSZ + INHEADSZ];
01336         
01337         memb = (struct member *) pt;
01338                 
01339         int nb = 0;
01340                 
01341         for(int i=0 ; i< MAX_MEMBER; i++) { 
01342                 
01343                 if( my_group->members_list[i].ident != UNDEF ) {
01344                 
01345                         memb->ident = my_group->members_list[i].ident;
01346                         
01347                         memb->nodeid  = my_group->members_list[i].nodeid ;
01348                         
01349                         memb->port = my_group->members_list[i].port ;
01350                         
01351                         memb++;
01352                         
01353                         nb++;
01354                 }
01355         }
01356                         
01357         if ( nb != next_get->grsize) {   cout << " error groupsize " << next_get->grsize << " != " << nb ;}
01358         
01359         /*pthread_mutex_lock(&net_mutex);*/     
01360         
01361         ret= net->unicast(ident, init_message.data, init_message.size );
01362                                                                 
01363         /*pthread_mutex_unlock(&net_mutex);*/
01364         
01365         if(ret<0) {
01366                 
01367                 cout << "      Init unicast error, val =" << ret << endl;
01368         }    
01369         
01370         init_flag = 0;    
01371 }

void total_order_channel::phase_actions () [private]

Definition at line 914 of file total_order_channel.cpp.

00914 {
00915 
00916         int sz, i,k;
00917         
00918         int host, port, ident;
00919         
00920         cmdheader * cmdpt;
00921                         
00922         //DEBUGcout << "     phase " << curphase << " phase action " << endl;
00923         
00924         /* cout<< "     lock recv " << endl; */
00925                                                 
00926         pthread_mutex_lock(&recv_mutex);
00927                         
00928         /* cout<< "     got recv " << endl; */  
00929         
00930         /********************************************************************/  
00931         /***** suppress faulty members. This is the critical choice algorithm ****/     
00932         
00933         for(int i=0 ; i < MAX_MEMBER ; i++) {
00934         
00935                 if (prev_get->fault_cnt[i] != 0 || cur_get->fault_cnt[i] != 0 ) { 
00936 
00937                         cur_get->status[i] = FAULTY ;
00938                         
00939                         if (next_get->status[i] != UNDEF ) {
00940                                                 
00941                                 next_get->status[i] = UNDEF ;
00942                         
00943                                 next_get->grsize--;
00944                         
00945                                 my_group->del_member(i);
00946                         
00947                                 cout << "          XXXXXXXXXXX " << my_ident << " SUPPRESSION de " << i << endl; 
00948                         }
00949 
00950                         if ( prev_get->fault_cnt[i] != 0 ) 
00951                         
00952                                 prev_get->status[i]= FAULTY;
00953                 }
00954                 
00955                 /********************************************************************/  
00956                 /* deliver application messages */
00957                                                 
00958                 if( prev_get->status[i] == GOTMESS) {
00959                 
00960                         sz = prev_get->message[i].size;
00961                                                                 
00962                         prev_get->message[i].data[sz] = '\0';
00963                                 
00964                         cout << "        XXXXXXXXXXX " << my_ident << " DELIVRE de " << i <<": "<< prev_get->message[i].data << endl;
00965                                         
00966                         //receive(prev_get->message[i].data, prev_get->message[i].size); 
00967                         
00968                 }
00969         }
00970         /********************************************************************/  
00971         /* treat join and quit command messages */
00972         
00973         for(i=0 ; i < MAX_MEMBER ; i++) {
00974         
00975                 if(prev_get->status[i] == GOTCMD ) {
00976                 
00977                         cmdpt = (cmdheader *) prev_get->message[i].data;
00978                         
00979                         if ( cmdpt->type == JOIN ) {
00980                         
00981                                 new_flag = 1;
00982                         
00983                                 host = cmdpt->nodeid;
00984                                 
00985                                 port = cmdpt->port;
00986                                 
00987                                 if ( next_get->grsize < MAX_MEMBER) {
00988                         
00989                                         for(k=1 ; k < MAX_MEMBER ; k++) {
00990                                 
00991                                                 if (next_get->status[k] == UNDEF ) {
00992                                         
00993                                                         ident = k;
00994                                                 
00995                                                         next_get->status[k] = NEWMEM;
00996                                                         
00997                                                         next_get->grsize++;
00998                                                 
00999                                                         my_group->add_member(ident, host, port);
01000                                                         
01001                                                         cout << "        XXXXXXXXXXX " << my_ident << " AJOUT de " << ident <<endl ;
01002                                                 
01003                                                         break;
01004                                                 }
01005                                         }
01006                                 } else { 
01007                                         }
01008                         }
01009                         else if ( cmdpt->type == QUIT ) {
01010                         
01011                                 ident = cmdpt->ident;
01012                                 
01013                                 if(next_get->status[ident] != UNDEF ) {
01014                                 
01015                                         next_get->status[ident] = UNDEF ; 
01016                                         
01017                                         next_get->grsize--;
01018                                         
01019                                         my_group->del_member(ident);
01020                                 }
01021                         }                                       
01022                 }
01023         }
01024         for(i=0 ; i < MAX_MEMBER ; i++) {
01025         
01026                 if(next_get->status[i] == NEWMEM) {
01027                 
01028                         new_member_init(i);  
01029                         
01030                         next_get->status[i] = WAITING;
01031                 }
01032         }                               
01033        pthread_mutex_unlock(&recv_mutex); 
01034 }

void * total_order_channel::phase_code (void *)

Definition at line 543 of file total_order_channel.cpp.

00543 {
00544         
00545         int test, sz , iter ;
00546                 
00547         header *head ;
00548                 
00549         char *data;
00550         
00551         cout << "     start phase thread" << endl;
00552          
00553         for (;;) {
00554  
00555                 pthread_mutex_lock(&phase_mutex);
00556                 
00557                 cout << " ->> on phase " << curphase << endl;
00558                 
00559                 do {
00560 
00561 /********************************************************************/  
00562 /* There is a FAULT message to send  */
00563 
00564                         cout << "PHASE " << curphase << endl;
00565 
00566                         if ( fault_count) { 
00567                                 
00568                                 cur_get->sendflag = FAULT;
00569                         
00570                                 current = &fault_message;
00571                                 
00572                                 //DEBUGcout << "    fault broadcast at phase " << curphase << endl;
00573                         
00574                                 test = net->broadcast(fault_message.data, fault_message.size + HEADSZ);
00575  
00576                                 if(test<0) {
00577                                         cout << "Fault broadcast error, test=" << test << endl;
00578                                 }
00579                                 fault_count = 0;
00580                                 
00581                                 cur_get->status[my_ident]= GOTFAULT; 
00582                         }
00583 
00584 /********************************************************************/  
00585 /* There is a CMD message to broadcast  (JOIN / QUIT) */
00586 
00587                         else if ( cmd_to_send )  {
00588                         
00589                                 cur_get->sendflag = CMD ;
00590                                 
00591                                 current = &cmd_message ;
00592                                 
00593                                 data = &current->data[HEADSZ];
00594                                 
00595                                 cmd_buf.head.head.num_phase = curphase;
00596                                 
00597                                 //DEBUGcout << "    cmd broadcast at phase " << curphase << endl;
00598                                 
00599                                 test = net->broadcast(cmd_message.data, cmd_message.size + HEADSZ);
00600  
00601                                 if(test<0) {
00602                                         cout << "Cmd broadcast error, test=" << test << endl;
00603                                 }
00604                                 
00605                                 cur_get->status[my_ident]= GOTCMD; 
00606                                 
00607                                 cur_get->message[my_ident].data= new char[current->size];
00608                                         
00609                                 memcpy(cur_get->message[my_ident].data, data, current->size);
00610                                         
00611                                 cur_get->message[my_ident].size = current->size;
00612                                 
00613                                 cmd_to_send = 0;
00614                         }
00615                                                                                 
00616 /********************************************************************/  
00617 /*  There is a "standart" message to broadcast at this phase */
00618 
00619                         else if ( n_to_send != 0 ) {
00620                         
00621                                 cur_get->sendflag = MESS;
00622                                 
00623                                 total_sent++;
00624                                 
00625                                 current = (msgtype *) get_first_to_send(); 
00626                                 
00627                                 data = &current->data[HEADSZ];                                  
00628                                                         
00629                                 head = (header *) current->data;
00630                                         
00631                                 head->sender = my_ident;
00632                                 
00633                                 head->num_phase = curphase;
00634                                         
00635                                 head->type = MESS;
00636                                         
00637                                 head->size = current->size; 
00638                                                                                 
00639                                 //DEBUGcout << "     std broadcast at phase " << curphase << endl;
00640                                 
00641                                 /*pthread_mutex_lock(&net_mutex);*/
00642                                 
00643                                 test = net->broadcast( current->data, current->size + HEADSZ);
00644                                 
00645                                 /*pthread_mutex_unlock(&net_mutex);*/
00646                                         
00647                                 if(test < 0){
00648                                         cout << "  std  broadcast error, test = " << test << endl;
00649                                 }
00650                                 
00651                                 cur_get->status[my_ident]= GOTMESS; 
00652                                 
00653                                 cur_get->message[my_ident].data= new char[current->size];
00654                                         
00655                                 memcpy(cur_get->message[my_ident].data, data, current->size);
00656                                         
00657                                 cur_get->message[my_ident].size = current->size;
00658 
00659                         } else  {
00660 
00661 /********************************************************************/  
00662 /*  no message to broadcast at this phase , send ACK */
00663 
00664                                 cur_get->sendflag = ACK;
00665 
00666                                 current = NULL;
00667 
00668                                 head = & empty_message_header;
00669                                                                                                                         
00670                                 head->sender = my_ident;
00671                                 
00672                                 head->num_phase = curphase;
00673                                         
00674                                 head->type = ACK;
00675                                         
00676                                 head->size = 0; 
00677                                 
00678                                 //DEBUGcout << "     empty broadcast at phase " << curphase << endl;
00679                                 
00680                                 /*pthread_mutex_lock(&net_mutex);*/
00681 
00682                                 test = net->broadcast((char *) head, HEADSZ);
00683                                 
00684                                 /*pthread_mutex_unlock(&net_mutex);*/
00685                                         
00686                                 if(test<0) {
00687                                         cout << "  ack  broadcast error, test=" << test << endl;
00688                                 }
00689                                 
00690                                 cur_get->status[my_ident]= GOTACK;
00691                         }
00692                 
00693                 
00694 /**************************************************************************/
00695 /*********************  WAITING end-of phase ********************/      
00696 /* if some message is not yet received , wait for condition, set by deliver() */ 
00697 /* routine after receiving messages from all partners */     
00698 
00699                 if(cur_get->mess_numb != cur_get->grsize - 1 ) {
00700                                         
00701                         /* pthread_mutex_unlock(&recv_mutex); */
00702                         
00703                         if ( new_flag ) max_iter = NEW_MAX_ITER;
00704                         
00705                         else max_iter = MAX_ITER ; 
00706                         
00707                         new_flag = 0;
00708         
00709                         to.tv_sec = time(NULL) + timeout;
00710                         to.tv_nsec = 0;
00711                         
00712                         pthread_mutex_lock(&mp);
00713                         
00714                         int ret = pthread_cond_timedwait(&all_received, &mp, &to);
00715                         
00716                         pthread_mutex_unlock(&mp);
00717                         
00718                         for ( iter = 1 ; ret == ETIMEDOUT && iter <= max_iter ; iter ++)  {
00719                         
00720                                 // timeout routine 
00721 
00722                                 if (timeout_handler(iter) != 0 ) { 
00723                                         
00724                                         /* cout << "     lock recv " << endl;*/
00725                                         
00726                                         /* pthread_mutex_lock(&recv_mutex);*/
00727                                                 
00728                                         /* cout << "     got recv " << endl; */
00729                                         
00730                                         if(cur_get->mess_numb != cur_get->grsize - 1 ) {
00731                                         
00732                                         /* pthread_mutex_unlock(&recv_mutex); */
00733                                         
00734                                                 to.tv_sec = time(NULL) + timeout ;
00735                                                 to.tv_nsec = 0;
00736                                                 
00737                                                 pthread_mutex_lock(&mp);
00738                                                 
00739                                                 ret = pthread_cond_timedwait(&all_received, &mp, &to);
00740                                                 
00741                                                 pthread_mutex_unlock(&mp);
00742                                         }                                       
00743                                 }
00744                                 
00745                                 else { 
00746                                         ret = 0; 
00747                                 
00748                                         break;
00749                                 }                               
00750                         }
00751                                                 
00752                         if (ret) {
00753                         
00754                                 if (ret == ETIMEDOUT) {
00755                                 
00756                                         // some message was not received 
00757                                         
00758                                         fault_handler() ;   
00759                                 }
00760                                 else if (ret== EINVAL) {
00761                                         cout << "     phase erreur EINVAL cond wait iter " << iter << endl;
00762                                         exit(0);
00763                                         
00764                                 } else {                
00765                                         cout << "     phase erreur cond wait at iter " << iter << endl;
00766                                         exit(0);
00767                                 }       
00768                         }                        
00769                 }
00770                 /* else  pthread_mutex_unlock(&recv_mutex); */                          
00771                 
00772                 //DEBUGcout << " phase  " << curphase << " emissions " << total_sent << endl ;
00773 
00774 /**************************************************************************/
00775 /*********************  End-of phase ********************/      
00776 
00777 /* take phase decision and actions */
00778 
00779                         if (curphase != 1) phase_actions();
00780 
00781                         pthread_mutex_trylock(&phase_mutex);
00782                                                 
00783 /*******************************************************************/   
00784 /* update send buffers */
00785                         
00786                         /* cout<< "     lock send " << endl; */
00787 
00788                         pthread_mutex_lock(&send_mutex);
00789                         
00790                         /* cout<< "     got send " << endl; */
00791                         
00792                         send_buffer_advance();
00793                         
00794                         pthread_mutex_unlock(&send_mutex);
00795 
00796 /*******************************************************************/   
00797 /* Initialise next phase */     
00798 
00799                         //DEBUGcout << "     reinit for next phase " << curphase + 1<< endl;
00800                         
00801                         phase_update();
00802                         
00803                         //DEBUGcout << " to_send " << n_to_send  << " received " <<  n_received << endl;
00804                                                                 
00805                 } while( (cmd_to_send + n_to_send + n_received + ack_flag + fault_count ) != 0);
00806         }
00807 }

void total_order_channel::phase_update () [private]

Definition at line 1039 of file total_order_channel.cpp.

01039 {
01040 
01041                         
01042         /* cout<< "     lock recv " << endl; */
01043                                                 
01044         pthread_mutex_lock(&recv_mutex);
01045                         
01046         /* cout<< "     got recv " << endl; */
01047                         
01048         curphase++;
01049         
01050         previous = current;
01051                         
01052         current = NULL;
01053                         
01054         ack_flag = cur_get->mess_flag + cur_get->fault_flag + cur_get->cmd_flag;
01055         
01056 /********************************************************************/  
01057 /* phase buffers permutation */
01058         
01059         struct phase_buffer *tmp ; 
01060                 
01061         tmp = prev_get;
01062         
01063         prev_get = cur_get;
01064         
01065         cur_get = next_get;
01066         
01067         next_get= tmp;
01068                                         
01069         /* prev_get = &get_net_buffer[(curphase +2) % 3];
01070                                 
01071         cur_get= &get_net_buffer[curphase % 3];
01072         
01073         next_get =&get_net_buffer[(curphase +1) % 3]; */
01074         
01075 /********************************************************************/  
01076 /* next buffer initialisation */        
01077 
01078 
01079         n_received = cur_get->mess_numb;
01080                 
01081         next_get->phase = curphase + 1;
01082                 
01083         next_get->grsize = cur_get->grsize; 
01084         
01085         next_get->sendflag = 0 ;
01086         
01087         next_get->mess_numb = 0;
01088         
01089         next_get->mess_flag = 0 ;
01090         
01091         next_get->fault_flag = 0 ;
01092         
01093         next_get->cmd_flag = 0 ;
01094                 
01095         for(int i=0 ; i< MAX_MEMBER; i++) {
01096         
01097                 //cout<< " ident " << i << " status " << cur_get->status[i] << endl; 
01098         
01099                 if( cur_get->status[i] == UNDEF ) next_get->status[i] = UNDEF;
01100                 
01101                 else { 
01102                         next_get->status[i] = WAITING;
01103                 
01104                         //cout << "MEMBER ident : "<< i << endl;
01105                 }
01106         
01107                 next_get-> fault_cnt[i] = 0;
01108                 
01109                 if (next_get->message[i].data != NULL) { 
01110                 
01111                         delete(next_get->message[i].data );
01112                         
01113                         next_get->message[i].data = NULL;
01114                         
01115                 }
01116                         
01117                 next_get->message[i].size = 0;
01118         }
01119         
01120         pthread_mutex_unlock(&recv_mutex); 
01121         
01122 }

void total_order_channel::send_buffer_advance () [private]

Definition at line 1219 of file total_order_channel.cpp.

01219 {
01220 
01221         int ind;
01222         
01223         if (total != 0) {
01224         
01225                 if (prev_get->sendflag == MESS)  {
01226                 
01227                         if ( cur_get->sendflag == MESS ) ind = next - 2 ;
01228                         
01229                         else ind = next - 1;
01230                 }
01231                 
01232                 else {
01233                 
01234                          return ; 
01235                 }
01236 
01237                 if (ind < 0 ) ind = ind + TAILLE_MAX ;
01238         
01239                 if (send_buff[ind].size != 0 ) {
01240                 
01241                         delete(send_buff[ind].data) ;
01242                          
01243                         send_buff[ind].data = NULL;
01244                         
01245                         send_buff[ind].size = 0 ;
01246                 }
01247                 
01248                 total--;
01249                 
01250                 //DEBUGcout << " total " << total << " next " << next << " last " << last << " ind " << ind << endl;
01251         }
01252 }

int total_order_channel::start_join (char * data, int size) [private]

Definition at line 1259 of file total_order_channel.cpp.

01259 {
01260 
01261 
01262         header *head = &cmd_buf.head.head;
01263         
01264         cmdheader *cmdpt= &cmd_buf.head.chead;
01265         
01266         if (cmd_to_send == 0 && init_flag == 0) {
01267                         
01268                 head->sender = my_ident;
01269 
01270                 head->num_phase = -1;
01271 
01272                 head->type = CMD;
01273 
01274                 head->size = CMHEADSZ ;
01275                         
01276                 cmdpt->type = JOIN ;
01277                 
01278                 cmdpt->ident = 0 ;
01279                         
01280                 cmdpt->nodeid = * (int *)data;
01281         
01282                 cmdpt->port = * (int *)(data + sizeof(int));
01283                 
01284                 cmd_message.size = head->size ;
01285         
01286                 cmd_to_send = 1;
01287                 
01288                 init_flag = 1;
01289         }
01290                 
01291         else {
01292         
01293                 cout << " start_join aborted : *** " << my_ident << " *** " << endl;
01294         }       
01295         
01296 }

int total_order_channel::start_net_init (char * file) [private]

Definition at line 133 of file total_order_channel.cpp.

00133 {
00134 
00135 
00136         header *head= &cmd_buf.head.head;
00137         int * pt;
00138                         
00139                 head->sender = -1;
00140 
00141                 head->num_phase = -1;
00142 
00143                 head->type = NEW;
00144 
00145                 head->size = 2 * sizeof(int) ;
00146         
00147                 pt =  (int *) &cmd_buf.buf[HEADSZ];
00148 
00149                 *pt = net->get_mynode();
00150         
00151                 *(pt + 1) = net->get_myport();
00152                 
00153                 cout << "***** start_net_init, my_port = " << *(pt + 1) <<"****" << endl;
00154                 
00155                 net->init_membership(file, cmd_message.data, head->size + HEADSZ ); 
00156         
00157 }

int total_order_channel::timeout_handler (int it) [private]

Definition at line 811 of file total_order_channel.cpp.

00811 {
00812 
00813                 int test;
00814 
00815                 //DEBUGcout << "     TIMEOUT  at iter " << it  << endl;
00816 
00817                 union{ 
00818                 
00819                         header head;
00820                         char buf[25];
00821                         
00822                 } nack_message; 
00823                 
00824                 int dest_list[MAX_MEMBER] ;
00825                 
00826                 int dest ; 
00827 
00828                 dest = (cur_get->grsize - cur_get->mess_numb) -1;       //number of missing senders
00829                 
00830                 if ( dest == 0 ) return( 0);
00831                 
00832                 dest = 0;
00833                 
00834                 //Fill nack message             
00835                 
00836                 header *head= &nack_message.head;
00837                                                 
00838                 head->sender = my_ident;
00839 
00840                 head->num_phase = curphase;
00841 
00842                 head->type = NACK;              
00843                 
00844                 head->size = 0;
00845 
00846                 for(int id =0 ; id < MAX_MEMBER ; id++) {
00847 
00848                         if( id != my_ident && cur_get->status[id]== WAITING ) {
00849                         
00850                                 dest_list[dest]= id; 
00851                                 
00852                                 dest ++; 
00853                                 
00854                                 //DEBUGcout << " Nack for " << id <<  endl;
00855                         }
00856                 }
00857                 if ( dest == 0 ) return( 0);
00858                 
00859                 test= net->multicast(dest, dest_list , nack_message.buf, HEADSZ );
00860  
00861                 if(test<0) {
00862                         cout << "nack broadcast error test=" << test << endl;
00863                 } 
00864                 return(1);
00865 
00866 }

Member Data Documentation

int total_order_channel::ack_flag [private]

Definition at line 234 of file total_order_channel.h.

pthread_cond_t total_order_channel::all_received [private]

Definition at line 337 of file total_order_channel.h.

char total_order_channel::buf[400] [private]

Definition at line 161 of file total_order_channel.h.

cmdheader total_order_channel::chead

Definition at line 188 of file total_order_channel.h.

union { ... } cmd_buf [private]

msgtype total_order_channel::cmd_message [private]

Definition at line 184 of file total_order_channel.h.

int total_order_channel::cmd_to_send [private]

Definition at line 232 of file total_order_channel.h.

struct phase_buffer * total_order_channel::cur_get [private]

Definition at line 285 of file total_order_channel.h.

int total_order_channel::curphase [private]

Definition at line 75 of file total_order_channel.h.

msgtype * total_order_channel::current [private]

Definition at line 109 of file total_order_channel.h.

int total_order_channel::delt [private]

Definition at line 219 of file total_order_channel.h.

header total_order_channel::empty_message_header [private]

Definition at line 199 of file total_order_channel.h.

union { ... } fault_buf [private]

int total_order_channel::fault_count [private]

Definition at line 238 of file total_order_channel.h.

msgtype total_order_channel::fault_message [private]

Definition at line 204 of file total_order_channel.h.

struct total_order_channel::phase_buffer total_order_channel::get_net_buffer[3] [private]

int total_order_channel::grsize [private]

Definition at line 77 of file total_order_channel.h.

struct { ... } head [private]

struct { ... } head [private]

header total_order_channel::head

Definition at line 158 of file total_order_channel.h.

initheader total_order_channel::ihead

Definition at line 159 of file total_order_channel.h.

union { ... } init_buf [private]

int total_order_channel::init_flag [private]

Definition at line 240 of file total_order_channel.h.

msgtype total_order_channel::init_message [private]

Definition at line 155 of file total_order_channel.h.

char total_order_channel::ipadd[15] [private]

Definition at line 95 of file total_order_channel.h.

int total_order_channel::last [private]

Definition at line 221 of file total_order_channel.h.

int total_order_channel::max_iter [private]

Definition at line 244 of file total_order_channel.h.

pthread_mutex_t total_order_channel::mp [private]

Definition at line 339 of file total_order_channel.h.

group_admin * total_order_channel::my_group [private]

Definition at line 93 of file total_order_channel.h.

int total_order_channel::my_ident [private]

Definition at line 97 of file total_order_channel.h.

int total_order_channel::n_received [private]

Definition at line 228 of file total_order_channel.h.

int total_order_channel::n_to_send [private]

Definition at line 230 of file total_order_channel.h.

no_prop_channel * total_order_channel::net [private]

Definition at line 66 of file total_order_channel.h.

int total_order_channel::new_flag [private]

Definition at line 236 of file total_order_channel.h.

int total_order_channel::next [private]

Definition at line 220 of file total_order_channel.h.

struct phase_buffer * total_order_channel::next_get [private]

Definition at line 285 of file total_order_channel.h.

pthread_mutex_t total_order_channel::phase_mutex [private]

Definition at line 343 of file total_order_channel.h.

pthread_t total_order_channel::phase_thread [private]

Definition at line 68 of file total_order_channel.h.

struct phase_buffer * total_order_channel::prev_get [private]

Definition at line 285 of file total_order_channel.h.

msgtype * total_order_channel::previous [private]

Definition at line 109 of file total_order_channel.h.

pthread_mutex_t total_order_channel::recv_mutex [private]

Definition at line 347 of file total_order_channel.h.

msgtype total_order_channel::send_buff[TAILLE_MAX] [private]

Definition at line 217 of file total_order_channel.h.

pthread_mutex_t total_order_channel::send_mutex [private]

Definition at line 345 of file total_order_channel.h.

int total_order_channel::status [private]

Definition at line 73 of file total_order_channel.h.

int total_order_channel::timeout [private]

Definition at line 245 of file total_order_channel.h.

timestruc_t total_order_channel::to [private]

Definition at line 349 of file total_order_channel.h.

int total_order_channel::total [private]

Definition at line 222 of file total_order_channel.h.

int total_order_channel::total_sent [private]

Definition at line 79 of file total_order_channel.h.


The documentation for this class was generated from the following files:
Generated at Mon Mar 1 18:05:56 2004 for Groupware by doxygen 1.1.1 written by Dimitri van Heesch, © 1997-2000