feat(新增): 🚀 netty的 ByteBuf和打印输出工具类

This commit is contained in:
Bunny 2024-05-24 22:33:36 +08:00
parent 8275bdc281
commit 596a22efd0
15 changed files with 45 additions and 550 deletions

View File

@ -1,51 +0,0 @@
package cn.bunny.common.utils;
import cn.bunny.vo.system.comment.CommentVo;
import java.util.ArrayList;
import java.util.List;
public class CommentUtil {
/**
* 构建树型结构
*
* @param commentList 评论列表
* @return 结构列表
*/
public static List<CommentVo> buildTree(List<CommentVo> commentList) {
// 构建树形结构
List<CommentVo> tree = new ArrayList<>();
// 遍历评论列表
for (CommentVo comment : commentList) {
// 找到顶级评论没有父评论
if (comment.getPCommentId() == 0) {
// 递归构建子评论
comment.setChildren(getChildren(comment.getId(), commentList));
tree.add(comment);
}
}
return tree;
}
/**
* 递归获取子评论
*
* @param commentId 当前评论ID
* @param commentList 评论列表
* @return 子评论列表
*/
private static List<CommentVo> getChildren(Long commentId, List<CommentVo> commentList) {
List<CommentVo> children = new ArrayList<>();
// 遍历评论列表
for (CommentVo comment : commentList) {
// 找到当前评论的子评论
if (Long.valueOf(comment.getPCommentId()).equals(commentId)) {
// 递归构建子评论的子评论
comment.setChildren(getChildren(comment.getId(), commentList));
children.add(comment);
}
}
return children;
}
}

View File

@ -0,0 +1,20 @@
package cn.bunny.service;
import io.netty.buffer.ByteBuf;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
public class NettyLogUtil {
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf);
}
}

View File

@ -0,0 +1,25 @@
package cn.bunny.service.netty.demo3;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import static cn.bunny.service.NettyLogUtil.log;
public class ByteBufDemo {
public static void main(String[] args) {
// 堆内存
// ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
// 非池化的
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf);
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 300; i++) {
stringBuilder.append("a");
}
buf.writeBytes(stringBuilder.toString().getBytes());
System.out.println(buf);
log(buf);
}
}

View File

@ -1,62 +0,0 @@
package cn.bunny;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;
@Slf4j
public class InputClientTest {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture future = new Bootstrap()
// .group(new NioEventLoopGroup())
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
}).connect(new InetSocketAddress("localhost", 8080));
Channel channel = future.sync().channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 获取CloseFuture 对象同步处理处理关闭 异步处理要关闭
ChannelFuture channelFuture = channel.closeFuture();
System.out.println("等待关闭......");
// 第一种方式关闭
// channelFuture.sync();
// log.debug("处理之后的关闭");
// 第二种方式关闭
channelFuture.addListener((ChannelFutureListener) future1 -> {
log.debug("处理之后的关闭");
group.shutdownGracefully();
});
}
}

View File

@ -1,54 +0,0 @@
package cn.bunny;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.Date;
public class TestDemoClient {
public static void main(String[] args) throws InterruptedException {
// 启动类
Channel channel = new Bootstrap()
// 添加EventLoop
.group(new NioEventLoopGroup())
// 设置客户端Channel类型
.channel(NioSocketChannel.class)
// 选择客户端 Channel
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override// 在连接建立后调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);// 使用debug方式发送消息...
System.out.println();
// // 向服务器发送数据
// .writeAndFlush("你好啊啊啊");
}
private static void m1() throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup()) // 1
.channel(NioSocketChannel.class) // 2
.handler(new ChannelInitializer<NioSocketChannel>() { // 3
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringEncoder()); // 8
}
})
.connect("127.0.0.1", 8080) // 4
.sync() // 5
.channel() // 6
.writeAndFlush(new Date() + ": hello world!"); // 7
}
}

View File

@ -1,36 +0,0 @@
package cn.bunny.demo1;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class TestDemoServer {
public static void main(String[] args) {
// 服务器端启动器负责组装 netty 组件启动服务器
new ServerBootstrap()
// 2. BossEventLoop,WorkEventLoop group组
.group(new NioEventLoopGroup())
// 3. 选择服务器ServerSocketChannel 实现
.channel(NioServerSocketChannel.class)
// 4. boss 负责处理连接决定work可以做哪些事情
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 将By特Buffer转为字符串
nioSocketChannel.pipeline().addLast(new StringDecoder());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 自定义handler
@Override// 读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 消息字符串
System.out.println(msg);
}
});
}
}).bind(8080);
}
}

