七的博客

MINA通信入门(三)-理解网络通信枢纽 IoService

网络通信电力Mina

MINA通信入门(三)-理解网络通信枢纽 IoService

通过上一个章节,我们已经动手实现了 Mina 的 HelloWorld 例子。这一章节开始,将对 Mina 的三个核心组件进行讲解。这对我们开发实际的生产应用是至关重要的,也许通过 HelloWorld 你可以慢慢改成生产的代码,但是不了解这几个组件很有可能会开发出有bug的应用。

讲解中将会设计到一些源码,稍微看一看,入门课有个大概印象即可,不用在源码上面纠结太多。

1. IoService 是什么

IoService 是 Mina 框架的核心接口, 它提供基本的 I/O 服务并管理 I/O 会话。IoService接口及其子接口的实现类是处理大部分底层I/O操作的地方。

IoService流程在整个流程位置

2. IoService 的职责有哪些

图片来自于 Mina 官网:

IoService职责

  • 会话管理:创建和删除会话,检测空闲状态。

  • 过滤器链管理:管理过滤器链,允许用户动态修改链。

  • 处理程序调用:在收到新消息时调用处理程序等。

  • 统计管理:更新发送的消息数、发送的字节数等统计信息。

  • 监听器管理:管理用户可以设置的监听器。

  • 通信管理:处理数据的双向传输。

3. IoService 接口的定义

IoService 接口的定义如下:

public interface IoService {
    /**
     * @return the {@link TransportMetadata} that this service runs on.
     */
    // 返回IoAcceptor或IoConnector正在运行的传输元数据。通常包括提供程序名称(如nio、apr、rxtx)、连接类型(无连接(UDP)/面向连接(TCP))等的详细信息
    TransportMetadata getTransportMetadata();

    /**
     * Adds an {@link IoServiceListener} that listens any events related with
     * this service.
     * 
     * @param listener The listener to add
     */
    // 添加一个监听器,这个监听器会接收到服务的各种事件通知,比如连接开启、关闭等。
    void addListener(IoServiceListener listener);

    /**
     * Removed an existing {@link IoServiceListener} that listens any events
     * related with this service.
     * 
     * @param listener The listener to use
     */
    // 移除一个已经添加的监听器,这样这个监听器就不再接收事件通知了。
    void removeListener(IoServiceListener listener);

    /**
     * @return <tt>true</tt> if and if only {@link #dispose()} method has
     * been called.  Please note that this method will return <tt>true</tt>
     * even after all the related resources are released.
     */
    // 告诉你这个服务是否正在关闭中。即使服务的所有资源都已经清理完毕,这个方法也会返回 true
    boolean isDisposing();

    /**
     * @return <tt>true</tt> if and if only all resources of this processor
     * have been disposed.
     */
    // 告诉你这个服务是否已经完全关闭,所有资源都被释放了。
    boolean isDisposed();

    /**
     * Releases any resources allocated by this service.  Please note that
     * this method might block as long as there are any sessions managed by
     * this service.
     */
    // 关闭这个服务,并释放它使用的所有资源。这个操作可能会等待,直到所有正在处理的连接都完全关闭。
    void dispose();

    /**
     * Releases any resources allocated by this service.  Please note that
     * this method might block as long as there are any sessions managed by this service.
     *
     * Warning : calling this method from a IoFutureListener with <code>awaitTermination</code> = true
     * will probably lead to a deadlock.
     *
     * @param awaitTermination When true this method will block until the underlying ExecutorService is terminated
     */
    //  关闭服务,并可以选择是否等待直到所有资源完全释放。如果你选择等待 (awaitTermination 为 true),这个方法会阻塞直到所有的后台处理都结束。
    void dispose(boolean awaitTermination);

    /**
     * @return the handler which will handle all connections managed by this service.
     */
    //  获取当前处理所有连接的处理器对象。
    IoHandler getHandler();

    /**
     * Sets the handler which will handle all connections managed by this service.
     * 
     * @param handler The IoHandler to use
     */
    // 设置一个处理器,这个处理器负责处理所有网络连接的数据。
    void setHandler(IoHandler handler);

