Easy to Learn Java: Programming Articles, Examples and Tips

Start with Java in a few days with Java Lessons or Lectures

Home

Code Examples

Java Tools

More Java Tools!

Java Forum

All Java Tips

Books

Submit News
Search the site here...
Search...
 

Nio Server, Nio Client, Echo Worker, Server Data Event Java code example - Click here to copy ->>>

   Can't find what you're looking for? Try our search:

Really working examples categorized by API, package, class. You can compile and run our examples right away! Not from source code for Java projects - only working examples! Copy, compile and run!
------------------

 

------------------------------- NioServer.java ------------------------------------------------

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

public class NioServer implements Runnable {
  // The host:port combination to listen on
  private InetAddress hostAddress;
  private int port;

  // The channel on which we'll accept connections
  private ServerSocketChannel serverChannel;

  // The selector we'll be monitoring
  private Selector selector;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

  private EchoWorker worker;

  // A list of PendingChange instances
  private List pendingChanges = new LinkedList();

  // Maps a SocketChannel to a list of ByteBuffer instances
  private Map pendingData = new HashMap();

  public NioServer(InetAddress hostAddress, int port, EchoWorker workerthrows IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
    this.worker = worker;
  }

  public void send(SocketChannel socket, byte[] data) {
    synchronized (this.pendingChanges) {
      // Indicate we want the interest ops set changed
      this.pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));

