00001 /**********************************************************************************************/ 00002 /*************************** TOTAL ORDER CHANNEL CLASS *************************/ 00003 /**************************** Jean FANCHON. LAAS-CNRS*************************/ 00004 /**********************************************************************************************/ 00005 00006 00007 #include "no_prop_channel.h" 00008 00009 /********************************************************************/ 00010 /********************************************************************/ 00011 00012 struct member { 00013 00014 int ident; 00015 00016 int nodeid; 00017 00018 int port; 00019 00020 }; 00021 00022 class group_admin { 00023 00024 /********************************************************************/ 00025 00026 public: 00027 group_admin() ; 00028 00029 int init_members_list(char * file) ; 00030 00031 int add_member( int , int , int ) ; 00032 00033 int del_member( int ) ; 00034 00035 int host_number(int ) ; 00036 00037 int port_number(int ); 00038 00039 int groupsize; 00040 00041 struct member members_list[MAX_MEMBER]; 00042 00043 #define MEMBSZ sizeof(struct member) 00044 00045 } ; 00046 /********************************************************************/ 00047 /********************************************************************/ 00048 00049 class total_order_channel { // : public channel, channeluser { 00050 00051 /********************************************************************/ 00052 /*interface*/ 00053 00054 public: 00055 total_order_channel(int , char *, char *); 00056 00057 int deliver ( char *, int); // from network 00058 00059 int broadcast ( char *, int ); // from application 00060 00061 void * phase_code(void *); //Phase thread code 00062 00063 00064 private: 00065 00066 no_prop_channel * net; // the no-prop channel UDP 00067 00068 pthread_t phase_thread; //Phase thread structure 00069 00070 /********************************************************************/ 00071 /*local state*/ 00072 00073 int status ; /* initialisation, running or ending */ 00074 00075 int curphase; 00076 00077 int grsize; 00078 00079 int total_sent; 00080 00081 #define START 0 // waiting for channel information 00082 #define PRERUN 1 // initialising 00083 #define RUN 2 // running 00084 /* #define SYNC 3 resynchronising */ 00085 #define ERROR 4 // unexpected (!!) situation 00086 #define STOP 5 // waiting for group close confirmation 00087 #define END 6 // after group close confirmation 00088 00089 00090 /********************************************************************/ 00091 /******************* Group data *******************/ 00092 00093 group_admin *my_group; 00094 00095 char ipadd[15]; 00096 00097 int my_ident; //local member ident 00098 00099 /********************************************************************/ 00100 /******************* messages data *******************/ 00101 00102 typedef struct { 00103 00104 char *data; 00105 int size; 00106 00107 } msgtype ; 00108 00109 msgtype *current , *previous; 00110 00111 /********************************************************************/ 00112 /******************* message header structure*******************/ 00113 00114 typedef struct { 00115 00116 int sender; //ident of the sender 00117 00118 int num_phase; //phase number 00119 00120 int type; // 00121 00122 int size; // 00123 00124 } header; 00125 00126 #define HEADSZ sizeof(header) 00127 00128 /********** Definition of message types ************************/ 00129 00130 #define NEW 1 // new member demand, (one destination) 00131 #define INIT 2 // channel parameters (ident, grsize etc ..) 00132 #define MESS 3 // application message 00133 #define CMD 4 // JOIN, QUIT 00134 #define ACK 5 // empty message 00135 #define NACK 6 // multicasted to late senders 00136 #define FAULT 7 // list of faulty idents 00137 00138 /********************************************************************/ 00139 /******************* INIT message structure*******************/ 00140 00141 typedef struct { 00142 00143 int ident; 00144 00145 int groupsize; 00146 00147 int phase; 00148 00149 } initheader; 00150 00151 #define INHEADSZ sizeof(initheader) 00152 00153 /******************* init message *******************/ 00154 00155 msgtype init_message; 00156 union { 00157 struct { 00158 header head; 00159 initheader ihead; 00160 } head; 00161 char buf[400]; 00162 00163 } init_buf ; 00164 00165 /********************************************************************/ 00166 /******************* CMD message structure ***********************/ 00167 00168 typedef struct { 00169 00170 int type ; // JOIN or QUIT 00171 00172 int ident; 00173 00174 int nodeid; 00175 00176 int port; 00177 00178 } cmdheader; 00179 00180 #define CMHEADSZ sizeof(cmdheader) 00181 00182 /******************* cmd (join/quit) message *******************/ 00183 00184 msgtype cmd_message; 00185 union { 00186 struct { 00187 header head; 00188 cmdheader chead; 00189 } head; 00190 char buf[100]; 00191 } cmd_buf ; 00192 00193 #define JOIN 1 00194 #define QUIT 2 00195 00196 /********************************************************************/ 00197 /******************* ack message *******************/ 00198 00199 header empty_message_header; 00200 00201 /********************************************************************/ 00202 /******************* fault message *******************/ 00203 00204 msgtype fault_message; 00205 union { 00206 header head; 00207 char buf[400]; 00208 00209 } fault_buf; 00210 00211 00212 /********************************************************************/ 00213 /******************* send buffer *******************/ 00214 00215 #define TAILLE_MAX 100 00216 00217 msgtype send_buff[TAILLE_MAX]; 00218 00219 int delt, // next to delete ( unused ) 00220 next, // next to send 00221 last, // input index ( last to send +1) 00222 total; 00223 00224 00225 /********************************************************************/ 00226 /************** current phase data *********************/ 00227 00228 int n_received; // number of senders of the received messages at curphase 00229 00230 int n_to_send; // number of unsent buffered messages 00231 00232 int cmd_to_send; // a join (or quit) message has to be sent 00233 00234 int ack_flag; // a message has to be acknowledged at curphase 00235 00236 int new_flag; // a new member is joining 00237 00238 int fault_count; // number of missing messages/members 00239 00240 int init_flag; // in charge of a join process 00241 00242 /*************** timeout and iteration values ( dynamic and static) *****************/ 00243 00244 int max_iter; 00245 int timeout ; 00246 00247 #define TIMEOUT1 1 00248 #define TIMEOUT2 2 00249 #define MAX_ITER 2 00250 #define NEW_MAX_ITER 2 * MAX_ITER 00251 00252 /********************************************************************/ 00253 /******** per-phase structure with input message buffers *************/ 00254 00255 struct phase_buffer { 00256 00257 int phase; 00258 00259 int sendflag; // type of message sent at curphase 00260 00261 int grsize; // number of members 00262 00263 int mess_numb; // number of senders for the received messages 00264 00265 int mess_flag ; // a MESS has been received at this phase 00266 00267 int fault_flag; // a FAULT has been received at this phase 00268 00269 int cmd_flag; // a CMD has been received at this phase 00270 00271 int status[MAX_MEMBER]; 00272 00273 //see values below 00274 00275 int fault_cnt[MAX_MEMBER]; 00276 00277 // count of FAULT messages for that member ( previous phase ) 00278 00279 msgtype message[MAX_MEMBER]; 00280 00281 // the messages received at this phase; 00282 00283 } get_net_buffer[3]; 00284 00285 struct phase_buffer *prev_get, *cur_get, *next_get; 00286 00287 /********************************************************************/ 00288 /********* (per-phase) status of members *********/ 00289 00290 #define UNDEF -1 00291 #define WAITING 1 00292 #define FAULTY 2 00293 #define NEWMEM 4 00294 #define GOT 32 00295 #define GOTMESS GOT + MESS 00296 #define GOTCMD GOT + CMD 00297 #define GOTACK GOT + ACK 00298 #define GOTFAULT GOT + FAULT 00299 00300 /********************************************************************/ 00301 /********************************************************************/ 00302 /*****************initialisation method *****************/ 00303 00304 int init_from_file(char *) ; 00305 00306 int start_net_init(char *) ; 00307 00308 int init_from_net(char * data, int size) ; 00309 00310 void init_data(); 00311 00312 /*****************per-phase methods *****************/ 00313 00314 int timeout_handler( int ); 00315 00316 void fault_handler(); 00317 00318 void phase_update() ; 00319 00320 void phase_actions(); 00321 00322 void send_buffer_advance(); 00323 00324 void * get_first_to_send(); 00325 00326 /****** initiate a JOIN process for a new member ***********/ 00327 00328 int start_join(char *data, int size ); 00329 00330 /***** terminate a JOIN process ( send data to new member ) ***********/ 00331 00332 int new_member_init( int ident ); 00333 00334 /********************************************************************/ 00335 /****************Synchronisation objects ****************/ 00336 00337 pthread_cond_t all_received; //waiting for the reception from all members 00338 00339 pthread_mutex_t mp ; 00340 00341 /*pthread_mutex_t net_mutex; */ //to access UDP 00342 00343 pthread_mutex_t phase_mutex; //to start a new phase 00344 00345 pthread_mutex_t send_mutex; //mutual exclusion for send buffer 00346 00347 pthread_mutex_t recv_mutex; //mutual exclusion for recv buffer 00348 00349 timestruc_t to; // time struct for timeouts 00350 00351 00352 }; 00353 00354 00355 00356 00357 00358 00359 00360 00361 00362 00363 00364 00365