Java用NIO写一个通讯服务端

Home / Article MrLee 2014-12-19 2967

Java NIO
Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。
Java NIO出现不只是一个技术性能的提高,你会发现网络上到处在介绍它,因为它具有里程碑意义,从JDK1.4开始,Java开始提高性能相关的功能,从而使得Java在底层或者并行分布式计算等操作上已经可以和C或Perl等语言并驾齐驱。
如果你至今还是在怀疑Java的性能,说明你的思想和观念已经完全落伍了,Java一两年就应该用新的名词来定义。从JDK1.5开始又要提供关于线程、并发等新性能的支持,Java应用在游戏等适时领域方面的机会已经成熟,Java在稳定自己中间件地位后,开始蚕食传统C的领域。那么今天就用Java NIO写一个服务端。
线程池 ThreadPoolExecutor:
ThreadPoolExecutor是一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。 线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
数据库连接池ComboPooledDataSource:
关于数据库连接池的使用,首先我们要明白我们为什么要用它,对应普通的数据库连接操作,通常会涉及到以下一些操作是比较耗时的: 网络通讯,涉及到网络延时及协议通讯 身份验证,涉及安全性检查 连接合法性检查,主要是检查所连接的数据库是否存在 并发控制机制 构造并初始化输出缓冲区 连接成功后的信息保存,日志存储 服务器性能 数据库配置优化 系统分配内存资源 等等~~~状况,导致数据库连接操作比较耗时,~~~而且每次都得花费0.05s~1s的时间 但是使用连接池技术,本质上就是在一个请求对应的连接,都由一个线程池来维护着,也就是说“上下文切换”的代价是线程级别(所谓的纳秒级),对于大规模的并发访问,就算以每秒几亿级别的访问量都是不成问题的。
上代码
SocketConnectThread 类负责处理客户端的连接,然后分发读取数据任务
package com.soft.nio.connector.socket;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.soft.nio.connector.core.NetTool;
import com.soft.nio.connector.core.ThreadPool;
/**
 * 监听连接线程
 *
 * @author leehom
 *
 */