    /**
     * @return the map of all sessions which are currently managed by this
     * service.  The key of map is the {@link IoSession#getId() ID} of the
     * session. An empty collection if there's no session.
     */
    // 获取一个包含所有当前连接会话的列表,每个会话都有一个唯一标识符。
    Map<Long, IoSession> getManagedSessions();

    /**
     * @return the number of all sessions which are currently managed by this
     * service.
     */
    // 获取当前服务管理的连接会话的总数。
    int getManagedSessionCount();

    /**
     * @return the default configuration of the new {@link IoSession}s
     * created by this service.
     */
    // 获取创建新会话时使用的配置信息。
    IoSessionConfig getSessionConfig();

    /**
     * @return the {@link IoFilterChainBuilder} which will build the
     * {@link IoFilterChain} of all {@link IoSession}s which is created
     * by this service.
     * The default value is an empty {@link DefaultIoFilterChainBuilder}.
     */
    // 获取一个构建器,这个构建器用来设置连接的处理流程,比如加密、压缩等。 (Builder设计模式)
    IoFilterChainBuilder getFilterChainBuilder();

    /**
     * Sets the {@link IoFilterChainBuilder} which will build the
     * {@link IoFilterChain} of all {@link IoSession}s which is created
     * by this service.
     * If you specify <tt>null</tt> this property will be set to
     * an empty {@link DefaultIoFilterChainBuilder}.
     * 
     * @param builder The filter chain builder to use
     */
    // 设置连接的处理流程构建者 (Builder设计模式)
    void setFilterChainBuilder(IoFilterChainBuilder builder);

    /**
     * A shortcut for <tt>( ( DefaultIoFilterChainBuilder ) </tt>{@link #getFilterChainBuilder()}<tt> )</tt>.
     * Please note that the returned object is not a <b>real</b> {@link IoFilterChain}
     * but a {@link DefaultIoFilterChainBuilder}.  Modifying the returned builder
     * won't affect the existing {@link IoSession}s at all, because
     * {@link IoFilterChainBuilder}s affect only newly created {@link IoSession}s.
     *
     * @return The filter chain in use
     * @throws IllegalStateException if the current {@link IoFilterChainBuilder} is
     *                               not a {@link DefaultIoFilterChainBuilder}
     */
    // 获取当前使用的过滤链构建器,可以用来修改新会话的处理流程。
    DefaultIoFilterChainBuilder getFilterChain();

    /**
     * @return a value of whether or not this service is active
     */
    // 检查服务是否正在运行。
    boolean isActive();

    /**
     * @return the time when this service was activated.  It returns the last
     * time when this service was activated if the service is not active now.
     */
    // 获取服务最后一次被激活(开始运行)的时间。
    long getActivationTime();

    /**
     * Writes the specified {@code message} to all the {@link IoSession}s
     * managed by this service.  This method is a convenience shortcut for
     * {@link IoUtil#broadcast(Object, Collection)}.
     * 
     * @param message the message to broadcast
     * @return The set of WriteFuture associated to the message being broadcasted
     */
    // 向所有管理的会话发送同一个消息,比如广播通知。
    Set<WriteFuture> broadcast(Object message);

    /**
     * @return the {@link IoSessionDataStructureFactory} that provides
     * related data structures for a new session created by this service.
     */
    // 不常用
    IoSessionDataStructureFactory getSessionDataStructureFactory();

    /**
     * Sets the {@link IoSessionDataStructureFactory} that provides
     * related data structures for a new session created by this service.
     * 
     * @param sessionDataStructureFactory The factory to use
     */
     // 不常用
    void setSessionDataStructureFactory(IoSessionDataStructureFactory sessionDataStructureFactory);

    /**
     * @return The number of bytes scheduled to be written
     */
    // 获取当前所有等待发送的数据的总字节数(即存储在内存中等待套接字准备好写入的字节)
    int getScheduledWriteBytes();

    /**
     * @return The number of messages scheduled to be written
     */
    // 获取当前所有等待发送的消息的数量(即存储在内存中等待套接字准备好写入的消息)。
    int getScheduledWriteMessages();

    /**
     * @return The statistics object for this service.
     */
    // 获取这个服务的统计信息,比如发送了多少数据、处理了多少连接等。
    IoServiceStatistics getStatistics();
}

