MINA通信入门(三)-理解网络通信枢纽 IoService
MINA通信入门(三)-理解网络通信枢纽 IoService
通过上一个章节,我们已经动手实现了 Mina 的 HelloWorld 例子。这一章节开始,将对 Mina 的三个核心组件进行讲解。这对我们开发实际的生产应用是至关重要的,也许通过 HelloWorld 你可以慢慢改成生产的代码,但是不了解这几个组件很有可能会开发出有bug的应用。
讲解中将会设计到一些源码,稍微看一看,入门课有个大概印象即可,不用在源码上面纠结太多。
1. IoService 是什么
IoService 是 Mina 框架的核心接口, 它提供基本的 I/O 服务并管理 I/O 会话。IoService接口及其子接口的实现类是处理大部分底层I/O操作的地方。
2. IoService 的职责有哪些
图片来自于 Mina 官网:
会话管理:创建和删除会话,检测空闲状态。
过滤器链管理:管理过滤器链,允许用户动态修改链。
处理程序调用:在收到新消息时调用处理程序等。
统计管理:更新发送的消息数、发送的字节数等统计信息。
监听器管理:管理用户可以设置的监听器。
通信管理:处理数据的双向传输。
3. IoService 接口的定义
IoService 接口的定义如下:
public interface IoService {
/**
* @return the {@link TransportMetadata} that this service runs on.
*/
// 返回IoAcceptor或IoConnector正在运行的传输元数据。通常包括提供程序名称(如nio、apr、rxtx)、连接类型(无连接(UDP)/面向连接(TCP))等的详细信息
TransportMetadata getTransportMetadata();
/**
* Adds an {@link IoServiceListener} that listens any events related with
* this service.
*
* @param listener The listener to add
*/
// 添加一个监听器,这个监听器会接收到服务的各种事件通知,比如连接开启、关闭等。
void addListener(IoServiceListener listener);
/**
* Removed an existing {@link IoServiceListener} that listens any events
* related with this service.
*
* @param listener The listener to use
*/
// 移除一个已经添加的监听器,这样这个监听器就不再接收事件通知了。
void removeListener(IoServiceListener listener);
/**
* @return <tt>true</tt> if and if only {@link #dispose()} method has
* been called. Please note that this method will return <tt>true</tt>
* even after all the related resources are released.
*/
// 告诉你这个服务是否正在关闭中。即使服务的所有资源都已经清理完毕,这个方法也会返回 true
boolean isDisposing();
/**
* @return <tt>true</tt> if and if only all resources of this processor
* have been disposed.
*/
// 告诉你这个服务是否已经完全关闭,所有资源都被释放了。
boolean isDisposed();
/**
* Releases any resources allocated by this service. Please note that
* this method might block as long as there are any sessions managed by
* this service.
*/
// 关闭这个服务,并释放它使用的所有资源。这个操作可能会等待,直到所有正在处理的连接都完全关闭。
void dispose();
/**
* Releases any resources allocated by this service. Please note that
* this method might block as long as there are any sessions managed by this service.
*
* Warning : calling this method from a IoFutureListener with <code>awaitTermination</code> = true
* will probably lead to a deadlock.
*
* @param awaitTermination When true this method will block until the underlying ExecutorService is terminated
*/
// 关闭服务,并可以选择是否等待直到所有资源完全释放。如果你选择等待 (awaitTermination 为 true),这个方法会阻塞直到所有的后台处理都结束。
void dispose(boolean awaitTermination);
/**
* @return the handler which will handle all connections managed by this service.
*/
// 获取当前处理所有连接的处理器对象。
IoHandler getHandler();
/**
* Sets the handler which will handle all connections managed by this service.
*
* @param handler The IoHandler to use
*/
// 设置一个处理器,这个处理器负责处理所有网络连接的数据。
void setHandler(IoHandler handler);
/**
* @return the map of all sessions which are currently managed by this
* service. The key of map is the {@link IoSession#getId() ID} of the
* session. An empty collection if there's no session.
*/
// 获取一个包含所有当前连接会话的列表,每个会话都有一个唯一标识符。
Map<Long, IoSession> getManagedSessions();
/**
* @return the number of all sessions which are currently managed by this
* service.
*/
// 获取当前服务管理的连接会话的总数。
int getManagedSessionCount();
/**
* @return the default configuration of the new {@link IoSession}s
* created by this service.
*/
// 获取创建新会话时使用的配置信息。
IoSessionConfig getSessionConfig();
/**
* @return the {@link IoFilterChainBuilder} which will build the
* {@link IoFilterChain} of all {@link IoSession}s which is created
* by this service.
* The default value is an empty {@link DefaultIoFilterChainBuilder}.
*/
// 获取一个构建器,这个构建器用来设置连接的处理流程,比如加密、压缩等。 (Builder设计模式)
IoFilterChainBuilder getFilterChainBuilder();
/**
* Sets the {@link IoFilterChainBuilder} which will build the
* {@link IoFilterChain} of all {@link IoSession}s which is created
* by this service.
* If you specify <tt>null</tt> this property will be set to
* an empty {@link DefaultIoFilterChainBuilder}.
*
* @param builder The filter chain builder to use
*/
// 设置连接的处理流程构建者 (Builder设计模式)
void setFilterChainBuilder(IoFilterChainBuilder builder);
/**
* A shortcut for <tt>( ( DefaultIoFilterChainBuilder ) </tt>{@link #getFilterChainBuilder()}<tt> )</tt>.
* Please note that the returned object is not a <b>real</b> {@link IoFilterChain}
* but a {@link DefaultIoFilterChainBuilder}. Modifying the returned builder
* won't affect the existing {@link IoSession}s at all, because
* {@link IoFilterChainBuilder}s affect only newly created {@link IoSession}s.
*
* @return The filter chain in use
* @throws IllegalStateException if the current {@link IoFilterChainBuilder} is
* not a {@link DefaultIoFilterChainBuilder}
*/
// 获取当前使用的过滤链构建器,可以用来修改新会话的处理流程。
DefaultIoFilterChainBuilder getFilterChain();
/**
* @return a value of whether or not this service is active
*/
// 检查服务是否正在运行。
boolean isActive();
/**
* @return the time when this service was activated. It returns the last
* time when this service was activated if the service is not active now.
*/
// 获取服务最后一次被激活(开始运行)的时间。
long getActivationTime();
/**
* Writes the specified {@code message} to all the {@link IoSession}s
* managed by this service. This method is a convenience shortcut for
* {@link IoUtil#broadcast(Object, Collection)}.
*
* @param message the message to broadcast
* @return The set of WriteFuture associated to the message being broadcasted
*/
// 向所有管理的会话发送同一个消息,比如广播通知。
Set<WriteFuture> broadcast(Object message);
/**
* @return the {@link IoSessionDataStructureFactory} that provides
* related data structures for a new session created by this service.
*/
// 不常用
IoSessionDataStructureFactory getSessionDataStructureFactory();
/**
* Sets the {@link IoSessionDataStructureFactory} that provides
* related data structures for a new session created by this service.
*
* @param sessionDataStructureFactory The factory to use
*/
// 不常用
void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory);
/**
* @return The number of bytes scheduled to be written
*/
// 获取当前所有等待发送的数据的总字节数(即存储在内存中等待套接字准备好写入的字节)
int getScheduledWriteBytes();
/**
* @return The number of messages scheduled to be written
*/
// 获取当前所有等待发送的消息的数量(即存储在内存中等待套接字准备好写入的消息)。
int getScheduledWriteMessages();
/**
* @return The statistics object for this service.
*/
// 获取这个服务的统计信息,比如发送了多少数据、处理了多少连接等。
IoServiceStatistics getStatistics();
}
在这些方法中,有些方法是比较实用的,比如:
- 通过getManagedSessions()方法,可以获取当前服务管理的所有会话,并对其进行相应的处理,如空闲会话的检测和关闭等。 这个是比较实用的,尽管有时候我们可能会自己去想 HashMap 之类的数据结构,维护连接跟 Session 之间的关系,但是有些场景下也可以直接使用这个方法去管理会话。
4. IoService 怎么去用
IoService 是一个接口,公共的抽象定义,所以不能直接使用,我们通常会使用两个具体的实现 IoAcceptor 和 IoConnector, 下个小节我们将进行讲解。
5. IoService 接口的具体实现
IoService 有两个子接口: IoAcceptor 和 IoConnector , 分别表示服务端的实现和客户端的实现。实现类都会实现这些方法,从而提供完整的 I/O 服务功能。
5.1 服务端实现 - IoAcceptor
5.1.1 IoAcceptor 是什么
IoAcceptor 是 IoService 接口的一个实现,用于接受传入的连接请求并创建相应的IoSession。它主要用于服务器端 , 监听指定的端口 ,等待客户端的连接。 当有新的连接请求到达时, IoAcceptor 会创建一个新的 IoSession ,并将其交给 IoHandler 处理。
5.1.2 IoAcceptor 的定义
简单看下接口 IoAcceptor 的定义:
public interface IoAcceptor extends IoService {
/**
* Returns the local address which is bound currently. If more than one
* address are bound, only one of them will be returned, but it's not
* necessarily the firstly bound address.
*
* @return The bound LocalAddress
*/
// 如果你的服务器在多个地址上等待连接,这个方法可以让你知道其中一个正在被使用的地址。
SocketAddress getLocalAddress();
/**
* Returns a {@link Set} of the local addresses which are bound currently.
*
* @return The Set of bound LocalAddresses
*/
Set<SocketAddress> getLocalAddresses();
/**
* Returns the default local address to bind when no argument is specified
* in {@link #bind()} method. Please note that the default will not be
* used if any local address is specified. If more than one address are
* set, only one of them will be returned, but it's not necessarily the
* firstly specified address in {@link #setDefaultLocalAddresses(List)}.
*
* @return The default bound LocalAddress
*/
SocketAddress getDefaultLocalAddress();
/**
* Returns a {@link List} of the default local addresses to bind when no
* argument is specified in {@link #bind()} method. Please note that the
* default will not be used if any local address is specified.
*
* @return The list of default bound LocalAddresses
*/
List<SocketAddress> getDefaultLocalAddresses();
/**
* Sets the default local address to bind when no argument is specified in
* {@link #bind()} method. Please note that the default will not be used
* if any local address is specified.
*
* @param localAddress The local addresses to bind the acceptor on
*/
void setDefaultLocalAddress(SocketAddress localAddress);
/**
* Sets the default local addresses to bind when no argument is specified
* in {@link #bind()} method. Please note that the default will not be
* used if any local address is specified.
* @param firstLocalAddress The first local address to bind the acceptor on
* @param otherLocalAddresses The other local addresses to bind the acceptor on
*/
void setDefaultLocalAddresses(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses);
/**
* Sets the default local addresses to bind when no argument is specified
* in {@link #bind()} method. Please note that the default will not be
* used if any local address is specified.
*
* @param localAddresses The local addresses to bind the acceptor on
*/
void setDefaultLocalAddresses(Iterable<? extends SocketAddress> localAddresses);
/**
* Sets the default local addresses to bind when no argument is specified
* in {@link #bind()} method. Please note that the default will not be
* used if any local address is specified.
*
* @param localAddresses The local addresses to bind the acceptor on
*/
void setDefaultLocalAddresses(List<? extends SocketAddress> localAddresses);
/**
* Returns <tt>true</tt> if and only if all clients are closed when this
* acceptor unbinds from all the related local address (i.e. when the
* service is deactivated).
*
* @return <tt>true</tt> if the service sets the closeOnDeactivation flag
*/
boolean isCloseOnDeactivation();
/**
* Sets whether all client sessions are closed when this acceptor unbinds
* from all the related local addresses (i.e. when the service is
* deactivated). The default value is <tt>true</tt>.
*
* @param closeOnDeactivation <tt>true</tt> if we should close on deactivation
*/
void setCloseOnDeactivation(boolean closeOnDeactivation);
/**
* Binds to the default local address(es) and start to accept incoming
* connections.
*
* @throws IOException if failed to bind
*/
// 如果你已经设置了默认监听地址,使用这个方法可以快速开始监听,无需再次指定地址。不常用。
void bind() throws IOException;
/**
* Binds to the specified local address and start to accept incoming
* connections.
*
* @param localAddress The SocketAddress to bind to
*
* @throws IOException if failed to bind
*/
// 直接指定一个地址并在该地址上开始监听。 最常用!!!
void bind(SocketAddress localAddress) throws IOException;
/**
* Binds to the specified local addresses and start to accept incoming
* connections. If no address is given, bind on the default local address.
*
* @param firstLocalAddress The first address to bind to
* @param addresses The SocketAddresses to bind to
*
* @throws IOException if failed to bind
*/
void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException;
/**
* Binds to the specified local addresses and start to accept incoming
* connections. If no address is given, bind on the default local address.
*
* @param addresses The SocketAddresses to bind to
*
* @throws IOException if failed to bind
*/
void bind(SocketAddress... addresses) throws IOException;
/**
* Binds to the specified local addresses and start to accept incoming
* connections.
*
* @param localAddresses The local address we will be bound to
* @throws IOException if failed to bind
*/
void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException;
/**
* Unbinds from all local addresses that this service is bound to and stops
* to accept incoming connections. All managed connections will be closed
* if {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property
* is <tt>true</tt>. This method returns silently if no local address is
* bound yet.
*/
// 停止在所有当前绑定的地址上的监听。
void unbind();
/**
* Unbinds from the specified local address and stop to accept incoming
* connections. All managed connections will be closed if
* {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property is
* <tt>true</tt>. This method returns silently if the default local
* address is not bound yet.
*
* @param localAddress The local address we will be unbound from
*/
// 停止在特定的本地地址上的监听。
void unbind(SocketAddress localAddress);
/**
* Unbinds from the specified local addresses and stop to accept incoming
* connections. All managed connections will be closed if
* {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property is
* <tt>true</tt>. This method returns silently if the default local
* addresses are not bound yet.
*
* @param firstLocalAddress The first local address to be unbound from
* @param otherLocalAddresses The other local address to be unbound from
*/
void unbind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses);
/**
* Unbinds from the specified local addresses and stop to accept incoming
* connections. All managed connections will be closed if
* {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property is
* <tt>true</tt>. This method returns silently if the default local
* addresses are not bound yet.
*
* @param localAddresses The local address we will be unbound from
*/
void unbind(Iterable<? extends SocketAddress> localAddresses);
/**
* (Optional) Returns an {@link IoSession} that is bound to the specified
* <tt>localAddress</tt> and the specified <tt>remoteAddress</tt> which
* reuses the local address that is already bound by this service.
* <p>
* This operation is optional. Please throw {@link UnsupportedOperationException}
* if the transport type doesn't support this operation. This operation is
* usually implemented for connectionless transport types.
*
* @param remoteAddress The remote address bound to the service
* @param localAddress The local address the session will be bound to
* @throws UnsupportedOperationException if this operation is not supported
* @throws IllegalStateException if this service is not running.
* @throws IllegalArgumentException if this service is not bound to the
* specified <tt>localAddress</tt>.
* @return The session bound to the the given localAddress and remote address
*/
// 允许服务器主动发起与特定客户端的通信会话,常用于无连接协议或特定的网络交互场景。
IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress);
}
5.1.3 IoAcceptor 的实现
图片来自于 Mina 官网:
- NioSocketAcceptor : 基于Java NIO (Non-blocking I/O)的Socket传输IoAcceptor实现。它使用了 Java NIO的 Selector 和 Channel 机制,能够支持大量的并发连接,同时保持较低的系统资源占用。
主要特点是:
适用于高并发、长连接的应用场景,如即时通讯服务器、游戏服务器等。
非阻塞I/O,支持高并发连接。
基于Java NIO,可以跨平台使用。
提供了丰富的配置选项,如SSL、TCP参数等。
NioDatagramAcceptor :基于Java NIO 的 UDP 传输 IoAcceptor 实现。与 NioSocketAcceptor 类似,它也使用了 Java NIO 的 Selector 和Channel 机制,能够支持大量的并发 UDP 连接。
主要特点是:
适用于基于UDP的应用场景,如DNS服务器、SNMP 服务器等。
非阻塞I/O,支持高并发UDP连接。
基于Java NIO,可以跨平台使用。
提供了UDP相关的配置选项,如接收缓冲区大小等。
AprSocketAcceptor : 基于APR (Apache Portable Runtime)的阻塞 Socket 传输 IoAcceptor 实现。与 NioSocketAcceptor 不同,它使用了传统的阻塞 I/O 模型,每个连接都需要独立的线程来处理。
主要特点是:
适用于对性能要求较高,但并发连接数不太大的应用场景。
阻塞I/O,每个连接需要独立的线程处理。
基于APR,需要单独安装APR库。
在某些情况下,性能可能优于NioSocketAcceptor。
VmPipeSocketAcceptor : 适用于需要在同一个 JVM 内部进行高效通信的场景,如某些基于 OSGi 的应用。
主要特点是:
适用于需要在同一个JVM内部进行高效通信的场景,如某些基于OSGi的应用。
进程内通信,避免了网络I/O开销。
使用 Java 的 PipedInputStream 和 PipedOutputStream 模拟 Socket 。
适用于同一 JVM 内部的高效通信场景。
5.1.4 IoAcceptor 的使用
在上一章节中的 HelloWorld 例子中,服务端的代码如下:
public static void main(String[] args) throws IOException {
// 创建 IoAcceptor 来处理连接的接入,注意这一行
IoAcceptor acceptor = new NioSocketAcceptor();
// 设置 IoHandler 来处理数据读取、数据写入等等业务逻辑
acceptor.setHandler(new ServerHandler());
// 添加一个处理文本行的编码器和解码器,这个很重要,没有将会报错
acceptor.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
// 设置连接配置,这里暂时不要去纠结
acceptor.getSessionConfig().setReadBufferSize(2048);
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
// 绑定端口并启动服务器
acceptor.bind(new InetSocketAddress(8080));
System.out.println("Mina服务暴露在端口8080 ");
}
其中第一行代码
IoAcceptor acceptor = new NioSocketAcceptor();
这里使用的接口定义是 IoAcceptor ,实现使用的是 NioSocketAcceptor。 NioSocketAcceptor 主要是来处理基于 TCP/IP 协议的请求,在通信项目中用的也是比较多。
5.2 客户端实现 - IoConnector
5.2.1 IoConnector是什么
IoConnector 是 IoService 接口的另一个实现,用于与远程服务建立连接并创建相应的 IoSession。它通常用于客户端,主动连接指定的服务器地址和端口。当连接成功建立后, IoConnector 会创建一个新的 IoSession ,并将其交给 IoHandler 处理。
5.2.2 IoConnector的定义
下面是 IoConnector 接口的主要方法:
public interface IoConnector extends IoService {
/**
* @return the connect timeout in milliseconds. The default value is 1 minute.
*/
// 获取当前设置的连接超时时间,单位是毫秒
long getConnectTimeoutMillis();
/**
* Sets the connect timeout in milliseconds. The default value is 1 minute.
*
* @param connectTimeoutInMillis The time out for the connection
*/
// 设置连接的超时时间,单位是毫秒
void setConnectTimeoutMillis(long connectTimeoutInMillis);
/**
* @return the default remote address to connect to when no argument
* is specified in {@link #connect()} method.
*/
// 获取默认的远程地址,如果你在调用 connect() 方法时没有指定地址,系统就会尝试连接到这个默认地址。
SocketAddress getDefaultRemoteAddress();
/**
* Sets the default remote address to connect to when no argument is
* specified in {@link #connect()} method.
*
* @param defaultRemoteAddress The default remote address
*/
// 设置默认的远程地址
void setDefaultRemoteAddress(SocketAddress defaultRemoteAddress);
/**
* @return the default local address
*/
SocketAddress getDefaultLocalAddress();
/**
* Sets the default local address
*
* @param defaultLocalAddress The default local address
*/
void setDefaultLocalAddress(SocketAddress defaultLocalAddress);
/**
* Connects to the {@link #setDefaultRemoteAddress(SocketAddress) default
* remote address}.
*
* @return the {@link ConnectFuture} instance which is completed when the
* connection attempt initiated by this call succeeds or fails.
* @throws IllegalStateException
* if no default remoted address is set.
*/
// 连接到通过 setDefaultRemoteAddress 方法设置的默认远程地址。返回一个 ConnectFuture 对象,你可以用它来监听连接是否成功。
ConnectFuture connect();
/**
* Connects to the {@link #setDefaultRemoteAddress(SocketAddress) default
* remote address} and invokes the <code>ioSessionInitializer</code> when
* the IoSession is created but before {@link IoHandler#sessionCreated(IoSession)}
* is invoked. There is <em>no</em> guarantee that the <code>ioSessionInitializer</code>
* will be invoked before this method returns.
*
* @param sessionInitializer the callback to invoke when the {@link IoSession} object is created
* @return the {@link ConnectFuture} instance which is completed when the
* connection attempt initiated by this call succeeds or fails.
*
* @throws IllegalStateException if no default remote address is set.
*/
// 连接建立并创建会话(IoSession)时,会调用 sessionInitializer 初始化器。这可以在会话开始前做一些准备工作。
ConnectFuture connect(IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
/**
* Connects to the specified remote address.
*
* @param remoteAddress The remote address to connect to
* @return the {@link ConnectFuture} instance which is completed when the
* connection attempt initiated by this call succeeds or fails.
*/
// 直接连接到指定的远程地址。这个方法允许你动态指定要连接的远程地址,而不是使用默认地址。 最常用!!!
ConnectFuture connect(SocketAddress remoteAddress);
/**
* Connects to the specified remote address and invokes
* the <code>ioSessionInitializer</code> when the IoSession is created but before
* {@link IoHandler#sessionCreated(IoSession)} is invoked. There is <em>no</em>
* guarantee that the <code>ioSessionInitializer</code> will be invoked before
* this method returns.
*
* @param remoteAddress the remote address to connect to
* @param sessionInitializer the callback to invoke when the {@link IoSession} object is created
*
* @return the {@link ConnectFuture} instance which is completed when the
* connection attempt initiated by this call succeeds or fails.
*/
/ 直接连接到指定的远程地址。这个方法允许你动态指定要连接的远程地址,并在会话创建时调用一个初始化器。 也最常用!!!
ConnectFuture connect(SocketAddress remoteAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
/**
* Connects to the specified remote address binding to the specified local address.
*
* @param remoteAddress The remote address to connect
* @param localAddress The local address to bind
*
* @return the {@link ConnectFuture} instance which is completed when the
* connection attempt initiated by this call succeeds or fails.
*/
ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
/**
* Connects to the specified remote address binding to the specified local
* address and and invokes the <code>ioSessionInitializer</code> when the
* IoSession is created but before {@link IoHandler#sessionCreated(IoSession)}
* is invoked. There is <em>no</em> guarantee that the <code>ioSessionInitializer</code>
* will be invoked before this method returns.
*
* @param remoteAddress the remote address to connect to
* @param localAddress the local interface to bind to
* @param sessionInitializer the callback to invoke when the {@link IoSession} object is created
*
* @return the {@link ConnectFuture} instance which is completed when the
* connection attempt initiated by this call succeeds or fails.
*/
ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress,
IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
}
5.2.3 IoConnector 的实现
图片来自于 Mina 官网:
5.2.3.1 NioSocketConnector
NioSocketConnector 是基于 Java NIO 的非阻塞 Socket 传输 IoConnector 实现。它使用了 Java NIO 的 Selector 和 Channel 机制,能够支持大量的并发连接,同时保持较低的系统资源占用。适用于需要与服务器建立大量连接的客户端应用,如Web浏览器、移动应用等。主要特点是:
- 非阻塞I/O,支持高并发连接。
- 基于Java NIO,可以跨平台使用。
- 提供了连接超时、连接失败重试等高级功能。
5.2.3.2 NioDatagramConnector
基于 Java NIO 的非阻塞 UDP 传输 IoConnector 实现。与 NioSocketConnector 类似,它也使用了 Java NIO 的 Selector 和 Channel 机制,能够支持大量的并发UDP连接。适用于基于UDP的客户端应用,如DNS客户端、SNMP客户端等。主要特点是:
- 非阻塞I/O,支持高并发UDP连接。
- 基于Java NIO,可以跨平台使用。
- 提供了 UDP 相关的配置选项,如发送缓冲区大小等。
5.2.3.3 AprSocketConnector
基于 APR 的阻塞 Socket 传输 IoConnector 实现。与 NioSocketConnector 不同,它使用了传统的阻塞I/O模型,每个连接都需要独立的线程来处理。适用于对性能要求较高,但并发连接数不太大的客户端应用。主要特点是:
阻塞I/O,每个连接需要独立的线程处理。
基于APR,需要单独安装APR库。
在某些情况下,性能可能优于NioSocketConnector。
5.2.3.4 ProxyConnector
ProxyConnector是一个提供代理支持的 IoConnector 实现。它允许客户端通过 HTTP、SOCKS 等代理协议与服务器建立连接。
适用于需要通过代理服务器访问的客户端应用。主要特点是:
- 支持HTTP、SOCKS4、SOCKS5等多种代理协议。
- 可以与其他 IoConnector 实现(如 NioSocketConnector )组合使用。
- 简化了代理环境下的客户端开发。
5.2.3.5 SerialConnector
SerialConnector 是一个用于串行端口通信的 IoConnector 实现。它允许客户端通过 RS232 等串行端口与设备建立连接。
适用于需要与串行设备通信的客户端应用,如工业控制系统、科学仪器等。主要特点是:
支持RS232、RS422、RS485等多种串行协议。
提供了丰富的串口配置选项,如波特率、数据位、停止位等。
这个在电表通信的场景里面,有时候受限于网络条件直接会通过串口去通信,比方说 485 或者光口,这种场景下这个实现就有用武之地了。 写上位机的用这个实现会多一点。
5.2.3.6 VmPipeConnector
VmPipeConnector 是一种特殊的 IoConnector 实现,用于在同一个 Java 虚拟机内部进行通信。它使用了 Java 的 PipedInputStream 和PipedOutputStream 来模拟 Socket 的行为。适用于需要在同一个 JVM 内部进行高效通信的客户端应用,如某些基于 OSGi 的系统。主要特点是:
- 进程内通信,避免了网络I/O开销。
- 使用 Java 的 PipedInputStream 和 PipedOutputStream 模拟Socket。
- 适用于同一JVM内部的高效通信场景。
5.2.4 IoConnector怎么去用
在上一章节中的 HelloWorld 例子中,客户端的代码如下:
public static void main(String[] args) throws IOException {
// 创建IoConnector
IoConnector connector = new NioSocketConnector();
// 设置IoHandler
connector.setHandler(new ClientHandler());
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
// 设置连接配置,这里先不要纠结这几个参数
connector.getSessionConfig().setReadBufferSize(2048);
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
// 连接服务器
ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", "8080"));
future.awaitUninterruptibly();
// 获取IoSession
IoSession session = future.getSession();
// 发送消息给服务器
session.write("HelloWorld, Mina!");
// 等待接收服务器返回的消息
future.awaitUninterruptibly();
// 关闭连接
session.closeNow();
System.out.println("Mina 客户端声明周期结束.");
}
其中第一行代码
IoConnector connector = new NioSocketConnector();
这里接口定义的是 IoConnector ,实现类使用的是 NioSocketConnector。NioSocketConnector 主要是来处理基于 TCP/IP 协议的请求,但是是客户端连接到服务器端的实现。
6. 总结
本章节详细介绍了 Mina 框架核心组件 IoService 的作用和用法,理解 IoService 是理解 Mina 应用很重要的一个环节。
7. 参考链接
Mina 2.0 用户指南 https://mina.apache.org/mina-project/userguide/user-guide-toc.html
MINA based Application Architecture https://mina.apache.org/mina-project/userguide/ch2-basics/ch2.1-application-architecture.html
IoService Introduction https://mina.apache.org/mina-project/userguide/ch3-service/ch3.1-io-service.html
IoService Details https://mina.apache.org/mina-project/userguide/ch3-service/ch3.2-io-service-details.html