Easy to Learn Java: Programming Articles, Examples and Tips

Start with Java in a few days with Java Lessons or Lectures

Home

Code Examples

Java Tools

More Java Tools!

Java Forum

All Java Tips

Books

Submit News
Search the site here...
Search...
 
Search the JavaFAQ.nu
1000 Java Tips ebook

1000 Java Tips - Click here for the high resolution copy!1000 Java Tips - Click here for the high resolution copy!

Java Screensaver, take it here

Free "1000 Java Tips" eBook is here! It is huge collection of big and small Java programming articles and tips. Please take your copy here.

Take your copy of free "Java Technology Screensaver"!.

Read New NIO Tutorial with working examples: the server

JavaFAQ Home » Networking Go to all tips in Networking


Bookmark and Share

The Rox Java NIO Tutorial, part I

Published here by courteous permission from James Greenfield.

Contents

  1. Introduction
  2. Credits
  3. General principles
  4. The server
  5. The client (in part II)
  6. NIO and SSL on 1.4 (in part II)
  7. The code (in part II)
  8. About the author (in part II)

Introduction

This tutorial is intended to collect together my own experiences using the Java NIO libraries and the dozens of hints, tips, suggestions and caveats that litter the Internet. When I wrote Rox all of the useful information existed as just that: hints, tips, suggestions and caveats on a handful of forums. This tutorial actually only covers using NIO for asynchronous networking (non-blocking sockets), and not the NIO libraries in all their glory. When I use the term NIO in this tutorial I'm taking liberties and only talking about the non-blocking IO part of the API.

If you've spent any time looking at the JavaDoc documentation for the NIO libraries then you know they're not the most transparent docs floating around. And if you've spent any time trying to write code based on the NIO libraries and you use more than one platform you've probably run into something that works on one platform but locks up on another. This is particularly true if you started on Windows and moved over to Linux (using Sun's implementation).

This tutorial takes you from nothing to a working client-server implementation, and will hopefully help you avoid all of the pitfalls waiting to trap the unwary. I've turned it around and start with a server implementation, since that's the most common use-case for the NIO libraries.

Comments, criticisms, suggestions and, most important, corrections are welcome. The examples here have been developed and tested on Sun's NIO implementation using version 1.4.2 and 1.5.0 of Sun's Hotspot JVM on Windows and Linux. Your mileage may vary but if you do run into funnies please let me know and I'll incorporate them into this page.

Drop me a line at nio@flat502.com.

Credits

Credit where credit is due. This tutorial pulls together a lot of ideas, none of which I can claim original ownership. Sources vary widely and I haven't managed to keep track of all of them, but an incomplete list of some of the larger contributors would include:

Source source code in this tutorial was generated from Java source using Java2Html.

General principles

A few general principles inform the approach I've taken. There may be other approaches (I've certainly seen a few other suggestions) but I know this one works, and I know it performs. Sometimes it's worth spending time finding the best possible approach. Sometimes it's enough to find an approach that works.

These general ideas apply on both the client and server side. In fact, given that the only difference between the client and the server is whether or not a connection is initiated or accepted, the bulk of the logic for using NIO for a comms implementation can be factored out and shared.

So, without further ado:

Use a single selecting thread

Although NIO selectors are threadsafe their key sets are not. The upshot of this is that if you try to build a solution that depends on multiple threads accessing your selector you very quickly end up in one of two situations:

  1. Plagued by deadlocks and race conditions as you build up an increasingly fragile house of cards (or rather house of locks) to avoid stepping on your own toes while accessing the selector and its key sets.
  2. Effectively single-threading access to the selector and its key sets in an attempt to avoid, well, stepping on your own toes.
The upshot of this is that if you want to build a solution based on NIO then trust me and stick to a single selecting thread. Offload events as you see fit but stick to one thread on the selector.

I tend to handle all I/O within the selecting thread too. This means queuing writes and having the selecting thread perform the actual I/O, and having the selecting thread read off ready channels and offload the read data to a worker thread. In general I've found this scales well enough so I've not yet had a need to have other threads perform the I/O on the side.

Modify the selector from the selecting thread only

If you look closely at the NIO documentation you'll come across the occasional mention of naive implementations blocking where more efficient implementations might not, usually in the context of altering the state of the selector from another thread. If you plan on writing code against the NIO libraries that must run on multiple platforms you have to assume the worst. This isn't just hypothetical either, a little experimentation should be enough to convince you that Sun's Linux implementation is "naive". If you plan on targeting one platform only feel free to ignore this advice but I'd recommend against it. The thing about code is that it oftens ends up in the oddest of places.

As a result, if you plan to hang onto your sanity don't modify the selector from any thread other than the selecting thread. This includes modifying the interest ops set for a selection key, registering new channels with the selector, and cancelling existing channels.

A number of these changes are naturally initiated by threads that aren't the selecting thread. Think about sending data. This pretty much always has to be initiated by a calling thread that isn't the selecting thread. Don't try to modify the selector (or a selection key) from the calling thread. Queue the operation where the selecting thread can get to it and call Selector.wakeup. This will wake the selecting thread up. All it needs to do is check if there are any pending changes, and if there are apply them and go back to selecting on the selector. There are variations on this but that's the general idea.

Set OP_WRITE only when you have data ready

A common mistake is to enable OP_WRITE on a selection key and leave it set. This results in the selecting thread spinning because 99% of the time a socket channel is ready for writing. In fact the only times it's not going to be ready for writing is during connection establishment or if the local OS socket buffer is full. The correct way to do this is to enable OP_WRITE only when you have data ready to be written on that socket channel. And don't forget to do it from within the selecting thread.

Alternate between OP_READ and OP_WRITE

If you try to mix OP_READ and OP_WRITE you'll quickly get yourself into trouble. The Sun Windows implementation has been seen to deadlock if you do this. Other implementations may fare better, but I prefer to play it safe and alternate between OP_READ and OP_WRITE, rather than trying to use them together.

With those out of the way, let's take a look at some actual code. The code presented here could do with a little cleaning up. I've simplified a few things in an attempt to stick to the core issue, using the NIO libraries, without getting bogged down in too many abstractions or design discussions.

The server

A starting point

We need a little infrastructure before we can start building a NIO server. First, we'll need a thread. This will be our selecting thread and most of our logic will live in the class that is home to it. And I'm going to have the selecting thread perform reads itself, so we'll need a ByteBuffer to read into. I'm going to use a non-direct buffer rather than a direct buffer. The performance difference is minor in this case and the code is slightly clearer. We're also going to need a selector and a server socket channel on which to accept connections. Throwing in a few other minor odds and ends we end up with something like this.

public class NioServer implements Runnable {
  // The host:port combination to listen on
  private InetAddress hostAddress;
  private int port;

  // The channel on which we'll accept connections
  private ServerSocketChannel serverChannel;
  
  // The selector we'll be monitoring
  private Selector selector;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

  public NioServer(InetAddress hostAddress, int port) throws IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
  }

  public static void main(String[] args) {
    try {
      new Thread(new NioServer(null, 9090)).start();
    catch (IOException e) {
      e.printStackTrace();
    }
  }
}

Creating the selector and server channel

The astute will have noticed the call to initSelector() in the constructor. Needless to say this method doesn't exist yet. So let's write it. It's job is to create and initialize a non-blocking server channel and a selector. It must and then register the server channel with that selector.

  private Selector initSelector() throws IOException {
    // Create a new selector
    Selector socketSelector = SelectorProvider.provider().openSelector();

    // Create a new non-blocking server socket channel
    this.serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    // Bind the server socket to the specified address and port
    InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
    serverChannel.socket().bind(isa);

    // Register the server socket channel, indicating an interest in 
    // accepting new connections
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

    return socketSelector;
  }

Accepting connections

Right. At this point we have a server socket channel ready and waiting and we've indicated that we'd like to know when a new connection is available to be accepted. Now we need to actually accept it. Which brings us to our "select loop". This is where most of the action kicks off. In a nutshell our selecting thread sits in a loop waiting until one of the channels registered with the selector is in a state that matches the interest operations we've registered for it. In this case the operation we're waiting for on the server socket channel is an accept (indicated by OP_ACCEPT). So let's take a look at the first iteration (I couldn't resist) of our run() method.

  public void run() {
    while (true) {
      try {
        // Wait for an event one of the registered channels
        this.selector.select();

        // Iterate over the set of keys for which events are available
        Iterator selectedKeys = this.selector.selectedKeys().iterator();
        while (selectedKeys.hasNext()) {
          SelectionKey key = (SelectionKey) selectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            continue;
          }

          // Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          }
        }
      catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

This only takes us half way though. We still need to accept connections. Which means, you guessed it, an accept() method.

  private void accept(SelectionKey key) throws IOException {
    // For an accept to be pending the channel must be a server socket channel.
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    // Accept the connection and make it non-blocking
    SocketChannel socketChannel = serverSocketChannel.accept();
    Socket socket = socketChannel.socket();
    socketChannel.configureBlocking(false);

    // Register the new SocketChannel with our Selector, indicating
    // we'd like to be notified when there's data waiting to be read
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }

Reading data

Once we've accepted a connection it's only of any use if we can read (or write) data on it. If you look back at the accept() method you'll notice that the newly accepted socket channel was registered with our selector with OP_READ specified at the operation we're interested in. That means our selecting thread will be released from the call to select() when data becomes available on the socket channel. But what do we do with it? Well, the first thing we need is a small change to our run() method. We need to check if a socket channel is readable (and deal with it if it is).

          // Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          else if (key.isReadable()) {
            this.read(key);
          }

Which means we need a read() method...

  private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    this.readBuffer.clear();
    
    // Attempt to read off the channel
    int numRead;
    try {
      numRead = socketChannel.read(this.readBuffer);
    catch (IOException e) {
      // The remote forcibly closed the connection, cancel
      // the selection key and close the channel.
      key.cancel();
      socketChannel.close();
      return;
    }

    if (numRead == -1) {
      // Remote entity shut the socket down cleanly. Do the
      // same from our end and cancel the channel.
      key.channel().close();
      key.cancel();
      return;
    }

    // Hand the data off to our worker thread
    this.worker.processData(this, socketChannel, this.readBuffer.array(), numRead);
  }

Hang on, where did worker come from? This is the worker thread we hand data off to once we've read it. For our purposes all we'll do is echo that data back. We'll take a look at the code for the worker shortly. However, assuming the existence of an EchoWorker class, our infrastructure needs a little updating. We need an extra instance member, a change to our constructor and one or two extras in our main() method.

  private EchoWorker worker;

  public NioServer(InetAddress hostAddress, int port, EchoWorker worker) throws IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
    this.worker = worker;
  }

  public static void main(String[] args) {
    try {
      EchoWorker worker = new EchoWorker();
      new Thread(worker).start();
      new Thread(new NioServer(null, 9090, worker)).start();
    catch (IOException e) {
      e.printStackTrace();
    }
  }

Let's take a closer look at EchoWorker. Our echo worker implementation is implemented as a second thread. For the purposes of this example we're only going to have a single worker thread. This means we can hand events directly to it. We'll do this by calling a method on the instance. This method will stash the event in a local queue and notify the worker thread that there's work available. Needless to say, the worker thread itself will sit in a loop waiting for events to arrive and when they do it will process them. Events in this case are just data packets. All we're going to do is echo those packets back to the sender.

Normally, instead of calling a method on the worker directly, you'll want to use a form of blocking queue that you can share with as many workers as you like. Java 1.5's concurrency package introduces a BlockingQueue interface and some implementations, but it's not that hard to roll your own. The logic in the EchoWorker code below is a good start.

public class EchoWorker implements Runnable {
  private List queue = new LinkedList();
  
  public void processData(NioServer server, SocketChannel socket, byte[] data, int count) {
    byte[] dataCopy = new byte[count];
    System.arraycopy(data, 0, dataCopy, 0, count);
    synchronized(queue) {
      queue.add(new ServerDataEvent(server, socket, dataCopy));
      queue.notify();
    }
  }
  
  public void run() {
    ServerDataEvent dataEvent;
    
    while(true) {
      // Wait for data to become available
      synchronized(queue) {
        while(queue.isEmpty()) {
          try {
            queue.wait();
          catch (InterruptedException e) {
          }
        }
        dataEvent = (ServerDataEvent) queue.remove(0);
      }
      
      // Return to sender
      dataEvent.server.send(dataEvent.socket, dataEvent.data);
    }
  }
}

If you look at the last line of the run() method you'll see we're calling a new method on the server. Let's take a look at what we need to do to send data back to a client.

Writing data

Writing data to a socket channel is pretty straightforward. However, before we can do that we need to know that the channel is ready for more data. Which means setting the OP_WRITE interest op flag on that channel's selection key. Since the thread that wants to do this is not the selecting thread we need to queue this request somewhere and wake the selecting thread up. Putting all of this together we need a write() method and a few additional members in our server class.

  // A list of ChangeRequest instances
  private List ChangeRequests = new LinkedList();

  // Maps a SocketChannel to a list of ByteBuffer instances
  private Map pendingData = new HashMap();

  public void send(SocketChannel socket, byte[] data) {
    synchronized (this.ChangeRequests) {
      // Indicate we want the interest ops set changed
      this.ChangeRequests.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
      
      // And queue the data we want written
      synchronized (this.pendingData) {
        List queue = (List) this.pendingData.get(socket);
        if (queue == null) {
          queue = new ArrayList();
          this.pendingData.put(socket, queue);
        }
        queue.add(ByteBuffer.wrap(data));
      }
    }
    
    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
  }

We've introduced another class: ChangeRequest. There's no magic here, it just gives us an easy way to indicate a change that we want made on a particular socket channel. I've jumped ahead a little with this class, in anticipation of some of the other changes we'll ultimately want to queue.

public class ChangeRequest {
  public static final int REGISTER = 1;
  public static final int CHANGEOPS = 2;
  
  public SocketChannel socket;
  public int type;
  public int ops;
  
  public ChangeRequest(SocketChannel socket, int type, int ops) {
    this.socket = socket;
    this.type = type;
    this.ops = ops;
  }
}

Waking the selecting thread up is of no use until we modify our selecting thread's logic to do what needs to be done. So let's update our run() method.

  public void run() {
    while (true) {
      try {
        // Process any pending changes
        synchronized(this.ChangeRequests) {
          Iterator changes = this.ChangeRequests.iterator();
          while (changes.hasNext()) {
            ChangeRequest change = (ChangeRequest) changes.next();
            switch(change.type) {
            case ChangeRequest.CHANGEOPS:
              SelectionKey key = change.socket.keyFor(this.selector);
              key.interestOps(change.ops);
            }
          }
          this.ChangeRequests.clear();
        }
        
        // Wait for an event one of the registered channels
        this.selector.select();

        // Iterate over the set of keys for which events are available
        Iterator selectedKeys = this.selector.selectedKeys().iterator();
        while (selectedKeys.hasNext()) {
          SelectionKey key = (SelectionKey) selectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            continue;
          }

          // Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          else if (key.isReadable()) {
            this.read(key);
          }
        }
      catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

Waking up the selector will usually result in an empty selection key set. So our logic above will skip right over the key set loop and jump into processing pending changes. In truth the order of events here probably doesn't matter all that much. This is just how I prefer to do it. If you take a look at the logic we've added for handling those pending events there are a few things to point out. One is that it's pretty simple. All we do is update the interest ops for the selection key on a given socket channel. The other is that at the end of the loop we clear out the list of pending changes. This is easy to miss, in which case the selecting thread will continually reapply "old" changes.

We're almost done on the writing front, we just need one more piece, the actual write. This means another change to our run() method to check for a selection key in a writeable state.

          // Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          else if (key.isReadable()) {
            this.read(key);
          else if (key.isWritable()) {
            this.write(key);
          }

And of course, we'll need a write() method. This method just has to pull data off the appropriate queue and keep writing it until it either runs out or can't write anymore. I add ByteBuffers to the per-socket internal queue because we'll need a ByteBuffer for the socket write anyway and ByteBuffer's conveniently track how much data remains in the buffer. This last bit is important because a given write operation may end up only writing a part of the buffer (at which point we can stop writing to that socket because further writes will achieve nothing: the socket's local buffer is full). Using a ByteBuffer saves us having to either resize byte arrays in the queue or track an index into the byte array at the head of the queue.

But I've digressed so let's take a look at write().

  private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    synchronized (this.pendingData) {
      List queue = (List) this.pendingData.get(socketChannel);
      
      // Write until there's not more data ...
      while (!queue.isEmpty()) {
        ByteBuffer buf = (ByteBuffer) queue.get(0);
        socketChannel.write(buf);
        if (buf.remaining() > 0) {
          // ... or the socket's buffer fills up
          break;
        }
        queue.remove(0);
      }
      
      if (queue.isEmpty()) {
        // We wrote away all data, so we're no longer interested
        // in writing on this socket. Switch back to waiting for
        // data.
        key.interestOps(SelectionKey.OP_READ);
      }
    }
  }

The only point I want to make about the above code is this. You'll notice that it writes as much data as it can, stopping only when there's no more to write or the socket can't take any more. This is one approach. Another is to write only the first piece of data queued and then continue. The first approach is likely to result in better per-client service while the second is probably more fair in the face of multiple active connections. I suggest you play around a bit before choosing an approach.

We now have the bulk of a working NIO server implementation. In fact, the server you've written so far (if you've been following along diligently) will accept connections, read data off those connections, and echo back the read data to the client that sent it. It will also handle clients disconnecting by deregistering the socket channel from the selector. There's not a whole lot more to a server.

to be continued...

 


 Printer Friendly Page  Printer Friendly Page
 Send to a Friend  Send to a Friend

.. Bookmark and Share

Search here again if you need more info!
Custom Search



Home Code Examples Java Forum All Java Tips Books Submit News, Code... Search... Offshore Software Tech Doodling

RSS feed Java FAQ RSS feed Java FAQ News     

    RSS feed Java Forums RSS feed Java Forums

All logos and trademarks in this site are property of their respective owner. The comments are property of their posters, all the rest 1999-2006 by Java FAQs Daily Tips.

Interactive software released under GNU GPL, Code Credits, Privacy Policy