Java 7:如何正确关闭NIO.2文件通道

ImportNew  •  扫码分享
我是创始人李岩:很抱歉!给自己产品做个广告,点击进来看看。  
分享到:


本文由 ImportNew - 范婵香 翻译自 javacodegeeks。欢迎加入Java小组。转载请参见文章末尾的要求。

关闭一个异步文件通道可能会非常困难。如果你向异步通道提交I/O任务,你需要确保这些任务被正确执行。实际上这可以算是异步通道上一个棘手的要求,有以下几个原因:默认的通道组将守护线程当作辅助线程来使用,这不是一个好的选择,因为如果JVM退出,这些线程会放弃执行。如果你使用自定义的带有非守护线程的线程池执行者,你需要管理好你自己的线程池的生命周期。 如果你不这样做,当主线程退出时,那些线程会继续执行任务。因此,一旦JVM没有退出的话,你所能做的就是杀掉JVM线程。

另一个问题是在关闭异步通道时出现的,在AsynchronousFileChannel的Javadoc中有提到: “关闭执行器服务,而通道却是开放的,这样会导致未指定的行为发生。”这是因为AsynchronousFileChannel的close()操作向相关的执行服务发起任务,模拟挂起的I / O操作(在同一个线程池)抛出AsynchronousCloseException,从而操作失败。因此,如果你先关闭了相关的执行服务,然后在一个异步文件通道实例中执行close()方法,你就会得到RejectedExecutionException。

可以这么说,所提出来的关于安全地配置文件通道和关闭该通道的方式,就像下面这样:

public class SimpleChannelClose_AsynchronousCloseException {

  private static final String FILE_NAME = "E:/temp/afile.out";
  private static AsynchronousFileChannel outputfile;
  private static AtomicInteger fileindex = new AtomicInteger(0);
  private static ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());

  public static void main(String[] args) throws InterruptedException, IOException, ExecutionException {
   outputfile = AsynchronousFileChannel.open(
   Paths.get(FILE_NAME),
   new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.WRITE, 
              StandardOpenOption.CREATE,StandardOpenOption.DELETE_ON_CLOSE)), pool);
   List<Future<Integer>> futures = new ArrayList<>();
   for (int i = 0; i < 10000; i++) {
    futures.add(outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5));
   }
   outputfile.close();
   pool.shutdown();
   pool.awaitTermination(60, TimeUnit.SECONDS);
   for (Future<Integer> future : futures) {
    try {
     future.get();
    } catch (ExecutionException e) {
     System.out.println("Task wasn't executed!");
    }
   }
  }
}

自定义线程池执行服务在第6行和第7行中定义。文件通道在第10行到第13行之间定义。 从第18行到第20行,异步通道是有序关闭的。首先,通道自身关闭,然后执行服务关闭,最后,也是最重要的,线程等待线程池执行者的终止。

虽然关闭一个带有自定义执行服务的通道是一种安全的方式,然而这里要引入一个新的问题。客户端提交异步写入任务(第16行),而且可以肯定,一旦提交成功,这些任务将会被执行。不选择一直等待Future.get()方法返回(第23行),原因在于,很多情况下,这将导致异步文件通道ad adsurdum。上面的代码片断会返回大量“任务没有执行!”的消息,因为写操作被提交到通道(第18行)后,通道就立即关闭。为了避免这样的“数据丢失”,你可以实现自己的CompletionHandler并传递到所请求的写操作。

public class SimpleChannelClose_CompletionHandler {
...
 public static void main(String[] args) throws InterruptedException, IOException, ExecutionException {
...
   outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5, "", defaultCompletionHandler);
...
 }

 private static CompletionHandler<integer, string=""> defaultCompletionHandler = new CompletionHandler<Integer, String>() {
  @Override
  public void completed(Integer result, String attachment) {
   // NOP
  }

  @Override
  public void failed(Throwable exc, String attachment) {
  System.out.println("Do something to avoid data loss ...");
  }
 };
}