public class SocketConnectThread implements Runnable {
	/** selector专门用来监听client连接 **/
	protected Selector selector;
	protected SocketWorkThread workThread;
	private ServerSocketChannel server;
	private static SocketConnectThread socketConnectThread;
	private boolean stop = false;
	public static final int BLOCK = 3;// 100KB数据,多了自动断开连接
	private static Map<String, Object> black_map;
	private SocketConnectThread(int port) {
		super();
		// TODO Auto-generated constructor stub
		// 初始大小10000
		black_map = new HashMap<String, Object>(10000);
		try {
			// 打开一个选择器
			selector = Selector.open();
			// 打开服务器套接字通道
			server = ServerSocketChannel.open();
			// 启用/禁用 SO_REUSEADDR 套接字选项
			server.socket().setReuseAddress(true);
			// 调整此通道的阻塞模式
			server.configureBlocking(false);
			// 将 ServerSocket 绑定到特定地址
			server.socket().bind(new InetSocketAddress(port));
			// 向给定的选择器注册此通道,返回一个选择键
			server.register(selector, SelectionKey.OP_ACCEPT);
			workThread = new SocketWorkThread();
			ThreadPool.getInstance().execute(this);
			System.out.println("open socket server->port:" + port);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	public static SocketConnectThread newInstance(int port) {
		if (socketConnectThread == null)
			socketConnectThread = new SocketConnectThread(port);
		return socketConnectThread;
	}
	public static SocketConnectThread getInstance() {
		return socketConnectThread;
	}
	public static boolean inBlackList(String ip) {
		return black_map.containsKey(ip);
	}
	public static void addBlackList(String ip) {
		black_map.put(ip, null);
	}
	public static void resetBlackList() {
		// 重黑黑名单
		System.out.println("重黑黑名单");
		black_map.clear();
		try {
			List<String> list = NetTool.readBlackList();
			for (int i = 0; i < list.size(); i++) {
				String ip = list.get(i);
				addBlackList(ip);
				System.out.println("读取黑名单一条:" + ip);
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	@Override
	public void run() {
		// TODO Auto-generated method stub
		try {
			processConnector();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	public void stop() {
		selector.wakeup();
		stop = true;
		workThread.stop();
	}
	/** 等待客户端连接 **/
	private void processConnector() throws IOException {
		// 选择ACCEPT连接一组键
		ServerSocketChannel server = null;
		SocketChannel client = null;
		while (!stop && selector.select() > 0) {
			// 返回此选择器的已选择键集
			Set<SelectionKey> set = selector.selectedKeys();
			Iterator<SelectionKey> keys = set.iterator();
			while (keys.hasNext()) {
				SelectionKey key = keys.next();
				// 置空迭代器
				keys.remove();
				// 测试此键的通道是否已准备好接受新的套接字连接
				try {
					if (key.isAcceptable()) {
						// 返回为之创建此键的通道
						server = (ServerSocketChannel) key.channel();
						// 接受到此通道套接字的连接
						client = server.accept();
						String ip = client.socket().getInetAddress()
								.getHostAddress();
						boolean isBlack = inBlackList(ip);
						System.out.println(ip + "是否黑名字:" + isBlack);
						if (isBlack) {
							server.close();
							client.close();
							key.cancel();
						} else {
							// 调整此通道的阻塞模式
							client.configureBlocking(false);
							// 向给定的选择器注册此通道,返回一个选择键
							client.register(workThread.getSelector(),
									SelectionKey.OP_READ,
									new ByteArrayOutputStream());
						}
					}
				} catch (Exception exception) {
					exception.printStackTrace();
					if (key != null) {
						key.cancel();
						key.channel().close();
					}
				}
			}
		}
	}
	/**
	 * ServerSocketChannel: ServerSocket 的替代类, 支持阻塞通信与非阻塞通信 SocketChannel:Socket
	 * 的替代类, 支持阻塞通信与非阻塞通信 Selector: 为ServerSocketChannel 监控接收客户端连接就绪事件
	 * 为SocketChannel 监控连接服务器就绪, 读就绪和写就绪事件. SelectionKey: 代表 ServerSocketChannel
	 * 及 SocketChannel 向 Selector 注册事件的句柄.当一个 SelectionKey 对象位于Selector 对象的
	 * selected-keys 集合中时, 就表示与这个SelectionKey 对象相关的事件发生了
	 * SelectionKey.OP_ACCEPT->客户端连接就绪事件 等于监听serversocket.accept() 返回一个socket
	 * SelectionKey.OP_CONNECT ->准备连接服务器就绪跟上面类似,只不过是对于 socket的 相当于监听了
	 * socket.connect() SelectionKey.OP_READ->读就绪事件, 表示输入流中已经有了可读数据, 可以执行读操作了
	 * SelectionKey.OP_WRITE->写就绪事件
	 */
}

SocketWorkThread负责读取数据再分到写入线程
package com.soft.nio.connector.socket;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import com.soft.nio.connector.core.ThreadPool;
public class SocketWorkThread implements Runnable {
	/** selector专门用来监听client是否有数据传过来,可读 **/
	protected Selector selector;
	protected boolean stop;
	protected SocketWorkThread() {
		try {
			selector = Selector.open();
			ThreadPool.getInstance().execute(this);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	public void stop() {
		this.stop = true;
	}
	@Override
	public void run() {
		// TODO Auto-generated method stub
		// selector进入read,write非阻塞状态 不能用while(select()>0)循环
		// 没有数据会跳出循环
		try {
			while (!stop) {
				listener();
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			System.out.println("读取关闭异常");
		}
	}
	/** 监听是否有数据可读取 **/
	protected void listener() throws IOException {
		SelectionKey selectionKey = null;
		try {
			if (selector.select(10L) > 0) {
				Set<SelectionKey> set = selector.selectedKeys();
				Iterator<SelectionKey> iterator = set.iterator();
				while (iterator.hasNext()) {
					selectionKey = (SelectionKey) iterator.next();
					iterator.remove();// 用一个清除一个
					readBufer(selectionKey);
				}
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			if (selectionKey != null) {
				SocketChannel client = (SocketChannel) selectionKey.channel();
				String ip = client.socket().getInetAddress().getHostAddress();
				System.out.println(ip + "离线了");
				client.socket().close();
				client.close();
				selectionKey.cancel();
			}
		}
	}
	/** 读取数据 **/
	protected void readBufer(SelectionKey selectionKey) throws IOException {
		// /////监听连接线程丢到读取线程///////////
		SocketChannel client = (SocketChannel) selectionKey.channel();
		ByteArrayOutputStream bos = (ByteArrayOutputStream) selectionKey
				.attachment();
		ByteBuffer buffer = ByteBuffer.allocate(10240);// 10kb缓存
		int actual = 0;
		while ((actual = client.read(buffer)) > 0) {
			buffer.flip();
			int limit = buffer.limit();
			byte b[] = new byte[limit];
			buffer.get(b);
			bos.write(b);
			buffer.clear();// 清空
		}
		if (actual < 0) {
			// 出现异常
			String ip = client.socket().getInetAddress().getHostAddress();
			System.out.println(ip + "数据读取异常,连接断开");
			client.socket().close();
			client.close();
			selectionKey.cancel();
			return;
		}
		ThreadPool.getInstance().execute(new SocketWriteThread(selectionKey));
	}
	public final Selector getSelector() {
		return selector;
	}
}

SocketWriteThread负责把读取到数据和要返回客户端的数据处理
package com.soft.nio.connector.socket;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import com.soft.nio.connector.server.LogicPro;
public class SocketWriteThread implements Runnable {
	private SelectionKey sk;
	public SocketWriteThread(SelectionKey s) {
		sk = s;
	}
	public void run() {
		// TODO Auto-generated method stub
		try {
			writeBufer(sk);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			System.out.println("写入关闭异常");
		}
	}
	/** 处理读取到数据和写入返回客户庙数据 **/
	protected void writeBufer(SelectionKey selectionKey) throws IOException {
		// /////写入数据////////
		SocketChannel client = null;
		try {
			client = (SocketChannel) selectionKey.channel();
			ByteArrayOutputStream stream = (ByteArrayOutputStream) selectionKey
					.attachment();
			stream.flush();
			// 预写入数据
			ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
			DataOutputStream outputStream = new DataOutputStream(
					arrayOutputStream);
			// 预读取数据
			ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(
					stream.toByteArray());
			DataInputStream inputStream = new DataInputStream(arrayInputStream);
			String name = LogicPro.class.getName();
			LogicPro logic = (LogicPro) Class.forName(name).newInstance();
			// 处理客户端读取到数据和服务端要发送的数据
			logic.processClientData(inputStream, outputStream, client.socket()
					.getInetAddress());
			inputStream.close();
			arrayInputStream.close();
			outputStream.flush();
			arrayOutputStream.flush();
			byte[] data = arrayOutputStream.toByteArray();
			outputStream.close();
			arrayOutputStream.close();
			ByteBuffer writeBuffer = ByteBuffer.wrap(data);
			while (writeBuffer.hasRemaining())
				client.write(writeBuffer);
			stream.reset();// 清空缓冲区,准备下次缓冲
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			if (client != null) {
				String ip = client.socket().getInetAddress().getHostAddress();
				System.out.println(ip + "数据写入异常,连接断开");
				client.socket().close();
				client.close();
			}
			selectionKey.cancel();
		}
	}
}

下面是开发者使用的,上面全部是封装的内部机制,相当于服务端的通讯内核不需要开发者改动。 LogicPro处理服务端读取到的数据和要写的数据(面对开发者)
package com.soft.nio.connector.server;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.soft.nio.connector.core.Logic;
/**
 * 所有的数据交互均在此类中完成 此类由服务端单独分发线程完成 ProcessData 函数不支持异步操作数据流
 *
 * @author leehom
 */
public class LogicPro implements Logic {
	public SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	@Override
	public void processClientData(DataInputStream dis, DataOutputStream dos,
			InetAddress add) throws IOException {
		// TODO Auto-generated method stub
		int size = dis.available();
		byte[] data = new byte[size];
		dis.read(data);
		System.out.println(new String(data));
		String sendStr = sdf.format(new Date());
		byte[] tmp = sendStr.getBytes();
		byte[] sendData = new byte[data.length + tmp.length];
		System.arraycopy(tmp, 0, sendData, 0, tmp.length);
		System.arraycopy(data, 0, sendData, tmp.length, data.length);
		dos.write(sendData);
	}
}

SocketServer启动服务
package com.soft.nio.connector.server;
import com.soft.nio.connector.socket.SocketConnectThread;
public class SocketServer {
	public static void main(String[] args) {
		SocketConnectThread.newInstance(8888);
	}
}

SocketClient测试客户端,也可以用SocketChannel加Selector实现,也是非阻塞模式,看项目需求。实现和服务端基本上相似,可参考服务端实现。
package com.soft.nio.connector.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
public class SocketClient implements Runnable {
	Socket socket;
	InputStream is;
	OutputStream os;
	public SocketClient() {
		// TODO Auto-generated constructor stub
		new Thread(this).start();
		scanData();
	}
	private void scanData() {
		try {
			Scanner scanner = new Scanner(System.in);
			while (scanner.hasNext()) {
				String cmd = scanner.nextLine();
				if (cmd.equals("exit")) {
					scanner.close();
					os.close();
					is.close();
					socket.close();
					break;
				} else if (os != null) {
					byte[] data = cmd.getBytes();
					os.write(data);
					os.flush();
					System.out.println("send:" + cmd);
				}
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	@Override
	public void run() {
		// TODO Auto-generated method stub
		try {
			socket = new Socket("127.0.0.1", 8888);
			os = socket.getOutputStream();
			is = socket.getInputStream();
			while (true) {
				byte[] buf = new byte[512];
				int len = is.read(buf);
				if (len == -1) {
					System.out.println("断开连接了");
					break;
				}
				System.out.println("recv:" + new String(buf, 0, len));
			}
		} catch (UnknownHostException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		new SocketClient();
	}
}

源码下载

本文链接:https://www.it72.com/472.htm

推荐阅读
最新回复 (0)
返回