#include <total_order_channel.h>
Definition at line 49 of file total_order_channel.h.
00049 { // : public channel, channeluser { 00050 00051 /********************************************************************/ 00052 /*interface*/ 00053 00054 public: 00055 total_order_channel(int , char *, char *); 00056 00057 int deliver ( char *, int); // from network 00058 00059 int broadcast ( char *, int ); // from application 00060 00061 void * phase_code(void *); //Phase thread code 00062 00063 00064 private: 00065 00066 no_prop_channel * net; // the no-prop channel UDP 00067 00068 pthread_t phase_thread; //Phase thread structure 00069 00070 /********************************************************************/ 00071 /*local state*/ 00072 00073 int status ; /* initialisation, running or ending */ 00074 00075 int curphase; 00076 00077 int grsize; 00078 00079 int total_sent; 00080 00081 #define START 0 // waiting for channel information 00082 #define PRERUN 1 // initialising 00083 #define RUN 2 // running 00084 /* #define SYNC 3 resynchronising */ 00085 #define ERROR 4 // unexpected (!!) situation 00086 #define STOP 5 // waiting for group close confirmation 00087 #define END 6 // after group close confirmation 00088 00089 00090 /********************************************************************/ 00091 /******************* Group data *******************/ 00092 00093 group_admin *my_group; 00094 00095 char ipadd[15]; 00096 00097 int my_ident; //local member ident 00098 00099 /********************************************************************/ 00100 /******************* messages data *******************/ 00101 00102 typedef struct { 00103 00104 char *data; 00105 int size; 00106 00107 } msgtype ; 00108 00109 msgtype *current , *previous; 00110 00111 /********************************************************************/ 00112 /******************* message header structure*******************/ 00113 00114 typedef struct { 00115 00116 int sender; //ident of the sender 00117 00118 int num_phase; //phase number 00119 00120 int type; // 00121 00122 int size; // 00123 00124 } header; 00125 00126 #define HEADSZ sizeof(header) 00127 00128 /********** Definition of message types ************************/ 00129 00130 #define NEW 1 // new member demand, (one destination) 00131 #define INIT 2 // channel parameters (ident, grsize etc ..) 00132 #define MESS 3 // application message 00133 #define CMD 4 // JOIN, QUIT 00134 #define ACK 5 // empty message 00135 #define NACK 6 // multicasted to late senders 00136 #define FAULT 7 // list of faulty idents 00137 00138 /********************************************************************/ 00139 /******************* INIT message structure*******************/ 00140 00141 typedef struct { 00142 00143 int ident; 00144 00145 int groupsize; 00146 00147 int phase; 00148 00149 } initheader; 00150 00151 #define INHEADSZ sizeof(initheader) 00152 00153 /******************* init message *******************/ 00154 00155 msgtype init_message; 00156 union { 00157 struct { 00158 header head; 00159 initheader ihead; 00160 } head; 00161 char buf[400]; 00162 00163 } init_buf ; 00164 00165 /********************************************************************/ 00166 /******************* CMD message structure ***********************/ 00167 00168 typedef struct { 00169 00170 int type ; // JOIN or QUIT 00171 00172 int ident; 00173 00174 int nodeid; 00175 00176 int port; 00177 00178 } cmdheader; 00179 00180 #define CMHEADSZ sizeof(cmdheader) 00181 00182 /******************* cmd (join/quit) message *******************/ 00183 00184 msgtype cmd_message; 00185 union { 00186 struct { 00187 header head; 00188 cmdheader chead; 00189 } head; 00190 char buf[100]; 00191 } cmd_buf ; 00192 00193 #define JOIN 1 00194 #define QUIT 2 00195 00196 /********************************************************************/ 00197 /******************* ack message *******************/ 00198 00199 header empty_message_header; 00200 00201 /********************************************************************/ 00202 /******************* fault message *******************/ 00203 00204 msgtype fault_message; 00205 union { 00206 header head; 00207 char buf[400]; 00208 00209 } fault_buf; 00210 00211 00212 /********************************************************************/ 00213 /******************* send buffer *******************/ 00214 00215 #define TAILLE_MAX 100 00216 00217 msgtype send_buff[TAILLE_MAX]; 00218 00219 int delt, // next to delete ( unused ) 00220 next, // next to send 00221 last, // input index ( last to send +1) 00222 total; 00223 00224 00225 /********************************************************************/ 00226 /************** current phase data *********************/ 00227 00228 int n_received; // number of senders of the received messages at curphase 00229 00230 int n_to_send; // number of unsent buffered messages 00231 00232 int cmd_to_send; // a join (or quit) message has to be sent 00233 00234 int ack_flag; // a message has to be acknowledged at curphase 00235 00236 int new_flag; // a new member is joining 00237 00238 int fault_count; // number of missing messages/members 00239 00240 int init_flag; // in charge of a join process 00241 00242 /*************** timeout and iteration values ( dynamic and static) *****************/ 00243 00244 int max_iter; 00245 int timeout ; 00246 00247 #define TIMEOUT1 1 00248 #define TIMEOUT2 2 00249 #define MAX_ITER 2 00250 #define NEW_MAX_ITER 2 * MAX_ITER 00251 00252 /********************************************************************/ 00253 /******** per-phase structure with input message buffers *************/ 00254 00255 struct phase_buffer { 00256 00257 int phase; 00258 00259 int sendflag; // type of message sent at curphase 00260 00261 int grsize; // number of members 00262 00263 int mess_numb; // number of senders for the received messages 00264 00265 int mess_flag ; // a MESS has been received at this phase 00266 00267 int fault_flag; // a FAULT has been received at this phase 00268 00269 int cmd_flag; // a CMD has been received at this phase 00270 00271 int status[MAX_MEMBER]; 00272 00273 //see values below 00274 00275 int fault_cnt[MAX_MEMBER]; 00276 00277 // count of FAULT messages for that member ( previous phase ) 00278 00279 msgtype message[MAX_MEMBER]; 00280 00281 // the messages received at this phase; 00282 00283 } get_net_buffer[3]; 00284 00285 struct phase_buffer *prev_get, *cur_get, *next_get; 00286 00287 /********************************************************************/ 00288 /********* (per-phase) status of members *********/ 00289 00290 #define UNDEF -1 00291 #define WAITING 1 00292 #define FAULTY 2 00293 #define NEWMEM 4 00294 #define GOT 32 00295 #define GOTMESS GOT + MESS 00296 #define GOTCMD GOT + CMD 00297 #define GOTACK GOT + ACK 00298 #define GOTFAULT GOT + FAULT 00299 00300 /********************************************************************/ 00301 /********************************************************************/ 00302 /*****************initialisation method *****************/ 00303 00304 int init_from_file(char *) ; 00305 00306 int start_net_init(char *) ; 00307 00308 int init_from_net(char * data, int size) ; 00309 00310 void init_data(); 00311 00312 /*****************per-phase methods *****************/ 00313 00314 int timeout_handler( int ); 00315 00316 void fault_handler(); 00317 00318 void phase_update() ; 00319 00320 void phase_actions(); 00321 00322 void send_buffer_advance(); 00323 00324 void * get_first_to_send(); 00325 00326 /****** initiate a JOIN process for a new member ***********/ 00327 00328 int start_join(char *data, int size ); 00329 00330 /***** terminate a JOIN process ( send data to new member ) ***********/ 00331 00332 int new_member_init( int ident ); 00333 00334 /********************************************************************/ 00335 /****************Synchronisation objects ****************/ 00336 00337 pthread_cond_t all_received; //waiting for the reception from all members 00338 00339 pthread_mutex_t mp ; 00340 00341 /*pthread_mutex_t net_mutex; */ //to access UDP 00342 00343 pthread_mutex_t phase_mutex; //to start a new phase 00344 00345 pthread_mutex_t send_mutex; //mutual exclusion for send buffer 00346 00347 pthread_mutex_t recv_mutex; //mutual exclusion for recv buffer 00348 00349 timestruc_t to; // time struct for timeouts 00350 00351 00352 }
total_order_channel::total_order_channel (int flag, char * file, char * ipnum) |
Definition at line 27 of file total_order_channel.cpp.
00027 { 00028 00029 cout << "start total order channel" << endl; 00030 00031 /********************************************************************/ 00032 /* initialise local data */ 00033 00034 char ipadd[20]; 00035 00036 init_data(); 00037 00038 if (flag == 1) strcpy(ipadd,ipnum); 00039 00040 else my_ident = atoi(ipnum); 00041 00042 init_data(); 00043 00044 status = START; 00045 00046 /********************************************************************/ 00047 /* Case of a group creator: initialise group and membership */ 00048 00049 if (flag == 0 ) { 00050 if (init_from_file(file) ) { 00051 00052 cout << " erreur opening " << file << endl; 00053 00054 exit(0); 00055 } 00056 00057 status = RUN; 00058 } 00059 00060 00061 /********************************************************************/ 00062 /* initialise synchro objects */ 00063 00064 if( int ret = pthread_cond_init(&all_received, NULL) ) { 00065 00066 cout << " pthread_cond_init error " << "ret = " << ret << endl; 00067 } 00068 00069 pthread_mutex_init(&mp,NULL); 00070 00071 /*pthread_mutex_init(&net_mutex,NULL);*/ 00072 00073 pthread_mutex_init(&phase_mutex, NULL); 00074 00075 pthread_mutex_init(&send_mutex,NULL); 00076 00077 pthread_mutex_init(&recv_mutex, NULL); 00078 00079 pthread_mutex_unlock(&mp); 00080 00081 /*pthread_mutex_unlock(&net_mutex);*/ 00082 00083 pthread_mutex_lock(&phase_mutex); 00084 00085 pthread_mutex_unlock(&send_mutex); 00086 00087 pthread_mutex_unlock(&recv_mutex); 00088 00089 /********************************************************************/ 00090 /* initialise underlying layer */ 00091 00092 if (flag == 1) /* following members */ 00093 net = new no_prop_channel(this, ipadd, my_group); 00094 00095 else if (flag == 0) /* first members */ 00096 net = new no_prop_channel(this, my_ident , file, my_group); 00097 00098 /********************************************************************/ 00099 /* for a newcomer, join group and initialise membership */ 00100 00101 if (flag == 1) { 00102 00103 start_net_init(file); 00104 00105 status = PRERUN ; 00106 } 00107 00108 /********************************************************************/ 00109 /* start phase thread */ 00110 00111 max_iter = MAX_ITER; 00112 00113 timeout = TIMEOUT2; 00114 00115 int ret = pthread_create(&phase_thread, 0,start_phase, (void *)this); 00116 00117 if (ret) { 00118 perror(" total_order()"); printf(" erreur th create %d\n",ret); 00119 if (ret == ENOMEM) printf(" ENOMEM \n" ); 00120 if (ret== EINVAL) printf(" EINVAL \n" ); 00121 if (ret == EPERM) printf(" EPERM \n" ); 00122 exit(0); 00123 } 00124 00125 00126 }
int total_order_channel::broadcast (char * data, int size) |
Definition at line 494 of file total_order_channel.cpp.
00494 { 00495 00496 if(status != RUN) { 00497 00498 cout<< "status broadcast " << status << endl; 00499 00500 return (-2); 00501 } 00502 00503 /* cout<< "lock send " << endl; */ 00504 00505 pthread_mutex_lock(&send_mutex); 00506 00507 /* cout<< "got send " << endl; */ 00508 00509 if( total != TAILLE_MAX - 1 ) { 00510 00511 send_buff[last].data = new char[size + HEADSZ ]; 00512 00513 memcpy(send_buff[last].data + HEADSZ , data, size); 00514 00515 send_buff[last].size = size; // the size WITHOUT the header 00516 00517 total++; 00518 00519 last = (last+1) % TAILLE_MAX; 00520 00521 } else { 00522 00523 cout << " broadcast buffer plein " << endl; 00524 00525 pthread_mutex_unlock(&send_mutex); 00526 00527 return(-1); 00528 } 00529 00530 n_to_send++; 00531 00532 pthread_mutex_unlock(&send_mutex); 00533 00534 pthread_mutex_unlock(&phase_mutex); 00535 00536 return (0); 00537 }
int total_order_channel::deliver (char * data, int size) |
Definition at line 205 of file total_order_channel.cpp.
00205 { 00206 00207 int ret; 00208 00209 int sender, mess_phase, type, mess_size ; 00210 00211 int ident, faulty; 00212 00213 header *head = (header *) data; 00214 00215 sender = head->sender; 00216 00217 type = head->type; 00218 00219 mess_phase = head->num_phase; 00220 00221 mess_size = head->size; 00222 00223 if ( mess_size != size - HEADSZ) 00224 00225 cout << " ** Problem of message size , got :"<< size - HEADSZ << " expected : " << mess_size<< endl; 00226 00227 char * datapt = data + HEADSZ; 00228 00229 //DEBUGcout <<"from " << sender <<" type " << type << " phase " << curphase <<" mess_phase " << mess_phase <<endl; 00230 00231 if ( status == RUN) { 00232 00233 if(type == MESS || type == CMD || type == ACK || type == FAULT ) { 00234 00235 /* cout<< " lock recv " << endl; */ 00236 00237 pthread_mutex_lock(&recv_mutex); 00238 00239 /* cout<< " got recv " << endl; */ 00240 00241 if(mess_phase == curphase) { 00242 00243 if (cur_get->sendflag == 0) { 00244 00245 pthread_mutex_unlock(&phase_mutex); 00246 } 00247 00248 if(cur_get->status[sender]==WAITING) { 00249 00250 //DEBUG cout << " first message: from " << sender << endl; 00251 00252 cur_get->mess_numb++; 00253 00254 cur_get->status[sender]= GOT + type; 00255 00256 if ( type == MESS || type == CMD ) { 00257 00258 cur_get->mess_flag = 1; 00259 00260 cur_get->message[sender].data= new char[mess_size]; 00261 00262 memcpy(cur_get->message[sender].data, datapt, mess_size); 00263 00264 cur_get->message[sender].size = mess_size; 00265 } 00266 else if (type == FAULT ) { 00267 00268 cur_get->fault_flag = 1; 00269 00270 faulty = mess_size/ sizeof(int); 00271 00272 //DEBUGcout << " got FAULT for phase " << mess_phase 00273 //DEBUG<< " for " << faulty << " faulty" << endl; 00274 00275 for(int i=1 ; i<= faulty ; i++, datapt += 4) { 00276 00277 ident = *(int *)datapt; 00278 00279 prev_get->fault_cnt[ident]++; 00280 00281 } 00282 } 00283 } 00284 pthread_mutex_unlock(&recv_mutex); 00285 00286 /********************************************************************/ 00287 /* all messages received, signal end of phase */ 00288 00289 if(cur_get->mess_numb == cur_get->grsize - 1 ) { 00290 00291 ret = pthread_cond_signal(&all_received); 00292 00293 if (ret) { 00294 00295 cout << " error signal all_received : " << ret 00296 << " at phase " << curphase << endl; 00297 00298 if (ret== EINVAL) cout << " EINVAL " << endl; 00299 } 00300 00301 } 00302 00303 } else if (mess_phase == curphase+1) { 00304 00305 if(next_get->status[sender]== WAITING) { 00306 00307 next_get->mess_numb++; 00308 00309 next_get->status[sender]= GOT + type; 00310 00311 next_get->message[sender].data= new char[mess_size]; 00312 00313 memcpy(next_get->message[sender].data, datapt, mess_size); 00314 00315 next_get->message[sender].size = mess_size; 00316 00317 if ( type == MESS || type == CMD ) { 00318 00319 next_get->mess_flag = 1; 00320 } 00321 else if (type == FAULT ) { 00322 00323 next_get->fault_flag = 1; 00324 00325 faulty = mess_size/sizeof(int); 00326 00327 cout << "FAULT in advance for phase " << mess_phase 00328 << " for " << faulty << " faulty" << endl; 00329 00330 for(int i=1 ; i<= faulty ; i++, datapt += 4) { 00331 00332 ident = *(int *)datapt; 00333 00334 if (ident == my_ident) { 00335 00336 cout << " SABORDAGE " << endl; 00337 00338 exit(1); 00339 } 00340 00341 // cur_get->fault_cnt[ident]++; 00342 } 00343 } 00344 } 00345 00346 pthread_mutex_unlock(&recv_mutex); 00347 00348 //DEBUGcout << " message in advance " << endl; 00349 } else { 00350 00351 pthread_mutex_unlock(&recv_mutex); 00352 00353 //DEBUGcout << " message hors phase, phase " << mess_phase << endl; 00354 } 00355 00356 } else if( type == NACK ) { 00357 00358 cout << " got NACK for phase " << mess_phase << endl; 00359 00360 /* cout<< " lock recv " << endl; */ 00361 00362 pthread_mutex_lock(&recv_mutex); 00363 00364 /* cout<< " got recv " << endl; */ 00365 00366 if (mess_phase == curphase) { 00367 00368 if ( cur_get->sendflag == 0 ) { 00369 00370 pthread_mutex_unlock(&recv_mutex); 00371 00372 pthread_mutex_unlock(&phase_mutex); 00373 } 00374 00375 else { 00376 00377 if( cur_get->sendflag == MESS || cur_get->sendflag == FAULT || cur_get->sendflag == CMD) { 00378 00379 //DEBUGcout << " current re-unicast , size = " << current->size << endl; 00380 00381 /*pthread_mutex_lock(&net_mutex);*/ 00382 00383 ret = net->unicast(sender, current->data, current->size + HEADSZ ); 00384 00385 /*pthread_mutex_unlock(&net_mutex);*/ 00386 00387 pthread_mutex_unlock(&recv_mutex); 00388 00389 } else { // it was an empty ACK message 00390 00391 pthread_mutex_unlock(&recv_mutex); 00392 00393 head = &empty_message_header; 00394 00395 head->sender = my_ident; 00396 00397 head->num_phase = curphase; 00398 00399 head->type = ACK; 00400 00401 head->size = 0; 00402 00403 //DEBUGcout << " curphase empty re-unicast " << endl; 00404 00405 /*pthread_mutex_lock(&net_mutex);*/ 00406 00407 ret= net->unicast( sender, (char *) head, HEADSZ ); 00408 00409 /*pthread_mutex_unlock(&net_mutex);*/ 00410 } 00411 00412 if(ret<0) { 00413 cout << " Resend unicast error, val =" << ret << endl; 00414 } 00415 } 00416 00417 } else if (mess_phase == curphase - 1) { 00418 00419 00420 // Sends to sender the complete information 00421 00422 // should increase the timeout for the sender 00423 00424 if ( prev_get->sendflag == MESS || prev_get->sendflag == FAULT || prev_get->sendflag == CMD) { 00425 00426 //DEBUGcout << " previous re-unicast , size = " << previous->size << endl; 00427 00428 /*pthread_mutex_lock(&net_mutex);*/ 00429 00430 ret = net->unicast(sender, previous->data, previous->size + HEADSZ); 00431 00432 /*pthread_mutex_unlock(&net_mutex);*/ 00433 00434 pthread_mutex_unlock(&recv_mutex); 00435 00436 } else { 00437 00438 pthread_mutex_unlock(&recv_mutex); 00439 00440 head = &empty_message_header; 00441 00442 head->sender = my_ident; 00443 00444 head->num_phase = curphase -1; 00445 00446 head->type = ACK; 00447 00448 head->size = 0; 00449 00450 //DEBUGcout << " previous empty re-unicast " << endl; 00451 00452 /*pthread_mutex_lock(&net_mutex);*/ 00453 00454 ret= net->unicast(sender, (char *) head, HEADSZ ); 00455 00456 /*pthread_mutex_unlock(&net_mutex);*/ 00457 } 00458 if(ret<0) { 00459 cout << " Resend unicast error , val =" << ret << endl; 00460 } 00461 00462 } else if (mess_phase == curphase + 1) { 00463 00464 pthread_mutex_unlock(&recv_mutex); 00465 00466 cout << "message NACK en avance " << endl; 00467 00468 } else { 00469 00470 pthread_mutex_unlock(&recv_mutex); 00471 00472 cout << "message NACK hors phase, phase " << mess_phase << endl; 00473 } 00474 } 00475 else if ( type == NEW ) { 00476 00477 start_join( datapt, mess_size ); 00478 00479 pthread_mutex_unlock(&phase_mutex); 00480 } 00481 } 00482 else if ( status == PRERUN && type == INIT) { 00483 00484 init_from_net( datapt, mess_size ); 00485 00486 status = RUN; 00487 00488 } 00489 }
void total_order_channel::fault_handler () [private]
|
Definition at line 871 of file total_order_channel.cpp.
00871 { 00872 00873 //DEBUGcout << " TIMEOUT FAULT " << endl; 00874 00875 header *head= &fault_buf.head; 00876 00877 char *data = &fault_buf.buf[HEADSZ]; 00878 00879 fault_count = 0; 00880 00881 for(int i=0 ; i< MAX_MEMBER ; i++) { 00882 00883 if(i != my_ident && cur_get->status[i]== WAITING ) { 00884 00885 *(int *) data = i; 00886 00887 data += sizeof(int); 00888 00889 fault_count ++; 00890 00891 cur_get->fault_cnt[i]++ ; 00892 00893 //DEBUGcout << " seems lost : " << i << endl; 00894 00895 } 00896 } 00897 00898 head->sender = my_ident; 00899 00900 head->num_phase = curphase + 1; 00901 00902 head->type = FAULT; 00903 00904 head->size = sizeof(int) * fault_count ; 00905 00906 fault_message.size = head->size ; 00907 }
void * total_order_channel::get_first_to_send () [private]
|
Definition at line 1199 of file total_order_channel.cpp.
01199 { 01200 01201 /* cout<< " lock send " << endl; */ 01202 01203 pthread_mutex_lock(&send_mutex); 01204 01205 /* cout<< " got send " << endl; */ 01206 01207 void * pt = (void *)&send_buff[next]; 01208 01209 next = (next + 1 ) % TAILLE_MAX; 01210 01211 n_to_send-- ; 01212 01213 pthread_mutex_unlock(&send_mutex); 01214 01215 return(pt); 01216 }
void total_order_channel::init_data () [private]
|
Definition at line 1127 of file total_order_channel.cpp.
01127 { 01128 01129 delt=0 ; last = 0; next=0; total = 0 ; 01130 int i,j; 01131 01132 for ( i = 0; i< TAILLE_MAX ; i++) { 01133 01134 send_buff[i].data = NULL; 01135 01136 send_buff[i].size = 0; 01137 } 01138 01139 prev_get = &get_net_buffer[0] ; 01140 01141 cur_get = &get_net_buffer[1]; 01142 01143 next_get = &get_net_buffer[2]; 01144 01145 for (i = 0; i<3 ;i++ ) { 01146 01147 get_net_buffer[i].grsize =0; 01148 01149 get_net_buffer[i].sendflag = 0; 01150 01151 get_net_buffer[i].mess_numb = 0; 01152 01153 get_net_buffer[i].mess_flag = 0; 01154 01155 get_net_buffer[i].fault_flag = 0; 01156 01157 get_net_buffer[i].cmd_flag = 0; 01158 01159 for(j =0 ; j < MAX_MEMBER; j++) { 01160 01161 get_net_buffer[i].status[j] = UNDEF; 01162 01163 get_net_buffer[i].fault_cnt[j] = 0; 01164 01165 get_net_buffer[i].message[j].data = NULL; 01166 01167 get_net_buffer[i].message[j].size = 0; 01168 } 01169 } 01170 total_sent = 0; 01171 01172 n_to_send = 0; 01173 01174 n_received = 0; 01175 01176 new_flag = 0; 01177 01178 fault_count = 0; 01179 01180 fault_message.data = fault_buf.buf; 01181 01182 cmd_to_send = 0; 01183 01184 cmd_message.data = cmd_buf.buf ; 01185 01186 init_flag = 0; 01187 01188 init_message.data = init_buf.buf; 01189 01190 current = NULL; 01191 01192 my_group = new group_admin(); 01193 01194 }
int total_order_channel::init_from_file (char * file) [private]
|
Definition at line 162 of file total_order_channel.cpp.
00162 { 00163 00164 int ident, nb = 0; 00165 00166 if (my_group->init_members_list(file) ) { 00167 00168 cout << "pb init_members_list " << endl; 00169 exit(0); 00170 } 00171 00172 for(int i=0; i< MAX_MEMBER; i++) { 00173 00174 if (my_group->members_list[i].ident != UNDEF) { 00175 00176 ident = my_group->members_list[i].ident ; 00177 00178 cur_get->status[ident]= WAITING; 00179 00180 next_get->status[ident]= WAITING; 00181 00182 nb ++; 00183 } 00184 } 00185 //DEBUGcout << " total grsiz = " << nb << endl; 00186 00187 cur_get->grsize = nb; 00188 00189 next_get->grsize = nb; 00190 00191 cur_get->phase = 1 ; 00192 00193 next_get->phase = 2 ; 00194 00195 curphase = 1; 00196 00197 return(0) ; 00198 00199 }
int total_order_channel::init_from_net (char * data, int size) [private]
|
Definition at line 1377 of file total_order_channel.cpp.
01377 { 01378 01379 int ident; 01380 01381 struct member *memb ; 01382 01383 initheader * inhead ; 01384 01385 inhead = (initheader * ) data; 01386 01387 my_ident = inhead->ident ; 01388 01389 net->my_ident = my_ident ; 01390 01391 grsize = inhead->groupsize ; 01392 01393 curphase = inhead->phase; 01394 01395 //DEBUGcout << " Init from net member : *** " << my_ident << " *** " << endl; 01396 01397 //DEBUGcout << " grsize: " << grsize << " , curphase: " << curphase << endl; 01398 01399 prev_get = &get_net_buffer[(curphase +2) % 3]; 01400 01401 cur_get= &get_net_buffer[curphase % 3]; 01402 01403 next_get =&get_net_buffer[(curphase +1) % 3]; 01404 01405 memb = (member *) &data[INHEADSZ]; 01406 01407 for ( int i = 1; i<= grsize ;i ++) { 01408 01409 ident = memb->ident ; 01410 01411 my_group->add_member(memb->ident, memb->nodeid, memb->port ); 01412 01413 cur_get->status[ident]= WAITING; 01414 01415 next_get->status[ident]= WAITING; 01416 01417 memb++; 01418 01419 } 01420 01421 cur_get->grsize = grsize; 01422 01423 next_get->grsize = grsize; 01424 01425 cur_get->phase = curphase ; 01426 01427 next_get->phase = curphase + 1 ; 01428 01429 01430 }
int total_order_channel::new_member_init (int ident) [private]
|
Definition at line 1301 of file total_order_channel.cpp.
01301 { 01302 01303 int size , ret; 01304 01305 char *pt ; 01306 01307 header *head= &init_buf.head.head; 01308 01309 initheader *inhead = &init_buf.head.ihead; 01310 01311 struct member *memb ; 01312 01313 //DEBUGcout << " new_member_init : *** " << my_ident << " *** " << endl; 01314 01315 //DEBUGcout << " grsize: " << next_get->grsize << " , curphase: " << curphase << endl; 01316 01317 size = INHEADSZ + (next_get->grsize * MEMBSZ) ; 01318 01319 init_message.size = size + HEADSZ; 01320 01321 head->sender = my_ident; 01322 01323 head->num_phase = curphase + 1; 01324 01325 head->type = INIT ; 01326 01327 head->size = size; 01328 01329 inhead->ident = ident ; 01330 01331 inhead->groupsize = next_get->grsize ; 01332 01333 inhead->phase = curphase + 1; 01334 01335 pt = &init_buf.buf[HEADSZ + INHEADSZ]; 01336 01337 memb = (struct member *) pt; 01338 01339 int nb = 0; 01340 01341 for(int i=0 ; i< MAX_MEMBER; i++) { 01342 01343 if( my_group->members_list[i].ident != UNDEF ) { 01344 01345 memb->ident = my_group->members_list[i].ident; 01346 01347 memb->nodeid = my_group->members_list[i].nodeid ; 01348 01349 memb->port = my_group->members_list[i].port ; 01350 01351 memb++; 01352 01353 nb++; 01354 } 01355 } 01356 01357 if ( nb != next_get->grsize) { cout << " error groupsize " << next_get->grsize << " != " << nb ;} 01358 01359 /*pthread_mutex_lock(&net_mutex);*/ 01360 01361 ret= net->unicast(ident, init_message.data, init_message.size ); 01362 01363 /*pthread_mutex_unlock(&net_mutex);*/ 01364 01365 if(ret<0) { 01366 01367 cout << " Init unicast error, val =" << ret << endl; 01368 } 01369 01370 init_flag = 0; 01371 }
void total_order_channel::phase_actions () [private]
|
Definition at line 914 of file total_order_channel.cpp.
00914 { 00915 00916 int sz, i,k; 00917 00918 int host, port, ident; 00919 00920 cmdheader * cmdpt; 00921 00922 //DEBUGcout << " phase " << curphase << " phase action " << endl; 00923 00924 /* cout<< " lock recv " << endl; */ 00925 00926 pthread_mutex_lock(&recv_mutex); 00927 00928 /* cout<< " got recv " << endl; */ 00929 00930 /********************************************************************/ 00931 /***** suppress faulty members. This is the critical choice algorithm ****/ 00932 00933 for(int i=0 ; i < MAX_MEMBER ; i++) { 00934 00935 if (prev_get->fault_cnt[i] != 0 || cur_get->fault_cnt[i] != 0 ) { 00936 00937 cur_get->status[i] = FAULTY ; 00938 00939 if (next_get->status[i] != UNDEF ) { 00940 00941 next_get->status[i] = UNDEF ; 00942 00943 next_get->grsize--; 00944 00945 my_group->del_member(i); 00946 00947 cout << " XXXXXXXXXXX " << my_ident << " SUPPRESSION de " << i << endl; 00948 } 00949 00950 if ( prev_get->fault_cnt[i] != 0 ) 00951 00952 prev_get->status[i]= FAULTY; 00953 } 00954 00955 /********************************************************************/ 00956 /* deliver application messages */ 00957 00958 if( prev_get->status[i] == GOTMESS) { 00959 00960 sz = prev_get->message[i].size; 00961 00962 prev_get->message[i].data[sz] = '\0'; 00963 00964 cout << " XXXXXXXXXXX " << my_ident << " DELIVRE de " << i <<": "<< prev_get->message[i].data << endl; 00965 00966 //receive(prev_get->message[i].data, prev_get->message[i].size); 00967 00968 } 00969 } 00970 /********************************************************************/ 00971 /* treat join and quit command messages */ 00972 00973 for(i=0 ; i < MAX_MEMBER ; i++) { 00974 00975 if(prev_get->status[i] == GOTCMD ) { 00976 00977 cmdpt = (cmdheader *) prev_get->message[i].data; 00978 00979 if ( cmdpt->type == JOIN ) { 00980 00981 new_flag = 1; 00982 00983 host = cmdpt->nodeid; 00984 00985 port = cmdpt->port; 00986 00987 if ( next_get->grsize < MAX_MEMBER) { 00988 00989 for(k=1 ; k < MAX_MEMBER ; k++) { 00990 00991 if (next_get->status[k] == UNDEF ) { 00992 00993 ident = k; 00994 00995 next_get->status[k] = NEWMEM; 00996 00997 next_get->grsize++; 00998 00999 my_group->add_member(ident, host, port); 01000 01001 cout << " XXXXXXXXXXX " << my_ident << " AJOUT de " << ident <<endl ; 01002 01003 break; 01004 } 01005 } 01006 } else { 01007 } 01008 } 01009 else if ( cmdpt->type == QUIT ) { 01010 01011 ident = cmdpt->ident; 01012 01013 if(next_get->status[ident] != UNDEF ) { 01014 01015 next_get->status[ident] = UNDEF ; 01016 01017 next_get->grsize--; 01018 01019 my_group->del_member(ident); 01020 } 01021 } 01022 } 01023 } 01024 for(i=0 ; i < MAX_MEMBER ; i++) { 01025 01026 if(next_get->status[i] == NEWMEM) { 01027 01028 new_member_init(i); 01029 01030 next_get->status[i] = WAITING; 01031 } 01032 } 01033 pthread_mutex_unlock(&recv_mutex); 01034 }
void * total_order_channel::phase_code (void *) |
Definition at line 543 of file total_order_channel.cpp.
00543 { 00544 00545 int test, sz , iter ; 00546 00547 header *head ; 00548 00549 char *data; 00550 00551 cout << " start phase thread" << endl; 00552 00553 for (;;) { 00554 00555 pthread_mutex_lock(&phase_mutex); 00556 00557 cout << " ->> on phase " << curphase << endl; 00558 00559 do { 00560 00561 /********************************************************************/ 00562 /* There is a FAULT message to send */ 00563 00564 cout << "PHASE " << curphase << endl; 00565 00566 if ( fault_count) { 00567 00568 cur_get->sendflag = FAULT; 00569 00570 current = &fault_message; 00571 00572 //DEBUGcout << " fault broadcast at phase " << curphase << endl; 00573 00574 test = net->broadcast(fault_message.data, fault_message.size + HEADSZ); 00575 00576 if(test<0) { 00577 cout << "Fault broadcast error, test=" << test << endl; 00578 } 00579 fault_count = 0; 00580 00581 cur_get->status[my_ident]= GOTFAULT; 00582 } 00583 00584 /********************************************************************/ 00585 /* There is a CMD message to broadcast (JOIN / QUIT) */ 00586 00587 else if ( cmd_to_send ) { 00588 00589 cur_get->sendflag = CMD ; 00590 00591 current = &cmd_message ; 00592 00593 data = ¤t->data[HEADSZ]; 00594 00595 cmd_buf.head.head.num_phase = curphase; 00596 00597 //DEBUGcout << " cmd broadcast at phase " << curphase << endl; 00598 00599 test = net->broadcast(cmd_message.data, cmd_message.size + HEADSZ); 00600 00601 if(test<0) { 00602 cout << "Cmd broadcast error, test=" << test << endl; 00603 } 00604 00605 cur_get->status[my_ident]= GOTCMD; 00606 00607 cur_get->message[my_ident].data= new char[current->size]; 00608 00609 memcpy(cur_get->message[my_ident].data, data, current->size); 00610 00611 cur_get->message[my_ident].size = current->size; 00612 00613 cmd_to_send = 0; 00614 } 00615 00616 /********************************************************************/ 00617 /* There is a "standart" message to broadcast at this phase */ 00618 00619 else if ( n_to_send != 0 ) { 00620 00621 cur_get->sendflag = MESS; 00622 00623 total_sent++; 00624 00625 current = (msgtype *) get_first_to_send(); 00626 00627 data = ¤t->data[HEADSZ]; 00628 00629 head = (header *) current->data; 00630 00631 head->sender = my_ident; 00632 00633 head->num_phase = curphase; 00634 00635 head->type = MESS; 00636 00637 head->size = current->size; 00638 00639 //DEBUGcout << " std broadcast at phase " << curphase << endl; 00640 00641 /*pthread_mutex_lock(&net_mutex);*/ 00642 00643 test = net->broadcast( current->data, current->size + HEADSZ); 00644 00645 /*pthread_mutex_unlock(&net_mutex);*/ 00646 00647 if(test < 0){ 00648 cout << " std broadcast error, test = " << test << endl; 00649 } 00650 00651 cur_get->status[my_ident]= GOTMESS; 00652 00653 cur_get->message[my_ident].data= new char[current->size]; 00654 00655 memcpy(cur_get->message[my_ident].data, data, current->size); 00656 00657 cur_get->message[my_ident].size = current->size; 00658 00659 } else { 00660 00661 /********************************************************************/ 00662 /* no message to broadcast at this phase , send ACK */ 00663 00664 cur_get->sendflag = ACK; 00665 00666 current = NULL; 00667 00668 head = & empty_message_header; 00669 00670 head->sender = my_ident; 00671 00672 head->num_phase = curphase; 00673 00674 head->type = ACK; 00675 00676 head->size = 0; 00677 00678 //DEBUGcout << " empty broadcast at phase " << curphase << endl; 00679 00680 /*pthread_mutex_lock(&net_mutex);*/ 00681 00682 test = net->broadcast((char *) head, HEADSZ); 00683 00684 /*pthread_mutex_unlock(&net_mutex);*/ 00685 00686 if(test<0) { 00687 cout << " ack broadcast error, test=" << test << endl; 00688 } 00689 00690 cur_get->status[my_ident]= GOTACK; 00691 } 00692 00693 00694 /**************************************************************************/ 00695 /********************* WAITING end-of phase ********************/ 00696 /* if some message is not yet received , wait for condition, set by deliver() */ 00697 /* routine after receiving messages from all partners */ 00698 00699 if(cur_get->mess_numb != cur_get->grsize - 1 ) { 00700 00701 /* pthread_mutex_unlock(&recv_mutex); */ 00702 00703 if ( new_flag ) max_iter = NEW_MAX_ITER; 00704 00705 else max_iter = MAX_ITER ; 00706 00707 new_flag = 0; 00708 00709 to.tv_sec = time(NULL) + timeout; 00710 to.tv_nsec = 0; 00711 00712 pthread_mutex_lock(&mp); 00713 00714 int ret = pthread_cond_timedwait(&all_received, &mp, &to); 00715 00716 pthread_mutex_unlock(&mp); 00717 00718 for ( iter = 1 ; ret == ETIMEDOUT && iter <= max_iter ; iter ++) { 00719 00720 // timeout routine 00721 00722 if (timeout_handler(iter) != 0 ) { 00723 00724 /* cout << " lock recv " << endl;*/ 00725 00726 /* pthread_mutex_lock(&recv_mutex);*/ 00727 00728 /* cout << " got recv " << endl; */ 00729 00730 if(cur_get->mess_numb != cur_get->grsize - 1 ) { 00731 00732 /* pthread_mutex_unlock(&recv_mutex); */ 00733 00734 to.tv_sec = time(NULL) + timeout ; 00735 to.tv_nsec = 0; 00736 00737 pthread_mutex_lock(&mp); 00738 00739 ret = pthread_cond_timedwait(&all_received, &mp, &to); 00740 00741 pthread_mutex_unlock(&mp); 00742 } 00743 } 00744 00745 else { 00746 ret = 0; 00747 00748 break; 00749 } 00750 } 00751 00752 if (ret) { 00753 00754 if (ret == ETIMEDOUT) { 00755 00756 // some message was not received 00757 00758 fault_handler() ; 00759 } 00760 else if (ret== EINVAL) { 00761 cout << " phase erreur EINVAL cond wait iter " << iter << endl; 00762 exit(0); 00763 00764 } else { 00765 cout << " phase erreur cond wait at iter " << iter << endl; 00766 exit(0); 00767 } 00768 } 00769 } 00770 /* else pthread_mutex_unlock(&recv_mutex); */ 00771 00772 //DEBUGcout << " phase " << curphase << " emissions " << total_sent << endl ; 00773 00774 /**************************************************************************/ 00775 /********************* End-of phase ********************/ 00776 00777 /* take phase decision and actions */ 00778 00779 if (curphase != 1) phase_actions(); 00780 00781 pthread_mutex_trylock(&phase_mutex); 00782 00783 /*******************************************************************/ 00784 /* update send buffers */ 00785 00786 /* cout<< " lock send " << endl; */ 00787 00788 pthread_mutex_lock(&send_mutex); 00789 00790 /* cout<< " got send " << endl; */ 00791 00792 send_buffer_advance(); 00793 00794 pthread_mutex_unlock(&send_mutex); 00795 00796 /*******************************************************************/ 00797 /* Initialise next phase */ 00798 00799 //DEBUGcout << " reinit for next phase " << curphase + 1<< endl; 00800 00801 phase_update(); 00802 00803 //DEBUGcout << " to_send " << n_to_send << " received " << n_received << endl; 00804 00805 } while( (cmd_to_send + n_to_send + n_received + ack_flag + fault_count ) != 0); 00806 } 00807 }
void total_order_channel::phase_update () [private]
|
Definition at line 1039 of file total_order_channel.cpp.
01039 { 01040 01041 01042 /* cout<< " lock recv " << endl; */ 01043 01044 pthread_mutex_lock(&recv_mutex); 01045 01046 /* cout<< " got recv " << endl; */ 01047 01048 curphase++; 01049 01050 previous = current; 01051 01052 current = NULL; 01053 01054 ack_flag = cur_get->mess_flag + cur_get->fault_flag + cur_get->cmd_flag; 01055 01056 /********************************************************************/ 01057 /* phase buffers permutation */ 01058 01059 struct phase_buffer *tmp ; 01060 01061 tmp = prev_get; 01062 01063 prev_get = cur_get; 01064 01065 cur_get = next_get; 01066 01067 next_get= tmp; 01068 01069 /* prev_get = &get_net_buffer[(curphase +2) % 3]; 01070 01071 cur_get= &get_net_buffer[curphase % 3]; 01072 01073 next_get =&get_net_buffer[(curphase +1) % 3]; */ 01074 01075 /********************************************************************/ 01076 /* next buffer initialisation */ 01077 01078 01079 n_received = cur_get->mess_numb; 01080 01081 next_get->phase = curphase + 1; 01082 01083 next_get->grsize = cur_get->grsize; 01084 01085 next_get->sendflag = 0 ; 01086 01087 next_get->mess_numb = 0; 01088 01089 next_get->mess_flag = 0 ; 01090 01091 next_get->fault_flag = 0 ; 01092 01093 next_get->cmd_flag = 0 ; 01094 01095 for(int i=0 ; i< MAX_MEMBER; i++) { 01096 01097 //cout<< " ident " << i << " status " << cur_get->status[i] << endl; 01098 01099 if( cur_get->status[i] == UNDEF ) next_get->status[i] = UNDEF; 01100 01101 else { 01102 next_get->status[i] = WAITING; 01103 01104 //cout << "MEMBER ident : "<< i << endl; 01105 } 01106 01107 next_get-> fault_cnt[i] = 0; 01108 01109 if (next_get->message[i].data != NULL) { 01110 01111 delete(next_get->message[i].data ); 01112 01113 next_get->message[i].data = NULL; 01114 01115 } 01116 01117 next_get->message[i].size = 0; 01118 } 01119 01120 pthread_mutex_unlock(&recv_mutex); 01121 01122 }
void total_order_channel::send_buffer_advance () [private]
|
Definition at line 1219 of file total_order_channel.cpp.
01219 { 01220 01221 int ind; 01222 01223 if (total != 0) { 01224 01225 if (prev_get->sendflag == MESS) { 01226 01227 if ( cur_get->sendflag == MESS ) ind = next - 2 ; 01228 01229 else ind = next - 1; 01230 } 01231 01232 else { 01233 01234 return ; 01235 } 01236 01237 if (ind < 0 ) ind = ind + TAILLE_MAX ; 01238 01239 if (send_buff[ind].size != 0 ) { 01240 01241 delete(send_buff[ind].data) ; 01242 01243 send_buff[ind].data = NULL; 01244 01245 send_buff[ind].size = 0 ; 01246 } 01247 01248 total--; 01249 01250 //DEBUGcout << " total " << total << " next " << next << " last " << last << " ind " << ind << endl; 01251 } 01252 }
int total_order_channel::start_join (char * data, int size) [private]
|
Definition at line 1259 of file total_order_channel.cpp.
01259 { 01260 01261 01262 header *head = &cmd_buf.head.head; 01263 01264 cmdheader *cmdpt= &cmd_buf.head.chead; 01265 01266 if (cmd_to_send == 0 && init_flag == 0) { 01267 01268 head->sender = my_ident; 01269 01270 head->num_phase = -1; 01271 01272 head->type = CMD; 01273 01274 head->size = CMHEADSZ ; 01275 01276 cmdpt->type = JOIN ; 01277 01278 cmdpt->ident = 0 ; 01279 01280 cmdpt->nodeid = * (int *)data; 01281 01282 cmdpt->port = * (int *)(data + sizeof(int)); 01283 01284 cmd_message.size = head->size ; 01285 01286 cmd_to_send = 1; 01287 01288 init_flag = 1; 01289 } 01290 01291 else { 01292 01293 cout << " start_join aborted : *** " << my_ident << " *** " << endl; 01294 } 01295 01296 }
int total_order_channel::start_net_init (char * file) [private]
|
Definition at line 133 of file total_order_channel.cpp.
00133 { 00134 00135 00136 header *head= &cmd_buf.head.head; 00137 int * pt; 00138 00139 head->sender = -1; 00140 00141 head->num_phase = -1; 00142 00143 head->type = NEW; 00144 00145 head->size = 2 * sizeof(int) ; 00146 00147 pt = (int *) &cmd_buf.buf[HEADSZ]; 00148 00149 *pt = net->get_mynode(); 00150 00151 *(pt + 1) = net->get_myport(); 00152 00153 cout << "***** start_net_init, my_port = " << *(pt + 1) <<"****" << endl; 00154 00155 net->init_membership(file, cmd_message.data, head->size + HEADSZ ); 00156 00157 }
int total_order_channel::timeout_handler (int it) [private]
|
Definition at line 811 of file total_order_channel.cpp.
00811 { 00812 00813 int test; 00814 00815 //DEBUGcout << " TIMEOUT at iter " << it << endl; 00816 00817 union{ 00818 00819 header head; 00820 char buf[25]; 00821 00822 } nack_message; 00823 00824 int dest_list[MAX_MEMBER] ; 00825 00826 int dest ; 00827 00828 dest = (cur_get->grsize - cur_get->mess_numb) -1; //number of missing senders 00829 00830 if ( dest == 0 ) return( 0); 00831 00832 dest = 0; 00833 00834 //Fill nack message 00835 00836 header *head= &nack_message.head; 00837 00838 head->sender = my_ident; 00839 00840 head->num_phase = curphase; 00841 00842 head->type = NACK; 00843 00844 head->size = 0; 00845 00846 for(int id =0 ; id < MAX_MEMBER ; id++) { 00847 00848 if( id != my_ident && cur_get->status[id]== WAITING ) { 00849 00850 dest_list[dest]= id; 00851 00852 dest ++; 00853 00854 //DEBUGcout << " Nack for " << id << endl; 00855 } 00856 } 00857 if ( dest == 0 ) return( 0); 00858 00859 test= net->multicast(dest, dest_list , nack_message.buf, HEADSZ ); 00860 00861 if(test<0) { 00862 cout << "nack broadcast error test=" << test << endl; 00863 } 00864 return(1); 00865 00866 }
int total_order_channel::ack_flag [private]
|
Definition at line 234 of file total_order_channel.h.
pthread_cond_t total_order_channel::all_received [private]
|
Definition at line 337 of file total_order_channel.h.
char total_order_channel::buf[400] [private]
|
Definition at line 161 of file total_order_channel.h.
cmdheader total_order_channel::chead |
Definition at line 188 of file total_order_channel.h.
union { ... } cmd_buf [private]
|
msgtype total_order_channel::cmd_message [private]
|
Definition at line 184 of file total_order_channel.h.
int total_order_channel::cmd_to_send [private]
|
Definition at line 232 of file total_order_channel.h.
struct phase_buffer * total_order_channel::cur_get [private]
|
Definition at line 285 of file total_order_channel.h.
int total_order_channel::curphase [private]
|
Definition at line 75 of file total_order_channel.h.
msgtype * total_order_channel::current [private]
|
Definition at line 109 of file total_order_channel.h.
int total_order_channel::delt [private]
|
Definition at line 219 of file total_order_channel.h.
header total_order_channel::empty_message_header [private]
|
Definition at line 199 of file total_order_channel.h.
union { ... } fault_buf [private]
|
int total_order_channel::fault_count [private]
|
Definition at line 238 of file total_order_channel.h.
msgtype total_order_channel::fault_message [private]
|
Definition at line 204 of file total_order_channel.h.
struct total_order_channel::phase_buffer total_order_channel::get_net_buffer[3] [private]
|
int total_order_channel::grsize [private]
|
Definition at line 77 of file total_order_channel.h.
struct { ... } head [private]
|
struct { ... } head [private]
|
header total_order_channel::head |
Definition at line 158 of file total_order_channel.h.
initheader total_order_channel::ihead |
Definition at line 159 of file total_order_channel.h.
union { ... } init_buf [private]
|
int total_order_channel::init_flag [private]
|
Definition at line 240 of file total_order_channel.h.
msgtype total_order_channel::init_message [private]
|
Definition at line 155 of file total_order_channel.h.
char total_order_channel::ipadd[15] [private]
|
Definition at line 95 of file total_order_channel.h.
int total_order_channel::last [private]
|
Definition at line 221 of file total_order_channel.h.
int total_order_channel::max_iter [private]
|
Definition at line 244 of file total_order_channel.h.
pthread_mutex_t total_order_channel::mp [private]
|
Definition at line 339 of file total_order_channel.h.
group_admin * total_order_channel::my_group [private]
|
Definition at line 93 of file total_order_channel.h.
int total_order_channel::my_ident [private]
|
Definition at line 97 of file total_order_channel.h.
int total_order_channel::n_received [private]
|
Definition at line 228 of file total_order_channel.h.
int total_order_channel::n_to_send [private]
|
Definition at line 230 of file total_order_channel.h.
no_prop_channel * total_order_channel::net [private]
|
Definition at line 66 of file total_order_channel.h.
int total_order_channel::new_flag [private]
|
Definition at line 236 of file total_order_channel.h.
int total_order_channel::next [private]
|
Definition at line 220 of file total_order_channel.h.
struct phase_buffer * total_order_channel::next_get [private]
|
Definition at line 285 of file total_order_channel.h.
pthread_mutex_t total_order_channel::phase_mutex [private]
|
Definition at line 343 of file total_order_channel.h.
pthread_t total_order_channel::phase_thread [private]
|
Definition at line 68 of file total_order_channel.h.
struct phase_buffer * total_order_channel::prev_get [private]
|
Definition at line 285 of file total_order_channel.h.
msgtype * total_order_channel::previous [private]
|
Definition at line 109 of file total_order_channel.h.
pthread_mutex_t total_order_channel::recv_mutex [private]
|
Definition at line 347 of file total_order_channel.h.
msgtype total_order_channel::send_buff[TAILLE_MAX] [private]
|
Definition at line 217 of file total_order_channel.h.
pthread_mutex_t total_order_channel::send_mutex [private]
|
Definition at line 345 of file total_order_channel.h.
int total_order_channel::status [private]
|
Definition at line 73 of file total_order_channel.h.
int total_order_channel::timeout [private]
|
Definition at line 245 of file total_order_channel.h.
timestruc_t total_order_channel::to [private]
|
Definition at line 349 of file total_order_channel.h.
int total_order_channel::total [private]
|
Definition at line 222 of file total_order_channel.h.
int total_order_channel::total_sent [private]
|
Definition at line 79 of file total_order_channel.h.