Apache mina: IoSession.write(Object msg)剖析
在apache mina中通过IoSession 写入数据,返回一个Future可以获取写入数据的结果。
NioSocketSession(AbstractIoSession).write(Object)
NioSocketSession(AbstractIoSession).write(Object, SocketAddress) .IoSession通过FilterChain 过滤链写出数据。
// Now, we can write the message. First, create a future WriteFuture writeFuture = new DefaultWriteFuture(this); WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); // Then, get the chain and inject the WriteRequest into it IoFilterChain filterChain = getFilterChain(); filterChain.fireFilterWrite(writeRequest);
DefaultIoFilterChain.callPreviousFilterWrite(IoFilterChain$Entry, IoSession, WriteRequest)
ProtocolCodecFilter.filterWrite(IoFilter$NextFilter, IoSession, WriteRequest)
在ProtocolCodecFilter 中把Object消息编码后,到达HeadFilter中。在HeadFilter中负责把消息写入消息队列中。
private class HeadFilter extends IoFilterAdapter { @SuppressWarnings("unchecked") @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { AbstractIoSession s = (AbstractIoSession) session; // Maintain counters. if (writeRequest.getMessage() instanceof IoBuffer) { IoBuffer buffer = (IoBuffer) writeRequest.getMessage(); // I/O processor implementation will call buffer.reset() // it after the write operation is finished, because // the buffer will be specified with messageSent event. buffer.mark(); int remaining = buffer.remaining(); if (remaining > 0) { s.increaseScheduledWriteBytes(remaining); } } else { s.increaseScheduledWriteMessages(); } //在HeadFilter中把要写入的消息放在WriteRequestQueue WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue(); if (!s.isWriteSuspended()) { if (writeRequestQueue.isEmpty(session)) { // We can write directly the message s.getProcessor().write(s, writeRequest); } else { s.getWriteRequestQueue().offer(s, writeRequest); s.getProcessor().flush(s); } } else { s.getWriteRequestQueue().offer(s, writeRequest); } } @SuppressWarnings("unchecked") @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { ((AbstractIoSession) session).getProcessor().remove(session); } }
写出过程中,会把IoSession 与线程绑定。
/** * Find the processor associated to a session. If it hasen"t be stored into * the session"s attributes, pick a new processor and stores it. */ @SuppressWarnings("unchecked") private IoProcessor<S> getProcessor(S session) { IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR); if (processor == null) { if (disposed || disposing) { throw new IllegalStateException("A disposed processor cannot be accessed."); } processor = pool[Math.abs((int) session.getId()) % pool.length]; if (processor == null) { throw new IllegalStateException("A disposed processor cannot be accessed."); } session.setAttributeIfAbsent(PROCESSOR, processor); } return processor; } }
public void write(S session, WriteRequest writeRequest) { WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); writeRequestQueue.offer(session, writeRequest); if (!session.isWriteSuspended()) { this.flush(session); } }
NioProcess线程把写请求加入到WriteRequestQueue队列中。同时把待写IoSession加入到flushingSessions 并发队列中。
public final void flush(S session) { // add the session to the queue if it"s not already // in the queue, then wake up the select() if (session.setScheduledForFlush(true)) { flushingSessions.add(session); wakeup(); } }
此时,调用IoSession.write(Object msg)线程的任务已经结束,返回一个Future.剩下的任务都交由IoProcessor线程处理。
在NioProcessor线程中NioProcessor(AbstractPollingIoProcessor<S>).flush(long) 为处理实际IO读写的地方。
DefaultIoFilterChain.fireMessageSent(WriteRequest) line: 553
NioProcessor(AbstractPollingIoProcessor<S>).fireMessageSent(S, WriteRequest) line: 988
NioProcessor(AbstractPollingIoProcessor<S>).writeBuffer(S, WriteRequest, boolean, int, long) line: 947
NioProcessor(AbstractPollingIoProcessor<S>).flushNow(S, long) line: 854
NioProcessor(AbstractPollingIoProcessor<S>).flush(long) line: 781
AbstractPollingIoProcessor<S>.access$11(AbstractPollingIoProcessor, long) line: 758
AbstractPollingIoProcessor$Processor.run() line: 1131
DefaultIoFilterChain.fireMessageSent(WriteRequest) 在调用messageSend()时,此时设置Future结果。DefaultWriteFuture.setWritten()