3
回答
健壮的、便捷的、异步的SocketChannel实现
终于搞明白,存储TCO原来是这样算的>>>   
Socket通信比较常见的问题有如下几种:
1、设置收发超时;
2、正确的每一个bit的收发;
3、物理线路故障的保护;
4、始终能正常工作;
5、尽量少占系统资源;
n、……
而Socket编程有一个共性,尽管100个人可能会写出1000种实现,但做的事情却只有一种,就是: 通信
为此,通过学习dnsjava的通信代码,加上自己在一些项目中的实践,现在给出TCP通信的例子实现如下,希望能够给想偷懒的人一个简单的解决方案。
本方案在正常的局域网连接中测试过几百万次没什么问题。缺乏更艰苦的环境,所以如果使用这些代码发生任何风险的话……
(TcpChannel代码为Brian Wellington所做,原名为TCPClient,经本人稍作改动)
// Copyright (c) 2005 Brian Wellington (bwelling@xbill.org)

package asynchronizedchannel;

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;

final class TcpChannel
{
    private long endTime;
    private SelectionKey key;

    public TcpChannel(SelectableChannel channel, long endTime, int op) throws IOException
    {
        boolean done = false;
        Selector selector = null;
        this.endTime = endTime;
        try {
            selector = Selector.open();
            channel.configureBlocking(false);
            key = channel.register(selector, op);
            done = true;
        } finally {
            if (!done && selector != null) {
                selector.close();
            }
            if (!done) {
                channel.close();
            }
        }
    }

    static void blockUntil(SelectionKey key, long endTime) throws IOException
    {
        long timeout = endTime - System.currentTimeMillis();
        int nkeys = 0;
        if (timeout > 0) {
            nkeys = key.selector().select(timeout);
        } else if (timeout == 0) {
            nkeys = key.selector().selectNow();
        }
        if (nkeys == 0) {
            throw new SocketTimeoutException();
        }
    }

