import java.util.LinkedList;
public class AlphaSynch extends Process implements Synchronizer {
int pulse = -1;
int acksNeeded = 0;
IntLinkedList unsafe = new IntLinkedList();
LinkedList nextPulseMsgs = new LinkedList();//msgs for next pulse
boolean meSafe;
MsgHandler prog;
public AlphaSynch(Linker initComm) {
super(initComm);
}
public synchronized void initialize(MsgHandler initProg) {
prog = initProg;
startPulse();
notifyAll();
}
void startPulse(){
unsafe.addAll(comm.neighbors);
meSafe = false;
pulse ++;
Util.println("**** new pulse ****:" + pulse);
}
public synchronized void handleMsg(Msg m, int src, String tag) {
while (pulse < 0) myWait();
if (tag.equals("synchAck")) {
acksNeeded--;
if (acksNeeded == 0) notifyAll();
} else if (tag.equals("safe")) {
while (!unsafe.contains(src)) myWait();
unsafe.removeObject(src);
if (unsafe.isEmpty()) notifyAll();
} else { // application message
sendMsg(src, "synchAck", 0);
while (!unsafe.contains(src)) myWait();
if (meSafe) nextPulseMsgs.add(m);
else prog.handleMsg(m, src, tag);
}
}
public synchronized void sendMessage(int destId, String tag, int msg) {
acksNeeded++;
sendMsg(destId, tag, msg);
}
public synchronized void nextPulse() {
while (acksNeeded != 0) myWait();
meSafe = true;
sendToNeighbors("safe", 0);
while (!unsafe.isEmpty()) myWait();
startPulse();
while (!nextPulseMsgs.isEmpty()) {//act on msgs received earlier
Msg m = (Msg) nextPulseMsgs.removeFirst();
prog.handleMsg(m, m.getSrcId(), m.getTag());
}
notifyAll();
}
}