在这些方法中,有些方法是比较实用的,比如:

  • 通过getManagedSessions()方法,可以获取当前服务管理的所有会话,并对其进行相应的处理,如空闲会话的检测和关闭等。 这个是比较实用的,尽管有时候我们可能会自己去想 HashMap 之类的数据结构,维护连接跟 Session 之间的关系,但是有些场景下也可以直接使用这个方法去管理会话。

4. IoService 怎么去用

IoService 是一个接口,公共的抽象定义,所以不能直接使用,我们通常会使用两个具体的实现 IoAcceptorIoConnector, 下个小节我们将进行讲解。

5. IoService 接口的具体实现

IoService 有两个子接口: IoAcceptorIoConnector , 分别表示服务端的实现和客户端的实现。实现类都会实现这些方法,从而提供完整的 I/O 服务功能。

5.1 服务端实现 - IoAcceptor

5.1.1 IoAcceptor 是什么

IoAcceptor 是 IoService 接口的一个实现,用于接受传入的连接请求并创建相应的IoSession。它主要用于服务器端 , 监听指定的端口 ,等待客户端的连接。 当有新的连接请求到达时, IoAcceptor 会创建一个新的 IoSession ,并将其交给 IoHandler 处理。

5.1.2 IoAcceptor 的定义

简单看下接口 IoAcceptor 的定义:

public interface IoAcceptor extends IoService {
    /**
     * Returns the local address which is bound currently.  If more than one
     * address are bound, only one of them will be returned, but it's not
     * necessarily the firstly bound address.
     * 
     * @return The bound LocalAddress
     */
    // 如果你的服务器在多个地址上等待连接,这个方法可以让你知道其中一个正在被使用的地址。
    SocketAddress getLocalAddress();

    /**
     * Returns a {@link Set} of the local addresses which are bound currently.
     * 
     * @return The Set of bound LocalAddresses
     */
    Set<SocketAddress> getLocalAddresses();

    /**
     * Returns the default local address to bind when no argument is specified
     * in {@link #bind()} method.  Please note that the default will not be
     * used if any local address is specified.  If more than one address are
     * set, only one of them will be returned, but it's not necessarily the
     * firstly specified address in {@link #setDefaultLocalAddresses(List)}.
     * 
     * @return The default bound LocalAddress
     */
    SocketAddress getDefaultLocalAddress();

    /**
     * Returns a {@link List} of the default local addresses to bind when no
     * argument is specified in {@link #bind()} method.  Please note that the
     * default will not be used if any local address is specified.
     * 
     * @return The list of default bound LocalAddresses
     */
    List<SocketAddress> getDefaultLocalAddresses();

    /**
     * Sets the default local address to bind when no argument is specified in
     * {@link #bind()} method.  Please note that the default will not be used
     * if any local address is specified.
     * 
     * @param localAddress The local addresses to bind the acceptor on
     */
    void setDefaultLocalAddress(SocketAddress localAddress);

    /**
     * Sets the default local addresses to bind when no argument is specified
     * in {@link #bind()} method.  Please note that the default will not be
     * used if any local address is specified.
     * @param firstLocalAddress The first local address to bind the acceptor on
     * @param otherLocalAddresses The other local addresses to bind the acceptor on
     */
    void setDefaultLocalAddresses(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses);

    /**
     * Sets the default local addresses to bind when no argument is specified
     * in {@link #bind()} method.  Please note that the default will not be
     * used if any local address is specified.
     * 
     * @param localAddresses The local addresses to bind the acceptor on
     */
    void setDefaultLocalAddresses(Iterable<? extends SocketAddress> localAddresses);

    /**
     * Sets the default local addresses to bind when no argument is specified
     * in {@link #bind()} method.  Please note that the default will not be
     * used if any local address is specified.
     * 
     * @param localAddresses The local addresses to bind the acceptor on
     */
    void setDefaultLocalAddresses(List<? extends SocketAddress> localAddresses);

    /**
     * Returns <tt>true</tt> if and only if all clients are closed when this
     * acceptor unbinds from all the related local address (i.e. when the
     * service is deactivated).
     * 
     * @return <tt>true</tt> if the service sets the closeOnDeactivation flag
     */
    boolean isCloseOnDeactivation();

