High-Performance IO with Java NIO

by Brian Pontarelli



Listing One



public class IOThread extends Thread {

    private String result;

    public synchronized String getResult() {

        return result;

    }

    public void run() {

        try {

            // Connect to the remote host and read in the data

            URL url = new URL("http://example.orbitz.com");

            URLConnection connection = url.openConnection();



            String localResult = doRead(connection);

            synchronized (this) {

                result = localResult;

            }

        } catch (IOException ioe) {

            System.err.println(ioe.toString());

        }

    }

    private String doRead(URLConnection connection) throws IOException {

        StringBuffer buf = new StringBuffer();

        InputStream is = connection.getInputStream();

        BufferedReader br = new BufferedReader(new InputStreamReader(is));

        char[] cbuf = new char[4096];

        int n;

        do {

            n = br.read(cbuf);

            if (n > 0) {

                buf.append(cbuf, 0, n);

            }

        } while (n >= 0);

        return buf.toString();

    }

}





Listing Two



public class OldIO {

    public String execute() {

        IOThread thread = new IOThread();

        thread.start();

        try {

            thread.join(15000); // Wait for 15 seconds

        } catch (InterruptedException ie) {}



        return thread.getResult();

    }

}





Listing Three



public class PoolOldIO {

    private static ThreadPool pool = new ThreadPool();

    public String execute() {

        IOThread thread = pool.checkOut();

        String result;

        try {

            thread.execute(15000); // Wait for 15 seconds

        } catch (InterruptedException ie) {

            // Smother, okay to be interrupted

        } finally {

            result = thread.getResult();

            pool.checkIn(thread); // Must always check-in the thread

        }

        return result;

    }

}





Listing Four



public class NIOWorker extends Thread {

    private Selector selector;

    public NIOWorker () throws IOException {

        selector = Selector.open();

    }

    ...

}





Listing Five



public static class Work {

    public URL in;

    public String out;

    public ByteBuffer httpRequest;

}





Listing Six



public String doWork(URL url, long timeout) {

    Work work = new Work();

    work.in = url;

    String result = null;

    synchronized (work) {

        work.httpRequest = buildHTTPRequest(work);



        // If the work was added successfully, call wait on the work object

        // which forces the calling thread to wait (i.e. the execute thread)

        if (add(work, timeout)) {

            try {

                work.wait(timeout);

            } catch (InterruptedException e) {

                System.err.println("NIO operation interrupted");

            }

            result = work.out;

        }

    }

    return result;

}

protected boolean add(Work work, long timeout) {

    SocketChannel channel = null;

    try {

       URL url = work.in;

       InetSocketAddress addr = new InetSocketAddress(url.getHost(), 80);



       channel = SocketChannel.open();

       channel.configureBlocking(false);

       channel.connect(addr);



       WorkState state=new WorkState(System.currentTimeMillis(),timeout,work);

       channel.register(selector, SelectionKey.OP_CONNECT, state);

    } catch (IOException ioe) {

        if (channel != null) {

            try {

                channel.close();

            } catch (IOException ioe2) {

                System.err.println("Unable to close channel: " + ioe2);

            }

        }

        System.err.println("Channel creation or registration failed: " + ioe);

        return false;

    }

    return true;

}





Listing Seven



public static class WorkState {

    public final StringBuffer buffer = new StringBuffer();

    public final Work work;

    public final long start;

    public final long timeout;

    public boolean success = false;

    public WorkState(long start, long timeout, Work work) {

        this.start = start;

        this.timeout = timeout;

        this.work = work;

    }

    public boolean isTimedOut() {

        return (System.currentTimeMillis() - start >= timeout);

    }

}





Listing Eight



public void run() {

    while (true) {

        try {

            int num = selector.selectNow();

            if (num > 0) {

                processKeys();

            } else {

                Thread.yield();

            }

        } catch (IOException ioe) {

            System.err.println("Unable to select: " + ioe.toString());

        } catch (InterruptedException ie) {

            // Continue processing

        }

    }

}





Listing Nine



protected void processKeys() {

    Set keys = selector.selectedKeys();

    for (Iterator iter = keys.iterator(); iter.hasNext();) {

        SelectionKey key = (SelectionKey) iter.next();

        iter.remove();

        WorkState state = (WorkState) key.attachment();

        SocketChannel channel = (SocketChannel) key.channel();

        try {

            if (state.isTimedOut()) {

                finished(channel, key, state);

                continue;

            }

            boolean connectable = key.isConnectable();

            if (connectable && channel.finishConnect()) {

                // If the Channel is connected, setup the Channel to 

                // write the HTTP message to the remote server

                key.interestOps(SelectionKey.OP_WRITE);

            } else if (key.isWritable()) {

                // If the Channel is finished writing, setup the

                // Channel to read the HTTP response

                if (doWrite(channel, state)) {

                    key.interestOps(SelectionKey.OP_READ);

                }

            } else if (key.isReadable()) {

                // If the Channel is finished reading, call finsihed

                // to complete the work

                if (doRead(channel, state)) {

                    finished(channel, key, state);

                }

            }

        } catch (IOException ioe) {

            System.err.println("Failure during IO operation");

            finished(channel, key, state);

        }

    }

}





Listing Ten



protected boolean doWrite(SocketChannel channel, WorkState state)

throws IOException {

    int rem = state.work.httpRequest.remaining();

    int num = channel.write(state.work.httpRequest);

    // Continue writing until everything has been written

    return (num == rem);

}





Listing Eleven



protected boolean doRead(SocketChannel channel, WorkState state)

throws IOException {

    buffer.clear();

    decoder.reset();

    boolean done = false;

    int num = channel.read(buffer);

    if (num == -1) {

        state.success = true;

        done = true;

    } else if (num > 0) {

        buffer.flip();

        String data = decoder.decode(buffer).toString();

        state.buffer.append(data);

    }

    return done;

}





Listing Twelve



protected void finished(SocketChannel channel, 

                                  SelectionKey key, WorkState state) {

    key.cancel();

    try {

        channel.close();

    } catch (IOException ioe) {

        System.err.println("Failed to close socket: " + ioe.toString());

    } finally {

        Work work = state.work;

        synchronized (work) {

            // Only if the Work was successful, parse out the HTTP response

            if (state.success) {

                String result = state.buffer.toString();

                work.out = parseHTTPResponse(result);

            }

            work.notify();

        }

    }

}





Listing Thirteen



public class NewIO {

    // Assume someone else setup the NIOWorker and passed it in

    public String execute(NIOWorker worker) {

        try {

            // Construct the URL and pass it to the worker

            URL url = new URL("http://example.orbitz.com");

            return worker.doWork(url, 15000); // Wait 15 seconds

        } catch (IOException ioe) {

            System.err.println(ioe.toString());

        }

    }

}















5



