Apache Mina 源码再读4 IoSession.write()源码剖析
在IoSession 中提供两个接口供写入数据。
Writes the specified message to remote peer. This operation is asynchronous; IoHandler.messageSent(IoSession,Object) will be invoked when the message is actually sent to remote peer. You can also wait for the returned WriteFuture if you want to wait for
the message actually written.
When you implement a client that receives a broadcast message from a server such as DHCP server, the client might need to send a response message for the broadcast message the server sent. Because the remote address of the session is not the address of the
server in case of broadcasting, there should be a way to specify the destination when you write the response message. This interface provides write(Object, SocketAddress) method so you can specify the destination.
1、WriteFuture write(Object message)
2、WriteFuture write(Object message, SocketAddress destination)
1、写入指定的消息对象到远程对端。写入操作是异步的。 当消息实际被写入远程对端时,IoHandler.messageSent(IoSession,Object)方法将被执行。 也可以,等待返回WriteFuture直到消息被写入到远程对端。
2、当消息从一个服务器端被广播到客户端时,例如:消息从DHCP服务器端广播到客户端时,如果客户端想返回一个相应消息给服务器端,可以是用destination 指定目的地。
public WriteFuture write(Object message, SocketAddress remoteAddress) { if (message == null) { throw new IllegalArgumentException("Trying to write a null message : not allowed"); } // We can"t send a message to a connected session if we don"t have // the remote address if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) { throw new UnsupportedOperationException(); } // If the session has been closed or is closing, we can"t either // send a message to the remote side. We generate a future // containing an exception. if (isClosing() || !isConnected()) { WriteFuture future = new DefaultWriteFuture(this); WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress); WriteException writeException = new WriteToClosedSessionException(request); future.setException(writeException); return future; } FileChannel openedFileChannel = null; // TODO: remove this code as soon as we use InputStream // instead of Object for the message. try { if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) { // Nothing to write : probably an error in the user code throw new IllegalArgumentException("message is empty. Forgot to call flip()?"); } else if (message instanceof FileChannel) { FileChannel fileChannel = (FileChannel) message; message = new DefaultFileRegion(fileChannel, 0, fileChannel.size()); } else if (message instanceof File) { File file = (File) message; openedFileChannel = new FileInputStream(file).getChannel(); message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size()); } } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); return DefaultWriteFuture.newNotWrittenFuture(this, e); } // Now, we can write the message. First, create a future WriteFuture writeFuture = new DefaultWriteFuture(this); WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); // Then, get the chain and inject the WriteRequest into it IoFilterChain filterChain = getFilterChain(); filterChain.fireFilterWrite(writeRequest); // TODO : This is not our business ! The caller has created a // FileChannel, // he has to close it ! if (openedFileChannel != null) { // If we opened a FileChannel, it needs to be closed when the write // has completed final FileChannel finalChannel = openedFileChannel; writeFuture.addListener(new IoFutureListener<WriteFuture>() { public void operationComplete(WriteFuture future) { try { finalChannel.close(); } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } }); } // Return the WriteFuture. return writeFuture; }
IoSession.write(Object message,SocketAddress remoteAddress) 函数内容非常简单。就是创建一个WriteRequest 请求对象,并执行
// Then, get the chain and inject the WriteRequest into it
IoFilterChain filterChain = getFilterChain();
filterChain.fireFilterWrite(writeRequest);
IoFilterChain.fireFilterWrite()函数,然后返回的WriteFuture对象。
在HeadFilter 中public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) 方法把消息放置到待写入缓冲区的队列中。
即WriteRequestFuture队列中。
private class HeadFilter extends IoFilterAdapter { @SuppressWarnings("unchecked") @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { AbstractIoSession s = (AbstractIoSession) session; // Maintain counters. if (writeRequest.getMessage() instanceof IoBuffer) { IoBuffer buffer = (IoBuffer) writeRequest.getMessage(); // I/O processor implementation will call buffer.reset() // it after the write operation is finished, because // the buffer will be specified with messageSent event. buffer.mark(); int remaining = buffer.remaining(); if (remaining > 0) { s.increaseScheduledWriteBytes(remaining); } } else { s.increaseScheduledWriteMessages(); } WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue(); if (!s.isWriteSuspended()) { if (writeRequestQueue.isEmpty(session)) { // We can write directly the message s.getProcessor().write(s, writeRequest); } else { s.getWriteRequestQueue().offer(s, writeRequest); s.getProcessor().flush(s); } } else { s.getWriteRequestQueue().offer(s, writeRequest); } } @SuppressWarnings("unchecked") @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { ((AbstractIoSession) session).getProcessor().remove(session); } }
根据SimpleIoProcessorPool中IoSession 与Processor 关联关系。write写线程与读取线程是在同一个线程中。
/** * Find the processor associated to a session. If it hasen"t be stored into * the session"s attributes, pick a new processor and stores it. */ @SuppressWarnings("unchecked") private IoProcessor<S> getProcessor(S session) { //获取在IoSession中PROCESSOR 为键的对象 IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR); //如果该IoSession尚未与IoProcessor相关联,则把IoProccessor按照如下原则相关联 if (processor == null) { if (disposed || disposing) { throw new IllegalStateException("A disposed processor cannot be accessed."); } //把processor和session相关联。根据session id和proceor pool 的大小来决定。 把IoSession的ID与线程池中顺序相关联 processor = pool[Math.abs((int) session.getId()) % pool.length]; if (processor == null) { throw new IllegalStateException("A disposed processor cannot be accessed."); } //设置PROCESSOR 的对象为IoProccessor session.setAttributeIfAbsent(PROCESSOR, processor); } return processor; }
此时,调用IoSession.write(Object message) 线程工作已经完成,返回WriteFuture对象。 剩下的任务,将在Processor线程中继续完成。
当Processor 线程判断缓冲区Buffer处于可写状态时, 会把IoSession 添加到flushingSessions 队列中。
/** * Deal with session ready for the read or write operations, or both. */ private void process(S session) { // Process Reads if (isReadable(session) && !session.isReadSuspended()) { read(session); } // Process writes if (isWritable(session) && !session.isWriteSuspended()) { // add the session to the queue, if it"s not already there if (session.setScheduledForFlush(true)) { flushingSessions.add(session); } } }
接着,Processor 将处理flushingSession队列的需要执行的事情。此时,Processor 会轮询每一个flushingSessions队列的每一个IoSession ,直到flushingSessions 队列为空。
/** * Write all the pending messages */ private void flush(long currentTime) { if (flushingSessions.isEmpty()) { return; } do { S session = flushingSessions.poll(); // the same one with firstSession if (session == null) { // Just in case ... It should not happen. break; } // Reset the Schedule for flush flag for this session, // as we are flushing it now session.unscheduledForFlush(); SessionState state = getState(session); switch (state) { case OPENED: try { boolean flushedAll = flushNow(session, currentTime); if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) { scheduleFlush(session); } } catch (Exception e) { scheduleRemove(session); IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } break; case CLOSING: // Skip if the channel is already closed. break; case OPENING: // Retry later if session is not yet fully initialized. // (In case that Session.write() is called before addSession() // is processed) scheduleFlush(session); return; default: throw new IllegalStateException(String.valueOf(state)); } } while (!flushingSessions.isEmpty()); }
flushNow(S session, long currentTime) 会轮询IoSession 中的每一个WriteRequestQueue中的请求,然后调用writeBuffer 把请求写入到操作系统缓冲区。
当写入缓冲区时,设置WriteFuture 状态为完成,然后调用
void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception;
Apache Mina IoSession.write(Object message) 写出数据流程小结:
1、Processor 线程首选轮询newSessions中每一个IoSession. 然后,把IoSession 放到IoService 的managedSessions中进行管理追踪。
2、当IoSession.write(Object message) 调用时,会创建一个WriteFuture 对象,并创建一个WriteRequest 对象,把这个对象增加到每一个IoSession关联的WriteRequestQueue队列中。然后,返回一个WriteFuture对象
3、Processor 会调用select()系统函数,判断每一个IoSession的缓冲区是否可以写入数据,当IoSession可以写入数据时,Processor线程会把该IoSession添加到flushingSessions队列中。
4、Processor 线程轮询flushingSession队列,然后轮询每一个IoSession中的WriteRequestQueue队列,移除把每一个IoSession关联的队列按照顺序写入到系统缓存区。
5、当WriteRequest 写入到缓冲区后, WriteFuture 被设置成完成。然后,调用messageSent()方法。