1 import java.util.*; import java.io.*;import java.net.Socket;
2 public class Linker implements MsgHandler {
3 public int myId;
4 Connector connector;
5 MsgHandler app = null;// upper layer
6 public boolean appFinished = false;
7 public LinkedList<Integer> neighbors = new LinkedList<Integer>();
8 public Properties prop = new Properties();
9 public Linker(String args[]) throws Exception {
10 String basename = args[0];
11 myId = Integer.parseInt(args[1]);
12 Topology.readNeighbors(myId, neighbors);
13 prop.loadFromXML(new FileInputStream("LinkerProp.xml"));
14 connector = new Connector();
15 connector.Connect(basename, myId, neighbors);
16 }
17 public void init(MsgHandler app){
18 this.app = app;
19 for (int pid : neighbors)
20 (new ListenerThread(pid, this)).start();
21 }
22 public synchronized void handleMsg(Msg m, int src, String tag) { }
23 public synchronized void executeMsg(Msg m) {
24 handleMsg(m, m.src, m.tag);
25 notifyAll();
26 if (app != null) app.executeMsg(m);
27 }
28 public void sendMsg(int destId, Object ... objects) {
29 int j = neighbors.indexOf(destId);
30 try {
31 LinkedList<Object> objectList = Util.getLinkedList(objects);
32 ObjectOutputStream os = connector.dataOut[j];
33 os.writeObject(Integer.valueOf(objectList.size()));
34 for (Object object : objectList)
35 os.writeObject(object);
36 os.flush();
37 } catch (IOException e) {System.out.println(e);close(); }
38 }
39 public Msg receiveMsg(int fromId) {
40 int i = neighbors.indexOf(fromId);
41 try {
42
43 ObjectInputStream oi = connector.dataIn[i];
44 int numItems = ((Integer) oi.readObject()).intValue();
45 LinkedList<Object> recvdMessage = new LinkedList<Object>();
46 for (int j = 0; j < numItems; j++)
47 recvdMessage.add(oi.readObject());
48 String tag = (String) recvdMessage.removeFirst();
49 return new Msg(fromId, myId, tag, recvdMessage);
50 } catch (Exception e) {
51 System.out.println(e);
52 close();
53 return null;
54 }
55
56 }
57 public synchronized int getMyId() { return myId; }
58 public Properties getProp() { return prop;}
59 //public int getNumProc() { return N; }
60 public LinkedList<Integer> getNeighbors() { return neighbors; }
61 public void close() { appFinished = true; connector.closeSockets(); }
62 public void turnPassive() {}
63 }