View File

@ -1,44 +0,0 @@
package cn.bunny.demo2;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.NettyRuntime;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class TestGroupLoop {
public static void main(String[] args) {
// io 事件普通任务定时任务
NioEventLoopGroup group = new NioEventLoopGroup();
// 获取下一个事件循环对象
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
// 执行普通任务
group.next().submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("执行完成");
});
// 执行定时任务
group.next().scheduleAtFixedRate(() -> {
log.debug("定时任务的执行");
}, 0, 1, TimeUnit.SECONDS);
}
private static void test1() {
// io 事件普通任务定时任务
NioEventLoopGroup group = new NioEventLoopGroup();
// 普通任务定时任务
DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup();
// 查看系统是几核
System.out.println(NettyRuntime.availableProcessors());
}
}

View File

@ -1,36 +0,0 @@
package cn.bunny.demo3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class EventLoopServerTest {
public static void main(String[] args) {
// 创建服务器
new ServerBootstrap()
// BOOS只负责accept事件worker 只负责 SocketChannel 上的读写
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))// 设置工作线程为2个
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
}).bind(8080);
}
}

View File

@ -1,45 +0,0 @@
package cn.bunny.demo4;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class TestDefaultServerLoopGroup {
public static void main(String[] args) {
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast("handler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString((Charset.defaultCharset())));
// 让消息继续传递下去
ctx.fireChannelRead(msg);
}
}).addLast(group, "hander2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
}
}

View File

@ -1,21 +0,0 @@
package cn.bunny.demo5;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestAsyncFuture {
public static void main(String[] args) {
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
EventLoop eventLoop = loopGroup.next();
Future<Integer> future = eventLoop.submit(() -> {
log.debug("执行计算...");
Thread.sleep(1000);
return 70;
});
future.addListener(future1 -> log.debug("接受结果:{}", future1.getNow()));
}
}

View File

@ -1,26 +0,0 @@
package cn.bunny.demo5;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(() -> {
Thread.sleep(1000);
return 70;
});
// 主线程通过 future 来获取结果
log.debug("主线程等待结果");
future.get();
log.debug("结果是:{}", future.get());
}
}

View File

@ -1,26 +0,0 @@
package cn.bunny.demo5;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class UdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 线程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 提交任务
Future<Integer> future = service.submit(() -> {
Thread.sleep(1000);
return 50;
});
// 主线程通过 future 来获取结果
log.debug("主线程等待结果");
future.get();
log.debug("结果是:{}", future.get());
}
}

View File

@ -1,47 +0,0 @@
package cn.bunny.demo6;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
public class TestEmbedded {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(1);
ctx.fireChannelRead(msg); // 1
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(2);
ctx.channel().write(msg); // 2
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(4);
ctx.write(msg, promise); // 3
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(5);
ctx.write(msg, promise); // 4
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello world".getBytes()));
}
}

View File

@ -1,35 +0,0 @@
package cn.bunny.demo6;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 准备EventLoop 对象
EventLoop eventLoop = new NioEventLoopGroup().next();
// 可以创建promise 结果容器
Promise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
// 任意线程进行计算
System.out.println("开始计算...");
// 计算完成后向promise填充结果
try {
Thread.sleep(1000);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
promise.setSuccess(80);
}).start();
// 除了可以等待结果也可以处理异常
log.debug("等待结果...");
log.debug("结果是:{}", promise.get());
}
}

View File

@ -1,67 +0,0 @@
package cn.bunny.demo6;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestPipelineDemo {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(1);
ctx.fireChannelRead(msg); // 1
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(2);
ctx.fireChannelRead(msg); // 2
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(3);
ctx.channel().write(msg); // 3
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(4);
ctx.write(msg, promise); // 4
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(5);
ctx.write(msg, promise); // 5
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(6);
ctx.write(msg, promise); // 6
}
});
}
})
.bind(8080);
}
}