    /**
     * Sets whether all client sessions are closed when this acceptor unbinds
     * from all the related local addresses (i.e. when the service is
     * deactivated).  The default value is <tt>true</tt>.
     * 
     * @param closeOnDeactivation <tt>true</tt> if we should close on deactivation
     */
    void setCloseOnDeactivation(boolean closeOnDeactivation);

    /**
     * Binds to the default local address(es) and start to accept incoming
     * connections.
     *
     * @throws IOException if failed to bind
     */
    // 如果你已经设置了默认监听地址,使用这个方法可以快速开始监听,无需再次指定地址。不常用。
    void bind() throws IOException;

    /**
     * Binds to the specified local address and start to accept incoming
     * connections.
     *
     * @param localAddress The SocketAddress to bind to
     * 
     * @throws IOException if failed to bind
     */
    // 直接指定一个地址并在该地址上开始监听。 最常用!!!
    void bind(SocketAddress localAddress) throws IOException;

    /**
     * Binds to the specified local addresses and start to accept incoming
     * connections. If no address is given, bind on the default local address.
     * 
     * @param firstLocalAddress The first address to bind to
     * @param addresses The SocketAddresses to bind to
     * 
     * @throws IOException if failed to bind
     */
    void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException;

    /**
     * Binds to the specified local addresses and start to accept incoming
     * connections. If no address is given, bind on the default local address.
     * 
     * @param addresses The SocketAddresses to bind to
     *
     * @throws IOException if failed to bind
     */
    void bind(SocketAddress... addresses) throws IOException;

    /**
     * Binds to the specified local addresses and start to accept incoming
     * connections.
     *
     * @param localAddresses The local address we will be bound to
     * @throws IOException if failed to bind
     */
    void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException;

    /**
     * Unbinds from all local addresses that this service is bound to and stops
     * to accept incoming connections.  All managed connections will be closed
     * if {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property
     * is <tt>true</tt>.  This method returns silently if no local address is
     * bound yet.
     */
    // 停止在所有当前绑定的地址上的监听。
    void unbind();

    /**
     * Unbinds from the specified local address and stop to accept incoming
     * connections.  All managed connections will be closed if
     * {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property is
     * <tt>true</tt>.  This method returns silently if the default local
     * address is not bound yet.
     * 
     * @param localAddress The local address we will be unbound from
     */
    // 停止在特定的本地地址上的监听。
    void unbind(SocketAddress localAddress);

    /**
     * Unbinds from the specified local addresses and stop to accept incoming
     * connections.  All managed connections will be closed if
     * {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property is
     * <tt>true</tt>.  This method returns silently if the default local
     * addresses are not bound yet.
     * 
     * @param firstLocalAddress The first local address to be unbound from
     * @param otherLocalAddresses The other local address to be unbound from
     */
    void unbind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses);

    /**
     * Unbinds from the specified local addresses and stop to accept incoming
     * connections.  All managed connections will be closed if
     * {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property is
     * <tt>true</tt>.  This method returns silently if the default local
     * addresses are not bound yet.
     * 
     * @param localAddresses The local address we will be unbound from
     */
    void unbind(Iterable<? extends SocketAddress> localAddresses);

    /**
     * (Optional) Returns an {@link IoSession} that is bound to the specified
     * <tt>localAddress</tt> and the specified <tt>remoteAddress</tt> which
     * reuses the local address that is already bound by this service.
     * <p>
     * This operation is optional.  Please throw {@link UnsupportedOperationException}
     * if the transport type doesn't support this operation.  This operation is
     * usually implemented for connectionless transport types.
     *
     * @param remoteAddress The remote address bound to the service
     * @param localAddress The local address the session will be bound to
     * @throws UnsupportedOperationException if this operation is not supported
     * @throws IllegalStateException if this service is not running.
     * @throws IllegalArgumentException if this service is not bound to the
     *                                  specified <tt>localAddress</tt>.
     * @return The session bound to the the given localAddress and remote address
     */
    // 允许服务器主动发起与特定客户端的通信会话,常用于无连接协议或特定的网络交互场景。
    IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress);
}

5.1.3 IoAcceptor 的实现

图片来自于 Mina 官网:

