Building Dynamic Fail-Over Java Servers
by Chang Sau Sheong

Listing One
public class Server {
 ...
    public static void main(String[] args) throws IOException {
      // use the current cluster status
    Status status = Status.getInstance();
    Config config = Config.getInstance();
 ...
    status.setRunningServer(server_ip);
    status.setRunningPort(server_port);
        
    if (_isPrimary) {
      status.promote();
    }
      status.addServer(server_ip, server_port);
 ...
      // create server-side TCP socket to listen to port
    ServerSocket serverSocket = null;
    boolean listening = true;

    // create server Socket
    try {
      serverSocket = 
        new ServerSocket(status.getPort(status.getRunningServer()),
          _SOCKET_BUFFER, InetAddress.getByName(status.getRunningServer()));
    }
    catch (Exception e) {
      ...
      System.exit(-1);
    }       


Listing Two
// Create the synchronizer to synchronize between servers
    Synchronizer synchronizer = new Synchronizer();
    String appClassName = config.getAppClassName();
    Application app;
      try {
      app = (Application)Class.forName(appClassName).newInstance();   
    }
    ...
      // listen on sockets from incoming connections from clients   
      while (listening) {   
        synchronizer.sync();
        (new MultiServerThread(serverSocket.accept())).start();
        if (status.isPrimary()) {
          System.out.println("*** PRIMARY ***");
          app.activate();
        }
       }
// close server socket
    serverSocket.close();
 ...

Listing Three
public void run() {
    try {
      PrintWriter out = new PrintWriter(_socket.getOutputStream(), true);
    BufferedReader in = 
       new BufferedReader(new InputStreamReader(_socket.getInputStream()));
    String inputLine = new String();
    while ((inputLine = in.readLine()) != null) {
        StringTokenizer st = new StringTokenizer(inputLine, " ");
        String command = st.nextToken();
        Vector parameters = new Vector();
                
        while (st.hasMoreTokens()) {
          parameters.add(st.nextElement());
        }                               
 ...
        // request for information
        else if (command.equalsIgnoreCase("ask")) {
          if (!parameters.isEmpty()) {
          out.println(doAsk(parameters));
        }
          else {
            out.println("##Error: Try ask <parameter>");
        }               
      }             
        else {
          out.println("##Error: No such command.");
        } // end request for information
    } // end while loop


Listing Four
 ...
  /** Asking for information */
  private String doAsk(Vector parameters) {
    // get the ask command
    String command = (String)parameters.get(0);
    
    if (command.equalsIgnoreCase("primary")) {
      return status.getPrimaryServer();
    }
    else if (command.equalsIgnoreCase("isAlive")) {
      return "*";
    }
    else if (command.equalsIgnoreCase("info")) {
      return "elipva Xander Server version 1.0.";
    }
  ...

Listing Five
public class Synchronizer {
  ...
  public void sync() {
    try {
      double randNo = (new Random((Calendar.getInstance())
          .getTime().getTime())).nextDouble() * config.getWaitTimer();
      double s = (new Double((Double.toString(randNo))
          .substring(0,5))).doubleValue();
    // declare own existence
      (new DeclareSelfThread()).start(); 
        wait(s);
    // check for other servers in the cluster
      (new CheckServersThread()).start();
    wait(s);
    // if this is a primary, declare itself as primary
      if (status.isPrimary())  {
      (new DeclarePrimaryThread()).start();
      wait(s);
        }
    // check if a primary server exists
        (new CheckPrimaryThread()).start();
    wait(s);  
    // check servers in the cluster, if down, remove it
      (new PingThread()).start();
    wait(s);                    
    }
 ...

Listing Six
 ...
  public void run() {    
    Hashtable servers = status.getServers();
    if (servers != null) {  
       Writer out = null;
       Enumeration hosts = servers.keys();

       while (hosts.hasMoreElements()) {
         String host = (String)hosts.nextElement();
        try {
          _socket =  new Socket(host,((Integer)servers.get(host)).intValue());
          Synchronizer.serverCount.put(host, new Integer(0));
        }
        catch (IOException e) {
          int count = 0;
          try {
            count = ((Integer)Synchronizer.serverCount.get(host)).intValue();
          }
          catch (NullPointerException ne) {}
          count++;
          int maxRetries = config.getMaxServerRetry();
          if (count > config.getMaxServerRetry()) {
            status.removeServer(host);
            Synchronizer.serverCount.remove(host);
          }
          else {
            Synchronizer.serverCount.put(host, new Integer(count));
          }
        }
  ...

Listing Seven
 ...
    int portOut = config.getDiscoveryPort();
    byte ttl = (byte) 1; // 1 byte ttl for subnet only
    InetAddress iaOut = null;
    try {
      iaOut = InetAddress.getByName(config.getDiscoveryServer());
    }
    catch (Exception e) {
      System.err.println("### Error : error 
                       contacting multicast port. Shutting down server.");
      e.printStackTrace();
      System.exit(-1);
    }
    String clusterStatus = status.getServersAsString();
    System.out.println("status >> " + clusterStatus);
      
    byte [] dataOut = 
     (status.getRunningServer() + ":" +  status.getRunningPort()).getBytes();
    DatagramPacket dpOut = 
         new DatagramPacket(dataOut, dataOut.length, iaOut, portOut);
    try {
      MulticastSocket msOut = new MulticastSocket(portOut);
      msOut.joinGroup(iaOut);
      msOut.send(dpOut, ttl);
      msOut.leaveGroup(iaOut);
      msOut.close();
      }
    catch (SocketException e) {
      System.err.println(e);
      e.printStackTrace();
    }
    ...

Listing Eight
 ...
    MulticastSocket socket = null;
    try {
      socket = new MulticastSocket(config.getDiscoveryPort());
      InetAddress address=InetAddress.getByName(config.getDiscoveryServer());
      DatagramPacket packet;
      byte[] buf = new byte[256];
      packet = new DatagramPacket(buf, buf.length);
      socket.setSoTimeout(config.getTimeout());
      socket.joinGroup(address);
      socket.receive(packet);
      String data = new String(packet.getData());
  
      StringTokenizer st = new StringTokenizer(data, ":");
      String host = st.nextToken();
      int port = Integer.parseInt((st.nextToken()).trim());

      status.addServer(host, port);

      socket.leaveGroup(address);
      socket.close();    
    }
    catch (InterruptedIOException e) {
      System.out.println("### timed out.");    
      socket.close();  
    }
    ...

Listing Nine
 ...
try {
  msIn.receive(dpIn);
  
  String primary = (new String(dpIn.getData())).trim();
  if (status.isPrimary() &&
 !primary.equals(status.getRunningServer())) {
    System.out.println("### More than 1 primary found.Demoting this server.");
    status.demote();
  }
  Synchronizer.primaryCount = 0;
}
catch (InterruptedIOException e) {
  Synchronizer.primaryCount++;
  System.out.println("### Timing out the primary server : " + 
                                             Synchronizer.primaryCount);  
  if (Synchronizer.primaryCount > config.getMaxPrimaryRetry()) {
    status.promote();
  }
}       
    ...


Listing Ten
package xander;
/** <code>Application</code> is an interface that is used to 
 * allow Xander to call the application that it runs. The default  
 * implementation of an Application is the Scheduler.
 * last modified - 6 April 2001<br> version 1.0<br>
 * author Chang Sau Sheong
 */
public interface Application {
    public void activate();
}


Listing Eleven
public class Scheduler implements Application {
    int _PERIOD = 10; // period in seconds
    Date _start = new Date();
    public void activate() {
        Date now = new Date();
        long start_millis = _start.getTime();
        long now_millis = now.getTime();
        
        if (now_millis > (start_millis + (_PERIOD * 1000))) {
            _start = now;
            System.out.println("DING DONG!!!");     
        }
    }
}


Listing Twelve
<?xml version="1.0"?>
<xander>   
<!--  Configuration for the synchronizer -->
    <synchronizer>
        <discovery>
            <server>230.0.0.1</server>
            <port>4446</port>
        </discovery>    
        <primary>
            <server>225.0.0.2</server>
            <port>4000</port>
        </primary>
        <max_server_retry>15</max_server_retry>         
        <max_primary_retry>15</max_primary_retry>   
        <wait_timer>1.5</wait_timer>
        <timeout>5</timeout>
    </synchronizer>
   <!-- configuration for the application  -->
    <app>
        <name>Default Scheduler Application</name>
        <class_name>xander.Scheduler</class_name>
    </app>      
</xander>





6

