import java.util.LinkedList; public class AlphaSynch extends Process implements Synchronizer { int pulse = -1; int acksNeeded = 0; IntLinkedList unsafe = new IntLinkedList(); LinkedList nextPulseMsgs = new LinkedList();//msgs for next pulse boolean meSafe; MsgHandler prog; public AlphaSynch(Linker initComm) { super(initComm); } public synchronized void initialize(MsgHandler initProg) { prog = initProg; startPulse(); notifyAll(); } void startPulse(){ unsafe.addAll(comm.neighbors); meSafe = false; pulse ++; Util.println("**** new pulse ****:" + pulse); } public synchronized void handleMsg(Msg m, int src, String tag) { while (pulse < 0) myWait(); if (tag.equals("synchAck")) { acksNeeded--; if (acksNeeded == 0) notifyAll(); } else if (tag.equals("safe")) { while (!unsafe.contains(src)) myWait(); unsafe.removeObject(src); if (unsafe.isEmpty()) notifyAll(); } else { // application message sendMsg(src, "synchAck", 0); while (!unsafe.contains(src)) myWait(); if (meSafe) nextPulseMsgs.add(m); else prog.handleMsg(m, src, tag); } } public synchronized void sendMessage(int destId, String tag, int msg) { acksNeeded++; sendMsg(destId, tag, msg); } public synchronized void nextPulse() { while (acksNeeded != 0) myWait(); meSafe = true; sendToNeighbors("safe", 0); while (!unsafe.isEmpty()) myWait(); startPulse(); while (!nextPulseMsgs.isEmpty()) {//act on msgs received earlier Msg m = (Msg) nextPulseMsgs.removeFirst(); prog.handleMsg(m, m.getSrcId(), m.getTag()); } notifyAll(); } }