import java.util.*; import java.net.*; import java.io.*; public class CausalLinker extends Linker { int M[][]; LinkedList deliveryQ = new LinkedList(); // deliverable messages LinkedList pendingQ = new LinkedList(); // messages with matrix public CausalLinker(String basename, int id, int numProc) throws Exception { super(basename, id, numProc); M = new int[N][N]; Matrix.setZero(M); } public synchronized void sendMsg(int destId, String tag, String msg){ M[myId][destId]++; super.sendMsg(destId, "matrix", Matrix.write(M)); super.sendMsg(destId, tag, msg); } public synchronized void multicast(IntLinkedList destIds, String tag, String msg) { for (int i=0; i<destIds.size(); i++) M[myId][destIds.getEntry(i)]++; for (int i=0; i<destIds.size(); i++) { int destId = destIds.getEntry(i); super.sendMsg(destId, "matrix", Matrix.write(M)); super.sendMsg(destId, tag, msg); } } boolean okayToRecv(int W[][], int srcId) { if (W[srcId][myId] > M[srcId][myId]+1) return false; for (int k = 0; k < N; k++) if ((k!=srcId) && (W[k][myId] > M[k][myId])) return false; return true; } synchronized void checkPendingQ() { ListIterator iter = pendingQ.listIterator(0); while (iter.hasNext()) { CausalMessage cm = (CausalMessage) iter.next(); if (okayToRecv(cm.getMatrix(), cm.getMessage().getSrcId())){ iter.remove(); deliveryQ.add(cm); } } } // polls the channel given by fromId to add to the pendingQ public Msg receiveMsg(int fromId) throws IOException { checkPendingQ(); while (deliveryQ.isEmpty()) { Msg matrix = super.receiveMsg(fromId);// matrix int [][]W = new int[N][N]; Matrix.read(matrix.getMessage(), W); Msg m1 = super.receiveMsg(fromId);//app message pendingQ.add(new CausalMessage(m1, N, W)); checkPendingQ(); } CausalMessage cm = (CausalMessage) deliveryQ.removeFirst(); Matrix.setMax(M, cm.getMatrix()); return cm.getMessage(); } }