    void cleanup()
    {
        try {
            key.selector().close();
            key.channel().close();
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    void bind(SocketAddress addr) throws IOException
    {
        SocketChannel channel = (SocketChannel) key.channel();
        channel.socket().bind(addr);
    }

    void connect(SocketAddress addr) throws IOException
    {
        SocketChannel channel = (SocketChannel) key.channel();

        key.interestOps(key.interestOps() | SelectionKey.OP_CONNECT);

        try {
            if (!key.isConnectable()) {
                blockUntil(key, endTime);
            }
            if (!channel.connect(addr) && !channel.finishConnect()) {
                throw new ConnectException();
            }
        } finally {
            if (key.isValid()) {
                key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
            }
        }
    }

    void send(ByteBuffer buffer) throws IOException
    {
        Send.operate(key, buffer, endTime);
    }

    void recv(ByteBuffer buffer) throws IOException
    {
        Recv.operate(key, buffer, endTime);
    }
}

interface Operator
{
    class Operation
    {
        static void operate(final int op, final SelectionKey key, final ByteBuffer buffer, final long endTime, final Operator optr) throws IOException
        {
            final SocketChannel channel = (SocketChannel) key.channel();
            final int total = buffer.capacity();
            key.interestOps(op);
            try {
                while (buffer.position() < total) {
                    if (System.currentTimeMillis() > endTime) {
                        throw new SocketTimeoutException();
                    }
                    if ((key.readyOps() & op) != 0) {
                        if (optr.io(channel, buffer) < 0) {
                            throw new EOFException();
                        }
                    } else {
                        TcpChannel.blockUntil(key, endTime);
                    }
                }
            } finally {
                if (key.isValid()) {
                    key.interestOps(0);
                }
            }
        }
    }

    int io(SocketChannel channel, ByteBuffer buffer) throws IOException;
}
class Send implements Operator
{
    public int io(SocketChannel channel, ByteBuffer buffer) throws IOException
    {
        return channel.write(buffer);
    }
    public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException
    {
        Operation.operate(SelectionKey.OP_WRITE, key, buffer, endTime, operator);
    }
    public static final Send operator = new Send();
}

class Recv implements Operator
{
    public int io(SocketChannel channel, ByteBuffer buffer) throws IOException
    {
        return channel.read(buffer);
    }
    
    public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException
    {
        Operation.operate(SelectionKey.OP_READ, key, buffer, endTime, operator);
    }
    public static final Recv operator = new Recv();
}

使用演示见以下代码。
大致说明一下,Server端开5656侦听,Client端开若干线程测试Socket通信。每次发送240字节信息+16字节MD5校验。服务端收到信息之后做MD5检查,正确的,发送“.xxxx”表示认可,否则发送“?xxxx”表示故障。
正式应用中可以再设置tryout尝试n次。
Server端,代码演示:

package asynchronizedchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.MessageDigest;
import java.util.Iterator;

public class Server
{

    /**
     * 服务端通信范例程序主函数
     * 
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException
    {
        // Create the selector
        final Selector selector = Selector.open();
        final ServerSocketChannel server = ServerSocketChannel.open();
        server.configureBlocking(false);
        server.socket().bind(new InetSocketAddress("xx.xx.xx.xx", 5656), 5);
        // Register both channels with selector
        server.register(selector, SelectionKey.OP_ACCEPT);
        new Thread(new Daemon(selector)).start();
    }
}

class Daemon implements Runnable
{
    private final Selector selector;

    Daemon(Selector selector)
    {
        this.selector = selector;
    }

    public void run()
    {
        while (true) {
            try {
                // Wait for an event
                selector.select();

                // Get list of selection keys with pending events
                Iterator<SelectionKey> it = selector.selectedKeys().iterator();

                // Process each key
                while (it.hasNext()) {
                    // Get the selection key
                    SelectionKey selKey = it.next();

                    // Remove it from the list to indicate that it is being processed
                    it.remove();

                    // Check if it's a connection request
                    if (selKey.isAcceptable()) {
                        // Get channel with connection request
                        ServerSocketChannel server = (ServerSocketChannel) selKey.channel();
                        // Accept the connection request.
                        // If serverSocketChannel is blocking, this method blocks.
                        // The returned channel is in blocking mode.
                        SocketChannel channel = server.accept();

                        // If serverSocketChannel is non-blocking, sChannel may be null
                        if (channel != null) {
                            // Use the socket channel to communicate with the client
                            new Thread(new ServerHandler(channel)).start();
                        } else {
                            System.out.println("---No Connection---");
                            // There were no pending connection requests; try again later.
                            // To be notified of connection requests,
                        }
                    }
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
    }
}

class ServerHandler implements Runnable
{
    private static final long timeout = 30 * 1000; // 设置超时时间为30秒
    private static int counter = 0;
    private final TcpChannel channel;
    private final MessageDigest md;

    ServerHandler(SocketChannel channel) throws Exception
    {
        this.channel = new TcpChannel(channel, System.currentTimeMillis() + timeout, SelectionKey.OP_READ);
        md = MessageDigest.getInstance("md5");
    }

    public void run()
    {
        try {
            while (true) {
                work();
                synchronized (ServerHandler.class) {
                    if ((++counter & 65535) == 0) {
                        System.out.println(counter);
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            channel.cleanup();
        }
    }

    private void work() throws IOException
    { // 模拟工作流程
        byte[] cache = new byte[256], reply = new byte[5];
        read(cache, reply);
    }

    private void read(byte[] cache, byte[] reply) throws IOException
    { // 从套接字读入数据
        channel.recv(ByteBuffer.wrap(cache));
        md.reset();
        md.update(cache, 0, 240);
        byte[] md5 = md.digest(); // 使用前240字节产生MD5校验码
        if (!ExtArrays.partialEquals(md5, 0, cache, 240, 16)) { // 与后16字节比较
            reply[0] = '?';
            System.out.println("MISMATCH!");
        } else {
            reply[0] = '.';
        }
        channel.send(ByteBuffer.wrap(reply)); // 返回接收结果
    }
}

final class ExtArrays
{
    private ExtArrays()
    {
    }

    public static boolean partialEquals(byte[] a, int offset_a, byte[] b, int offset_b, int len)
    { // 字节数组的部分比较
        if (a == null || b == null) {
            return false;
        }
        if (offset_a + len > a.length || offset_b + len > b.length) {
            return false;
        }
        for (int i = offset_a, j = offset_b, k = len; k > 0; i++, j++, k--) {
            if (a[i] != b[j]) {
                return false;
            }
        }
        return true;
    }
}

Client端,代码演示:
package asynchronizedchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.DigestException;
import java.security.MessageDigest;
import java.util.Random;

public class Client
{
    private static int id = 0;
    /**
     * 客户端通信范例程序主函数
     * 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception
    {
        new Thread(new ClientHandler(id++)).start();
        new Thread(new ClientHandler(id++)).start();
        new Thread(new ClientHandler(id++)).start();
        new Thread(new ClientHandler(id++)).start();
        new Thread(new ClientHandler(id++)).start();
    }

}

class ClientHandler implements Runnable
{
    private static final long timeout = 30 * 1000; // 设置超时时间为30秒
    private final TcpChannel channel;
    
    private final int id;

    private final MessageDigest md;
    private final Random rand;

    ClientHandler(int id) throws Exception
    {
        this.id = id;
        channel = new TcpChannel(SocketChannel.open(), System.currentTimeMillis() + timeout, SelectionKey.OP_WRITE);
        md = MessageDigest.getInstance("md5");
        rand = new Random();
    }

    @Override
    public void run()
    {
        try {
            channel.connect(new InetSocketAddress("xx.xx.xx.xx", 5656));
            int i = 0;
            while (true) {
                work();
                if ((++i & 16383) == 0) {
                    System.out.println(String.format("client(%1$d): %2$d", id, i));
                }
                Thread.yield();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            channel.cleanup();
        }
    }

    private void work() throws IOException, DigestException
    {
        byte[] cache = new byte[256], reply = new byte[5];
        write(cache, reply);
    }

    private void write(byte[] cache, byte[] reply) throws DigestException, IOException
    {
        rand.nextBytes(cache); // 只用前面的240字节
        md.reset();
        md.update(cache, 0, 240);
        md.digest(cache, 240, 16); // MD5校验码占后面16字节
        ByteBuffer buffer = ByteBuffer.wrap(cache);
        channel.send(buffer);
        buffer = ByteBuffer.wrap(reply);
        channel.recv(buffer);
        if (reply[0] != '.') { // 若接收的结果不正确,可以考虑尝试再次发送
            System.out.println("MISMATCH!");
        }
    }
}

重点说明:
发多少,收多少。要么固定发送和接收的字节数,要么在发送的时候带有发送字节数的信息,接收的时候根据该信息接收完整然后再处理。

顶部