Compare commits

...

8 Commits

21 changed files with 311 additions and 550 deletions

View File

@ -1,51 +0,0 @@
package cn.bunny.common.utils;
import cn.bunny.vo.system.comment.CommentVo;
import java.util.ArrayList;
import java.util.List;
public class CommentUtil {
/**
* 构建树型结构
*
* @param commentList 评论列表
* @return 结构列表
*/
public static List<CommentVo> buildTree(List<CommentVo> commentList) {
// 构建树形结构
List<CommentVo> tree = new ArrayList<>();
// 遍历评论列表
for (CommentVo comment : commentList) {
// 找到顶级评论没有父评论
if (comment.getPCommentId() == 0) {
// 递归构建子评论
comment.setChildren(getChildren(comment.getId(), commentList));
tree.add(comment);
}
}
return tree;
}
/**
* 递归获取子评论
*
* @param commentId 当前评论ID
* @param commentList 评论列表
* @return 子评论列表
*/
private static List<CommentVo> getChildren(Long commentId, List<CommentVo> commentList) {
List<CommentVo> children = new ArrayList<>();
// 遍历评论列表
for (CommentVo comment : commentList) {
// 找到当前评论的子评论
if (Long.valueOf(comment.getPCommentId()).equals(commentId)) {
// 递归构建子评论的子评论
comment.setChildren(getChildren(comment.getId(), commentList));
children.add(comment);
}
}
return children;
}
}

View File

@ -0,0 +1,20 @@
package cn.bunny.service;
import io.netty.buffer.ByteBuf;
import static io.netty.buffer.ByteBufUtil.appendPrettyHexDump;
import static io.netty.util.internal.StringUtil.NEWLINE;
public class NettyLogUtil {
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf);
}
}

View File

@ -0,0 +1,25 @@
package cn.bunny.service.netty.demo3;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import static cn.bunny.service.NettyLogUtil.log;
public class ByteBufDemo {
public static void main(String[] args) {
// 堆内存
// ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer();
// 非池化的
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf);
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < 300; i++) {
stringBuilder.append("a");
}
buf.writeBytes(stringBuilder.toString().getBytes());
System.out.println(buf);
log(buf);
}
}

View File

@ -0,0 +1,35 @@
package cn.bunny.service.netty.demo3;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import static cn.bunny.service.NettyLogUtil.log;
public class ByteBufSlice {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
// +-------------------------------------------------+
// | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
// +--------+-------------------------------------------------+----------------+
// |00000000| 61 62 63 64 65 66 67 68 69 6a 6b 6c |abcdefghijkl |
// +--------+-------------------------------------------------+----------------+
buf.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l'});
log(buf);
// +-------------------------------------------------+
// | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
// +--------+-------------------------------------------------+----------------+
// |00000000| 61 62 63 64 65 |abcde |
// +--------+-------------------------------------------------+----------------+
ByteBuf buf1 = buf.slice(0, 5);
log(buf1);
// +-------------------------------------------------+
// | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
// +--------+-------------------------------------------------+----------------+
// |00000000| 66 67 68 69 6a |fghij |
// +--------+-------------------------------------------------+----------------+
ByteBuf buf2 = buf.slice(5, 5);
log(buf2);
}
}

View File

@ -0,0 +1,31 @@
package cn.bunny.service.netty.demo3;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import static cn.bunny.service.NettyLogUtil.log;
public class ByteByteBufAllocator {
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
log(buf);
ByteBuf buf1 = buf.slice(0, 5);
buf1.retain();
log(buf1);
ByteBuf buf2 = buf.slice(5, 5);
buf2.retain();
log(buf2);
// 释放原有的buf 内容
buf.release();
log(buf);
// 释放 buf1和buf2内容
buf1.release();
buf2.release();
}
}

View File

