springboot使用netty整合WebSocket-netty(三)

人生之路坎坎坷坷,跌跌撞撞在所难免。但是,不论跌了多少次,你都必须坚强勇敢地站起来。任何时候,无论你面临着生命的何等困惑抑或经受着多少挫折,无论道路多艰难,希望变得如何渺茫,请你不要绝望,再试一次,坚持到底,成功终将属于勇不言败的你。

导读:本篇文章讲解 springboot使用netty整合WebSocket-netty(三),希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

链接: Netty与Spring Boot的整合

链接: springboot项目中使用netty+websocket 实现消息推送

链接: spring boot的websocket五种实现方式

链接: 慕课网_《Netty入门之WebSocket初体验》学习总结

导入依赖

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.36.Final</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
package com.zm.webscoket.config;

import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class MyChannelHandler {

    public MyChannelHandler() {
    }

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private static ConcurrentMap<String, ChannelId> ChannelMap = new ConcurrentHashMap();

    public static void addChannel(String apiToken, Channel channel) {
        channelGroup.add(channel);
        if (null != apiToken) {
            ChannelMap.put(apiToken, channel.id());
        }
    }

    public static void updateChannel(String apiToken, Channel channel) {
        Channel chan = channelGroup.find(channel.id());
        if (null == chan) {
            addChannel(apiToken, channel);
        } else {
            ChannelMap.put(apiToken, channel.id());
        }
    }

    public static void removeChannel(Channel channel) {
        channelGroup.remove(channel);
        channel.close();
        Collection<ChannelId> values = ChannelMap.values();
        values.remove(channel.id());
    }

    public static Channel findChannel(String apiToken) {
        ChannelId chanId = ChannelMap.get(apiToken);
        if (null == chanId) {
            return null;
        }
        return channelGroup.find(ChannelMap.get(apiToken));
    }

    public static void sendToAll(String message) {
        channelGroup.writeAndFlush(new TextWebSocketFrame(message));
    }

    //给每个人发送消息,除发消息人外
    private void SendAllExceptMy(String apiToken, String msg) {
        Channel myChannel = channelGroup.find(ChannelMap.get(apiToken));
        if(null != myChannel){
            for(Channel channel:channelGroup){
                if(!channel.id().asLongText().equals(myChannel.id().asLongText())){
                    channel.writeAndFlush(new TextWebSocketFrame(msg));
                }
            }
        }
    }

    public static void sendToSimple(String apiToken, String message) {
        channelGroup.find(ChannelMap.get(apiToken)).writeAndFlush(new TextWebSocketFrame(message));
    }


}

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * TextWebSocketFrame是netty用于处理websocket发来的文本对象
 */
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().id() + "与客户端建立连接,通道开启!");
        //添加到channelGroup通道组
        MyChannelHandler.channelGroup.add(ctx.channel());
    }

    /**
     * 要想实现客户端感知服务端的存活情况,需要进行双向的心跳;
     * Netty中的channelInactive()方法是通过Socket连接关闭时挥手数据包触发的,
     * 因此可以通过channelInactive()方法感知正常的下线情况,但是因为网络异常等非正常下线则无法感知;
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().id() + "与客户端断开连接,通道关闭!");
        //添加到channelGroup 通道组
        MyChannelHandler.channelGroup.remove(ctx.channel());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();
        ChannelId id = channel.id();
        //首次连接是FullHttpRequest,处理参数
        if (msg instanceof FullHttpRequest) {
            FullHttpRequest request = (FullHttpRequest) msg;
            String uri = request.uri();
            Map paramMap = getUrlParams(uri);
            System.out.println("接收到的参数是:" + paramMap);
            //如果url包含参数,需要处理
            if (uri.contains("?")) {
                String newUri = uri.substring(0, uri.indexOf("?"));
                System.out.println(newUri);
                request.setUri(newUri);
            }
        }
        if (msg instanceof TextWebSocketFrame) {
            //正常的TEXT消息类型
            TextWebSocketFrame frame = (TextWebSocketFrame) msg;
            System.out.println(new Date() + "客户端收到服务器数据:" + frame.text());
            MyChannelHandler.sendToAll(frame.text());
        }
        super.channelRead(ctx, msg);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.out.println("异常发生了...");
        cause.printStackTrace();
        ctx.close();
    }


    private static Map getUrlParams(String url) {
        Map<String, String> map = new HashMap<>();
        url = url.replace("?", ";");
        if (!url.contains(";")) {
            return map;
        }
        if (url.split(";").length > 0) {
            String[] arr = url.split(";")[1].split("&");
            for (String s : arr) {
                String key = s.split("=")[0];
                String value = s.split("=")[1];
                map.put(key, value);
            }
            return map;

        } else {
            return map;
        }
    }


}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

import java.util.Date;

/**
 * 检查客户端心跳机制
 * IdleStateHandler心跳检测主要是通过向线程任务队列中添加定时任务,判断channelRead()方法或write()方法是否调用空闲超时,如果超时则触发超时事件执行自定义userEventTrigger()方法;
 *
 * Netty通过IdleStateHandler实现最常见的心跳机制不是一种双向心跳的PING-PONG模式,而是客户端发送心跳数据包,服务端接收心跳但不回复,
 * 因为如果服务端同时有上千个连接,心跳的回复需要消耗大量网络资源;如果服务端一段时间内没有收到客户端的心跳数据包则认为客户端已经下线,
 * 将通道关闭避免资源的浪费;在这种心跳模式下服务端可以感知客户端的存活情况,无论是宕机的正常下线还是网络问题的非正常下线,
 * 服务端都能感知到,而客户端不能感知到服务端的非正常下线;
 */
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj){
        if (obj instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)obj;
            if (event.state()== IdleState.READER_IDLE){
                System.out.println(ctx.channel().id() +"客户端读超时" + new Date());
                MyChannelHandler.removeChannel(ctx.channel());
            }else if (event.state()== IdleState.WRITER_IDLE){
                System.out.println(ctx.channel().id() +"客户端写超时" + new Date());
            }else if (event.state()==IdleState.ALL_IDLE){
                System.out.println(ctx.channel().id() +"客户端所有操作超时");
            }
        }
    }

}


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Date;

