JGRoups源码

源代码在线查看: uniform.java.txt

软件大小: 8273 K
上传用户: pangyi
关键词: JGRoups 源码
下载地址: 免注册下载 普通下载 VIP

相关代码

				// $Id: UNIFORM.java.txt,v 1.3 2005/05/30 14:31:05 belaban Exp $								package org.jgroups.protocols;								import java.io.Serializable;				import java.util.Hashtable;				import java.util.Properties;				import java.util.Vector;																class UniformHeader implements Serializable {				    public static final int SAVE        = 0;				    public static final int SAVE_OK     = 1;				    public static final int DELIVER     = 2;				    public static final int DELIVER_OK  = 3;				    public static final int SEEN        = 4;				    public static final int SEEN_OK     = 5;				    public static final int SEEN_NOTOK  = 6;				    public static final int GC          = 7;				    public static final int GC_OK       = 8;								    public int      type=-1;				    public long     id=-1;				    public boolean  handle=true;												    String type2Str(int t) {					switch(t) {					case SAVE:        return "SAVE";					case SAVE_OK:     return "SAVE_OK";					case DELIVER:     return "DELIVER";					case DELIVER_OK:  return "DELIVER_OK";					case SEEN:        return "SEEN";					case SEEN_OK:     return "SEEN_OK";					case GC:          return "GC";					case GC_OK:       return "GC_OK";					default:          return "";					}				    }								    				    public UniformHeader() {handle=false;}				    								    public UniformHeader(int type) {					this.type=type;					id=System.currentTimeMillis();					handle=true;				    }												    public UniformHeader(int type, long id) {					this.type=type;					this.id=id;					handle=true;				    }												    public String toString() {return "[UNIFORM: type=" + type2Str(type) + ", id=" + id + "]";}				}																								/**				   The algorithms implements dynamically-uniform failure-atomic group multicast,				   that is, a message is delivered by all members if it is delivered by at least 1				   non-faulty member even if the sender crashes after sending. If the sender crashes, it				   will eventually be removed from the group membership: the FLUSH protocol preceding the				   view change causes all pending multicasts to be flushed out of the system, thereby				   re-sending pending multicasts to members that haven't received them yet.				   The protocol makes use of GroupRequest (which itself uses 				   RequestCorrelator) to send a request to all members and receive responses from				   all non-faulty members.				 */								public class UNIFORM extends Protocol implements RequestHandler, Transport {				    Vector              members=null;				    boolean             trace=false;				    RequestCorrelator   corr=new RequestCorrelator(getName(), this, this);				    Hashtable           pending=new Hashtable();    // key = sender, val = Hashtable (msg-id, msg)				    Hashtable           delivered=new Hashtable();  // key = sender, val = Hashtable (msg-id, msg)												    public String  getName() {return "UNIFORM";}												    public boolean setProperties(Properties props) {super.setProperties(props);					String     str;									this.props=props;					str=props.getProperty("trace");					if(str != null) {					    trace=new Boolean(str).booleanValue();					    props.remove("trace");					}					if(props.size() > 0) {					    log.error("UNIFORM.setProperties(): the following properties are not recognized: " + props);					    					    return false;					}					return true;				    }												    /** Just remove if you don't need to reset any state */				    public void reset() {}																				    public void up(Event evt) {					Message   msg;					boolean   rc;					Object    obj;									if(evt.getType() == Event.START) {					    corr.start();					    passUp(evt);					    return;					}					corr.receive(evt);	    				    }																				    public void down(Event evt) {					Message       msg;					GroupRequest  save_req, deliver_req, seen_req, gc_req;					AndCommand    and_comm;					Message       save_msg, deliver_msg, seen_msg, gc_msg;					Vector        mbrs=null;					long          id=0;									switch(evt.getType()) {					case Event.STOP:					    corr.Stop();					    passDown(evt);					    break;									case Event.TMP_VIEW:					case Event.VIEW_CHANGE:					    Vector tmp;					    if((tmp=(Vector)((View)evt.getArg()).getMembers()) != null)						members=tmp;					    passDown(evt);					    break;									case Event.MSG:					    msg=(Message)evt.getArg();													    if(msg.getDest() != null) { // unicast msg						passDown(evt);						return;					    }									    id=System.currentTimeMillis();					    mbrs=(Vector)members.clone();																	    /*					      1. Create 4 commands (SaveCommand, OkCommand, SeenCommand and GcCommand).					         Each has the same unique ID, and each is tagged with its type 						 (e.g. SAVE_REQ, OK_REQ etc). ID and type are contained in a UniformHeader						 attached to the message (with each command).					      2. Create an AndCommand and add the 4 commands.					      3. Add the AndCommand to a list of currently running commands and execute it.					      4. When a FLUSH request is received, wait until all commands are done.					     */									    save_msg=msg;					    save_msg.addHeader(new UniformHeader(UniformHeader.SAVE, id));					    save_req=new GroupRequest(save_msg, corr, mbrs, GroupRequest.GET_ALL);									    deliver_msg=new Message(null, null, null);					    deliver_msg.addHeader(new UniformHeader(UniformHeader.DELIVER, id));					    deliver_req=new GroupRequest(deliver_msg, corr, mbrs, GroupRequest.GET_ALL);									    seen_msg=new Message(null, null, null);					    seen_msg.addHeader(new UniformHeader(UniformHeader.SEEN, id));					    seen_req=new GroupRequest(seen_msg, corr, mbrs, GroupRequest.GET_ALL);									    gc_msg=new Message(null, null, null);					    gc_msg.addHeader(new UniformHeader(UniformHeader.GC, id));					    gc_req=new GroupRequest(gc_msg, corr, mbrs, GroupRequest.GET_ALL);									    and_comm=new AndCommand();					    and_comm.add(save_req); 					    and_comm.add(deliver_req);					    and_comm.add(seen_req);					    and_comm.add(gc_req);									    boolean rc=and_comm.execute();					    System.out.println("UNIFORM: rc from Execute is " + rc);													    break;									default:					    passDown(evt);  // Pass on to the layer below us					}								    }																				    /**				       				       1. Remove UniformHeader from message and get ID and type				       2. If type == SAVE: add message to save-table (key=sender + ID) and send response (SAVE_OK)				          If type == OK:   add message to ok-table, remove from save-table. Deliver message (pass up					  the stack) and send response (OK_OK).					  If type == SEEN: find message in ok-table. If found, send SEEN_OK response, else NOT_SEEN.					  If type == GC: delete message from ok-table.				       				     */				    public Object handle(Message msg) {					UniformHeader  hdr;					Object         obj=msg.peekHeader();					Message        m=null;									if(obj != null && obj instanceof UniformHeader) {					    hdr=(UniformHeader)msg.removeHeader();									    switch(hdr.type) {					    case UniformHeader.SAVE:										System.out.println("==> save in pending: " + msg.getSrc() + ":" + hdr.id);										saveInPending(hdr.id, msg);						return new Integer(UniformHeader.SAVE_OK);					    case UniformHeader.DELIVER:										System.out.println("==> move to delivered: " + msg.getSrc() + ":" + hdr.id);										m=moveFromPendingToDelivered(msg.getSrc(), hdr.id);						if(m != null)						    passUp(new Event(Event.MSG, m));						return new Integer(UniformHeader.DELIVER_OK);					    case UniformHeader.SEEN:												System.out.print("==> find in delivered: " + msg.getSrc() + ":" + hdr.id);										if(findInDelivered(msg.getSrc(), hdr.id)) {						    System.out.println(" SEEN_OK");						    return new Integer(UniformHeader.SEEN_OK);						}										System.out.println(" SEEN_NOTOK");						return new Integer(UniformHeader.SEEN_NOTOK);									    case UniformHeader.GC:										System.out.println("==> remove from delivered: " + msg.getSrc() + ":" + hdr.id);										removeFromDelivered(msg.getSrc(), hdr.id);						return new Integer(UniformHeader.GC_OK);					    default:						log.error("UNIFORM.handle(): UniformHeader.type " + hdr.type + " not known");						break;					    }					}									return null;				    }																    /* --------------------------- Transport interface ---------------------------------- */				    public void    send(Message msg) throws Exception {passDown(new Event(Event.MSG, msg));}				    public Object  receive(long timeout) throws Exception {return null;}				    /* ------------------------ End of Transport interface ------------------------------ */																    void saveInPending(long msg_id, Message msg) {					Object     sender=msg.getSrc();					Long       key=new Long(msg_id);					Hashtable  val=(Hashtable)pending.get(sender); // look for sender as key									if(val == null) {					    val=new Hashtable();					    pending.put(sender, val);					}					if(!val.containsKey(key))					    val.put(key, msg);				    }																    Message moveFromPendingToDelivered(Object sender, long msg_id) {					Message    msg=null;					Hashtable  val_pending, val_delivered;					Long       key=new Long(msg_id);									val_pending=(Hashtable)pending.get(sender);					if(val_pending == null) {					    log.error("UNIFORM.moveFromPendingToDelivered(): value for " +							       sender + " not found !");					    return null;					}					msg=(Message)val_pending.get(key);					if(msg == null) {					    log.error("UNIFORM.moveFromPendingToDelivered(): value for " + sender + ":" +							       key + " not found !");					    return null;					}										val_delivered=(Hashtable)delivered.get(sender);					if(val_delivered == null) {					    val_delivered=new Hashtable();					    delivered.put(sender, val_delivered);					}					if(!val_delivered.containsKey(key))					    val_delivered.put(key, msg);					val_pending.remove(key);               // remove from pending table					if(val_pending.size() == 0)					    pending.remove(sender);									return msg;				    }																				    boolean findInDelivered(Object sender, long msg_id) {					Hashtable val=(Hashtable)delivered.get(sender);					if(val == null)					    return false;					return val.containsKey(new Long(msg_id));				    }																    void removeFromDelivered(Object sender, long msg_id) {					Hashtable val=(Hashtable)delivered.get(sender);					if(val == null)					    return;					val.remove(new Long(msg_id));					if(val.size() == 0)					    delivered.remove(sender);				    }				}							

相关资源