1 import java.util.*; import java.io.*;import java.net.Socket; 2 public class Linker implements MsgHandler { 3 public int myId; 4 Connector connector; 5 MsgHandler app = null;// upper layer 6 public boolean appFinished = false; 7 public LinkedList<Integer> neighbors = new LinkedList<Integer>(); 8 public Properties prop = new Properties(); 9 public Linker(String args[]) throws Exception { 10 String basename = args[0]; 11 myId = Integer.parseInt(args[1]); 12 Topology.readNeighbors(myId, neighbors); 13 prop.loadFromXML(new FileInputStream("LinkerProp.xml")); 14 connector = new Connector(); 15 connector.Connect(basename, myId, neighbors); 16 } 17 public void init(MsgHandler app){ 18 this.app = app; 19 for (int pid : neighbors) 20 (new ListenerThread(pid, this)).start(); 21 } 22 public synchronized void handleMsg(Msg m, int src, String tag) { } 23 public synchronized void executeMsg(Msg m) { 24 handleMsg(m, m.src, m.tag); 25 notifyAll(); 26 if (app != null) app.executeMsg(m); 27 } 28 public void sendMsg(int destId, Object ... objects) { 29 int j = neighbors.indexOf(destId); 30 try { 31 LinkedList<Object> objectList = Util.getLinkedList(objects); 32 ObjectOutputStream os = connector.dataOut[j]; 33 os.writeObject(Integer.valueOf(objectList.size())); 34 for (Object object : objectList) 35 os.writeObject(object); 36 os.flush(); 37 } catch (IOException e) {System.out.println(e);close(); } 38 } 39 public Msg receiveMsg(int fromId) { 40 int i = neighbors.indexOf(fromId); 41 try { 42 43 ObjectInputStream oi = connector.dataIn[i]; 44 int numItems = ((Integer) oi.readObject()).intValue(); 45 LinkedList<Object> recvdMessage = new LinkedList<Object>(); 46 for (int j = 0; j < numItems; j++) 47 recvdMessage.add(oi.readObject()); 48 String tag = (String) recvdMessage.removeFirst(); 49 return new Msg(fromId, myId, tag, recvdMessage); 50 } catch (Exception e) { 51 System.out.println(e); 52 close(); 53 return null; 54 } 55 56 } 57 public synchronized int getMyId() { return myId; } 58 public Properties getProp() { return prop;} 59 //public int getNumProc() { return N; } 60 public LinkedList<Integer> getNeighbors() { return neighbors; } 61 public void close() { appFinished = true; connector.closeSockets(); } 62 public void turnPassive() {} 63 }