@ -0,0 +1,24 @@
package cn.bunny.service.netty.demo3;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import static cn.bunny.service.NettyLogUtil.log;
public class ByteCompositeByteBuff {
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 将前两个字节拼接在一起
CompositeByteBuf buffer = ByteBufAllocator.DEFAULT.compositeBuffer();
buffer.addComponents(true, buf1, buf2);
log(buffer);
}
}

View File

@ -0,0 +1,58 @@
package cn.bunny.service.netty.demo4;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PackageClient {
public static void main(String[] args) {
// 10 次发送
for (int i = 0; i < 10; i++) {
send();
}
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.debug("conneted...");
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18});
ctx.writeAndFlush(buffer);
// 发完即关
ctx.close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,53 @@
package cn.bunny.service.netty.demo4;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PackageServer {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, work);
// 接受缓冲区接受为10个字节
// serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("connected {}", ctx.channel());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("disconnect {}", ctx.channel());
super.channelInactive(ctx);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception exception) {
log.error("服务器错误。。。", exception);
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,65 @@
package cn.bunny.service.netty.demo5;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class RedisSend {
public static void main(String[] args) {
final byte[] LINE = {13, 10};
NioEventLoopGroup work = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(work);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LoggingHandler());
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 在连接建立时发送
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("*2".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("get".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("$3".getBytes());
buf.writeBytes(LINE);
buf.writeBytes("aaa".getBytes());
buf.writeBytes(LINE);
ctx.writeAndFlush(buf);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("47.120.65.66", 6379);
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("服务器错误", e);
} finally {
work.shutdownGracefully();
}
}
}

View File

@ -1,62 +0,0 @@
package cn.bunny;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Scanner;
@Slf4j
public class InputClientTest {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture future = new Bootstrap()
// .group(new NioEventLoopGroup())
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
}).connect(new InetSocketAddress("localhost", 8080));
Channel channel = future.sync().channel();
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 获取CloseFuture 对象同步处理处理关闭 异步处理要关闭
ChannelFuture channelFuture = channel.closeFuture();
System.out.println("等待关闭......");
// 第一种方式关闭
// channelFuture.sync();
// log.debug("处理之后的关闭");
// 第二种方式关闭
channelFuture.addListener((ChannelFutureListener) future1 -> {
log.debug("处理之后的关闭");
group.shutdownGracefully();
});
}
}

View File

@ -1,54 +0,0 @@
package cn.bunny;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.Date;
public class TestDemoClient {
public static void main(String[] args) throws InterruptedException {
// 启动类
Channel channel = new Bootstrap()
// 添加EventLoop
.group(new NioEventLoopGroup())
// 设置客户端Channel类型
.channel(NioSocketChannel.class)
// 选择客户端 Channel
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override// 在连接建立后调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
System.out.println(channel);// 使用debug方式发送消息...
System.out.println();
// // 向服务器发送数据
// .writeAndFlush("你好啊啊啊");
}
private static void m1() throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup()) // 1
.channel(NioSocketChannel.class) // 2
.handler(new ChannelInitializer<NioSocketChannel>() { // 3
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new StringEncoder()); // 8
}
})
.connect("127.0.0.1", 8080) // 4
.sync() // 5
.channel() // 6
.writeAndFlush(new Date() + ": hello world!"); // 7
}
}

View File

@ -1,36 +0,0 @@
package cn.bunny.demo1;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class TestDemoServer {
public static void main(String[] args) {
// 服务器端启动器负责组装 netty 组件启动服务器
new ServerBootstrap()
// 2. BossEventLoop,WorkEventLoop group组
.group(new NioEventLoopGroup())
// 3. 选择服务器ServerSocketChannel 实现
.channel(NioServerSocketChannel.class)
// 4. boss 负责处理连接决定work可以做哪些事情
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 将By特Buffer转为字符串
nioSocketChannel.pipeline().addLast(new StringDecoder());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 自定义handler
@Override// 读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 消息字符串
System.out.println(msg);
}
});
}
}).bind(8080);
}
}

View File

