// $Id: UNIFORM.java.txt,v 1.3 2005/05/30 14:31:05 belaban Exp $ package org.jgroups.protocols; import java.io.Serializable; import java.util.Hashtable; import java.util.Properties; import java.util.Vector; class UniformHeader implements Serializable { public static final int SAVE = 0; public static final int SAVE_OK = 1; public static final int DELIVER = 2; public static final int DELIVER_OK = 3; public static final int SEEN = 4; public static final int SEEN_OK = 5; public static final int SEEN_NOTOK = 6; public static final int GC = 7; public static final int GC_OK = 8; public int type=-1; public long id=-1; public boolean handle=true; String type2Str(int t) { switch(t) { case SAVE: return "SAVE"; case SAVE_OK: return "SAVE_OK"; case DELIVER: return "DELIVER"; case DELIVER_OK: return "DELIVER_OK"; case SEEN: return "SEEN"; case SEEN_OK: return "SEEN_OK"; case GC: return "GC"; case GC_OK: return "GC_OK"; default: return ""; } } public UniformHeader() {handle=false;} public UniformHeader(int type) { this.type=type; id=System.currentTimeMillis(); handle=true; } public UniformHeader(int type, long id) { this.type=type; this.id=id; handle=true; } public String toString() {return "[UNIFORM: type=" + type2Str(type) + ", id=" + id + "]";} } /** The algorithms implements dynamically-uniform failure-atomic group multicast, that is, a message is delivered by all members if it is delivered by at least 1 non-faulty member even if the sender crashes after sending. If the sender crashes, it will eventually be removed from the group membership: the FLUSH protocol preceding the view change causes all pending multicasts to be flushed out of the system, thereby re-sending pending multicasts to members that haven't received them yet. The protocol makes use of GroupRequest (which itself uses RequestCorrelator) to send a request to all members and receive responses from all non-faulty members. */ public class UNIFORM extends Protocol implements RequestHandler, Transport { Vector members=null; boolean trace=false; RequestCorrelator corr=new RequestCorrelator(getName(), this, this); Hashtable pending=new Hashtable(); // key = sender, val = Hashtable (msg-id, msg) Hashtable delivered=new Hashtable(); // key = sender, val = Hashtable (msg-id, msg) public String getName() {return "UNIFORM";} public boolean setProperties(Properties props) {super.setProperties(props); String str; this.props=props; str=props.getProperty("trace"); if(str != null) { trace=new Boolean(str).booleanValue(); props.remove("trace"); } if(props.size() > 0) { log.error("UNIFORM.setProperties(): the following properties are not recognized: " + props); return false; } return true; } /** Just remove if you don't need to reset any state */ public void reset() {} public void up(Event evt) { Message msg; boolean rc; Object obj; if(evt.getType() == Event.START) { corr.start(); passUp(evt); return; } corr.receive(evt); } public void down(Event evt) { Message msg; GroupRequest save_req, deliver_req, seen_req, gc_req; AndCommand and_comm; Message save_msg, deliver_msg, seen_msg, gc_msg; Vector mbrs=null; long id=0; switch(evt.getType()) { case Event.STOP: corr.Stop(); passDown(evt); break; case Event.TMP_VIEW: case Event.VIEW_CHANGE: Vector tmp; if((tmp=(Vector)((View)evt.getArg()).getMembers()) != null) members=tmp; passDown(evt); break; case Event.MSG: msg=(Message)evt.getArg(); if(msg.getDest() != null) { // unicast msg passDown(evt); return; } id=System.currentTimeMillis(); mbrs=(Vector)members.clone(); /* 1. Create 4 commands (SaveCommand, OkCommand, SeenCommand and GcCommand). Each has the same unique ID, and each is tagged with its type (e.g. SAVE_REQ, OK_REQ etc). ID and type are contained in a UniformHeader attached to the message (with each command). 2. Create an AndCommand and add the 4 commands. 3. Add the AndCommand to a list of currently running commands and execute it. 4. When a FLUSH request is received, wait until all commands are done. */ save_msg=msg; save_msg.addHeader(new UniformHeader(UniformHeader.SAVE, id)); save_req=new GroupRequest(save_msg, corr, mbrs, GroupRequest.GET_ALL); deliver_msg=new Message(null, null, null); deliver_msg.addHeader(new UniformHeader(UniformHeader.DELIVER, id)); deliver_req=new GroupRequest(deliver_msg, corr, mbrs, GroupRequest.GET_ALL); seen_msg=new Message(null, null, null); seen_msg.addHeader(new UniformHeader(UniformHeader.SEEN, id)); seen_req=new GroupRequest(seen_msg, corr, mbrs, GroupRequest.GET_ALL); gc_msg=new Message(null, null, null); gc_msg.addHeader(new UniformHeader(UniformHeader.GC, id)); gc_req=new GroupRequest(gc_msg, corr, mbrs, GroupRequest.GET_ALL); and_comm=new AndCommand(); and_comm.add(save_req); and_comm.add(deliver_req); and_comm.add(seen_req); and_comm.add(gc_req); boolean rc=and_comm.execute(); System.out.println("UNIFORM: rc from Execute is " + rc); break; default: passDown(evt); // Pass on to the layer below us } } /** 1. Remove UniformHeader from message and get ID and type 2. If type == SAVE: add message to save-table (key=sender + ID) and send response (SAVE_OK) If type == OK: add message to ok-table, remove from save-table. Deliver message (pass up the stack) and send response (OK_OK). If type == SEEN: find message in ok-table. If found, send SEEN_OK response, else NOT_SEEN. If type == GC: delete message from ok-table. */ public Object handle(Message msg) { UniformHeader hdr; Object obj=msg.peekHeader(); Message m=null; if(obj != null && obj instanceof UniformHeader) { hdr=(UniformHeader)msg.removeHeader(); switch(hdr.type) { case UniformHeader.SAVE: System.out.println("==> save in pending: " + msg.getSrc() + ":" + hdr.id); saveInPending(hdr.id, msg); return new Integer(UniformHeader.SAVE_OK); case UniformHeader.DELIVER: System.out.println("==> move to delivered: " + msg.getSrc() + ":" + hdr.id); m=moveFromPendingToDelivered(msg.getSrc(), hdr.id); if(m != null) passUp(new Event(Event.MSG, m)); return new Integer(UniformHeader.DELIVER_OK); case UniformHeader.SEEN: System.out.print("==> find in delivered: " + msg.getSrc() + ":" + hdr.id); if(findInDelivered(msg.getSrc(), hdr.id)) { System.out.println(" SEEN_OK"); return new Integer(UniformHeader.SEEN_OK); } System.out.println(" SEEN_NOTOK"); return new Integer(UniformHeader.SEEN_NOTOK); case UniformHeader.GC: System.out.println("==> remove from delivered: " + msg.getSrc() + ":" + hdr.id); removeFromDelivered(msg.getSrc(), hdr.id); return new Integer(UniformHeader.GC_OK); default: log.error("UNIFORM.handle(): UniformHeader.type " + hdr.type + " not known"); break; } } return null; } /* --------------------------- Transport interface ---------------------------------- */ public void send(Message msg) throws Exception {passDown(new Event(Event.MSG, msg));} public Object receive(long timeout) throws Exception {return null;} /* ------------------------ End of Transport interface ------------------------------ */ void saveInPending(long msg_id, Message msg) { Object sender=msg.getSrc(); Long key=new Long(msg_id); Hashtable val=(Hashtable)pending.get(sender); // look for sender as key if(val == null) { val=new Hashtable(); pending.put(sender, val); } if(!val.containsKey(key)) val.put(key, msg); } Message moveFromPendingToDelivered(Object sender, long msg_id) { Message msg=null; Hashtable val_pending, val_delivered; Long key=new Long(msg_id); val_pending=(Hashtable)pending.get(sender); if(val_pending == null) { log.error("UNIFORM.moveFromPendingToDelivered(): value for " + sender + " not found !"); return null; } msg=(Message)val_pending.get(key); if(msg == null) { log.error("UNIFORM.moveFromPendingToDelivered(): value for " + sender + ":" + key + " not found !"); return null; } val_delivered=(Hashtable)delivered.get(sender); if(val_delivered == null) { val_delivered=new Hashtable(); delivered.put(sender, val_delivered); } if(!val_delivered.containsKey(key)) val_delivered.put(key, msg); val_pending.remove(key); // remove from pending table if(val_pending.size() == 0) pending.remove(sender); return msg; } boolean findInDelivered(Object sender, long msg_id) { Hashtable val=(Hashtable)delivered.get(sender); if(val == null) return false; return val.containsKey(new Long(msg_id)); } void removeFromDelivered(Object sender, long msg_id) { Hashtable val=(Hashtable)delivered.get(sender); if(val == null) return; val.remove(new Long(msg_id)); if(val.size() == 0) delivered.remove(sender); } }