@Component
public class NettyServer {

    @Value("${server.port:8080}")
    private Integer port;

    @PostConstruct
    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();// 主线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();//创建从线程组,处理主线程组分配下来的io操作
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);// 存放已完成三次握手的请求的等待队列
            //  要求高实时性,有数据时马上发送,就将该选项设置为true关闭Nagle算法;
            //  如果要减少发送次数,就设置为false,会累积一定大小后再发送
            sb.option(ChannelOption.TCP_NODELAY,true);
            sb.group(group, bossGroup) // 绑定线程池
                    .channel(NioServerSocketChannel.class) // 指定使用的channel
                    .localAddress(this.port)// 绑定监听端口
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
                        @Override
                        protected void initChannel(SocketChannel ch){
                            System.out.println("收到新连接"+ new Date());
                            //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //以块的方式来写的处理器
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                             /*
                            说明
                                1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
                                2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
                             */
                            ch.pipeline().addLast(new HttpObjectAggregator(8192));
                            // 对客户端,如果在40秒内没有向服务端发送心跳,就主动断开
                            // 前三个的参数解释如下:
                            //1)readerIdleTime:为读超时时间(即服务端一定时间内未接收到客户端消息的时间,服务端一段时间内没有数据读取)
                            //2)writerIdleTime:为写超时时间(即服务端一定时间内未向客户端发送消息的时间,服务端一段时间内没有数据发送)
                            //3)allIdleTime:所有类型的超时时间(以上两种满足其中一个即可)
                            ch.pipeline().addLast(new IdleStateHandler(40,0,0));
                            ch.pipeline().addLast(new HeartBeatHandler());
                            ch.pipeline().addLast(new MyWebSocketHandler());
                             /*
                            说明
                                1. 对应websocket ,它的数据是以 帧(frame) 形式传递
                                2. 可以看到WebSocketFrame 下面有六个子类
                                3. 浏览器请求时 ws://localhost:8888/hello 表示请求的uri
                                4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
                                5. 是通过一个 状态码 101
                             */
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));

                        }
                    });
            ChannelFuture cf = sb.bind().sync(); // 启动server 服务器异步创建绑定
            cf.channel().closeFuture().sync(); // 监听服务器关闭channel通道
            if (cf.isSuccess()) {
                System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
            }
        } finally {
            System.out.println("释放线程池资源");
            group.shutdownGracefully().sync(); // 释放线程池资源
            bossGroup.shutdownGracefully().sync();
        }
    }


}

测试地址

链接: 在线websocket测试网站.
输入 ws://127.0.0.1:8080/ws 连接即可

注意:

原因:netty 绑定端口的时候阻塞了主线程,导致 springboot 到达不了启动 tomcat 内置容器的那一步。
解决办法: 新开一个线程去启动

springboot 启动netty时,springboot内置tomcat无法启动

public class Application implements CommandLineRunner {
    

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
	@Autowired
    private NettyServer nettyServer;
    
    @Override
    public void run(String... args) throws Exception {
        new Thread(nettyServer).start();
    }

}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/133917.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!