@ -1,44 +0,0 @@
package cn.bunny.demo2;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.NettyRuntime;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
public class TestGroupLoop {
public static void main(String[] args) {
// io 事件普通任务定时任务
NioEventLoopGroup group = new NioEventLoopGroup();
// 获取下一个事件循环对象
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
// 执行普通任务
group.next().submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("执行完成");
});
// 执行定时任务
group.next().scheduleAtFixedRate(() -> {
log.debug("定时任务的执行");
}, 0, 1, TimeUnit.SECONDS);
}
private static void test1() {
// io 事件普通任务定时任务
NioEventLoopGroup group = new NioEventLoopGroup();
// 普通任务定时任务
DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup();
// 查看系统是几核
System.out.println(NettyRuntime.availableProcessors());
}
}

View File

@ -1,36 +0,0 @@
package cn.bunny.demo3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class EventLoopServerTest {
public static void main(String[] args) {
// 创建服务器
new ServerBootstrap()
// BOOS只负责accept事件worker 只负责 SocketChannel 上的读写
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))// 设置工作线程为2个
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
}).bind(8080);
}
}

View File

@ -1,45 +0,0 @@
package cn.bunny.demo4;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class TestDefaultServerLoopGroup {
public static void main(String[] args) {
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast("handler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString((Charset.defaultCharset())));
// 让消息继续传递下去
ctx.fireChannelRead(msg);
}
}).addLast(group, "hander2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
super.channelRead(ctx, msg);
}
});
}
}).bind(8080);
}
}

View File

@ -1,21 +0,0 @@
package cn.bunny.demo5;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestAsyncFuture {
public static void main(String[] args) {
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
EventLoop eventLoop = loopGroup.next();
Future<Integer> future = eventLoop.submit(() -> {
log.debug("执行计算...");
Thread.sleep(1000);
return 70;
});
future.addListener(future1 -> log.debug("接受结果:{}", future1.getNow()));
}
}

View File

@ -1,26 +0,0 @@
package cn.bunny.demo5;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(() -> {
Thread.sleep(1000);
return 70;
});
// 主线程通过 future 来获取结果
log.debug("主线程等待结果");
future.get();
log.debug("结果是:{}", future.get());
}
}

View File

@ -1,26 +0,0 @@
package cn.bunny.demo5;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class UdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 线程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 提交任务
Future<Integer> future = service.submit(() -> {
Thread.sleep(1000);
return 50;
});
// 主线程通过 future 来获取结果
log.debug("主线程等待结果");
future.get();
log.debug("结果是:{}", future.get());
}
}

View File

@ -1,47 +0,0 @@
package cn.bunny.demo6;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
public class TestEmbedded {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(1);
ctx.fireChannelRead(msg); // 1
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(2);
ctx.channel().write(msg); // 2
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(4);
ctx.write(msg, promise); // 3
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(5);
ctx.write(msg, promise); // 4
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello world".getBytes()));
}
}

View File

@ -1,35 +0,0 @@
package cn.bunny.demo6;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 准备EventLoop 对象
EventLoop eventLoop = new NioEventLoopGroup().next();
// 可以创建promise 结果容器
Promise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
// 任意线程进行计算
System.out.println("开始计算...");
// 计算完成后向promise填充结果
try {
Thread.sleep(1000);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
promise.setSuccess(80);
}).start();
// 除了可以等待结果也可以处理异常
log.debug("等待结果...");
log.debug("结果是:{}", promise.get());
}
}

View File

@ -1,67 +0,0 @@
package cn.bunny.demo6;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestPipelineDemo {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(1);
ctx.fireChannelRead(msg); // 1
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(2);
ctx.fireChannelRead(msg); // 2
}
});
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(3);
ctx.channel().write(msg); // 3
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(4);
ctx.write(msg, promise); // 4
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(5);
ctx.write(msg, promise); // 5
}
});
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
System.out.println(6);
ctx.write(msg, promise); // 6
}
});
}
})
.bind(8080);
}
}