      // And queue the data we want written
      synchronized (this.pendingData) {
        List queue = (Listthis.pendingData.get(socket);
        if (queue == null) {
          queue = new ArrayList();
          this.pendingData.put(socket, queue);
        }
        queue.add(ByteBuffer.wrap(data));
      }
    }

    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
  }

  public void run() {
    while (true) {
      try {
        // Process any pending changes
        synchronized (this.pendingChanges) {
          Iterator changes = this.pendingChanges.iterator();
          while (changes.hasNext()) {
            ChangeRequest change = (ChangeRequestchanges.next();
            switch (change.type) {
            case ChangeRequest.CHANGEOPS:
              SelectionKey key = change.socket.keyFor(this.selector);
              key.interestOps(change.ops);
            }
          }
          this.pendingChanges.clear();
        }

        // Wait for an event one of the registered channels
        this.selector.select();

        // Iterate over the set of keys for which events are available
        Iterator selectedKeys = this.selector.selectedKeys().iterator();
        while (selectedKeys.hasNext()) {
          SelectionKey key = (SelectionKeyselectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            continue;
          }

          // Check what event is available and deal with it
          if (key.isAcceptable()) {
            this.accept(key);
          else if (key.isReadable()) {
            this.read(key);
          else if (key.isWritable()) {
            this.write(key);
          }
        }
      catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

  private void accept(SelectionKey keythrows IOException {
    // For an accept to be pending the channel must be a server socket channel.
    ServerSocketChannel serverSocketChannel = (ServerSocketChannelkey.channel();

    // Accept the connection and make it non-blocking
    SocketChannel socketChannel = serverSocketChannel.accept();
    Socket socket = socketChannel.socket();
    socketChannel.configureBlocking(false);

    // Register the new SocketChannel with our Selector, indicating
    // we'd like to be notified when there's data waiting to be read
    socketChannel.register(this.selector, SelectionKey.OP_READ);
  }

  private void read(SelectionKey keythrows IOException {
    SocketChannel socketChannel = (SocketChannelkey.channel();

    // Clear out our read buffer so it's ready for new data
    this.readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
      numRead = socketChannel.read(this.readBuffer);
    catch (IOException e) {
      // The remote forcibly closed the connection, cancel
      // the selection key and close the channel.
      key.cancel();
      socketChannel.close();
      return;
    }

    if (numRead == -1) {
      // Remote entity shut the socket down cleanly. Do the
      // same from our end and cancel the channel.
      key.channel().close();
      key.cancel();
      return;
    }

    // Hand the data off to our worker thread
    this.worker.processData(this, socketChannel, this.readBuffer.array(), numRead);
  }

  private void write(SelectionKey keythrows IOException {
    SocketChannel socketChannel = (SocketChannelkey.channel();

    synchronized (this.pendingData) {
      List queue = (Listthis.pendingData.get(socketChannel);

      // Write until there's not more data ...
      while (!queue.isEmpty()) {
        ByteBuffer buf = (ByteBufferqueue.get(0);
        socketChannel.write(buf);
        if (buf.remaining() 0) {
          // ... or the socket's buffer fills up
          break;
        }
        queue.remove(0);
      }

      if (queue.isEmpty()) {
        // We wrote away all data, so we're no longer interested
        // in writing on this socket. Switch back to waiting for
        // data.
        key.interestOps(SelectionKey.OP_READ);
      }
    }
  }

  private Selector initSelector() throws IOException {
    // Create a new selector
    Selector socketSelector = SelectorProvider.provider().openSelector();

    // Create a new non-blocking server socket channel
    this.serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    // Bind the server socket to the specified address and port
    InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
    serverChannel.socket().bind(isa);

    // Register the server socket channel, indicating an interest in 
    // accepting new connections
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);

    return socketSelector;
  }

  public static void main(String[] args) {
    try {
      EchoWorker worker = new EchoWorker();
      new Thread(worker).start();
      new Thread(new NioServer(null, 9090, worker)).start();
    catch (IOException e) {
      e.printStackTrace();
    }
  }
}

------------------------ EchoWorker.java --------------------------------------------------------------


import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;

public class EchoWorker implements Runnable {
  private List queue = new LinkedList();
  
  public void processData(NioServer server, SocketChannel socket, byte[] data, int count) {
    byte[] dataCopy = new byte[count];
    System.arraycopy(data, 0, dataCopy, 0, count);
    synchronized(queue) {
      queue.add(new ServerDataEvent(server, socket, dataCopy));
      queue.notify();
    }
  }
  
  public void run() {
    ServerDataEvent dataEvent;
    
    while(true) {
      // Wait for data to become available
      synchronized(queue) {
        while(queue.isEmpty()) {
          try {
            queue.wait();
          catch (InterruptedException e) {
          }
        }
        dataEvent = (ServerDataEventqueue.remove(0);
      }
      
      // Return to sender
      dataEvent.server.send(dataEvent.socket, dataEvent.data);
    }
  }
}

------------------------------- ServerDataEvent.java ------------------------------------------------


import java.nio.channels.SocketChannel;

class ServerDataEvent {
  public NioServer server;
  public SocketChannel socket;
  public byte[] data;
  
  public ServerDataEvent(NioServer server, SocketChannel socket, byte[] data) {
    this.server = server;
    this.socket = socket;
    this.data = data;
  }
}

-------------------------------- ChangeRequest.java ---------------------------------------------

import java.nio.channels.SocketChannel;

public class ChangeRequest {
  public static final int REGISTER = 1;
  public static final int CHANGEOPS = 2;
  
  public SocketChannel socket;
  public int type;
  public int ops;
  
  public ChangeRequest(SocketChannel socket, int type, int ops) {
    this.socket = socket;
    this.type = type;
    this.ops = ops;
  }
}

------------------------------------- NioClient.java ---------------------------------------------

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;

public class NioClient implements Runnable {
  // The host:port combination to connect to
  private InetAddress hostAddress;
  private int port;

  // The selector we'll be monitoring
  private Selector selector;

  // The buffer into which we'll read data when it's available
  private ByteBuffer readBuffer = ByteBuffer.allocate(8192);

  // A list of PendingChange instances
  private List pendingChanges = new LinkedList();

  // Maps a SocketChannel to a list of ByteBuffer instances
  private Map pendingData = new HashMap();
  
  // Maps a SocketChannel to a RspHandler
  private Map rspHandlers = Collections.synchronizedMap(new HashMap());
  
  public NioClient(InetAddress hostAddress, int portthrows IOException {
    this.hostAddress = hostAddress;
    this.port = port;
    this.selector = this.initSelector();
  }

  public void send(byte[] data, RspHandler handlerthrows IOException {
    // Start a new connection
    SocketChannel socket = this.initiateConnection();
    
    // Register the response handler
    this.rspHandlers.put(socket, handler);
    
    // And queue the data we want written
    synchronized (this.pendingData) {
      List queue = (Listthis.pendingData.get(socket);
      if (queue == null) {
        queue = new ArrayList();
        this.pendingData.put(socket, queue);
      }
      queue.add(ByteBuffer.wrap(data));
    }

    // Finally, wake up our selecting thread so it can make the required changes
    this.selector.wakeup();
  }

  public void run() {
    while (true) {
      try {
        // Process any pending changes
        synchronized (this.pendingChanges) {
          Iterator changes = this.pendingChanges.iterator();
          while (changes.hasNext()) {
            ChangeRequest change = (ChangeRequestchanges.next();
            switch (change.type) {
            case ChangeRequest.CHANGEOPS:
              SelectionKey key = change.socket.keyFor(this.selector);
              key.interestOps(change.ops);
              break;
            case ChangeRequest.REGISTER:
              change.socket.register(this.selector, change.ops);
              break;
            }
          }
          this.pendingChanges.clear();
        }

        // Wait for an event one of the registered channels
        this.selector.select();

        // Iterate over the set of keys for which events are available
        Iterator selectedKeys = this.selector.selectedKeys().iterator();
        while (selectedKeys.hasNext()) {
          SelectionKey key = (SelectionKeyselectedKeys.next();
          selectedKeys.remove();

          if (!key.isValid()) {
            continue;
          }

          // Check what event is available and deal with it
          if (key.isConnectable()) {
            this.finishConnection(key);
          else if (key.isReadable()) {
            this.read(key);
          else if (key.isWritable()) {
            this.write(key);
          }
        }
      catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

  private void read(SelectionKey keythrows IOException {
    SocketChannel socketChannel = (SocketChannelkey.channel();

    // Clear out our read buffer so it's ready for new data
    this.readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
      numRead = socketChannel.read(this.readBuffer);
    catch (IOException e) {
      // The remote forcibly closed the connection, cancel
      // the selection key and close the channel.
      key.cancel();
      socketChannel.close();
      return;
    }

    if (numRead == -1) {
      // Remote entity shut the socket down cleanly. Do the
      // same from our end and cancel the channel.
      key.channel().close();
      key.cancel();
      return;
    }

    // Handle the response
    this.handleResponse(socketChannel, this.readBuffer.array(), numRead);
  }

  private void handleResponse(SocketChannel socketChannel, byte[] data, int numReadthrows IOException {
    // Make a correctly sized copy of the data before handing it
    // to the client
    byte[] rspData = new byte[numRead];
    System.arraycopy(data, 0, rspData, 0, numRead);
    
    // Look up the handler for this channel
    RspHandler handler = (RspHandlerthis.rspHandlers.get(socketChannel);
    
    // And pass the response to it
    if (handler.handleResponse(rspData)) {
      // The handler has seen enough, close the connection
      socketChannel.close();
      socketChannel.keyFor(this.selector).cancel();
    }
  }

  private void write(SelectionKey keythrows IOException {
    SocketChannel socketChannel = (SocketChannelkey.channel();

    synchronized (this.pendingData) {
      List queue = (Listthis.pendingData.get(socketChannel);

      // Write until there's not more data ...
      while (!queue.isEmpty()) {
        ByteBuffer buf = (ByteBufferqueue.get(0);
        socketChannel.write(buf);
        if (buf.remaining() 0) {
          // ... or the socket's buffer fills up
          break;
        }
        queue.remove(0);
      }

      if (queue.isEmpty()) {
        // We wrote away all data, so we're no longer interested
        // in writing on this socket. Switch back to waiting for
        // data.
        key.interestOps(SelectionKey.OP_READ);
      }
    }
  }

  private void finishConnection(SelectionKey keythrows IOException {
    SocketChannel socketChannel = (SocketChannelkey.channel();
  
    // Finish the connection. If the connection operation failed
    // this will raise an IOException.
    try {
      socketChannel.finishConnect();
    catch (IOException e) {
      // Cancel the channel's registration with our selector
      System.out.println(e);
      key.cancel();
      return;
    }
  
    // Register an interest in writing on this channel
    key.interestOps(SelectionKey.OP_WRITE);
  }

  private SocketChannel initiateConnection() throws IOException {
    // Create a non-blocking socket channel
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
  
    // Kick off connection establishment
    socketChannel.connect(new InetSocketAddress(this.hostAddress, this.port));
  
    // Queue a channel registration since the caller is not the 
    // selecting thread. As part of the registration we'll register
    // an interest in connection events. These are raised when a channel
    // is ready to complete connection establishment.
    synchronized(this.pendingChanges) {
      this.pendingChanges.add(new ChangeRequest(socketChannel, ChangeRequest.REGISTER, SelectionKey.OP_CONNECT));
    }
    
    return socketChannel;
  }

  private Selector initSelector() throws IOException {
    // Create a new selector
    return SelectorProvider.provider().openSelector();
  }

  public static void main(String[] args) {
    try {
      NioClient client = new NioClient(InetAddress.getByName("localhost"), 9090);
      Thread t = new Thread(client);
      t.setDaemon(true);
      t.start();
      RspHandler handler = new RspHandler();
      client.send("Hello World".getBytes(), handler);
      handler.waitForResponse();
    catch (Exception e) {
      e.printStackTrace();
    }
  }
}

-------------------------------- RspHandler.java --------------------------------------------------------

public class RspHandler {
  private byte[] rsp = null;
  
  public synchronized boolean handleResponse(byte[] rsp) {
    this.rsp = rsp;
    this.notify();
    return true;
  }
  
  public synchronized void waitForResponse() {
    while(this.rsp == null) {
      try {
        this.wait();
      catch (InterruptedException e) {
      }
    }
    
    System.out.println(new String(this.rsp));
  }
}



References.

The list of classes which were used on this page you can find below. The links to Java API contain official SUN documentation about all used classes.




[ Go Back ]



Home Code Examples Java Forum All Java Tips Books Submit News, Code... Search... Offshore Software Tech Doodling

RSS feed Java FAQ RSS feed Java FAQ News     

    RSS feed Java Forums RSS feed Java Forums

All logos and trademarks in this site are property of their respective owner. The comments are property of their posters, all the rest 1999-2006 by Java FAQs Daily Tips.

Interactive software released under GNU GPL, Code Credits, Privacy Policy