Commit 8cedea05 authored by zhushanglei's avatar zhushanglei

websocket和tcpsocket

parent 83ddb662
package server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
public class ChannelActiveHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = LoggerFactory.getLogger(ChannelActiveHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIP = insocket.getAddress().getHostAddress();
String clientPort = String.valueOf(insocket.getPort());
logger.info("新的连接:"+ clientIP +":"+ clientPort);
}
}
package server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class DeviceServerHandler extends SimpleChannelInboundHandler<Object> {
private static Logger logger = LoggerFactory.getLogger(DeviceServerHandler.class);
private static Map<String, Channel> map = new ConcurrentHashMap<>();
//由于继承了SimpleChannelInboundHandler,这个方法必须实现,否则报错
//但实际应用中,这个方法没被调用
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buff = (ByteBuf) msg;
String info = buff.toString(CharsetUtil.UTF_8);
logger.info("收到消息内容:"+info);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
logger.info("handlerAdded::"+ctx.channel().id().asLongText());
InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
String hostAddress = socketAddress.getAddress().getHostAddress();
logger.info("IP:{}",hostAddress);
String clientId = ctx.channel().id().toString();
map.put(clientId,ctx.channel());
logger.info("map:{}",map.toString());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object object) throws Exception {
// WebSocket消息处理
String msg = "";
if (object instanceof WebSocketFrame) {
logger.info("WebSocket消息处理************************************************************");
msg = ((TextWebSocketFrame) object).text().trim();
logger.info("收到webSocket消息:" + msg);
}
else{ // Socket消息处理
logger.info("Socket消息处理=================================");
// ByteBuf buff = (ByteBuf) object;
// String socketInfo = buff.toString(CharsetUtil.UTF_8).trim();;
msg = String.valueOf(object);
logger.info("收到socket消息:"+msg);
for (String key: map.keySet()) {
if (key.equals(ctx.channel().id().toString())){
continue;
}
Channel channel = map.get(key);
ChannelFuture channelFuture = channel.writeAndFlush(msg);
if (!channelFuture.isSuccess()) {
channel.writeAndFlush(new TextWebSocketFrame(msg));
}
}
}
}
/*******************************************************************************************/
// @Override
// public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// if (evt instanceof IdleStateEvent) {
// IdleState state = ((IdleStateEvent) evt).state();
// if (state == IdleState.READER_IDLE) {
// // 在规定时间内没有收到客户端的上行数据, 主动断开连接
////******************* socketChannelMap.remove((SocketChannel)ctx.channel());
// ctx.disconnect();
// logger.info("心跳检测触发,socket连接断开!");
// }
// } else {
// super.userEventTriggered(ctx, evt);
// }
// }
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
logger.warn("handlerRemoved::"+ctx.channel().id().asLongText());
String clientId = ctx.channel().id().toString();
map.remove(clientId);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress reAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIP = reAddr.getAddress().getHostAddress();
String clientPort = String.valueOf(reAddr.getPort());
logger.info("连接断开:"+ clientIP +":"+ clientPort);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught::"+cause.getMessage());
String clientId = ctx.channel().id().toString();
map.remove(clientId);
ctx.close();
}
}
...@@ -4,6 +4,13 @@ import io.netty.buffer.ByteBuf; ...@@ -4,6 +4,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import java.util.List; import java.util.List;
...@@ -17,22 +24,28 @@ public class SocketChooseHandler extends ByteToMessageDecoder { ...@@ -17,22 +24,28 @@ public class SocketChooseHandler extends ByteToMessageDecoder {
private static final int MAX_LENGTH = 23; private static final int MAX_LENGTH = 23;
/** WebSocket握手的协议前缀 */ /** WebSocket握手的协议前缀 */
private static final String WEBSOCKET_PREFIX = "GET /"; private static final String WEBSOCKET_PREFIX = "GET /";
// private SpringContextUtil springContextUtil;
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
String protocol = getBufStart(in); String protocol = getBufStart(in);
if (protocol.startsWith(WEBSOCKET_PREFIX)) { if (protocol.startsWith(WEBSOCKET_PREFIX)) {
// springContextUtil.getBean(PipelineAdd.class).websocketAdd(ctx); ctx.pipeline().addBefore("commonhandler","http-codec",new HttpServerCodec());
// HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
ctx.pipeline().addBefore("commonhandler","aggregator",new HttpObjectAggregator(65535));
//对于 webSocket ,不设置超时断开 // ChunkedWriteHandler:向客户端发送HTML5文件,文件过大会将内存撑爆
ctx.pipeline().remove(IdleStateHandler.class); ctx.pipeline().addBefore("commonhandler","http-chunked",new ChunkedWriteHandler());
ctx.pipeline().remove(LengthFieldBasedFrameDecoder.class); ctx.pipeline().addBefore("commonhandler","WebSocketAggregator",new WebSocketFrameAggregator(65535));
//用于处理websocket, /ws为访问websocket时的uri
ctx.pipeline().addBefore("commonhandler","ProtocolHandler", new WebSocketServerProtocolHandler("/ws"));
// 此次要移除socket 相关的编码
ctx.pipeline().remove(StringDecoder.class);
ctx.pipeline().remove(StringEncoder.class);
} }
in.resetReaderIndex(); in.resetReaderIndex();
ctx.pipeline().remove(this.getClass()); ctx.pipeline().remove(this.getClass());
} }
private String getBufStart(ByteBuf in){ private String getBufStart(ByteBuf in){
int length = in.readableBytes(); int length = in.readableBytes();
if (length > MAX_LENGTH) { if (length > MAX_LENGTH) {
......
...@@ -16,6 +16,7 @@ import io.netty.handler.codec.string.StringDecoder; ...@@ -16,6 +16,7 @@ import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -60,8 +61,8 @@ public class TcpServer { ...@@ -60,8 +61,8 @@ public class TcpServer {
ServerBootstrap b = new ServerBootstrap(); ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup); b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class); b.channel(NioServerSocketChannel.class);
b.childOption(ChannelOption.SO_REUSEADDR, true); //快速复用端口 // b.childOption(ChannelOption.SO_REUSEADDR, true); //快速复用端口
b.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS,1000); // b.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS,1000);
b.childHandler(new ChannelInitializer<SocketChannel>() { b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
...@@ -89,18 +90,27 @@ public class TcpServer { ...@@ -89,18 +90,27 @@ public class TcpServer {
// pipeline.addLast(new TcpServerHandler()); // pipeline.addLast(new TcpServerHandler());
ch.pipeline().addLast("active",new ChannelActiveHandler() ); // ch.pipeline().addLast("active",new ChannelActiveHandler() );
//Socket 连接心跳检测 // //Socket 连接心跳检测
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 0, 0)); // ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 0, 0));
//
// ch.pipeline().addLast("socketChoose",new SocketChooseHandler());
//
// //注意,这个专门针对 Socket 信息的解码器只能放在 SocketChooseHandler 之后,否则会导致 webSocket 连接出错
// ch.pipeline().addLast("myDecoder",new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN,1024*1024,0,4,0,4,true));
// ch.pipeline().addLast("commonhandler",new DeviceServerHandler());
ch.pipeline().addLast("socketChoose",new SocketChooseHandler());
//注意,这个专门针对 Socket 信息的解码器只能放在 SocketChooseHandler 之后,否则会导致 webSocket 连接出错 ch.pipeline().addLast("socketChoose",new SocketChooseHandler());
ch.pipeline().addLast("myDecoder",new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN,1024*1024,0,4,0,4,true)); ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast("commonhandler",new DeviceServerHandler()); ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast("commonhandler",new WebSocketHandler());
} }
}); });
b.option(ChannelOption.SO_BACKLOG, 500);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
// 异步绑定端口 // 异步绑定端口
b.bind(PORT).sync(); b.bind(PORT).sync();
logger.info("TCP Server Started"); logger.info("TCP Server Started");
......
package server;
import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
private static Map<String, Channel> map = new ConcurrentHashMap<>();
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object object) throws Exception {
String msg = "";
if (object instanceof TextWebSocketFrame){
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame)object;
msg = textWebSocketFrame.text();
}else {
msg = String.valueOf(object);
}
System.out.println("msg:"+msg);
for (String key: map.keySet()) {
if (key.equals(ctx.channel().id().toString())){
continue;
}
Channel channel = map.get(key);
ChannelFuture channelFuture = channel.writeAndFlush(msg);
if (!channelFuture.isSuccess()) {
channel.writeAndFlush(new TextWebSocketFrame(msg));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded::"+ctx.channel().id().asLongText());
InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
String hostAddress = socketAddress.getAddress().getHostAddress();
logger.info("IP:{}",hostAddress);
String clientId = ctx.channel().id().toString();
map.put(clientId,ctx.channel());
logger.info("map:{}",map.toString());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved::"+ctx.channel().id().asLongText());
String clientId = ctx.channel().id().toString();
map.remove(clientId);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught::"+cause.getMessage());
String clientId = ctx.channel().id().toString();
map.remove(clientId);
ctx.close();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment