Netty实战,Springboot + netty +websocket 实现推送消息

Netty实战,Springboot + netty +websocket 实现推送消息

前言

Netty 是一个高性能、异步事件驱动的 NIO 框架,基于 Java NIO 提供的 API 实现。它提供了对TCP、UDP 和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞的,本文基于Netty实现消息推送。

另一篇日志实时查看见《SpringBoot 整合websocket 实现日志实时查看》

正文

1.引入依赖

  <dependency>
      <groupId>org.Springframework.data</groupId>
      <artifactId>spring-data-Redis</artifactId>
      <version>1.8.4.RELEASE</version>
  </dependency>
  <dependency>
      <groupId>com.github.sazzad16</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.2</version>
  </dependency>

  <!--websocket日志预览-->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-websocket</artifactId>
  </dependency>
  <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.36.Final</version>
  </dependency>
  <!-- 模板引擎 -->
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-thymeleaf</artifactId>
  </dependency>

2.SpringUtils

从已有的Spring上下文取得已实例化的bean

@Component
public class SpringUtils implements ApplicationContextAware {
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        if(SpringUtils.applicationContext == null){
            SpringUtils.applicationContext = applicationContext;
        }
    }

    public static ApplicationContext getApplicationContext(){
        return applicationContext;
    }

    //根据name
    public static Object getBean(String name){
        return getApplicationContext().getBean(name);
    }

    //根据类型
    public static <T> T getBean(Class<T> clazz){
        return getApplicationContext().getBean(clazz);
    }

    public static <T> T getBean(String name,Class<T> clazz){
        return getApplicationContext().getBean(name,clazz);
    }
}

3.编写NettyServer

/**
 * NettyServer Netty服务器配置
 */
public class NettyServer {

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);
            sb.group(group, bossGroup) // 绑定线程池
                    .channel(NioServerSocketChannel.class) // 指定使用的channel
                    .localAddress(12346)// 绑定监听端口
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //websocket协议本身是基于http协议的,所以这边也要使用http解编码器
                            ch.pipeline().addLast(new HttpServerCodec());
                            //以块的方式来写的处理器
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            ch.pipeline().addLast(new HttpObjectAggregator(8192));
                            // 自定义的handler,处理业务逻辑
                            ch.pipeline().addLast(new MyWebSocketHandler());
                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", null, true, 65536 * 10));
                        }
                    });
            // 服务器异步创建绑定
            ChannelFuture cf = sb.bind().sync();
            System.out.println(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
            // 对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            // 释放线程池资源
            group.shutdownGracefully().sync();
            bossGroup.shutdownGracefully().sync();
        }
    }
}

4.编写通道组池,管理所有websocket连接

public class MyChannelHandlerPool {
    public MyChannelHandlerPool(){}

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

5.自定义Handler

  • channelActive与客户端建立连接
  • channelInactive与客户端断开连接
  • channelRead客户端发送消息处理
/**
 * MyWebSocketHandler
 * WebSocket处理器,处理websocket连接相关
 */
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("与客户端建立连接,通道开启!");
        RedisService redisService = (RedisService) SpringUtils.getBean("redisService");
        redisService.getRedisTemplate().opsForValue().increment("socket:oline",1);
        //添加到channelGroup通道组
        MyChannelHandlerPool.channelGroup.add(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //移除channelGroup 通道组
        MyChannelHandlerPool.channelGroup.remove(ctx.channel());
        RedisService redisService = (RedisService) SpringUtils.getBean("redisService");
        redisService.getRedisTemplate().opsForValue().increment("socket:oline",-1);
        AttributeKey<String> key = AttributeKey.valueOf("name");
        String name = ctx.channel().attr(key).get();
        System.out.println(name+"与客户端断开连接,通道关闭!");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //首次连接是FullHttpRequest,处理参数
        if (null != msg && msg instanceof FullHttpRequest) {
            FullHttpRequest request = (FullHttpRequest) msg;
            String uri = request.uri();
            Map paramMap = getUrlParams(uri);
            //将用户名称作为自定义属性加入到channel中,方便随时channel中获取用户
            AttributeKey<String> key = AttributeKey.valueOf("name");
            ctx.channel().attr(key).setIfAbsent(paramMap.get("name")+"");
            //如果url包含参数,需要特殊处理
            if(uri.contains("?")){
                String newUri=uri.substring(0,uri.indexOf("?"));
                request.setUri(newUri);
            }
        }else if(msg instanceof TextWebSocketFrame){
            RedisService redisService = (RedisService) SpringUtils.getBean("redisService");
            //正常的TEXT消息类型
            TextWebSocketFrame frame=(TextWebSocketFrame)msg;
            sendAllMessage(frame.text()+",当前在线人数:"+redisService.getRedisTemplate().opsForValue().get("socket:oline"));
        }
        super.channelRead(ctx, msg);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {

    }

    private void sendAllMessage(String message){
        //收到信息后,群发给所有channel
        MyChannelHandlerPool.channelGroup.writeAndFlush( new TextWebSocketFrame(message));
    }

    private static Map getUrlParams(String url) throws UnsupportedEncodingException {
        Map<String,String> map = new HashMap<>();
        url = url.replace("?",";");
        if (!url.contains(";")){
            return map;
        }
        if (url.split(";").length > 0){
            String[] arr = url.split(";")[1].split("&");
            for (String s : arr){
                String key = s.split("=")[0];
                String value = s.split("=")[1];
                //解决路径携带中文乱码
                map.put(key,URLDecoder.decode(value, "UTF-8"));
            }
            return  map;

        }else{
            return map;
        }
    }
}

6.RedisTemplate工具类

小编这里自己实现的多机房多数据源,你也可以通过自动配置方式装配Redis

@Configuration
@Component
public class RedisMultiConfiguration extends CachingConfigurerSupport {
    protected List<RedisTemplate> redisTemplist = new ArrayList<RedisTemplate>();
    @Autowired
    private Environment env;

    public List<RedisTemplate> getRedisTempMap() {
        if (redisTemplist.size() != 0) {
            return redisTemplist;
        } else {
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxIdle(Integer.parseInt(env.getProperty("spring.redis.pool.max-idle")));
            jedisPoolConfig.setMaxTotal(Integer.parseInt(env.getProperty("spring.redis.pool.max-total")));
            jedisPoolConfig.setMaxWaitMillis(Integer.parseInt(env.getProperty("spring.redis.pool.max-wait")));
            jedisPoolConfig.setMinIdle(Integer.parseInt(env.getProperty("spring.redis.pool.min-idle")));
            String[] host = env.getProperty("spring.redis.nodes").split(",");
            for (String redisHost : host) {
                String[] item = redisHost.split(":");
                String ip = "";
                String port = "";
                String password = "";
                String database = "0";
                if (item.length == 4) {
                    ip = item[0];
                    port = item[1];
                    password = item[2];
                    database = item[3];
                } else {
                    ip = item[0];
                    port = item[1];
                    password = item[2];
                }
                JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
                jedisConnectionFactory.setHostName(ip);
                jedisConnectionFactory.setDatabase(Integer.parseInt(database));
                jedisConnectionFactory.setPassword(password);
                jedisConnectionFactory.setPort(Integer.parseInt(port));

                jedisConnectionFactory.setUsePool(true);
                jedisConnectionFactory.setPoolConfig(jedisPoolConfig);

                jedisConnectionFactory.setTimeout(Integer.parseInt(env.getProperty("spring.redis.timeout")));
                jedisConnectionFactory.setUseSsl(Boolean.parseBoolean(env.getProperty("spring.redis.ssl")));
                jedisConnectionFactory.afterPropertiesSet();


                RedisTemplate<String, String> redisTemplate = new RedisTemplate<String, String>();
                redisTemplate.setConnectionFactory(jedisConnectionFactory);
                Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
                ObjectMapper objectMapper = new ObjectMapper();
                objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
                jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
                redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
                redisTemplate.setKeySerializer(new StringRedisSerializer());

                redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);
                redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

                redisTemplate.setDefaultSerializer(jackson2JsonRedisSerializer);
                redisTemplate.setEnableDefaultSerializer(true);
                redisTemplate.afterPropertiesSet();
                redisTemplist.add(redisTemplate);
            }
            return redisTemplist;
        }
    }

}
@Component
public class RedisDataSourceConfig {
    @Autowired
    protected RedisMultiConfiguration config;

    @Bean
    public List<RedisTemplate> getTemplateMap() {
        return config.getRedisTempMap();
    }
}
@Service
public class RedisService {

    @Value("${spring.redis.keyExpire}")
    private long expireTime;
    @Autowired
    private RedisDataSourceConfig redisDataSourceConfig;

    public boolean set(final String key, final String value) {
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
        boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();

                connection.setEx(serializer.serialize(key),expireTime, serializer.serialize(value));
                return true;
            }
        });
        return result;
    }

    public boolean setNoExpire(final String key, final String value) {
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
        boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                connection.set(serializer.serialize(key), serializer.serialize(value));
                return true;
            }
        });
        return result;
    }

    public boolean set(final String key, final JSONObject value) {
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
        boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                connection.setEx(serializer.serialize(key),expireTime, serializer.serialize(value.toString()));

                return true;
            }
        });
        return result;
    }


    public boolean setNoExpire(final String key, final JSONObject value) {
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
        boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                connection.set(serializer.serialize(key),serializer.serialize(value.toString()));

                return true;
            }
        });
        return result;
    }

    public String get(final String key) {
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
        String result = redisTemplate.execute(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                byte[] value = connection.get(serializer.serialize(key));
                return serializer.deserialize(value);
            }
        });
        return result;
    }

    public JSONObject getJSON(final String key) {
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
        String result = redisTemplate.execute(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                byte[] value = connection.get(serializer.serialize(key));
                return serializer.deserialize(value);
            }
        });
        return JSONObject.parseObject(result);
    }

    public boolean setNoExpireTime(final String key, final String value) {
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
        boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                connection.set(serializer.serialize(key),serializer.serialize(value));
                return true;
            }
        });
        return result;
    }

    public boolean setNoExpireTime(final String key, final JSONObject value) {
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
        boolean result = redisTemplate.execute(new RedisCallback<Boolean>() {
            @Override
            public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
                RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
                connection.set(serializer.serialize(key),serializer.serialize(value.toString()));
                return true;
            }
        });
        return result;
    }

    public boolean expire(final String key, long expire) {
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
        return redisTemplate.expire(key, expire, TimeUnit.SECONDS);
    }

    public boolean hasKey(final String key) {
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
        return redisTemplate.hasKey(key);
    }

    public void delkey(String key){
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        RedisTemplate<String, ?> redisTemplate = redisTemplates != null ? redisTemplates.get(0) : null;
        redisTemplate.delete(key);
    }


    public RedisTemplate getRedisTemplate(){
        List<RedisTemplate> redisTemplates = redisDataSourceConfig.getTemplateMap();
        return redisTemplates != null ? redisTemplates.get(0) : null;
    }
}

7.页面

如果不想引入页面,可以通过在线http://www.websocket-test.com/测试

<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
    <title>Netty-Websocket</title>
    <script type="text/JavaScript">
        var socket;
        if (!window.WebSocket) {
            window.WebSocket = window.MozWebSocket;
        }
        if (window.WebSocket) {
            socket = new WebSocket("ws://127.0.0.1:12346/ws?name=一安未来");
            socket.onmessage = function (event) {
                var ta = document.getElementById('responseText');
                ta.value += event.data + "rn";
            };
            socket.onopen = function (event) {
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服务器。。。。。。连接  rn";
            };
            socket.onclose = function (event) {
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服务器。。。。。。关闭 rn";
            };
        } else {
            alert("您的浏览器不支持WebSocket协议!");
        }

        function send(message) {
            if (!window.WebSocket) {
                return;
            }
            if (socket.readyState == WebSocket.OPEN) {
                socket.send(message);
            } else {
                alert("WebSocket 连接没有建立成功!");
            }

        }

    </script>
</head>
<body>
<form onSubmit="return false;">
    <label>编号</label><input type="text" name="uid" th:value="${uid}"/> <br/>
    <label>内容</label><input type="text" name="message" value="这里输入消息"/> <br/>
    <br/> <input type="button" value="发送ws消息"
                 onClick="send(this.form.uid.value+':'+this.form.message.value)"/>
    <hr color="black"/>
    <h3>服务端返回的应答消息</h3>
    <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>
</form>
</body>
</html>

8.控制层

@Controller
public class IndexController {

    @GetMapping(value = {"/","index.html"})
    private String indexPage(Model model) {
        model.addAttribute("uid", RandomUtil.randomNumbers(6));
        return "index";
    }
}

9.启动类

@SpringBootApplication
@EnableAutoConfiguration(exclude = {RedisAutoConfiguration.class})
public class PaasTestApplication {
    public static void main(String[] args) throws Exception {
        SpringApplication.run(PaasTestApplication.class, args);
        new NettyServer().start();
    }
}

演示

Netty实战,Springboot + netty +websocket 实现推送消息

号外!号外!

如果这篇文章对你有所帮助,或者有所启发的话,帮忙点赞、在看、转发、收藏,你的支持就是我坚持下去的最大动力!

Netty实战,Springboot + netty +websocket 实现推送消息

Spring Boot 实现跨域的 5 种方式,总有一种适合你


别用Date了,Java8新特性之日期处理,现在学会也不迟!


面试官:在浏览器输入URL回车之后发生了什么

Netty实战,Springboot + netty +websocket 实现推送消息


原文始发于微信公众号(一安未来):Netty实战,Springboot + netty +websocket 实现推送消息

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/44625.html

(1)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!