2021-07-06

厉害了,Netty 轻松实现文件上传!

作者:rickiyang

出处:www.cnblogs.com/rickiyang/p/11074222.html

今天我们来完成一个使用netty进行文件传输的任务。在实际项目中,文件传输通常采用FTP或者HTTP附件的方式。事实上通过TCP Socket+File的方式进行文件传输也有一定的应用场景,尽管不是主流,但是掌握这种文件传输方式还是比较重要的,特别是针对两个跨主机的JVM进程之间进行持久化数据的相互交换。

而使用netty来进行文件传输也是利用netty天然的优势:零拷贝功能。很多同学都听说过netty的"零拷贝"功能,但是具体体现在哪里又不知道,下面我们就简要介绍下:

Netty的"零拷贝"主要体现在如下三个方面:

  1. Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。

  2. Netty提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便的对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer。

  3. Netty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题。

具体的分析在此就不多做介绍,有兴趣的可以查阅相关文档。我们还是把重点放在文件传输上。Netty作为高性能的服务器端异步IO框架必然也离不开文件读写功能,我们可以使用netty模拟http的形式通过网页上传文件写入服务器,当然要使用http的形式那你也用不着netty!大材小用。

netty4中如果想使用http形式上传文件你还得借助第三方jar包:okhttp。使用该jar完成http请求的发送。但是在netty5 中已经为我们写好了,我们可以直接调用netty5的API就可以实现。所以netty4和5的差别还是挺大的,至于使用哪个,那就看你们公司选择哪一个了!本文目前使用netty4来实现文件上传功能。下面我们上代码:

pom文件:

<dependency>  <groupId>io.netty</groupId>  <artifactId>netty-all</artifactId>  <version>4.1.5.Final</version></dependency>