IoAcceptor实现

  • NioSocketAcceptor : 基于Java NIO (Non-blocking I/O)的Socket传输IoAcceptor实现。它使用了 Java NIO的 Selector 和 Channel 机制,能够支持大量的并发连接,同时保持较低的系统资源占用。

主要特点是:

  • 适用于高并发、长连接的应用场景,如即时通讯服务器、游戏服务器等。

  • 非阻塞I/O,支持高并发连接。

  • 基于Java NIO,可以跨平台使用。

  • 提供了丰富的配置选项,如SSL、TCP参数等。

  • NioDatagramAcceptor :基于Java NIO 的 UDP 传输 IoAcceptor 实现。与 NioSocketAcceptor 类似,它也使用了 Java NIO 的 Selector 和Channel 机制,能够支持大量的并发 UDP 连接。

主要特点是:

  • 适用于基于UDP的应用场景,如DNS服务器、SNMP 服务器等。

  • 非阻塞I/O,支持高并发UDP连接。

  • 基于Java NIO,可以跨平台使用。

  • 提供了UDP相关的配置选项,如接收缓冲区大小等。

  • AprSocketAcceptor : 基于APR (Apache Portable Runtime)的阻塞 Socket 传输 IoAcceptor 实现。与 NioSocketAcceptor 不同,它使用了传统的阻塞 I/O 模型,每个连接都需要独立的线程来处理。

主要特点是:

  • 适用于对性能要求较高,但并发连接数不太大的应用场景。

  • 阻塞I/O,每个连接需要独立的线程处理。

  • 基于APR,需要单独安装APR库。

  • 在某些情况下,性能可能优于NioSocketAcceptor。

  • VmPipeSocketAcceptor : 适用于需要在同一个 JVM 内部进行高效通信的场景,如某些基于 OSGi 的应用。

主要特点是:

  • 适用于需要在同一个JVM内部进行高效通信的场景,如某些基于OSGi的应用。

  • 进程内通信,避免了网络I/O开销。

  • 使用 Java 的 PipedInputStream 和 PipedOutputStream 模拟 Socket 。

  • 适用于同一 JVM 内部的高效通信场景。

5.1.4 IoAcceptor 的使用

在上一章节中的 HelloWorld 例子中,服务端的代码如下:

public static void main(String[] args) throws IOException {
        // 创建 IoAcceptor 来处理连接的接入,注意这一行
        IoAcceptor acceptor = new NioSocketAcceptor();

        // 设置 IoHandler 来处理数据读取、数据写入等等业务逻辑
        acceptor.setHandler(new ServerHandler());
        
            // 添加一个处理文本行的编码器和解码器,这个很重要,没有将会报错
        acceptor.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));

        // 设置连接配置,这里暂时不要去纠结
        acceptor.getSessionConfig().setReadBufferSize(2048);
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

        // 绑定端口并启动服务器
        acceptor.bind(new InetSocketAddress(8080));
        System.out.println("Mina服务暴露在端口8080 ");
    }

其中第一行代码

IoAcceptor acceptor = new NioSocketAcceptor();

这里使用的接口定义是 IoAcceptor ,实现使用的是 NioSocketAcceptor。 NioSocketAcceptor 主要是来处理基于 TCP/IP 协议的请求,在通信项目中用的也是比较多。

5.2 客户端实现 - IoConnector

5.2.1 IoConnector是什么

IoConnector 是 IoService 接口的另一个实现,用于与远程服务建立连接并创建相应的 IoSession。它通常用于客户端,主动连接指定的服务器地址和端口。当连接成功建立后, IoConnector 会创建一个新的 IoSession ,并将其交给 IoHandler 处理。

5.2.2 IoConnector的定义

下面是 IoConnector 接口的主要方法:

public interface IoConnector extends IoService {

    /**
     * @return the connect timeout in milliseconds.  The default value is 1 minute.
     */
    // 获取当前设置的连接超时时间,单位是毫秒
    long getConnectTimeoutMillis();


    /**
     * Sets the connect timeout in milliseconds.  The default value is 1 minute.
     * 
     * @param connectTimeoutInMillis The time out for the connection
     */
    // 设置连接的超时时间,单位是毫秒
    void setConnectTimeoutMillis(long connectTimeoutInMillis);