CompletionHandler.failed()方法(第16行)捕获任务处理过程中的任何运行时异常。你可以在这里实现任何补偿代码,以避免数据丢失。当您处理任务关键数据时,使用CompletionHandler可能是一个好主意。但是还有另外一个问题。该客户端可以提交任务,但他们不知道线程池是否会成功处理这些任务。这种情况下的成功意味着提交的字节实际上已经到达他们的目的地(硬盘上的文件)。如果你想确保在线程池关闭前所有提交的任务都被处理,这样就有点棘手。你需要一个‘优雅’的关闭机制,即等到工作队列为空,并且在这之前它确实关闭了通道和相关的执行服务(使用标准的生命周期方法是不可能的)。

介绍GracefulAsynchronousChannel

我最后一段介绍GracefulAsynchronousFileChannel。 你可以在我的Git资源库得到完整的代码。该通道的行为是这样的: 如果通道准备关闭,保证处理所有成功提交的写操作并抛出NonWritableChannelException。实现该行为需要做两件事。首先,你需要实现ThreadPoolExecutor 继承类中的afterExecute()方法,该继承类会在队列为空的时候发送信号。 DefensiveThreadPoolExecutor就是这么做的。

private class DefensiveThreadPoolExecutor extends ThreadPoolExecutor {

 public DefensiveThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
   LinkedBlockingQueue<Runnable> workQueue, ThreadFactory factory, RejectedExecutionHandler handler) {
  super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, factory, handler);
 }

 /**
  * "Last" task issues a signal that queue is empty after task processing was completed.
  */
 @Override
 protected void afterExecute(Runnable r, Throwable t) {
  if (state == PREPARE) {
   closeLock.lock(); // only one thread will pass when closer thread is awaiting signal
   try {
    if (getQueue().isEmpty() && state < SHUTDOWN) {
     System.out.println("Issueing signal that queue is empty ...");
     isEmpty.signal();
     state = SHUTDOWN; // -> no other thread can issue empty-signal
    }
   } finally {
    closeLock.unlock();
   }
  }
  super.afterExecute(r, t);
 }
}

在每个处理任务之后,该afterExecute()方法(第12行)由处理该给定任务的线程执行。该实现方法在第18行发送isEmpty 信号。你需要两个正常关闭通道的第二部分是AsynchronousFileChannel的close()方法的自定义实现。

/**
 * Method that closes this file channel gracefully without loosing any data.
 */
@Override
public void close() throws IOException {
 AsynchronousFileChannel writeableChannel = innerChannel;
 System.out.println("Starting graceful shutdown ...");
 closeLock.lock();
 try {
  state = PREPARE;
  innerChannel = AsynchronousFileChannel.open(Paths.get(uri),
    new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.READ)), pool);
  System.out.println("Channel blocked for write access ...");
  if (!pool.getQueue().isEmpty()) {
   System.out.println("Waiting for signal that queue is empty ...");
   isEmpty.await();
   System.out.println("Received signal that queue is empty ... closing");
  } else {
   System.out.println("Don't have to wait, queue is empty ...");
  }
 } catch (InterruptedException e) {
  Thread.interrupted();
  throw new RuntimeException("Interrupted on awaiting Empty-Signal!", e);
 } catch (Exception e) {
  throw new RuntimeException("Unexpected error" + e);
 } finally {
  closeLock.unlock();
  writeableChannel.force(false);
  writeableChannel.close(); // close the writable channel
  innerChannel.close(); // close the read-only channel
  System.out.println("File closed ...");
  pool.shutdown(); // allow clean up tasks from previous close() operation to finish safely
  try {
   pool.awaitTermination(1, TimeUnit.MINUTES);
  } catch (InterruptedException e) {
   Thread.interrupted();
   throw new RuntimeException("Could not terminate thread pool!", e);
  }
  System.out.println("Pool closed ...");
 }
}

