JGRoups源码

源代码在线查看: fd_rand.java.txt

软件大小: 8273 K
上传用户: pangyi
关键词: JGRoups 源码
下载地址: 免注册下载 普通下载 VIP

相关代码

				// $Id: FD_RAND.java.txt,v 1.3 2005/05/30 14:31:05 belaban Exp $								package org.jgroups.protocols;								import java.util.Properties;				import java.util.Vector;												class FdRandHeader extends Header {				    static final int HEARTBEAT     = 0;				    static final int HEARTBEAT_ACK = 1;				    static final int SUSPECT       = 2;				    static final int REGULAR       = 3;												    int     type=HEARTBEAT;				    Object  suspected_mbr=null;								    FdRandHeader(int type) {this.type=type;}												    public String toString() {					switch(type) {					case HEARTBEAT:					    return "[FD_RAND: heartbeat]";					case HEARTBEAT_ACK:					    return "[FD_RAND: heartbeat ack]";					case SUSPECT:					    return "[FD_RAND: suspect]";					case REGULAR:					    return "[FD_RAND: regular message]";					default:					    return "[FD_RAND: unknown type (" + type + ")]";					}				    }				}																				/**				   Failure detection based on simple heartbeat protocol. Regularly polls randomly				   selected members for liveness. Passes SUSPECT message up the stack when a member is				   not reachable. The simple algorithms works as follows: the membership is known.  Each				   HB protocol periodically sends a 'are-you-alive' message to a randomly selected				   member, except itself. When a response hasn't been received for n milliseconds and m				   tries, the corresponding member is suspected (and eventually excluded if faulty).				   FD_RAND starts when it detects (in a view change notification) that there are at least				   2 members in the group. It stops running when the membership drops below 2.  */								public class FD_RAND extends Protocol implements Runnable {				    boolean      trace=false;				    Address      ping_dest=null;				    Address      local_addr=null;				    Thread       pinger=null;				    long         timeout=1000;  // 1 second between heartbeats				    boolean      ack_received=false;				    Vector       members=null;				    int          num_tries=0;				    final int    max_tries=2;   // 3 tries before suspecting				    Object       ack_mutex=new Object();												    public String  getName() {return "FD_RAND";}												    public boolean setProperties(Properties props) {super.setProperties(props);					String     str;									this.props=props;					str=props.getProperty("trace");					if(str != null) {					    trace=new Boolean(str).booleanValue();					    props.remove("trace");					}										str=props.getProperty("timeout");					if(str != null) {					    timeout=new Long(str).longValue();					    props.remove("timeout");					}														str=props.getProperty("num_tries");					if(str != null) {					    num_tries=new Integer(str).intValue();					    props.remove("num_tries");					    if(num_tries < 1) {						log.error("FD_RAND.setProperties(): propertiy 'num_tries' must be at least 1 ! " +								   "Setting it to 1");						num_tries=1;					    }					}													if(props.size() > 0) {					    log.error("FD_RAND.setProperties(): the following properties are not recognized: " + props);					    					    return false;					}					return true;				    }																				    Address getPingDest() {					Address retval=null;					int     r, size;									if(members == null || members.size() < 2 || local_addr == null)					    return null;					size=members.size();					while(members.size() > 1) {					    r=((int)(Math.random() * (size+1))) % size;					    retval=(Address)members.elementAt(r);					    if(local_addr.equals(retval))						continue;					    else						break;					}						return retval;				    }																    public void up(Event evt) {					Message       msg;					FdRandHeader  hdr=null;									switch(evt.getType()) {									case Event.SET_LOCAL_ADDRESS:					    local_addr=(Address)evt.getArg();					    break;					    					case Event.MSG:					    msg=(Message)evt.getArg();									    try {						hdr=(FdRandHeader)msg.removeHeader();					    }					    catch(Exception e) {						log.error("FD_RAND.up(): " + e);					    }									    switch(hdr.type) {					    case FdRandHeader.HEARTBEAT:                         // heartbeat request; send heartbeat ack						Message      hb_ack=new Message(msg.getSrc(), null, null);						FdRandHeader tmp_hdr=new FdRandHeader(FdRandHeader.HEARTBEAT_ACK);						tmp_hdr.suspected_mbr=local_addr;								hb_ack.addHeader(tmp_hdr);						passDown(new Event(Event.MSG, hb_ack));						return;                                          // don't pass up !					    case FdRandHeader.HEARTBEAT_ACK:                     // heartbeat ack						Object suspect=hdr.suspected_mbr;						if(ping_dest != null && ping_dest.equals(suspect)) {						    synchronized(ack_mutex) {							ack_received=true;							ack_mutex.notify();						    }						}						return;					    case FdRandHeader.SUSPECT:						if(hdr.suspected_mbr != null) {						    System.out.println("FD_RAND: SUSPECT(" + hdr.suspected_mbr + ")");						    passUp(new Event(Event.SUSPECT, hdr.suspected_mbr));						}						return;					    default:						break;					    }	    					}					passUp(evt);                                        // pass up to the layer above us				    }																    public void down(Event evt) {					Message msg;										switch(evt.getType()) {					case Event.STOP:					    stop();					    passDown(evt);					    break;					    					case Event.VIEW_CHANGE:					    synchronized(this) {						stop();						View v=(View)evt.getArg();						members=v != null ? v.getMembers() : null;								passDown(evt);						start();					    }					    break;									case Event.MSG:					    msg=(Message)evt.getArg();					    msg.addHeader(new FdRandHeader(FdRandHeader.REGULAR));  // regular message					    passDown(evt);					    break;									default:					    passDown(evt);					    break;					}				    }																    /**				       Loop while more than 1 member available. Choose a member randomly (not myself !) and send a				       heartbeat. Wait for ack. If ack not received withing timeout, mcast SUSPECT message.				     */				    public void run() {					Message       suspect_msg, hb_req;					FdRandHeader  hdr;													while(members.size() > 1 && pinger != null) {					    ack_received=false;					    num_tries=0;									    ping_dest=getPingDest();									    while(!ack_received && num_tries 						hb_req=new Message(ping_dest, null, null);						hb_req.addHeader(new FdRandHeader(FdRandHeader.HEARTBEAT));  // send heartbeat request						passDown(new Event(Event.MSG, hb_req));	    												synchronized(ack_mutex) {                                    // wait for heartbeat ack						    try {ack_mutex.wait(timeout);}						    catch(Exception e) {}						}						if(pinger == null) return;						if(ack_received) {						    Util.sleep(timeout);						    break;						}						else {						    if(num_tries >= max_tries) {							System.out.println("FD_RAND(" + local_addr + "): received no heartbeat ack from " + 									   ping_dest + ", suspecting it");							hdr=new FdRandHeader(FdRandHeader.SUSPECT);							hdr.suspected_mbr=ping_dest;							suspect_msg=new Message(null, null, null);  // mcast SUSPECT to all members							suspect_msg.addHeader(hdr);							passDown(new Event(Event.MSG, suspect_msg));							break;						    }						    else {							num_tries++;							Util.sleep(timeout);						    }						}					    }					}				    }																								    void start() {					if(pinger == null) {					    pinger=new Thread(this, "FD_RAND.PingerThread");					    pinger.start();					}				    }												    void stop() {					Thread tmp=null;					num_tries=0;					ack_received=false;					if(pinger != null && pinger.isAlive()) {					    tmp=pinger;					    pinger=null;					    tmp.interrupt();					    try {tmp.join(timeout);} catch(Exception ex) {}					}					pinger=null;				    }												}							

相关资源