server端:

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.serialization.ClassResolvers;import io.netty.handler.codec.serialization.ObjectDecoder;import io.netty.handler.codec.serialization.ObjectEncoder;public class FileUploadServer { public void bind(int port) throws Exception {  EventLoopGroup bossGroup = new NioEventLoopGroup();  EventLoopGroup workerGroup = new NioEventLoopGroup();  try {   ServerBootstrap b = new ServerBootstrap();   b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<Channel>() {    @Override    protected void initChannel(Channel ch) throws Exception {     ch.pipeline().addLast(new ObjectEncoder());     ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null))); // 最大长度     ch.pipeline().addLast(new FileUploadServerHandler());    }   });   ChannelFuture f = b.bind(port).sync();   f.channel().closeFuture().sync();  } finally {   bossGroup.shutdownGracefully();   workerGroup.shutdownGracefully();  } } public static void main(String[] args) {  int port = 8080;  if (args != null && args.length > 0) {   try {    port = Integer.valueOf(args[0]);   } catch (NumberFormatException e) {    e.printStackTrace();   }  }  try {   new FileUploadServer().bind(port);  } catch (Exception e) {   e.printStackTrace();  } }}

server端handler:

import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.io.File;import java.io.RandomAccessFile;public class FileUploadServerHandler extends ChannelInboundHandlerAdapter { private int byteRead; private volatile int start = 0; private String file_dir = "D:"; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  if (msg instanceof FileUploadFile) {   FileUploadFile ef = (FileUploadFile) msg;   byte[] bytes = ef.getBytes();   byteRead = ef.getEndPos();   String md5 = ef.getFile_md5();//文件名   String path = file_dir + File.separator + md5;   File file = new File(path);   RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");   randomAccessFile.seek(start);   randomAccessFile.write(bytes);   start = start + byteRead;   if (byteRead > 0) {    ctx.writeAndFlush(start);   } else {    randomAccessFile.close();    ctx.close();   }  } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  cause.printStackTrace();  ctx.close(); }}

client端:

import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.serialization.ClassResolvers;import io.netty.handler.codec.serialization.ObjectDecoder;import io.netty.handler.codec.serialization.ObjectEncoder;import java.io.File;public class FileUploadClient { public void connect(int port, String host, final FileUploadFile fileUploadFile) throws Exception {  EventLoopGroup group = new NioEventLoopGroup();  try {   Bootstrap b = new Bootstrap();   b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<Channel>() {    @Override    protected void initChannel(Channel ch) throws Exception {     ch.pipeline().addLast(new ObjectEncoder());     ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));     ch.pipeline().addLast(new FileUploadClientHandler(fileUploadFile));    }   });   ChannelFuture f = b.connect(host, port).sync();   f.channel().closeFuture().sync();  } finally {   group.shutdownGracefully();  } } public static void main(String[] args) {  int port = 8080;  if (args != null && args.length > 0) {   try {    port = Integer.valueOf(args[0]);   } catch (NumberFormatException e) {    e.printStackTrace();   }  }  try {   FileUploadFile uploadFile = new FileUploadFile();   File file = new File("c:/1.txt");   String fileMd5 = file.getName();// 文件名   uploadFile.setFile(file);   uploadFile.setFile_md5(fileMd5);   uploadFile.setStarPos(0);// 文件开始位置   new FileUploadClient().connect(port, "127.0.0.1", uploadFile);  } catch (Exception e) {   e.printStackTrace();  } }}

client端handler:

import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.io.FileNotFoundException;import java.io.IOException;import java.io.RandomAccessFile;public class FileUploadClientHandler extends ChannelInboundHandlerAdapter { private int byteRead; private volatile int start = 0; private volatile int lastLength = 0; public RandomAccessFile randomAccessFile; private FileUploadFile fileUploadFile; public FileUploadClientHandler(FileUploadFile ef) {  if (ef.getFile().exists()) {   if (!ef.getFile().isFile()) {    System.out.println("Not a file :" + ef.getFile());    return;   }  }  this.fileUploadFile = ef; } public void channelActive(ChannelHandlerContext ctx) {  try {   randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");   randomAccessFile.seek(fileUploadFile.getStarPos());   lastLength = (int) randomAccessFile.length() / 10;   byte[] bytes = new byte[lastLength];   if ((byteRead = randomAccessFile.read(bytes)) != -1) {    fileUploadFile.setEndPos(byteRead);    fileUploadFile.setBytes(bytes);    ctx.writeAndFlush(fileUploadFile);   } else {    System.out.println("文件已经读完");   }  } catch (FileNotFoundException e) {   e.printStackTrace();  } catch (IOException i) {   i.printStackTrace();  } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  if (msg instanceof Integer) {   start = (Integer) msg;   if (start != -1) {    randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");    randomAccessFile.seek(start);    System.out.println("块儿长度:" + (randomAccessFile.length() / 10));    System.out.println("长度:" + (randomAccessFile.length() - start));    int a = (int) (randomAccessFile.length() - start);    int b = (int) (randomAccessFile.length() / 10);    if (a < b) {     lastLength = a;    }    byte[] bytes = new byte[lastLength];    System.out.println("-----------------------------" + bytes.length);    if ((byteRead = randomAccessFile.read(bytes)) != -1 && (randomAccessFile.length() - start) > 0) {     System.out.println("byte 长度:" + bytes.length);     fileUploadFile.setEndPos(byteRead);     fileUploadFile.setBytes(bytes);     try {      ctx.writeAndFlush(fileUploadFile);     } catch (Exception e) {      e.printStackTrace();     }    } else {     randomAccessFile.close();     ctx.close();     System.out.println("文件已经读完--------" + byteRead);    }   }  } } // @Override // public void channelRead(ChannelHandlerContext ctx, Object msg) throws // Exception { // System.out.println("Server is speek :"+msg.toString()); // FileRegion filer = (FileRegion) msg; // String path = "E://Apk//APKMD5.txt"; // File fl = new File(path); // fl.createNewFile(); // RandomAccessFile rdafile = new RandomAccessFile(path, "rw"); // FileRegion f = new DefaultFileRegion(rdafile.getChannel(), 0, // rdafile.length()); // // System.out.println("This is" + ++counter + "times receive server:[" // + msg + "]"); // } // @Override // public void channelReadComplete(ChannelHandlerContext ctx) throws // Exception { // ctx.flush(); // } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  cause.printStackTrace();  ctx.close(); } // @Override // protected void channelRead0(ChannelHandlerContext ctx, String msg) // throws Exception { // String a = msg; // System.out.println("This is"+ // ++counter+"times receive server:["+msg+"]"); // }}

我们还自定义了一个对象,用于统计文件上传进度的:

import java.io.File;import java.io.Serializable;public class FileUploadFile implements Serializable { private static final long serialVersionUID = 1L; private File file;// 文件 private String file_md5;// 文件名 private int starPos;// 开始位置 private byte[] bytes;// 文件字节数组 private int endPos;// 结尾位置 public int getStarPos() {  return starPos; } public void setStarPos(int starPos) {  this.starPos = starPos; } public int getEndPos() {  return endPos; } public void setEndPos(int endPos) {  this.endPos = endPos; } public byte[] getBytes() {  return bytes; } public void setBytes(byte[] bytes) {  this.bytes = bytes; } public File getFile() {  return file; } public void setFile(File file) {  this.file = file; } public String getFile_md5() {  return file_md5; } public void setFile_md5(String file_md5) {  this.file_md5 = file_md5; }}

输出为:

块儿长度:894长度:8052-----------------------------894byte 长度:894块儿长度:894长度:7158-----------------------------894byte 长度:894块儿长度:894长度:6264-----------------------------894byte 长度:894块儿长度:894长度:5370-----------------------------894byte 长度:894块儿长度:894长度:4476-----------------------------894byte 长度:894块儿长度:894长度:3582-----------------------------894byte 长度:894块儿长度:894长度:2688-----------------------------894byte 长度:894块儿长度:894长度:1794-----------------------------894byte 长度:894块儿长度:894长度:900-----------------------------894byte 长度:894块儿长度:894长度:6-----------------------------6byte 长度:6块儿长度:894长度:0-----------------------------0文件已经读完--------0Process finished with exit code 0

这样就实现了服务器端文件的上传,当然我们也可以使用http的形式。

server端:

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;public class HttpFileServer implements Runnable { private int port; public HttpFileServer(int port) {  super();  this.port = port; } @Override public void run() {  EventLoopGroup bossGroup = new NioEventLoopGroup(1);  EventLoopGroup workerGroup = new NioEventLoopGroup();  ServerBootstrap serverBootstrap = new ServerBootstrap();  serverBootstrap.group(bossGroup, workerGroup);  serverBootstrap.channel(NioServerSocketChannel.class);  //serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));  serverBootstrap.childHandler(new HttpChannelInitlalizer());  try {   ChannelFuture f = serverBootstrap.bind(port).sync();   f.channel().closeFuture().sync();  } catch (InterruptedException e) {   e.printStackTrace();  } finally {   bossGroup.shutdownGracefully();   workerGroup.shutdownGracefully();  } } public static void main(String[] args) {  HttpFileServer b = new HttpFileServer(9003);  new Thread(b).start(); }}

Server端initializer:

import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.stream.ChunkedWriteHandler;public class HttpChannelInitlalizer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception {  ChannelPipeline pipeline = ch.pipeline();  pipeline.addLast(new HttpServerCodec());  pipeline.addLast(new HttpObjectAggregator(65536));  pipeline.addLast(new ChunkedWriteHandler());  pipeline.addLast(new HttpChannelHandler()); }}

server端hadler:

import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelProgressiveFuture;import io.netty.channel.ChannelProgressiveFutureListener;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.DefaultFullHttpResponse;import io.netty.handler.codec.http.DefaultHttpResponse;import io.netty.handler.codec.http.FullHttpRequest;import io.netty.handler.codec.http.FullHttpResponse;import io.netty.handler.codec.http.HttpChunkedInput;import io.netty.handler.codec.http.HttpHeaders;import io.netty.handler.codec.http.HttpResponse;import io.netty.handler.codec.http.HttpResponseStatus;import io.netty.handler.codec.http.HttpVersion;import io.netty.handler.codec.http.LastHttpContent;import io.netty.handler.stream.ChunkedFile;import io.netty.util.CharsetUtil;import io.netty.util.internal.SystemPropertyUtil;import java.io.File;import java.io.FileNotFoundException;import java.io.RandomAccessFile;import java.io.UnsupportedEncodingException;import java.net.URLDecoder;import java.util.regex.Pattern;import javax.activation.MimetypesFileTypeMap;public class HttpChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> { public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; public static final String HTTP_DATE_GMT_TIMEZONE = "GMT"; public static final int HTTP_CACHE_SECONDS = 60; @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {  // 监测解码情况  if (!request.getDecoderResult().isSuccess()) {   sendError(ctx, BAD_REQUEST);   return;  }  final String uri = request.getUri();  final String path = sanitizeUri(uri);  System.out.println("get file:"+path);  if (path == null) {   sendError(ctx, FORBIDDEN);   return;  }  //读取要下载的文件  File file = new File(path);  if (file.isHidden() || !file.exists()) {   sendError(ctx, NOT_FOUND);   return;  }  if (!file.isFile()) {   sendError(ctx, FORBIDDEN);   return;  }  RandomAccessFile raf;  try {   raf = new RandomAccessFile(file, "r");  } catch (FileNotFoundException ignore) {   sendError(ctx, NOT_FOUND);   return;  }  long fileLength = raf.length();  HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);  HttpHeaders.setContentLength(response, fileLength);  setContentTypeHeader(response, file);  //setDateAndCacheHeaders(response, file);  if (HttpHeaders.isKeepAlive(request)) {   response.headers().set("CONNECTION", HttpHeaders.Values.KEEP_ALIVE);  }  // Write the initial line and the header.  ctx.write(response);  // Write the content.  ChannelFuture sendFileFuture =  ctx.write(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), ctx.newProgressivePromise());  //sendFuture用于监视发送数据的状态  sendFileFuture.addListener(new ChannelProgressiveFutureListener() {   @Override   public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {    if (total < 0) { // total unknown     System.err.println(future.channel() + " Transfer progress: " + progress);    } else {     System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);    }   }   @Override   public void operationComplete(ChannelProgressiveFuture future) {    System.err.println(future.channel() + " Transfer complete.");   }  });  // Write the end marker  ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);  // Decide whether to close the connection or not.  if (!HttpHeaders.isKeepAlive(request)) {   // Close the connection when the whole content is written out.   lastContentFuture.addListener(ChannelFutureListener.CLOSE);  } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  cause.printStackTrace();  if (ctx.channel().isActive()) {   sendError(ctx, INTERNAL_SERVER_ERROR);  }  ctx.close(); } private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*"); private static String sanitizeUri(String uri) {  // Decode the path.  try {   uri = URLDecoder.decode(uri, "UTF-8");  } catch (UnsupportedEncodingException e) {   throw new Error(e);  }  if (!uri.startsWith("/")) {   return null;  }  // Convert file separators.  uri = uri.replace('/', File.separatorChar);  // Simplistic dumb security check.  // You will have to do something serious in the production environment.  if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || uri.startsWith(".") || uri.endsWith(".")    || INSECURE_URI.matcher(uri).matches()) {   return null;  }  // Convert to absolute path.  return SystemPropertyUtil.get("user.dir") + File.separator + uri; } private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {  FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));  response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");  // Close the connection as soon as the error message is sent.  ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } /**  * Sets the content type header for the HTTP Response  *  * @param response  *   HTTP response  * @param file  *   file to extract content type  */ private static void setContentTypeHeader(HttpResponse response, File file) {  MimetypesFileTypeMap m = ......

原文转载:http://www.shaoqun.com/a/849284.html

跨境电商:https://www.ikjzd.com/

aca:https://www.ikjzd.com/w/1371

百思买:https://www.ikjzd.com/w/394

跨境通电子商务网站:https://www.ikjzd.com/w/1329


作者:rickiyang出处:www.cnblogs.com/rickiyang/p/11074222.html今天我们来完成一个使用netty进行文件传输的任务。在实际项目中,文件传输通常采用FTP或者HTTP附件的方式。事实上通过TCPSocket+File的方式进行文件传输也有一定的应用场景,尽管不是主流,但是掌握这种文件传输方式还是比较重要的,特别是针对两个跨主机的JVM进程之间进行持久化
亚马逊落榜?东南亚最受买家好评的跨境电商平台出炉:https://www.ikjzd.com/articles/91262
美国海关没有暂停清关,但明日起或面临高额税款与罚金!:https://www.ikjzd.com/articles/91263
中美贸易关税战下,美国企业和消费者不得不埋单!:https://www.ikjzd.com/articles/91268
紧急!美亚Prime Day秒杀的申报明天截止!:https://www.ikjzd.com/articles/91269
我被外国黑人3p过程 太粗太长弄死了我了:http://lady.shaoqun.com/a/256971.html
男友扒了我的奶罩吻我的胸 男朋友摸进我内裤揉搓:http://www.30bags.com/m/a/249767.html
舌尖分开她的细缝舔舐 他的舌尖在她的小核上逗弄:http://www.30bags.com/m/a/249712.html
将她双腿向左右开的更大 把美腿扛在肩上疯狂输出:http://www.30bags.com/m/a/249917.html
"做事"后有五个症状,说明太频繁了!看看你有没有:http://lady.shaoqun.com/a/403805.html
女性性生活过多的症状!每个年龄段的正常性生活频率是多少?:http://lady.shaoqun.com/a/403806.html
怎么追到比自己值钱得多的白?:http://lady.shaoqun.com/a/403807.html
WEEE申诉指南,实例分享~:https://www.ikjzd.com/articles/146384

No comments:

Post a Comment