研究一下代码。有趣的片段在第11行,其中innerChannel 被一个只读通道取代。这使任何后续的异步写入出现nonwritablechannelexception异常并请求失败。第16行的close()方法等待isEmpty 信号发生。 在最后的写任务完成之后发送这个信号,close()方法会继续执行有序的关机程序(第27行起)。基本上,该代码在文件通道和相关的线程池内添加了一个共同的生命周期状态。这样,两个对象就可以在关机过程中进行沟通,避免数据丢失。
下面是一个使用GracefulAsynchronousFileChannel的日志客户端。

public class MyLoggingClient {
 private static AtomicInteger fileindex = new AtomicInteger(0);
 private static final String FILE_URI = "file:/E:/temp/afile.out";

 public static void main(String[] args) throws IOException {
  new Thread(new Runnable() { // arbitrary thread that writes stuff into an asynchronous I/O data sink

     @Override
     public void run() {
      try {
       for (;;) {
        GracefulAsynchronousFileChannel.get(FILE_URI).write(ByteBuffer.wrap("Hello".getBytes()),
          fileindex.getAndIncrement() * 5);
       }
      } catch (NonWritableChannelException e) {
       System.out.println("Deal with the fact that the channel was closed asynchronously ... "
         + e.toString());
      } catch (Exception e) {
       e.printStackTrace();
      }
     }
    }).start();

  Timer timer = new Timer(); // asynchronous channel closer
  timer.schedule(new TimerTask() {
   public void run() {
    try {
     GracefulAsynchronousFileChannel.get(FILE_URI).close();
     long size = Files.size(Paths.get("E:/temp/afile.out"));
     System.out.println("Expected file size (bytes): " + (fileindex.get() - 1) * 5);
     System.out.println("Actual file size (bytes): " + size);
     if (size == (fileindex.get() - 1) * 5)
      System.out.println("No write operation was lost!");
     Files.delete(Paths.get("E:/temp/afile.out"));
    } catch (IOException e) {
     e.printStackTrace();
    }
   }
  }, 1000);


 }
}

该客户端启动两个线程,其中一个线程在一个无限循环中执行写操作(第6行起)。 另外一个线程在一秒钟的处理之后异步关闭文件通道(第25行起)。如果您运行该客户端,那么会产生下面的输出效果:

Starting graceful shutdown ...
Deal with the fact that the channel was closed asynchronously ... java.nio.channels.NonWritableChannelException
Channel blocked for write access ...
Waiting for signal that queue is empty ...
Issueing signal that queue is empty ...
Received signal that queue is empty ... closing
File closed ...
Pool closed ...
Expected file size (bytes): 400020
Actual file size (bytes): 400020
No write operation was lost!

该输出显示参与线程的有序关闭程序。日志线程需要面对的事实是,该通道是异步关闭。队列中的任务处理完之后,通道资源就关闭了。没有数据丢失,客户端执行的一切都写入了文件目标。在这样一个优雅的关闭程序内没有AsynchronousClosedException或RejectedExecutionException。

关于安全关闭异步文件通道的内容就是这些了。 在我的Git资源库有完整的代码。我希望你喜欢这篇文章。期待您的宝贵意见。


原文链接: javacodegeeks 翻译: ImportNew.com - 范婵香
译文链接: http://www.importnew.com/10496.html
[ 转载请保留原文出处、译者和译文链接。]



相关文章

  • 5种调优Java NIO和NIO.2的方式
  • Scala教程:高级类型
  • 传闻:Google高层施密特已经开始使用Moto X
  • SPRING SECURITY JAVA配置:简介
  • SPRING SECURITY JAVA配置:Method Security
  • 快到极致的Android模拟器——Genymotion
  • 真的吗?Java开发者喜欢编写非Java程序
  • 如何写一个不可变类?
  • Java反编译器剖析(中)
  • Java问答:终极父类(上)

随意打赏

提交建议
微信扫一扫,分享给好友吧。