Commit 83ddb662 authored by zhushanglei's avatar zhushanglei

websocket和tcpsocket

parent d36b947b
package server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
public class ChannelActiveHandler extends ChannelInboundHandlerAdapter {
private static Logger logger = LoggerFactory.getLogger(ChannelActiveHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIP = insocket.getAddress().getHostAddress();
String clientPort = String.valueOf(insocket.getPort());
logger.info("新的连接:"+ clientIP +":"+ clientPort);
}
}
package server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class DeviceServerHandler extends SimpleChannelInboundHandler<Object> {
private static Logger logger = LoggerFactory.getLogger(DeviceServerHandler.class);
private static Map<String, Channel> map = new ConcurrentHashMap<>();
//由于继承了SimpleChannelInboundHandler,这个方法必须实现,否则报错
//但实际应用中,这个方法没被调用
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buff = (ByteBuf) msg;
String info = buff.toString(CharsetUtil.UTF_8);
logger.info("收到消息内容:"+info);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
logger.info("handlerAdded::"+ctx.channel().id().asLongText());
InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
String hostAddress = socketAddress.getAddress().getHostAddress();
logger.info("IP:{}",hostAddress);
String clientId = ctx.channel().id().toString();
map.put(clientId,ctx.channel());
logger.info("map:{}",map.toString());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object object) throws Exception {
// WebSocket消息处理
String msg = "";
if (object instanceof WebSocketFrame) {
logger.info("WebSocket消息处理************************************************************");
msg = ((TextWebSocketFrame) object).text().trim();
logger.info("收到webSocket消息:" + msg);
}
else{ // Socket消息处理
logger.info("Socket消息处理=================================");
// ByteBuf buff = (ByteBuf) object;
// String socketInfo = buff.toString(CharsetUtil.UTF_8).trim();;
msg = String.valueOf(object);
logger.info("收到socket消息:"+msg);
for (String key: map.keySet()) {
if (key.equals(ctx.channel().id().toString())){
continue;
}
Channel channel = map.get(key);
ChannelFuture channelFuture = channel.writeAndFlush(msg);
if (!channelFuture.isSuccess()) {
channel.writeAndFlush(new TextWebSocketFrame(msg));
}
}
}
}
/*******************************************************************************************/
// @Override
// public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// if (evt instanceof IdleStateEvent) {
// IdleState state = ((IdleStateEvent) evt).state();
// if (state == IdleState.READER_IDLE) {
// // 在规定时间内没有收到客户端的上行数据, 主动断开连接
////******************* socketChannelMap.remove((SocketChannel)ctx.channel());
// ctx.disconnect();
// logger.info("心跳检测触发,socket连接断开!");
// }
// } else {
// super.userEventTriggered(ctx, evt);
// }
// }
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
logger.warn("handlerRemoved::"+ctx.channel().id().asLongText());
String clientId = ctx.channel().id().toString();
map.remove(clientId);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
InetSocketAddress reAddr = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIP = reAddr.getAddress().getHostAddress();
String clientPort = String.valueOf(reAddr.getPort());
logger.info("连接断开:"+ clientIP +":"+ clientPort);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught::"+cause.getMessage());
String clientId = ctx.channel().id().toString();
map.remove(clientId);
ctx.close();
}
}
package server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.List;
/**
* 协议初始化解码器. *
* 用来判定实际使用什么协议.</b> *
*/
public class SocketChooseHandler extends ByteToMessageDecoder {
/** 默认暗号长度为23 */
private static final int MAX_LENGTH = 23;
/** WebSocket握手的协议前缀 */
private static final String WEBSOCKET_PREFIX = "GET /";
// private SpringContextUtil springContextUtil;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
String protocol = getBufStart(in);
if (protocol.startsWith(WEBSOCKET_PREFIX)) {
// springContextUtil.getBean(PipelineAdd.class).websocketAdd(ctx);
//对于 webSocket ,不设置超时断开
ctx.pipeline().remove(IdleStateHandler.class);
ctx.pipeline().remove(LengthFieldBasedFrameDecoder.class);
}
in.resetReaderIndex();
ctx.pipeline().remove(this.getClass());
}
private String getBufStart(ByteBuf in){
int length = in.readableBytes();
if (length > MAX_LENGTH) {
length = MAX_LENGTH;
}
// 标记读位置
in.markReaderIndex();
byte[] content = new byte[length];
in.readBytes(content);
return new String(content);
}
}
......@@ -2,18 +2,26 @@ package server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.ByteOrder;
import java.util.Properties;
/**
......@@ -29,7 +37,7 @@ public class TcpServer {
// private static final String IP = "192.168.0.112";
// private static final String IP = "192.168.0.107";
// public static String IP = "8.143.203.103";
private static final int PORT = 9093;
private static final int PORT = 9095;
......@@ -52,11 +60,13 @@ public class TcpServer {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.childOption(ChannelOption.SO_REUSEADDR, true); //快速复用端口
b.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS,1000);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ChannelPipeline pipeline = ch.pipeline();
// Decode是对发送的信息进行编码、
// @param maxFrameLength 帧的最大长度
// @param lengthFieldOffset length字段偏移的地址
......@@ -64,9 +74,31 @@ public class TcpServer {
// @param lengthAdjustment 修改帧数据长度字段中定义的值,
// 可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
// @param initialBytesToStrip 解析时候跳过多少个长度
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(new TcpServerHandler());
// pipeline.addLast(new StringEncoder());
// pipeline.addLast(new StringDecoder());
// pipeline.addLast(new TcpServerHandler());
//因为基于http协议,使用http的编码和解码器
// pipeline.addLast(new HttpServerCodec());
// //是以块方式写,添加ChunkedWriteHandler处理器
// pipeline.addLast(new ChunkedWriteHandler());
// // http数据聚合器 用于将大数据量分段传输的数据 聚合
// pipeline.addLast(new HttpObjectAggregator(8192));
// // websocket协议处理器
// pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// // 自定义的业务处理
// pipeline.addLast(new TcpServerHandler());
ch.pipeline().addLast("active",new ChannelActiveHandler() );
//Socket 连接心跳检测
ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 0, 0));
ch.pipeline().addLast("socketChoose",new SocketChooseHandler());
//注意,这个专门针对 Socket 信息的解码器只能放在 SocketChooseHandler 之后,否则会导致 webSocket 连接出错
ch.pipeline().addLast("myDecoder",new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN,1024*1024,0,4,0,4,true));
ch.pipeline().addLast("commonhandler",new DeviceServerHandler());
}
});
// 异步绑定端口
......
......@@ -8,6 +8,7 @@ import com.alibaba.fastjson.JSON;
import com.fazecast.jSerialComm.SerialPort;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -22,7 +23,7 @@ import java.util.Map;
* @author yuli
* @ClassName TcpServerHandler
**/
public class TcpServerHandler extends SimpleChannelInboundHandler<Object> {
public class TcpServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static Logger logger = LoggerFactory.getLogger(TcpServerHandler.class);
// public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
......@@ -49,15 +50,40 @@ public class TcpServerHandler extends SimpleChannelInboundHandler<Object> {
// * @paractx = {DefaultChannelHandlerContext@2874} "ChannelHandlerContext(TcpServerHandler#0, [id: 0xe24a52fc, L:/127.0.0.1:9200 - R:/127.0.0.1:61801])"m [ctx, msg]
* @return void
*/
// @Override
// protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//// logger.info("**接收到的基站数据<<start:" + msg.toString() +">>end");
// if(TcpClient.client.equalsIgnoreCase(msg.toString())){ //移动站
// String clinetIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress() + ctx.channel().id();
// logger.warn("移动站IP:" + clinetIp);
// addContext(clinetIp,ctx);
// logger.warn("if>>hashmap移动站:" + gpsServerContextHashMap);
// ctx.writeAndFlush("OK!!"); //TextWebSocketFrame
// }else{ //基站
// logger.info("else>>基站:" + msg.toString());
// logger.info("else>>hashmap基站:" + gpsServerContextHashMap);
// gpsServerContextHashMap.forEach((k,v)->{
// logger.error("基站发送给移动站:" + msg.toString());
// v.writeAndFlush(msg.toString());
// });
// }
// }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught! cause:" + cause.toString());
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// logger.info("**接收到的基站数据<<start:" + msg.toString() +">>end");
if(TcpClient.client.equalsIgnoreCase(msg.toString())){ //移动站
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
if(TcpClient.client.equalsIgnoreCase(msg.text())){ //移动站
String clinetIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress() + ctx.channel().id();
logger.warn("移动站IP:" + clinetIp);
addContext(clinetIp,ctx);
logger.warn("if>>hashmap移动站:" + gpsServerContextHashMap);
ctx.writeAndFlush("OK!!"); //TextWebSocketFrame
// ctx.writeAndFlush("OK!!"); //TextWebSocketFrame
ctx.channel().writeAndFlush(new TextWebSocketFrame("OK!! " + msg.text()));
}else{ //基站
logger.info("else>>基站:" + msg.toString());
logger.info("else>>hashmap基站:" + gpsServerContextHashMap);
......@@ -67,10 +93,4 @@ public class TcpServerHandler extends SimpleChannelInboundHandler<Object> {
});
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("exceptionCaught! cause:" + cause.toString());
ctx.close();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment