diff --git a/.idea/misc.xml b/.idea/misc.xml index be09ff0..04e58ef 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -10,7 +10,7 @@ - + \ No newline at end of file diff --git a/netty/model/pom.xml b/netty/model/pom.xml index d3756f7..4cb1141 100644 --- a/netty/model/pom.xml +++ b/netty/model/pom.xml @@ -77,4 +77,16 @@ 1.6.14 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 14 + 14 + + + + diff --git a/netty/pom.xml b/netty/pom.xml index 8c943dd..efb5fc8 100644 --- a/netty/pom.xml +++ b/netty/pom.xml @@ -25,7 +25,7 @@ 22 22 - 21 + 22 3.8.1 3.5.6 8.0.30 diff --git a/netty/service/src/main/java/cn/bunny/service/netty/demo3/ChannelClient.java b/netty/service/src/main/java/cn/bunny/service/netty/demo3/ChannelClient.java new file mode 100644 index 0000000..7a58a4d --- /dev/null +++ b/netty/service/src/main/java/cn/bunny/service/netty/demo3/ChannelClient.java @@ -0,0 +1,45 @@ +package cn.bunny.service.netty.demo3; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringEncoder; +import lombok.extern.slf4j.Slf4j; + +import java.net.InetSocketAddress; + +@Slf4j +public class ChannelClient { + public static void main(String[] args) throws InterruptedException { + ChannelFuture channelFuture = new Bootstrap() + .group(new NioEventLoopGroup()) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { + nioSocketChannel.pipeline().addLast(new StringEncoder()); + } + }) + .connect(new InetSocketAddress("localhost", 8080)); + // 使用 sycn 方法同步处理结果,五阻塞当前线程 + // channelFuture.sync(); + // 五阻塞向下执行获取Channel + // Channel channel = channelFuture.channel(); + // channel.writeAndFlush("哈哈哈哈"); + // 使用addLietener 方法异步处理结果 + + channelFuture.addListener(new ChannelFutureListener() { + @Override + // 在nio线程连接建立好之后,会调用aperationComplate + public void operationComplete(ChannelFuture channelFuture) throws Exception { + Channel channel = channelFuture.channel(); + log.debug("当前的 Channel: {}", channel); + channel.writeAndFlush("使用监听方法"); + } + }); + } +} diff --git a/netty/service/src/main/java/cn/bunny/service/netty/demo3/CloseFutureClient.java b/netty/service/src/main/java/cn/bunny/service/netty/demo3/CloseFutureClient.java new file mode 100644 index 0000000..39321fd --- /dev/null +++ b/netty/service/src/main/java/cn/bunny/service/netty/demo3/CloseFutureClient.java @@ -0,0 +1,70 @@ +package cn.bunny.service.netty.demo3; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import lombok.extern.slf4j.Slf4j; + +import java.net.InetSocketAddress; +import java.util.Scanner; + +@Slf4j +public class CloseFutureClient { + public static void main(String[] args) throws InterruptedException { + NioEventLoopGroup group = new NioEventLoopGroup(); + ChannelFuture channelFuture = new Bootstrap() + .group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { + nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); + nioSocketChannel.pipeline().addLast(new StringEncoder()); + } + }) + .connect(new InetSocketAddress("localhost", 8080)); + + Channel channel = channelFuture.sync().channel(); + new Thread(() -> { + Scanner scanner = new Scanner(System.in); + while (true) { + String line = scanner.nextLine(); + if ("q".equals(line)) { + channel.close();// 1秒之后关闭(异步)不一定是这个先关闭 + break; + } + channel.writeAndFlush(line); + } + }, "input").start(); + + // 获取CloseFuture 1、同步处理关闭 2、一步处理关闭 + ChannelFuture closeFuture = channel.closeFuture(); + // 同步关闭 + /** + * * 同步方式关闭,目前关闭控制台并没有关闭 + * log.debug("等待关闭。。。"); + * closeFuture.sync(); + * log.debug("处理完成关闭"); + * group.shutdownGracefully(); + */ + + /** + * * 异步方式关闭 + */ + closeFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + // 不是立刻停止,等待消息发送完成后在关闭 + group.shutdownGracefully(); + log.debug("处理完成关闭"); + } + }); + } +} diff --git a/netty/service/src/main/java/cn/bunny/service/netty/demo4/JdkFuture.java b/netty/service/src/main/java/cn/bunny/service/netty/demo4/JdkFuture.java new file mode 100644 index 0000000..5ed1e96 --- /dev/null +++ b/netty/service/src/main/java/cn/bunny/service/netty/demo4/JdkFuture.java @@ -0,0 +1,25 @@ +package cn.bunny.service.netty.demo4; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.*; + +@Slf4j +public class JdkFuture { + public static void main(String[] args) throws ExecutionException, InterruptedException { + // 线程池 + ExecutorService service = Executors.newFixedThreadPool(2); + // 提交任务 + Future future = service.submit(new Callable() { + @Override + public Integer call() throws Exception { + log.debug("执行计算"); + Thread.sleep(1000); + return 50; + } + }); + + log.debug("等待结果"); + log.debug("结果是 {}", future.get()); + } +} diff --git a/netty/service/src/main/java/cn/bunny/service/netty/demo4/NettyFuture.java b/netty/service/src/main/java/cn/bunny/service/netty/demo4/NettyFuture.java new file mode 100644 index 0000000..dc6d3b5 --- /dev/null +++ b/netty/service/src/main/java/cn/bunny/service/netty/demo4/NettyFuture.java @@ -0,0 +1,39 @@ +package cn.bunny.service.netty.demo4; + +import io.netty.channel.EventLoop; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +@Slf4j +public class NettyFuture { + public static void main(String[] args) throws ExecutionException, InterruptedException { + NioEventLoopGroup group = new NioEventLoopGroup(); + EventLoop eventLoop = group.next(); + + Future future = eventLoop.submit(new Callable() { + @Override + public Integer call() throws Exception { + log.debug("执行计算"); + Thread.sleep(1000); + return 70; + } + }); + + // 同步方式等待结果 + log.debug("等待结果。。。"); + log.debug("结果是:{}", future.get()); + + // 异步方式接受结果 + future.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + log.debug("接受结果:{}", future.getNow()); + } + }); + } +} diff --git a/netty/service/src/main/java/cn/bunny/service/netty/demo4/NettyPromise.java b/netty/service/src/main/java/cn/bunny/service/netty/demo4/NettyPromise.java new file mode 100644 index 0000000..81f90b2 --- /dev/null +++ b/netty/service/src/main/java/cn/bunny/service/netty/demo4/NettyPromise.java @@ -0,0 +1,34 @@ +package cn.bunny.service.netty.demo4; + +import io.netty.channel.EventLoop; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.DefaultPromise; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ExecutionException; + +@Slf4j +public class NettyPromise { + public static void main(String[] args) throws ExecutionException, InterruptedException { + // 准备 EventLoop 对象 + EventLoop eventLoop = new NioEventLoopGroup().next(); + // 可以主动创建Promise 结果容器 + DefaultPromise promise = new DefaultPromise<>(eventLoop); + + new Thread(() -> { + log.debug("开始计算..."); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // 添加失败结果 + promise.setFailure(e); + } + // 填充正常结果,也可以填充异常结果,如果 + promise.setSuccess(80); + }).start(); + + // 接受结果的线程 + log.debug("等待结果"); + log.debug("结果是:{}", promise.get()); + } +} diff --git a/netty/service/src/main/java/cn/bunny/service/netty/demo5/PipLineServerInbound.java b/netty/service/src/main/java/cn/bunny/service/netty/demo5/PipLineServerInbound.java new file mode 100644 index 0000000..570f140 --- /dev/null +++ b/netty/service/src/main/java/cn/bunny/service/netty/demo5/PipLineServerInbound.java @@ -0,0 +1,84 @@ +package cn.bunny.service.netty.demo5; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +@Slf4j +public class PipLineServerInbound { + public static void main(String[] args) { + new ServerBootstrap() + .group(new NioEventLoopGroup()) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { + // 通过 Channel 拿到pipLine + ChannelPipeline pipeline = nioSocketChannel.pipeline(); + // handler 处理结果 + pipeline.addLast("h1", new ChannelInboundHandlerAdapter() { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("第一个"); + ByteBuf buf = (ByteBuf) msg; + String name = buf.toString(Charset.defaultCharset()); + super.channelRead(ctx, name); + } + }); + + // handler 第二个处理结果 + pipeline.addLast("h2", new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("第二个"); + // 将数据传输给下一个链,如果不使用下一个不会接受到相应内容 + super.channelRead(ctx, msg); + } + }); + // handler 第三个处理结果 + pipeline.addLast("h3", new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("第三个 是否拿到上一个参数:{}", msg); + super.channelRead(ctx, msg); + nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("服务".getBytes())); + } + }); + + // 第四个出栈内容 + pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + log.info("第四个 出栈"); + super.write(ctx, msg, promise); + } + }); + + // 第五个出栈内容 + pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + log.info("第五个 出栈"); + super.write(ctx, msg, promise); + } + }); + + // 第六个出栈内容 + pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + log.info("第六个 出栈"); + super.write(ctx, msg, promise); + } + }); + } + }).bind(8080); + + // 进栈:按照顺序进行 + // 出栈:从后往前走 + } +} diff --git a/netty/service/src/main/java/cn/bunny/service/netty/demo5/PipLineServerOutbound.java b/netty/service/src/main/java/cn/bunny/service/netty/demo5/PipLineServerOutbound.java new file mode 100644 index 0000000..91e780d --- /dev/null +++ b/netty/service/src/main/java/cn/bunny/service/netty/demo5/PipLineServerOutbound.java @@ -0,0 +1,90 @@ +package cn.bunny.service.netty.demo5; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; + +@Slf4j +public class PipLineServerOutbound { + public static void main(String[] args) { + new ServerBootstrap() + .group(new NioEventLoopGroup()) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { + // 通过 Channel 拿到pipLine + ChannelPipeline pipeline = nioSocketChannel.pipeline(); + // handler 处理结果 + pipeline.addLast("h1", new ChannelInboundHandlerAdapter() { + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("第一个"); + ByteBuf buf = (ByteBuf) msg; + String name = buf.toString(Charset.defaultCharset()); + super.channelRead(ctx, name); + } + }); + + // handler 第二个处理结果 + pipeline.addLast("h2", new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("第二个"); + // 将数据传输给下一个链,如果不使用下一个不会接受到相应内容 + super.channelRead(ctx, msg); + } + }); + + + // 第四个出栈内容 + pipeline.addLast("h4", new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + log.info("第四个 出栈"); + super.write(ctx, msg, promise); + } + }); + + // handler 第三个处理结果 + pipeline.addLast("h3", new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("第三个 是否拿到上一个参数:{}", msg); + ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("服务。。。".getBytes())); + } + }); + + // 第五个出栈内容 + pipeline.addLast("h5", new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + log.info("第五个 出栈"); + super.write(ctx, msg, promise); + } + }); + + // 第六个出栈内容 + pipeline.addLast("h6", new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + log.info("第六个 出栈"); + super.write(ctx, msg, promise); + } + }); + } + }).bind(8080); + + // 进栈:按照顺序进行 + // 出栈:从后往前走 + /** + * ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("服务。。。".getBytes())); 出栈 + * 顺序和正常不一样,从当前调用的往前找,所以这个顺序是1234,因为4在3前面,当三执行完成后,4是出栈,所以之后才会执行4 + */ + } +} diff --git a/netty/service/src/main/java/cn/bunny/service/netty/demo6/ByteBufDemo.java b/netty/service/src/main/java/cn/bunny/service/netty/demo6/ByteBufDemo.java new file mode 100644 index 0000000..c1b3b3b --- /dev/null +++ b/netty/service/src/main/java/cn/bunny/service/netty/demo6/ByteBufDemo.java @@ -0,0 +1,18 @@ +package cn.bunny.service.netty.demo6; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; + +public class ByteBufDemo { + public static void main(String[] args) { + ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); + System.out.println(buf); + StringBuilder stringBuilder = new StringBuilder(); + + for (int i = 0; i < 300; i++) { + stringBuilder.append("a"); + } + buf.writeBytes(stringBuilder.toString().getBytes()); + System.out.println(buf); + } +} diff --git a/netty/service/src/main/java/cn/bunny/service/netty/demo7/HelloClient.java b/netty/service/src/main/java/cn/bunny/service/netty/demo7/HelloClient.java new file mode 100644 index 0000000..6352c89 --- /dev/null +++ b/netty/service/src/main/java/cn/bunny/service/netty/demo7/HelloClient.java @@ -0,0 +1,41 @@ +package cn.bunny.service.netty.demo7; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class HelloClient { + public static void main(String[] args) { + NioEventLoopGroup group = new NioEventLoopGroup(); + try { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.channel(NioSocketChannel.class); + bootstrap.group(group); + bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ByteBuf buf = ctx.alloc().buffer(16); + buf.writeBytes(new byte[]{0, 1, 2, 3, 4}); + ctx.writeAndFlush(buf); + ctx.channel().close(); + } + }); + } + }); + } catch (Exception e) { + log.debug("客户端错误:{}", e); + } finally { + group.shutdownGracefully(); + } + } +}