Java NIO Code Example
Main concepts in Java NIO
Buffer and ByteBuffer
A Buffer object is a container for a fixed amount of data.
Flipping
Flipping is a very important operation on buffer.
The flip() method flips a buffer from a fill state to a drain state ready for elements to be read/written out.
Clear
Once a buffer has been filled and drained, it can be reused. The clear( ) method resets a buffer to an empty state.
Buffers are reusable and can be long-lived.
Channels
A Channel is a conduit that transports data efficiently between byte buffers and the entity on the other end of the channel.
Channels operate only on byte buffers, because os do low-level I/O in terms of bytes.
Channels can operate in blocking or non-blocking modes.
FileChannel,SocketChannel,ServerSocketChannel,and DatagramChannel.
Selectors
The Selector object performs readiness selection of channels registered with it and manages selection keys. It enables multiplexed I/O. The task to check readiness is delegated to the operating system.
Selector
The Selector class manages information about a set of registered channels and their readiness states. Using Selector, user can select a set of keys whose corresponding channels are ready for I/O operations.
SelectableChannel
SelectableChannel objects can be registered with Selector objects, along with an indication of which operations on that channel are of interest for that selector.
A channel must first be placed in non-blocking mode before it can be registered with a selector.
SelectionKey
A SelectionKey encapsulates the registration relationship between a specific channel and a specific selector.
Features
1. Using the new NIO classes, one or a few threads can manage hundreds or even thousands of active socket connections with little or no performance loss.
2. Use os native I/O services and system call.
3. Support non-blocking operations (connect, read and write).
Usage Patten
Use one selector for all selectable channels and delegate the servicing of ready channels to other threads.
Code Example
The following code demonstrates how to use NIO, and follows the pattern described before, the client sends messages to server; when the server received the message, it echoes the message back to client.
Resources package nio.example; public class NIoServer extends Thread { private InetAddress hostAddress; private int port; private Executor exec; private ServerSocketChannel serverChannel; private Selector selector; // The buffer into which we'll read data when it's available // private ByteBuffer readBuffer = ByteBuffer.allocate(8192); public static int PORT_NUMBER = 1234; private static final int NTHREADS = 5; private static final long TIME_OUT = 5000; public static void main(String[] argv) throws Exception { int port = PORT_NUMBER; if (argv.length > 0) { // override default listen port port = Integer.parseInt(argv[0]); } NIoServer server = new NIoServer(InetAddress.getLocalHost(), port); server.start(); server.join(); } public NIoServer(InetAddress hostAddress, int port) throws IOException { this.hostAddress = hostAddress; this.port = port; this.exec = Executors.newFixedThreadPool(NTHREADS); // Create a new non-blocking server socket channel this.serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); // Bind the server socket to the specified address and port InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port); serverChannel.socket().bind(isa); // Create a new selector selector = Selector.open(); // Register the server socket channel, indicating an interest in // accepting new connections serverChannel.register(selector, SelectionKey.OP_ACCEPT); } @Override public void run() { while (true) { try { // this may block for a long time, upon return the // selected set contains keys of the ready channels int n = selector.select(TIME_OUT); if (n == 0) { continue; // nothing to do } // get an iterator over the set of selected keys Iteratorit = selector.selectedKeys().iterator(); // look at each key in the selected set while (it.hasNext()) { SelectionKey key = it.next(); if (!key.isValid()) { continue; } // when a new connection is coming if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key .channel(); SocketChannel channel = server.accept(); registerChannel(selector, channel, SelectionKey.OP_READ); } // is there data to read on this channel? if (key.isReadable()) { processData(key); } // remove key from selected set, it's been handled it.remove(); } } catch (Exception e) { e.printStackTrace(); } } } /** * Register the given channel with the given selector for the given * operations of interest */ protected void registerChannel(Selector selector, SelectableChannel channel, int ops) throws Exception { if (channel == null) { return; // could happen } // set the new channel non-blocking channel.configureBlocking(false); // register it with the selector channel.register(selector, ops); } protected void processData(SelectionKey key) throws Exception { WorkerThread worker = new WorkerThread(key); exec.execute(worker); } private class WorkerThread implements Runnable { private ByteBuffer buffer; private SelectionKey key; public WorkerThread(SelectionKey key) { this.key = key; buffer = ByteBuffer.allocate(8192); } // loop forever waiting for work to do public synchronized void run() { if (key == null) { return; // just in case } try { process(key); } catch (Exception e) { e.printStackTrace(); try { key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } key.selector().wakeup(); } key = null; } void process(SelectionKey key) throws Exception { SocketChannel channel = (SocketChannel) key.channel(); int count, totalRead = 0; buffer.clear(); // make buffer empty // loop while data available, channel is non-blocking while ((count = channel.read(buffer)) > 0) { totalRead += count; } if (count < 0) { // close channel on EOF, invalidates the key channel.close(); return; } if (totalRead > 0) { buffer.flip(); // make buffer readable byte[] receivedData = new byte[totalRead]; System.arraycopy(buffer.array(), 0, receivedData, 0, totalRead); String receivedStr = new String(receivedData); System.err.println("Server read " + receivedStr + " at " + new Date()); // simulate long time operation try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } // after drain data, send a hello message back echo(channel, receivedStr); // When a worker // thread has finished servicing the channel, it will again // update // the key's interest set to reassert // an interest in read-readiness. It also does an explicit // wakeup( ) // on the selector. If the main // thread is blocked in select( ), this causes it to resume. // resume interest in OP_READ key.interestOps(key.interestOps() | SelectionKey.OP_READ); // cycle the selector so this key is active again key.selector().wakeup(); } } private void echo(SocketChannel channel, String echo) throws Exception { buffer.clear(); buffer.put(echo.getBytes()); buffer.flip(); channel.write(buffer); System.err.println("Server send " + echo + " at " + new Date()); } } } package nio.example; /** * After connected, write a hello message to server, whenever read a message * from server, write back a hello message. */ public class NIoClient implements Runnable { private InetAddress hostAddress; private int port; private Selector selector; private SocketChannel socketChannel; public static int PORT_NUMBER = 1234; private static final long TIME_OUT = 5000; private static final String SAY_HELLO = "Ni hao!"; private ByteBuffer readBuffer = ByteBuffer.allocate(8192); public NIoClient(InetAddress hostAddress, int port) throws IOException { this.hostAddress = hostAddress; this.port = port; this.selector = Selector.open(); } public SocketChannel createSocketChannel() throws IOException { // create a socket channel, after connected, set it to non-blocking // model socketChannel = SocketChannel.open(new InetSocketAddress(hostAddress, port)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); // |SelectionKey.OP_WRITE return socketChannel; } public void run() { try { createSocketChannel(); sayHello(socketChannel); } catch (IOException e1) { e1.printStackTrace(); throw new RuntimeException(e1); } try { while (true) { // Wait for an event one of the registered channels int n = this.selector.select(TIME_OUT); if (n == 0) { continue; // nothing to do } // Iterate over the set of keys for which events are available Iterator selectedKeys = this.selector .selectedKeys().iterator(); while (selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue; } if (key.isReadable()) { this.processData(key); } else if (key.isWritable()) { sayHello((SocketChannel) key.channel()); } } } } catch (IOException e) { e.printStackTrace(); } finally { try { socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } } System.err.println("client exits"); } private void sayHello(SocketChannel channel) throws IOException { this.readBuffer.clear(); readBuffer.put(SAY_HELLO.getBytes()); // The flip() method flips a buffer from a fill state to a drain state // ready for elements to be read/written // out. readBuffer.flip(); channel.write(readBuffer); System.err.println("Client send " + SAY_HELLO + " at " + new Date()); channel.keyFor(selector).interestOps(SelectionKey.OP_READ); // now I am ready to read data. } private void processData(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); // Clear out our read buffer so it's ready for new data this.readBuffer.clear(); // Attempt to read off the channel int numRead, totalRead = 0; while ((numRead = socketChannel.read(this.readBuffer)) > 0) { totalRead += numRead; } if (numRead == -1) { key.cancel(); throw new EOFException(); } readBuffer.flip(); byte[] receivedData = new byte[totalRead]; System.arraycopy(readBuffer.array(), 0, receivedData, 0, totalRead); // Handle the response System.err.println("Client read " + new String(new String(receivedData)) + " at " + new Date()); // simulate long time operation try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } key.interestOps(SelectionKey.OP_WRITE); // now I am ready to write data. //sayHello(socketChannel); } public static void main(String[] args) { try { NIoClient client = new NIoClient(InetAddress.getLocalHost(), PORT_NUMBER); Thread t = new Thread(client); t.start(); t.join(); } catch (Exception e) { e.printStackTrace(); } } }
Program output:
NIoClient:
Client read Ni hao! at Wed May 11 20:51:02 CST 2011
Client send Ni hao! at Wed May 11 20:51:12 CST 2011
Client read Ni hao! at Wed May 11 20:51:22 CST 2011
Client send Ni hao! at Wed May 11 20:51:32 CST 2011
NIoServer:
Server send Ni hao! at Wed May 11 20:51:02 CST 2011
Server read Ni hao! at Wed May 11 20:51:12 CST 2011
Server send Ni hao! at Wed May 11 20:51:22 CST 2011
Server read Ni hao! at Wed May 11 20:51:32 CST 2011
FileChannel Example:
package org.codeexample.nio;
public class FileChannelExamle {
public static final int BUFFER_SIZE = 4096;
/**
* @return the number of bytes copied
* @throws IOException
*/
public static long copy(File inFile, File outFile) throws IOException {
FileChannel inChannel = null;
FileChannel outChannel = null;
long totoalSize = 0;
try {
inChannel = new FileInputStream(inFile).getChannel();
outChannel = new FileOutputStream(outFile).getChannel();
// return sourceChannel.transferTo(0, sourceChannel
// .size(), targetChannel);
// return targetChannel.transferFrom(sourceChannel, 0, sourceChannel
// .size());
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
long size = 0;
while ((size = inChannel.read(buffer)) != -1) {
totoalSize += size;
buffer.flip();
while (buffer.hasRemaining()) {
outChannel.write(buffer);
}
buffer.clear();
}
return totoalSize;
} finally {
if (inChannel != null) {
try {
inChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outChannel != null) {
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static String read(File inFile) throws IOException {
FileChannel inChannel = null;
StringBuffer sb = new StringBuffer(4096);
try {
inChannel = new FileInputStream(inFile).getChannel();
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
int size = 0;
while ((size = inChannel.read(buffer)) != -1) {
byte[] receivedData = new byte[size];
System.arraycopy(buffer.array(), 0, receivedData, 0, size);
String text = new String(receivedData);
sb.append(text);
buffer.clear();
}
} finally {
if (inChannel != null) {
try {
inChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return sb.toString();
}
public static void write(String text, String encoding, File outFile)
throws IOException {
FileChannel outChannel = null;
try {
outChannel = new FileOutputStream(outFile).getChannel();
outChannel.write(ByteBuffer.wrap(text.getBytes(encoding)));
} finally {
if (outChannel != null) {
try {
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
Other NIO code examples