Java AIO

0

感觉上NIO开发要比AIO简单一点。

两者区别:http://bbym010.iteye.com/blog/2100868
代码参考:http://yunhaifeiwu.iteye.com/blog/1714664

上面文章的代码已经非常好了,注释也写的非常好,这里需要补充一些需要注意的地方:

解码

attachment.flip();
CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
decoder.onMalformedInput(CodingErrorAction.IGNORE); // 注意
content = decoder.decode(attachment).toString();
attachment.compact();

连接

socket.connect(new InetSocketAddress("localhost", 8888), socket, new ConnectHandler());

这里注意加上IP,不然连接打不开。

完整代码

Server

package com.demo.aio;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class Server {
 
    public void server() throws IOException {
        ExecutorService executor = Executors.newFixedThreadPool(20);
        AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor);
        AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(asyncChannelGroup).bind(new InetSocketAddress("localhost", 8888));
        channel.accept(channel, new AcceptHandler());
    }
     
    private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
         
        @Override
        public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
            attachment.accept(attachment, this);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            result.read(buffer, buffer, new ReaderHandler(result));
        }
 
        @Override
        public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
        }
         
    }
     
    private class ReaderHandler implements CompletionHandler<Integer, ByteBuffer> {
 
        private AsynchronousSocketChannel socket;
         
        public ReaderHandler(AsynchronousSocketChannel socket) {
            this.socket = socket;
        }
         
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            if(result > 0) {
                String content = null;
                try {
                    attachment.flip();
                    CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
//                  decoder.onMalformedInput(CodingErrorAction.IGNORE);
                    content = decoder.decode(attachment).toString();
                    attachment.compact();
                    System.out.println("收到客户端消息:" + content);
                } catch (CharacterCodingException e) {
                    e.printStackTrace();
                }
                socket.read(attachment, attachment, this);
                ByteBuffer client = ByteBuffer.wrap(("服务器回复消息:" + content).getBytes());
                socket.write(client, client, new WriterHandler(socket));
            } else if(result == 0) {
                System.out.println("空消息");
            } else {
                attachment = null;
                System.out.println("断开");
            }
        }
 
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
        }
         
    }
     
    private class WriterHandler implements CompletionHandler<Integer, ByteBuffer> {
         
        private AsynchronousSocketChannel socket;
         
        public WriterHandler(AsynchronousSocketChannel socket) {
            this.socket = socket;
        }
         
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            if(result > 0) {
                socket.write(attachment, attachment, this);
            } else if(result == 0) {
                System.out.println("空消息");
            } else {
                attachment = null;
                System.out.println("断开");
            }
        }
         
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
        }
         
    }
     
    public static void main(String[] args) throws IOException, InterruptedException {
        new Server().server();
        Thread.sleep(Integer.MAX_VALUE);
    }
     
}

Client

package com.demo.aio;
 
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class Client {
 
    private AsynchronousSocketChannel socket = null;
     
    public void client() throws IOException {
        ExecutorService executor = Executors.newFixedThreadPool(20);
        AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor);
        socket = AsynchronousSocketChannel.open(asyncChannelGroup);
        socket.setOption(StandardSocketOptions.TCP_NODELAY, true);
        socket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
        socket.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
        socket.connect(new InetSocketAddress("localhost", 8888), socket, new ConnectHandler()); // 注意localhost
    }
 
    private class ConnectHandler implements CompletionHandler<Void, AsynchronousSocketChannel> {
 
        @Override
        public void completed(Void result, AsynchronousSocketChannel attachment) {
            socket.write(ByteBuffer.wrap("客户端开始连接".getBytes()));
            ByteBuffer clientBuffer = ByteBuffer.allocate(1024);
            attachment.read(clientBuffer, clientBuffer, new ReaderHandler(attachment));
        }
 
        @Override
        public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
        }
         
    }
     
    private class ReaderHandler implements CompletionHandler<Integer, ByteBuffer> {
 
        private AsynchronousSocketChannel socket;
         
        public ReaderHandler(AsynchronousSocketChannel socket) {
            this.socket = socket;
        }
         
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            if(result > 0) {
                String content = null;
                try {
                    attachment.flip();
                    content = Charset.forName("UTF-8").newDecoder().decode(attachment).toString();
                    attachment.compact();
                } catch (CharacterCodingException e) {
                    e.printStackTrace();
                }
                System.out.println(content);
                socket.read(attachment, attachment, this);
            } else if(result == 0) {
                System.out.println("空消息");
            } else {
                attachment = null;
                System.out.println("断开");
            }
        }
 
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
        }
         
    }
     
    public void send() throws UnsupportedEncodingException {
        Scanner scanner = new Scanner(System.in);
        String tmp = null;
        while((tmp = scanner.next()) != null) {
            ByteBuffer buffer = ByteBuffer.wrap(tmp.getBytes("utf-8"));
            System.out.println("客户端发出信息:" + tmp);
            socket.write(buffer, buffer, new SenderHandler(socket));
        }
    }
     
    private class SenderHandler implements CompletionHandler<Integer, ByteBuffer> {
 
        private AsynchronousSocketChannel socket;
         
        public SenderHandler(AsynchronousSocketChannel socket) {
            this.socket = socket;
        }
         
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            if(result > 0)
                socket.write(attachment, attachment, this);
            else
                attachment = null;
        }
 
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
        }
         
    }
     
    public static void main(String[] args) throws IOException {
        Client client = new Client();
        client.client();
        client.send();
    }
     
}