    /**
     * @return the default remote address to connect to when no argument
     * is specified in {@link #connect()} method.
     */
    // 获取默认的远程地址,如果你在调用 connect() 方法时没有指定地址,系统就会尝试连接到这个默认地址。
    SocketAddress getDefaultRemoteAddress();

    /**
     * Sets the default remote address to connect to when no argument is
     * specified in {@link #connect()} method.
     * 
     * @param defaultRemoteAddress The default remote address
     */
    // 设置默认的远程地址
    void setDefaultRemoteAddress(SocketAddress defaultRemoteAddress);

    /**
     * @return the default local address
     */
    SocketAddress getDefaultLocalAddress();

    /**
     * Sets the default local address
     * 
     * @param defaultLocalAddress The default local address
     */
    void setDefaultLocalAddress(SocketAddress defaultLocalAddress);

    /**
     * Connects to the {@link #setDefaultRemoteAddress(SocketAddress) default
     * remote address}.
     * 
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     * @throws IllegalStateException
     *             if no default remoted address is set.
     */
    // 连接到通过 setDefaultRemoteAddress 方法设置的默认远程地址。返回一个 ConnectFuture 对象,你可以用它来监听连接是否成功。
    ConnectFuture connect();

    /**
     * Connects to the {@link #setDefaultRemoteAddress(SocketAddress) default
     * remote address} and invokes the <code>ioSessionInitializer</code> when
     * the IoSession is created but before {@link IoHandler#sessionCreated(IoSession)}
     * is invoked.  There is <em>no</em> guarantee that the <code>ioSessionInitializer</code>
     * will be invoked before this method returns.
     * 
     * @param sessionInitializer  the callback to invoke when the {@link IoSession} object is created
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     * 
     * @throws IllegalStateException if no default remote address is set.
     */
    // 连接建立并创建会话(IoSession)时,会调用 sessionInitializer 初始化器。这可以在会话开始前做一些准备工作。
    ConnectFuture connect(IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

    /**
     * Connects to the specified remote address.
     * 
     * @param remoteAddress The remote address to connect to
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    // 直接连接到指定的远程地址。这个方法允许你动态指定要连接的远程地址,而不是使用默认地址。 最常用!!!
    ConnectFuture connect(SocketAddress remoteAddress);

    /**
     * Connects to the specified remote address and invokes
     * the <code>ioSessionInitializer</code> when the IoSession is created but before
     * {@link IoHandler#sessionCreated(IoSession)} is invoked.  There is <em>no</em>
     * guarantee that the <code>ioSessionInitializer</code> will be invoked before
     * this method returns.
     * 
     * @param remoteAddress  the remote address to connect to
     * @param sessionInitializer  the callback to invoke when the {@link IoSession} object is created
     * 
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    / 直接连接到指定的远程地址这个方法允许你动态指定要连接的远程地址并在会话创建时调用一个初始化器 也最常用!!!
    ConnectFuture connect(SocketAddress remoteAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

    /**
     * Connects to the specified remote address binding to the specified local address.
     *
     * @param remoteAddress The remote address to connect
     * @param localAddress The local address to bind
     * 
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);

    /**
     * Connects to the specified remote address binding to the specified local
     * address and and invokes the <code>ioSessionInitializer</code> when the
     * IoSession is created but before {@link IoHandler#sessionCreated(IoSession)}
     * is invoked.  There is <em>no</em> guarantee that the <code>ioSessionInitializer</code>
     * will be invoked before this method returns.
     * 
     * @param remoteAddress  the remote address to connect to
     * @param localAddress  the local interface to bind to
     * @param sessionInitializer  the callback to invoke when the {@link IoSession} object is created
     *
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress,
            IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
}

5.2.3 IoConnector 的实现

图片来自于 Mina 官网:

IoConnector实现

5.2.3.1 NioSocketConnector

NioSocketConnector 是基于 Java NIO 的非阻塞 Socket 传输 IoConnector 实现。它使用了 Java NIO 的 Selector 和 Channel 机制,能够支持大量的并发连接,同时保持较低的系统资源占用。适用于需要与服务器建立大量连接的客户端应用,如Web浏览器、移动应用等。主要特点是:

  • 非阻塞I/O,支持高并发连接。
  • 基于Java NIO,可以跨平台使用。
  • 提供了连接超时、连接失败重试等高级功能。
5.2.3.2 NioDatagramConnector

基于 Java NIO 的非阻塞 UDP 传输 IoConnector 实现。与 NioSocketConnector 类似,它也使用了 Java NIO 的 Selector 和 Channel 机制,能够支持大量的并发UDP连接。适用于基于UDP的客户端应用,如DNS客户端、SNMP客户端等。主要特点是:

  • 非阻塞I/O,支持高并发UDP连接。
  • 基于Java NIO,可以跨平台使用。
  • 提供了 UDP 相关的配置选项,如发送缓冲区大小等。
5.2.3.3 AprSocketConnector

基于 APR 的阻塞 Socket 传输 IoConnector 实现。与 NioSocketConnector 不同,它使用了传统的阻塞I/O模型,每个连接都需要独立的线程来处理。适用于对性能要求较高,但并发连接数不太大的客户端应用。主要特点是:

  • 阻塞I/O,每个连接需要独立的线程处理。

  • 基于APR,需要单独安装APR库。

  • 在某些情况下,性能可能优于NioSocketConnector。

5.2.3.4 ProxyConnector

ProxyConnector是一个提供代理支持的 IoConnector 实现。它允许客户端通过 HTTP、SOCKS 等代理协议与服务器建立连接。

适用于需要通过代理服务器访问的客户端应用。主要特点是:

  • 支持HTTP、SOCKS4、SOCKS5等多种代理协议。
  • 可以与其他 IoConnector 实现(如 NioSocketConnector )组合使用。
  • 简化了代理环境下的客户端开发。
5.2.3.5 SerialConnector

SerialConnector 是一个用于串行端口通信的 IoConnector 实现。它允许客户端通过 RS232 等串行端口与设备建立连接。

适用于需要与串行设备通信的客户端应用,如工业控制系统、科学仪器等。主要特点是:

  • 支持RS232、RS422、RS485等多种串行协议。

  • 提供了丰富的串口配置选项,如波特率、数据位、停止位等。

这个在电表通信的场景里面,有时候受限于网络条件直接会通过串口去通信,比方说 485 或者光口,这种场景下这个实现就有用武之地了。 写上位机的用这个实现会多一点。

5.2.3.6 VmPipeConnector

VmPipeConnector 是一种特殊的 IoConnector 实现,用于在同一个 Java 虚拟机内部进行通信。它使用了 Java 的 PipedInputStream 和PipedOutputStream 来模拟 Socket 的行为。适用于需要在同一个 JVM 内部进行高效通信的客户端应用,如某些基于 OSGi 的系统。主要特点是:

  • 进程内通信,避免了网络I/O开销。
  • 使用 Java 的 PipedInputStream 和 PipedOutputStream 模拟Socket。
  • 适用于同一JVM内部的高效通信场景。

5.2.4 IoConnector怎么去用

在上一章节中的 HelloWorld 例子中,客户端的代码如下:

public static void main(String[] args) throws IOException {
        // 创建IoConnector
        IoConnector connector = new NioSocketConnector();

        // 设置IoHandler
        connector.setHandler(new ClientHandler());

        connector.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));

        // 设置连接配置,这里先不要纠结这几个参数
        connector.getSessionConfig().setReadBufferSize(2048);
        connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

        // 连接服务器
        ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", "8080"));
        future.awaitUninterruptibly();

        // 获取IoSession
        IoSession session = future.getSession();

        // 发送消息给服务器
        session.write("HelloWorld, Mina!");

        // 等待接收服务器返回的消息
        future.awaitUninterruptibly();

        // 关闭连接
        session.closeNow();
        System.out.println("Mina 客户端声明周期结束.");
    }

其中第一行代码

 IoConnector connector = new NioSocketConnector();

这里接口定义的是 IoConnector ,实现类使用的是 NioSocketConnector。NioSocketConnector 主要是来处理基于 TCP/IP 协议的请求,但是是客户端连接到服务器端的实现。

6. 总结

本章节详细介绍了 Mina 框架核心组件 IoService 的作用和用法,理解 IoService 是理解 Mina 应用很重要的一个环节。

7. 参考链接