1 import java.util.*; import java.io.*;import java.net.Socket;
 2 public class Linker implements MsgHandler {
 3         public int myId;
 4         Connector connector;
 5         MsgHandler app = null;// upper layer
 6         public boolean appFinished = false;
 7         public LinkedList<Integer> neighbors = new LinkedList<Integer>();
 8         public Properties prop = new Properties();
 9         public Linker(String args[]) throws Exception {
10                 String basename = args[0];
11                 myId = Integer.parseInt(args[1]);
12                 Topology.readNeighbors(myId, neighbors);
13                 prop.loadFromXML(new FileInputStream("LinkerProp.xml"));
14                 connector = new Connector();
15                 connector.Connect(basename, myId, neighbors);
16         }
17         public void init(MsgHandler app){
18                 this.app = app;
19                 for (int pid : neighbors)
20                         (new ListenerThread(pid, this)).start();
21         }
22         public synchronized void handleMsg(Msg m, int src, String tag) { }
23         public synchronized void executeMsg(Msg m) {
24                 handleMsg(m, m.src, m.tag);
25                 notifyAll();
26                 if (app != null) app.executeMsg(m);
27         }
28         public void sendMsg(int destId, Object ... objects) {
29                 int j = neighbors.indexOf(destId);
30                 try {
31                         LinkedList<Object> objectList = Util.getLinkedList(objects);
32                         ObjectOutputStream os = connector.dataOut[j];
33                         os.writeObject(Integer.valueOf(objectList.size()));
34                         for (Object object : objectList)
35                                 os.writeObject(object);
36                         os.flush();
37                 } catch (IOException e) {System.out.println(e);close();       }
38         }
39         public Msg receiveMsg(int fromId) {
40                 int i = neighbors.indexOf(fromId);
41                 try {
42 
43                         ObjectInputStream oi = connector.dataIn[i];
44                         int numItems = ((Integer) oi.readObject()).intValue();
45                         LinkedList<Object> recvdMessage = new LinkedList<Object>();
46                         for (int j = 0; j < numItems; j++)
47                                 recvdMessage.add(oi.readObject());
48                         String tag = (String) recvdMessage.removeFirst();
49                         return new Msg(fromId, myId, tag, recvdMessage);
50                 } catch (Exception e) {
51                         System.out.println(e);
52                         close();
53                         return null;
54                 }
55 
56         }
57         public synchronized int getMyId() { return myId; }
58         public Properties getProp() { return prop;}
59         //public int getNumProc() { return N; }
60         public LinkedList<Integer> getNeighbors() { return neighbors; }
61         public void close() { appFinished = true; connector.closeSockets(); }
62         public void turnPassive() {}
63 }