JGRoups源码
源代码在线查看: fd_rand.java.txt
// $Id: FD_RAND.java.txt,v 1.3 2005/05/30 14:31:05 belaban Exp $ package org.jgroups.protocols; import java.util.Properties; import java.util.Vector; class FdRandHeader extends Header { static final int HEARTBEAT = 0; static final int HEARTBEAT_ACK = 1; static final int SUSPECT = 2; static final int REGULAR = 3; int type=HEARTBEAT; Object suspected_mbr=null; FdRandHeader(int type) {this.type=type;} public String toString() { switch(type) { case HEARTBEAT: return "[FD_RAND: heartbeat]"; case HEARTBEAT_ACK: return "[FD_RAND: heartbeat ack]"; case SUSPECT: return "[FD_RAND: suspect]"; case REGULAR: return "[FD_RAND: regular message]"; default: return "[FD_RAND: unknown type (" + type + ")]"; } } } /** Failure detection based on simple heartbeat protocol. Regularly polls randomly selected members for liveness. Passes SUSPECT message up the stack when a member is not reachable. The simple algorithms works as follows: the membership is known. Each HB protocol periodically sends a 'are-you-alive' message to a randomly selected member, except itself. When a response hasn't been received for n milliseconds and m tries, the corresponding member is suspected (and eventually excluded if faulty). FD_RAND starts when it detects (in a view change notification) that there are at least 2 members in the group. It stops running when the membership drops below 2. */ public class FD_RAND extends Protocol implements Runnable { boolean trace=false; Address ping_dest=null; Address local_addr=null; Thread pinger=null; long timeout=1000; // 1 second between heartbeats boolean ack_received=false; Vector members=null; int num_tries=0; final int max_tries=2; // 3 tries before suspecting Object ack_mutex=new Object(); public String getName() {return "FD_RAND";} public boolean setProperties(Properties props) {super.setProperties(props); String str; this.props=props; str=props.getProperty("trace"); if(str != null) { trace=new Boolean(str).booleanValue(); props.remove("trace"); } str=props.getProperty("timeout"); if(str != null) { timeout=new Long(str).longValue(); props.remove("timeout"); } str=props.getProperty("num_tries"); if(str != null) { num_tries=new Integer(str).intValue(); props.remove("num_tries"); if(num_tries < 1) { log.error("FD_RAND.setProperties(): propertiy 'num_tries' must be at least 1 ! " + "Setting it to 1"); num_tries=1; } } if(props.size() > 0) { log.error("FD_RAND.setProperties(): the following properties are not recognized: " + props); return false; } return true; } Address getPingDest() { Address retval=null; int r, size; if(members == null || members.size() < 2 || local_addr == null) return null; size=members.size(); while(members.size() > 1) { r=((int)(Math.random() * (size+1))) % size; retval=(Address)members.elementAt(r); if(local_addr.equals(retval)) continue; else break; } return retval; } public void up(Event evt) { Message msg; FdRandHeader hdr=null; switch(evt.getType()) { case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; case Event.MSG: msg=(Message)evt.getArg(); try { hdr=(FdRandHeader)msg.removeHeader(); } catch(Exception e) { log.error("FD_RAND.up(): " + e); } switch(hdr.type) { case FdRandHeader.HEARTBEAT: // heartbeat request; send heartbeat ack Message hb_ack=new Message(msg.getSrc(), null, null); FdRandHeader tmp_hdr=new FdRandHeader(FdRandHeader.HEARTBEAT_ACK); tmp_hdr.suspected_mbr=local_addr; hb_ack.addHeader(tmp_hdr); passDown(new Event(Event.MSG, hb_ack)); return; // don't pass up ! case FdRandHeader.HEARTBEAT_ACK: // heartbeat ack Object suspect=hdr.suspected_mbr; if(ping_dest != null && ping_dest.equals(suspect)) { synchronized(ack_mutex) { ack_received=true; ack_mutex.notify(); } } return; case FdRandHeader.SUSPECT: if(hdr.suspected_mbr != null) { System.out.println("FD_RAND: SUSPECT(" + hdr.suspected_mbr + ")"); passUp(new Event(Event.SUSPECT, hdr.suspected_mbr)); } return; default: break; } } passUp(evt); // pass up to the layer above us } public void down(Event evt) { Message msg; switch(evt.getType()) { case Event.STOP: stop(); passDown(evt); break; case Event.VIEW_CHANGE: synchronized(this) { stop(); View v=(View)evt.getArg(); members=v != null ? v.getMembers() : null; passDown(evt); start(); } break; case Event.MSG: msg=(Message)evt.getArg(); msg.addHeader(new FdRandHeader(FdRandHeader.REGULAR)); // regular message passDown(evt); break; default: passDown(evt); break; } } /** Loop while more than 1 member available. Choose a member randomly (not myself !) and send a heartbeat. Wait for ack. If ack not received withing timeout, mcast SUSPECT message. */ public void run() { Message suspect_msg, hb_req; FdRandHeader hdr; while(members.size() > 1 && pinger != null) { ack_received=false; num_tries=0; ping_dest=getPingDest(); while(!ack_received && num_tries hb_req=new Message(ping_dest, null, null); hb_req.addHeader(new FdRandHeader(FdRandHeader.HEARTBEAT)); // send heartbeat request passDown(new Event(Event.MSG, hb_req)); synchronized(ack_mutex) { // wait for heartbeat ack try {ack_mutex.wait(timeout);} catch(Exception e) {} } if(pinger == null) return; if(ack_received) { Util.sleep(timeout); break; } else { if(num_tries >= max_tries) { System.out.println("FD_RAND(" + local_addr + "): received no heartbeat ack from " + ping_dest + ", suspecting it"); hdr=new FdRandHeader(FdRandHeader.SUSPECT); hdr.suspected_mbr=ping_dest; suspect_msg=new Message(null, null, null); // mcast SUSPECT to all members suspect_msg.addHeader(hdr); passDown(new Event(Event.MSG, suspect_msg)); break; } else { num_tries++; Util.sleep(timeout); } } } } } void start() { if(pinger == null) { pinger=new Thread(this, "FD_RAND.PingerThread"); pinger.start(); } } void stop() { Thread tmp=null; num_tries=0; ack_received=false; if(pinger != null && pinger.isAlive()) { tmp=pinger; pinger=null; tmp.interrupt(); try {tmp.join(timeout);} catch(Exception ex) {} } pinger=null; } }