# SFTP 适配器

# SFTP 适配器

Spring 集成为 SFTP 上的文件传输操作提供了支持。

安全文件传输协议(英语:Secure File Transfer Protocol,SFTP)是一种网络协议,允许你通过任何可靠的流在互联网上的两台计算机之间传输文件。

SFTP 协议需要一个安全的通道(如 SSH),并且在 SFTP会话中对客户端身份的可见性。

Spring 集成通过提供三个客户端端点来支持通过 SFTP 发送和接收文件:入站通道适配器、出站通道适配器和出站网关。它还提供了方便的名称空间配置来定义这些客户机组件。

你需要在项目中包含此依赖项:

Maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-sftp</artifactId>
    <version>5.5.9</version>
</dependency>

Gradle

compile "org.springframework.integration:spring-integration-sftp:5.5.9"

要在 XML 配置中包含 SFTP 命名空间,请在根元素上包含以下属性:

xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/sftp
    https://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd"

# sftp会话工厂

从版本 3.0 开始,默认情况下会话不再缓存。
参见SFTP Session Caching

在配置 SFTP 适配器之前,你必须配置一个 SFTP会话工厂。你可以使用常规的 Bean 定义来配置 SFTP会话工厂,如下例所示:

<beans:bean id="sftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <beans:property name="host" value="localhost"/>
    <beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/>
    <beans:property name="privateKeyPassphrase" value="springIntegration"/>
    <beans:property name="port" value="22"/>
    <beans:property name="user" value="kermit"/>
</beans:bean>

每当适配器从其SessionFactory请求会话对象时,就会创建一个新的 SFTP会话。在这种情况下,SFTP会话工厂依赖于JSch (opens new window)库来提供 SFTP 功能。

然而, Spring 集成还支持对 SFTP 会话的缓存。有关更多信息,请参见SFTP Session Caching

JSCH 支持在与服务器的连接上进行多个通道(操作),
默认情况下, Spring 集成会话工厂为每个通道使用单独的物理连接,
自 Spring 集成 3.0 以来,你可以配置会话工厂(使用布尔构造函数 arg-defaultfalse)来使用到服务器的单个连接,并在该单个连接上创建多个JSch通道。,

当使用此功能时,你必须将会话工厂包装在缓存会话工厂中,作为稍后描述,以便当一个操作完成时连接不是物理关闭的。

如果缓存被重置,会话仅在关闭最后一个通道时断开连接。

如果在新操作获得会话时发现断开连接,则刷新该连接。
如果你遇到连接问题,并且希望跟踪会话创建并查看轮询的会话,则可以通过将记录器设置为TRACE级别(例如,log4j.category.org.springframework.integration.sftp=TRACE)来启用跟踪。
参见SFTP/JSCH 日志记录

现在,你需要做的就是将这个 SFTP会话工厂注入到适配器中。

为 SFTP会话工厂提供值的一种更实用的方法是使用 Spring 的属性占位符支持 (opens new window)

# 配置属性

下面的列表描述了[DefaultSftpSessionFactory](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/sftp/会话/defaultsftpsessionfactory.html)所公开的所有属性。

isSharedSession(构造函数参数)::当true时,使用单个连接,并且JSch Channels是多路复用的。它的默认值为false

clientVersion::允许你设置客户机版本属性。它的默认值取决于底层的 JSCH 版本,但它看起来会是:SSH-2.0-JSCH-0.1.45

enableDaemonThread::如果true,所有线程都是守护进程线程。如果设置为false,则使用普通的非守护进程线程。此属性设置在底层session (opens new window)上。在这里,此属性默认为false

host:你想要连接的主机的 URL。必须的。

hostKeyAlias:设置主机键的别名,在将主机键与已知的主机列表进行比较时使用该别名。

knownHostsResource::指定用于主密钥存储库的文件资源。该文件的格式与 OpenSSH 的known_hosts文件相同,并且是必需的,如果allowUnknownKeys为 false,则必须预先填充该文件。

password:对远程主机进行身份验证的密码。如果没有提供密码,则需要privateKey属性。如果你设置userInfo,则不允许这样做。密码是从那个对象获得的。

port:应在其上建立 SFTP 连接的端口。如果没有指定,该值默认为22。如果指定,此属性必须是正数。

privateKey::允许你设置一个resource (opens new window),该位置表示用于对远程主机进行身份验证的私钥的位置。如果不提供privateKey,则需要password属性。

privateKeyPassphrase:私钥的密码。如果设置userInfo,则不允许设置privateKeyPassphrase。密码是从那个物体上取得的。可选的。

proxy::允许指定基于 JSCH 的proxy (opens new window)。如果设置了,代理对象将用于通过代理创建到远程主机的连接。有关配置代理的方便方法,请参见Proxy Factory Bean

serverAliveCountMax::指定服务器激活消息的数量,这些消息在断开连接之前不需要服务器的任何回复就可以发送。如果未设置,此属性默认为1

serverAliveInterval:设置发送服务器激活消息之前的超时间隔(以毫秒为单位),以防服务器未接收到任何消息。

sessionConfig::通过使用Properties,你可以在底层 JSCH会话上设置额外的配置设置。

socketFactory::让你传入一个[SocketFactory](https://epaul.github.com/jsch-documentation/javadoc/com/jcraft/jsch/socketfactory.html)。套接字工厂用于创建到目标主机的套接字。当使用代理时,套接字工厂将传递给代理。默认情况下,使用的是普通的 TCP 套接字。

timeout::超时属性用作套接字超时参数,以及默认的连接超时。默认值为0,这意味着不会发生超时。

user:远程用户使用。必须的。

allowUnknownKeys::设置为true,以允许使用未知(或更改)键连接到主机。它的默认值是“false”。只有在不提供userInfo的情况下,才应用它。如果false,则需要一个预先填充的knownHosts文件。

userInfo:设置一个在身份验证期间使用的自定义UserInfo。特别地,当接收到未知(或更改的)主机键时,将调用promptYesNo()。另见[allowUnknownKeys]。当你提供一个UserInfo时,password和私钥passphrase是从它获得的,并且你不能设置离散的passwordprivateKeyPassphrase属性。

# 代理工厂 Bean

Jsch提供了一种通过 HTTP 或 SOCKS 代理连接到服务器的机制。要使用此功能,请配置Proxy,并提供对DefaultSftpSessionFactory的引用,如前面讨论的那样。由Jsch提供三种实现方式:HTTPSOCKS4SOCKS5。 Spring Integration4.3 引入了FactoryBean,通过允许属性注入来简化这些代理的配置,如下例所示:

<bean id="proxySocks5" class="org.springframework.integration.sftp.session.JschProxyFactoryBean">
    <constructor-arg value="SOCKS5" />
    <constructor-arg value="${sftp.proxy.address}" />
    <constructor-arg value="${sftp.proxy.port}" />
    <constructor-arg value="${sftp.proxy.user}" />
    <constructor-arg value="${sftp.proxy.pw}" />
</bean>

<bean id="sessionFactory"
          class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory" >
    ...
    <property name="proxy" ref="proxySocks5" />
    ...
</bean>

# 委托会话工厂

版本 4.2 引入了DelegatingSessionFactory,它允许在运行时选择实际的会话工厂。在调用 SFTP 端点之前,你可以在工厂上调用setThreadKey()将一个键与当前线程关联。然后使用该键查找要使用的实际会话工厂。使用后,你可以通过调用clearThreadKey()清除密钥。

我们添加了方便的方法,这样你就可以更容易地从消息流中实现这一点,如下例所示:

<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
    <constructor-arg>
        <bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
            <!-- delegate factories here -->
        </bean>
    </constructor-arg>
</bean>

<int:service-activator input-channel="in" output-channel="c1"
        expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />

<int-sftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />

<int:service-activator input-channel="c2" output-channel="out"
        expression="@dsf.clearThreadKey(#root)" />
当使用会话缓存(参见SFTP Session Caching)时,每个委托都应该被缓存。
你不能缓存DelegatingSessionFactory本身。

从版本 5.0.7 开始,DelegatingSessionFactory可以与RotatingServerAdvice一起用于轮询多个服务器;请参见入站通道适配器:轮询多个服务器和目录

# SFTP会话缓存

从 Spring Integration Version3.0 开始,默认情况下不再缓存会话。
端点不再支持cache-sessions属性。
如果你希望缓存会话,则必须使用CachingSessionFactory(参见下一个示例)。

在 3.0 之前的版本中,默认情况下会话是自动缓存的。有一个cache-sessions属性可用于禁用自动缓存,但该解决方案没有提供一种配置其他会话-缓存属性的方法。例如,你无法限制创建的会话的数量。为了支持该需求和其他配置选项,我们添加了CachingSessionFactory。它提供sessionCacheSizesessionWaitTimeout属性。顾名思义,sessionCacheSize属性控制工厂在其缓存中维护的活动会话的数量(默认情况是无界的)。如果已经达到sessionCacheSize阈值,则试图获取另一个会话块,直到其中一个缓存的会话变得可用,或者直到一个会话的等待时间过期(默认的等待时间是Integer.MAX_VALUE)。sessionWaitTimeout属性允许配置等待时间。

如果你希望你的会话被缓存,那么配置你的默认会话工厂(作为前面描述的),然后将其包装到CachingSessionFactory的实例中,你可以在其中提供这些附加属性。下面的示例展示了如何做到这一点:

<bean id="sftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <property name="host" value="localhost"/>
</bean>

<bean id="cachingSessionFactory"
    class="org.springframework.integration.file.remote.session.CachingSessionFactory">
    <constructor-arg ref="sftpSessionFactory"/>
    <constructor-arg value="10"/>
    <property name="sessionWaitTimeout" value="1000"/>
</bean>

前面的示例创建一个CachingSessionFactory,其sessionCacheSize设置为10,其sessionWaitTimeout设置为一秒(1000 毫秒)。

从 Spring 集成版本 3.0 开始,CachingConnectionFactory提供了一个resetCache()方法。当调用时,所有空闲会话都会立即关闭,而当正在使用的会话被返回到缓存时,它们会被关闭。当使用isSharedSession=true时,只有当最后一个通道关闭时,通道才关闭,共享会话才关闭。新的届会请求在必要时设立新的届会。

从版本 5.1 开始,CachingSessionFactory有一个新的属性testSession。当为真时,会话将通过执行stat(getHome())命令来进行测试,以确保它仍然处于活动状态;如果不是,则将从缓存中删除它;如果缓存中没有活动会话,则将创建一个新的会话。

# 使用RemoteFileTemplate

Spring 集成版本 3.0 在SftpSession对象上提供了一个新的抽象。该模板提供了发送、检索(作为InputStream)、删除和重命名文件的方法。此外,我们提供了一个execute方法来让调用方在会话上运行多个操作。在所有情况下,模板都会可靠地关闭会话。有关更多信息,请参见[Javadoc forRemoteFileTemplate](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/file/file/remotefiletemplate.html)SFTP 有一个子类:[SftpRemoteFileTemplate](https://DOCS. Spring.template/ Spring-integration/api/org/springframework/integration/sftepramework/sftefilefilemplate/sp/会话/sf

我们在版本 4.1 中添加了其他方法,包括getClientInstance()。它提供了对底层ChannelSftp的访问,从而能够访问底层 API。

5.0 版引入了RemoteFileOperations.invoke(OperationsCallback<F, T> action)方法。这个方法允许在同一个线程有界的Session的范围内调用几个RemoteFileOperations调用。当你需要将RemoteFileTemplate作为一个工作单元执行多个高级操作时,这是非常有用的。例如,AbstractRemoteFileOutboundGateway将其与mput命令实现一起使用,其中我们对提供的目录中的每个文件执行put操作,并递归地对其子目录执行该操作。有关更多信息,请参见Javadoc (opens new window)

# SFTP 入站通道适配器

SFTP 入站通道适配器是一种特殊的侦听器,它连接到服务器并侦听远程目录事件(例如正在创建的新文件),此时它将启动文件传输。下面的示例展示了如何配置 SFTP 入站通道适配器:

<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
              session-factory="sftpSessionFactory"
            channel="requestChannel"
            filename-pattern="*.txt"
            remote-directory="/foo/bar"
            preserve-timestamp="true"
            local-directory="file:target/foo"
            auto-create-local-directory="true"
            local-filename-generator-expression="#this.toUpperCase() + '.a'"
            scanner="myDirScanner"
            local-filter="myFilter"
            temporary-file-suffix=".writing"
            max-fetch-size="-1"
            delete-remote-files="false">
        <int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

前面的配置示例展示了如何为各种属性提供值,包括以下内容:

  • local-directory:文件要传输到的位置

  • remote-directory:文件将从其中传输的远程源目录

  • session-factory:对我们之前配置的 Bean 的引用

默认情况下,传输的文件带有与原始文件相同的名称。如果要重写此行为,可以设置local-filename-generator-expression属性,该属性允许你提供一个 SPEL 表达式来生成本地文件的名称。与出站网关和适配器(SPEL 求值上下文的根对象是Message)不同,此入站适配器在求值时还没有消息,因为这是它最终以传输的文件作为有效负载生成的消息。因此,SPEL 求值上下文的根对象是远程文件的原始名称(aString)。

入站通道适配器首先将文件检索到本地目录,然后根据 Poller 配置发出每个文件。从版本 5.0 开始,当需要新的文件检索时,可以限制从 SFTP 服务器获取的文件的数量。当目标文件很大时,或者在具有持久文件列表过滤器的集群系统中运行时,这可能是有益的,本文将在后面讨论。为此,请使用max-fetch-size。负值(默认值)意味着没有限制,所有匹配的文件都会被检索到。有关更多信息,请参见入站通道适配器:控制远程文件获取。从版本 5.0 开始,你还可以通过设置scanner属性,为inbound-channel-adapter提供一个自定义的DirectoryScanner实现。

从 Spring Integration3.0 开始,你可以指定preserve-timestamp属性(默认值是false)。当true时,本地文件的修改时间戳被设置为从服务器检索到的值。否则,它被设置为当前时间。

从版本 4.2 开始,你可以指定remote-directory-expression而不是remote-directory,这使你可以动态地确定每个轮询上的目录——例如,remote-directory-expression="@myBean.determineRemoteDir()"

有时,基于通过filename-pattern属性指定的简单模式的文件过滤可能还不够。如果是这种情况,可以使用filename-regex属性来指定正则表达式(例如,filename-regex=".*\.test$")。如果需要完整的控制,可以使用filter属性提供对org.springframework.integration.file.filters.FileListFilter的自定义实现的引用,这是用于过滤文件列表的策略接口。此筛选器确定要检索哪些远程文件。你还可以使用CompositeFileListFilter将基于模式的过滤器与其他过滤器(例如AcceptOnceFileListFilter,以避免同步先前已获取的文件)结合起来。

AcceptOnceFileListFilter将其状态存储在内存中。如果你希望该状态在系统重新启动后仍然有效,请考虑使用SftpPersistentAcceptOnceFileListFilter代替。这个过滤器将接受的文件名存储在MetadataStore策略的实例中(参见元数据存储)。此筛选器匹配文件名和远程修改时间。

从版本 4.0 开始,这个过滤器需要ConcurrentMetadataStore。当与共享数据存储一起使用时(例如RedisRedisMetadataStore),这使得过滤器键可以在多个应用程序或服务器实例之间共享。

从版本 5.0 开始,对于SftpInboundFileSynchronizer,默认情况下会应用带有内存SimpleMetadataStoreSftpPersistentAcceptOnceFileListFilter。这个筛选器也将应用于 XML 配置中的regexpattern选项,以及 Java DSL 中的SftpInboundChannelAdapterSpec选项。你可以通过使用CompositeFileListFilter(或ChainFileListFilter)来处理任何其他用例。

上面的讨论涉及在检索文件之前对文件进行过滤。检索完文件后,将对文件系统上的文件应用一个额外的过滤器。默认情况下,这是一个AcceptOnceFileListFilter,正如本节讨论的那样,它在内存中保留状态,并且不考虑文件的修改时间。除非你的应用程序在处理后删除文件,否则在应用程序重新启动后,适配器默认会重新处理磁盘上的文件。

另外,如果将filter配置为使用SftpPersistentAcceptOnceFileListFilter并更改远程文件的时间戳(导致重新获取它),则默认的本地过滤器不允许处理此新文件。

有关此过滤器的详细信息,以及如何使用它,请参见远程持久文件列表过滤器

你可以使用local-filter属性来配置本地文件系统过滤器的行为。从版本 4.3.8 开始,默认情况下配置了FileSystemPersistentAcceptOnceFileListFilter。该过滤器将接受的文件名和修改的时间戳存储在MetadataStore策略的实例中(参见元数据存储),并检测本地文件修改时间的更改。默认的MetadataStore是在内存中存储状态的SimpleMetadataStore

从版本 4.1.5 开始,这些过滤器有一个新的属性flushOnUpdate,这会导致它们在每次更新时刷新元数据存储(如果存储实现Flushable)。

此外,如果使用分布式MetadataStore(例如Redis 元数据存储Gemfire 元数据存储),则可以使用同一个适配器或应用程序的多个实例,并确保只有一个实例处理一个文件。

实际的本地过滤器是一个CompositeFileListFilter,其中包含提供的过滤器和一个模式过滤器,该过滤器防止正在下载过程中的文件被处理(基于temporary-file-suffix)。文件是用这个后缀下载的(默认值是.writing),当传输完成时,文件被重命名为它们的最终名称,使它们对过滤器“可见”。

有关这些属性的更多详细信息,请参见schema (opens new window)

SFTP 入站通道适配器是一个轮询消费者。因此,你必须配置一个 Poller(一个全局默认值或一个局部元素)。一旦文件被传输到本地目录,将生成一条以java.io.File为有效负载类型的消息,并将其发送到由channel属性标识的通道。

# 更多关于文件过滤和大文件的信息

有时,刚刚出现在监视(远程)目录中的文件是不完整的。通常,这样的文件是以某种临时扩展名编写的(例如在名为something.txt.writing的文件上使用.writing),然后在编写过程完成后重新命名。在大多数情况下,开发人员只对完整的文件感兴趣,并且只想过滤那些文件。要处理这些场景,可以使用filename-patternfilename-regexfilter属性提供的过滤支持。如果需要自定义过滤器实现,可以通过设置filter属性在适配器中包含引用。下面的示例展示了如何做到这一点:

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="receiveChannel"
            session-factory="sftpSessionFactory"
            filter="customFilter"
            local-directory="file:/local-test-dir"
            remote-directory="/remote-test-dir">
        <int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>

<bean id="customFilter" class="org.foo.CustomFilter"/>

# 从故障中恢复

你应该了解适配器的体系结构。文件同步器获取文件,FileReadingMessageSource为每个同步文件发出一条消息。由于前面讨论过,涉及两个过滤器。filter属性(和模式)引用远程文件列表,以避免获取已经被获取的文件。FileReadingMessageSource使用local-filter来确定哪些文件将作为消息发送。

同步器列出远程文件并查看其过滤器。然后文件被转移。如果在文件传输过程中发生 IO 错误,那么已经添加到过滤器中的所有文件都将被删除,以便在下一次投票时有资格重新获取它们。这仅在过滤器实现ReversibleFileListFilter(例如AcceptOnceFileListFilter)时才适用。

如果在同步文件后,处理文件的下游流出现错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。

如果你希望在失败后重新处理此类文件,则可以使用类似于以下的配置,以便于从筛选器中删除失败的文件:

<int-sftp:inbound-channel-adapter id="sftpAdapter"
        session-factory="sftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/sftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-sftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

前面的配置适用于任何ResettableFileListFilter

从版本 5.0 开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。这也可以是一个远程子路径。为了能够根据层次结构支持递归地读取本地目录以进行修改,你现在可以基于Files.walk()算法提供一个内部FileReadingMessageSource和一个新的RecursiveDirectoryScanner。参见[AbstractInboundFileSynchronizingMessageSource.setScanner()](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/file/remote/synchronizer/abstractinboundfilesynchronizingmessagesource.html#setscanner-org.springframework.integration.file.directoryscanner)了解更多信息。此外,你现在可以通过使用setUseWatchService()选项将AbstractInboundFileSynchronizingMessageSource切换到基于WatchServiceDirectoryScanner。它还被配置为所有WatchEventType实例,以便对本地目录中的任何修改做出反应。前面显示的再处理示例基于FileReadingMessageSource.WatchServiceDirectoryScanner的内置功能,当从本地目录中删除文件(StandardWatchEventKinds.ENTRY_DELETE)时,它使用ResettableFileListFilter.remove()。有关更多信息,请参见[WatchServiceDirectoryScanner](./file.html#watch-service-directory-scanner)。

# 使用 Java 配置进行配置

Spring 以下引导应用程序展示了如何使用 Java 配置入站适配器的示例:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

# 使用 Java DSL 进行配置

Spring 以下引导应用程序展示了如何使用 Java DSL 配置入站适配器的示例:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlows
            .from(Sftp.inboundAdapter(this.sftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilenameExpression("#this.toUpperCase() + '.a'")
                    .localDirectory(new File("sftp-inbound")),
                 e -> e.id("sftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

# 处理不完整数据

处理不完整的数据

SftpSystemMarkerFilePresentFileListFilter用于过滤远程系统上没有相应标记文件的远程文件。有关配置信息,请参见Javadoc (opens new window)

# SFTP 流媒体入站通道适配器

版本 4.3 引入了流入站通道适配器。这个适配器生成的消息的有效负载类型为InputStream,这样你就可以在不写到本地文件系统的情况下获取文件。由于会话保持打开,所以当文件已被消费时,消费应用程序负责关闭会话。会话在closeableResource报头(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE)中提供。标准框架组件,例如FileSplitterStreamTransformer,会自动关闭会话。有关这些组件的更多信息,请参见文件拆分器流变压器。下面的示例展示了如何配置 SFTP 流入站通道适配器:

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

你只能使用filename-patternfilename-regexfilterfilter-expression中的一个。

从版本 5.0 开始,默认情况下,SftpStreamingMessageSource适配器通过使用基于内存SimpleMetadataStoreSftpPersistentAcceptOnceFileListFilter来防止远程文件的重复。
默认情况下,该过滤器还与文件名模式(或正则表达式)一起应用。,如果你需要允许重复,
,你可以使用AcceptAllFileListFilter
你可以通过使用CompositeFileListFilter(或ChainFileListFilter)来处理任何其他用例。
Java 配置稍后显示显示了一种在处理后删除远程文件的技术,从而避免重复。

有关SftpPersistentAcceptOnceFileListFilter及其使用方式的更多信息,请参见远程持久文件列表过滤器

你可以使用max-fetch-size属性来限制在需要获取时在每个轮询中获取的文件数量。将其设置为1,并在集群环境中运行时使用持久过滤器。有关更多信息,请参见入站通道适配器:控制远程文件获取

适配器将远程目录和文件名分别放入头文件中(FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE)。从版本 5.0 开始,FileHeaders.REMOTE_FILE_INFO头提供了额外的远程文件信息(在 JSON 中)。如果将SftpStreamingMessageSource上的fileInfoJson属性设置为false,则头包含一个SftpFileInfo对象。你可以使用SftpFileInfo.getFileInfo()方法访问底层 JSCH 库提供的LsEntry对象。当你使用 XML 配置时,fileInfoJson属性是不可用的,但是你可以通过将SftpStreamingMessageSource注入到你的一个配置类中来设置它。另见远程文件信息

从版本 5.1 开始,comparator的通用类型是LsEntry。在此之前,它是AbstractFileInfo<LsEntry>。这是因为排序现在是在处理的较早阶段执行的,在过滤和应用maxFetch之前。

# 使用 Java 配置进行配置

Spring 以下引导应用程序展示了如何使用 Java 配置入站适配器的示例:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("sftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

请注意,在本例中,Transformer 下游的消息处理程序有一个建议,该建议在处理后删除远程文件。

# 入站通道适配器:轮询多个服务器和目录

从版本 5.0.7 开始,RotatingServerAdvice是可用的;当配置为 Poller 建议时,入站适配器可以轮询多个服务器和目录。配置建议,并将其正常添加到 Poller 的建议链中。aDelegatingSessionFactory用于选择服务器,有关更多信息,请参见Delegating Session Factory。通知配置由RotationPolicy.KeyDirectory对象列表组成。

例子

@Bean
public RotatingServerAdvice advice() {
    List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
    return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

此建议将轮询服务器foo上的目录one,直到没有新文件存在,然后移动到目录bar,然后在服务器baz上的目录two,等等。

可以使用公平构造函数 arg 修改此默认行为:

fair

@Bean
public RotatingServerAdvice advice() {
    ...
    return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}

在这种情况下,无论上一次投票是否返回了文件,通知都将移动到下一个服务器/目录。

或者,你可以提供自己的RotationPolicy,以便根据需要重新配置消息源:

政策

public interface RotationPolicy {

    void beforeReceive(MessageSource<?> source);

    void afterReceive(boolean messageReceived, MessageSource<?> source);

}

and

习惯

@Bean
public RotatingServerAdvice advice() {
    return new RotatingServerAdvice(myRotationPolicy());
}

local-filename-generator-expression属性(同步器上的localFilenameGeneratorExpression)现在可以包含#remoteDirectory变量。这允许将从不同目录检索到的文件下载到本地类似的目录中:

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(Sftp.inboundAdapter(sf())
                    .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                    .localDirectory(new File(tmpDir))
                    .localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
                    .remoteDirectory("."),
                e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
            .channel(MessageChannels.queue("files"))
            .get();
}
使用此建议时,不要在 Poller 上配置TaskExecutor;有关更多信息,请参见消息源的条件 Poller

# 入站通道适配器:控制远程文件获取

在配置入站通道适配器时,应该考虑两个属性。max-messages-per-poll,与所有的 Poller 一样,可以用来限制在每个轮询上发出的消息的数量(如果超过配置的值已经准备好)。max-fetch-size(自版本 5.0 起)可以限制一次从远程服务器检索到的文件的数量。

以下场景假定起始状态是一个空的本地目录:

  • max-messages-per-poll=2max-fetch-size=1:适配器获取一个文件,发出它,获取下一个文件,并发出它。然后它会一直睡到下一次投票。

  • max-messages-per-poll=2max-fetch-size=2):适配器获取这两个文件,然后发射每个文件。

  • max-messages-per-poll=2max-fetch-size=4:适配器获取最多 4 个文件(如果可用)并发出前两个文件(如果至少有两个)。接下来的两个文件将在下一次投票时发出。

  • max-messages-per-poll=2max-fetch-size未指定:适配器获取所有远程文件并发出前两个文件(如果至少有两个)。后续的文件将在后续的轮询中发出(一次两个)。当所有的文件都用完后,将再次尝试远程获取,以获取任何新的文件。

当你部署一个应用程序的多个实例时,我们建议设置一个小的max-fetch-size,以避免一个实例“抓取”所有文件并使其他实例无法使用。

max-fetch-size的另一种用法是,当你想要停止获取远程文件,但要继续处理已经获取的文件时。在MessageSource上设置maxFetchSize属性(通过编程方式,通过 JMX 或通过控制总线)可以有效地阻止适配器获取更多文件,但可以让 Poller 继续为以前已获取的文件发送消息。如果属性更改时 poller 处于活动状态,则更改将在下一次投票时生效。

从版本 5.1 开始,同步器可以提供Comparator<LsEntry>。这在限制使用maxFetchSize获取的文件数量时很有用。

# SFTP 出站通道适配器

SFTP 出站通道适配器是一种特殊的MessageHandler,它连接到远程目录,并为它作为传入Message的有效负载接收的每个文件发起文件传输。它还支持文件的几种表示方式,因此不限于File对象。与 FTP 出站适配器类似,SFTP 出站通道适配器支持以下有效负载:

  • java.io.File:实际的文件对象

  • byte[]:表示文件内容的字节数组

  • java.lang.String:表示文件内容的文本

  • java.io.InputStream:要传输到远程文件的数据流

  • org.springframework.core.io.Resource:用于将数据传输到远程文件的资源

下面的示例展示了如何配置 SFTP 出站通道适配器:

<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter"
    session-factory="sftpSessionFactory"
    channel="inputChannel"
    charset="UTF-8"
    remote-file-separator="/"
    remote-directory="foo/bar"
    remote-filename-generator-expression="payload.getName() + '-mysuffix'"
    filename-generator="fileNameGenerator"
    use-temporary-filename="true"
    chmod="600"
    mode="REPLACE"/>

有关这些属性的更多详细信息,请参见schema (opens new window)

# spel 和 SFTP 出站适配器

与 Spring 集成中的许多其他组件一样,在配置 SFTP 出站通道适配器时,可以使用 Spring 表达式语言,方法是指定两个属性:remote-directory-expressionremote-filename-generator-expression前面描述的)。表达式求值上下文将消息作为其根对象,它允许你使用表达式,这些表达式可以基于消息中的数据(来自“payload”或“headers”)动态计算文件名或现有目录路径。在前面的示例中,我们使用一个表达式值来定义remote-filename-generator-expression属性,该表达式值根据原始名称计算文件名,同时还附加一个后缀:“-mysuffix”。

从版本 4.1 开始,你可以在传输文件时指定mode。默认情况下,现有文件将被覆盖。模式由FileExistsMode枚举定义,其中包括以下值:

  • REPLACE(默认)

  • REPLACE_IF_MODIFIED

  • APPEND

  • APPEND_NO_FLUSH

  • IGNORE

  • FAIL

使用IGNOREFAIL时,文件不会被传输。FAIL会导致抛出异常,而IGNORE会静默地忽略传输(尽管生成了DEBUG日志条目)。

4.3 版本引入了chmod属性,你可以使用该属性在上传后更改远程文件权限。你可以使用常规的 UNIX 八进制格式(例如,600仅允许文件所有者进行读写)。当使用 Java 配置适配器时,可以使用setChmodOctal("600")setChmod(0600)

# 避免部分写入的文件

处理文件传输时的常见问题之一是处理部分文件的可能性。文件在实际传输完成之前可能会出现在文件系统中。

Spring 为了处理这个问题,集成 SFTP 适配器使用一种常见的算法,在这种算法中,文件以临时名称进行传输,并在文件完全传输后重新命名。

默认情况下,每个正在传输过程中的文件都会出现在文件系统中,并带有一个附加的后缀,默认情况下,后缀是.writing。可以通过设置temporary-file-suffix属性进行更改。

但是,在某些情况下,你可能不想使用这种技术(例如,如果服务器不允许重命名文件)。对于这样的情况,可以通过将use-temporary-file-name设置为false来禁用此功能(默认值为true)。当此属性false时,文件将以其最终名称写入,而使用该属性的应用程序需要其他一些机制来检测该文件是否已完全上传,然后才能访问它。

# 使用 Java 配置进行配置

Spring 以下引导应用程序展示了如何使用 Java 配置出站适配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class SftpJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                    new SpringApplicationBuilder(SftpJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToSftp(new File("/foo/bar.txt"));
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    @Bean
    @ServiceActivator(inputChannel = "toSftpChannel")
    public MessageHandler handler() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
        handler.setFileNameGenerator(new FileNameGenerator() {

            @Override
            public String generateFileName(Message<?> message) {
                 return "handlerContent.test";
            }

        });
        return handler;
    }

    @MessagingGateway
    public interface MyGateway {

         @Gateway(requestChannel = "toSftpChannel")
         void sendToSftp(File file);

    }
}

# 使用 Java DSL 进行配置

Spring 以下引导应用程序展示了如何使用 Java DSL 配置出站适配器的示例:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpOutboundFlow() {
        return IntegrationFlows.from("toSftpChannel")
            .handle(Sftp.outboundAdapter(this.sftpSessionFactory, FileExistsMode.FAIL)
                         .useTemporaryFileName(false)
                         .remoteDirectory("/foo")
            ).get();
    }

}

# SFTP 出站网关

SFTP 出站网关提供了一组有限的命令,允许你与远程 SFTP 服务器进行交互:

  • ls(列表文件)

  • nlst(列出文件名)

  • get(检索文件)

  • mget(检索多个文件)

  • rm(删除文件)

  • mv(移动和重命名文件)

  • put(发送文件)

  • mput(发送多个文件)

# 使用ls命令

ls列出了远程文件并支持以下选项:

  • -1:检索文件名列表。默认值是检索FileInfo对象的列表

  • -a:包含所有文件(包括以“.”开头的文件)

  • -f:不对列表进行排序

  • -dirs:包括目录(默认排除)

  • -links:包括符号链接(默认排除)

  • -R:递归列出远程目录

此外,文件名过滤是以与inbound-channel-adapter相同的方式提供的。

ls操作产生的消息有效负载是文件名列表或FileInfo对象列表(取决于你是否使用-1开关)。这些对象提供诸如修改时间、权限等信息。

ls命令所作用的远程目录在file_remoteDirectory报头中提供。

当使用递归选项(-R)时,fileName包括任意子目录元素,并表示文件的相对路径(相对于远程目录)。如果使用-dirs选项,那么每个递归目录也将作为列表中的一个元素返回。在这种情况下,我们建议你不要使用-1选项,因为你将无法区分文件和目录,这在使用FileInfo对象时是可以做到的。

# 使用nlst命令

版本 5 引入了对nlst命令的支持。

nlst列出远程文件名,并且只支持一个选项:

  • -f:不要对列表进行排序

nlst操作产生的消息有效负载是一个文件名列表。

file_remoteDirectory头保存了执行nlst命令的远程目录。

SFTP 协议不提供列出名称的功能。此命令相当于带有-1选项的ls命令,在此添加此命令是为了方便。

# 使用get命令

get检索远程文件并支持以下选项:

  • -P:保留远程文件的时间戳。

  • -stream:以流的形式检索远程文件。

  • -D:成功传输后删除远程文件。如果忽略传输,则不会删除远程文件,因为FileExistsModeIGNORE,并且本地文件已经存在。

file_remoteDirectory头保存远程目录,file_remoteFile头保存文件名。

get操作产生的消息有效负载是表示检索到的文件的File对象。如果使用-stream选项,则有效负载是InputStream,而不是File。对于文本文件,一个常见的用例是将此操作与文件拆分器流变压器结合起来。当以流的形式使用远程文件时,你要负责在流被使用后关闭Session。为了方便起见,SessioncloseableResource头中提供了IntegrationMessageHeaderAccessor提供了方便的方法:

Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
    closeable.close();
}

框架组件,例如文件拆分器流变压器,在传输数据后自动关闭会话。

下面的示例展示了如何将文件作为流使用:

<int-sftp:outbound-gateway session-factory="ftpSessionFactory"
                            request-channel="inboundGetStream"
                            command="get"
                            command-options="-stream"
                            expression="payload"
                            remote-directory="ftpTarget"
                            reply-channel="stream" />

<int-file:splitter input-channel="stream" output-channel="lines" />
如果在自定义组件中使用输入流,则必须关闭Session
你可以在自定义代码中这样做,也可以将消息的副本路由到service-activator并使用 SPEL,如下例所示:
<int:service-activator input-channel="closeSession"
    expression="headers['closeableResource'].close()" />

# 使用mget命令

mget基于模式检索多个远程文件,并支持以下选项:

  • -P:保留远程文件的时间戳。

  • -R:递归地检索整个目录树。

  • -x:如果没有与模式匹配的文件,则抛出异常(否则,将返回一个空列表)。

  • -D:成功传输后删除每个远程文件。如果忽略传输,则不会删除远程文件,因为FileExistsModeIGNORE,并且本地文件已经存在。

mget操作产生的消息有效负载是一个List<File>对象(即ListFile对象,每个对象代表一个检索到的文件)。

从版本 5.0 开始,如果FileExistsModeIGNORE,则输出消息的有效负载不再包含由于文件已经存在而未被获取的文件。
以前,该数组包含所有文件,包括已经存在的文件。

你使用的表达式确定远程路径应该产生一个以``结尾的结果,例如myfiles/myfiles下获取完整的树。

从版本 5.0 开始,你可以使用递归的MGET,并结合FileExistsMode.REPLACE_IF_MODIFIED模式,在本地定期同步整个远程目录树。此模式将本地文件上一次修改的时间戳设置为远程文件的时间戳,而不考虑-P(保留时间戳)选项。

Notes for when using recursion (-R)

The pattern is ignored and * is assumed.
By default, the entire remote tree is retrieved.
However, you can filter files in the tree by providing a FileListFilter.
You can also filter directories in the tree this way.
A FileListFilter can be provided by reference or by filename-pattern or filename-regex attributes.
For example, filename-regex="(subDir|.*1.TXT)“ retrieves all files ending with 1.TXT in the remote directory and the subdirectory subdir.
However, we describe an alternative available after this note.

If you filter a subdirectory, no additional traversal of that subdirectory is performed.

The -dirs option is not allowed (the recursive mget uses the recursive ls to obtain the directory tree and the directories themselves cannot be included in the list).

Typically, you would use the #remoteDirectory variable in the local-directory-expression 使远程目录结构在本地保留。

持久文件列表过滤器现在有一个布尔属性forRecursion。将此属性设置为true,还设置alwaysAcceptDirectories,这意味着出站网关上的递归操作(lsmget)现在每次都将遍历完整目录树。这是为了解决未检测到目录树中深层更改的问题。此外,forRecursion=true会导致文件的完整路径被用作元数据存储键;这解决了一个问题,即如果同名文件在不同的目录中多次出现,则过滤器无法正常工作。重要提示:这意味着,对于顶层目录下的文件,将找不到持久性元数据存储中的现有密钥。由于这个原因,默认情况下,该属性是false;这可能会在将来的版本中发生变化。

从版本 5.0 开始,你可以将SftpSimplePatternFileListFilterSftpRegexPatternFileListFilter配置为始终通过将alwaysAcceptDirectorties设置为true的目录。这样做允许对简单模式进行递归,如下例所示:

<bean id="starDotTxtFilter"
            class="org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter">
    <constructor-arg value="*.txt" />
    <property name="alwaysAcceptDirectories" value="true" />
</bean>

<bean id="dotStarDotTxtFilter"
            class="org.springframework.integration.sftp.filters.SftpRegexPatternFileListFilter">
    <constructor-arg value="^.*\.txt$" />
    <property name="alwaysAcceptDirectories" value="true" />
</bean>

你可以通过使用网关上的filter属性来提供这些过滤器之一。

另请参见[出站网关部分成功(mgetmput)]。

# 使用put命令

put将文件发送到远程服务器。消息的有效负载可以是java.io.Filebyte[]String。使用remote-filename-generator(或表达式)为远程文件命名。其他可用的属性包括remote-directorytemporary-remote-directory以及它们的*-expression等价物:use-temporary-file-nameauto-create-directory。有关更多信息,请参见模式文档 (opens new window)

put操作产生的消息有效负载是String,其中包含了传输后服务器上文件的完整路径。

版本 4.3 引入了chmod属性,该属性在上传后更改远程文件权限。你可以使用常规的 UNIX 八进制格式(例如,600仅允许文件所有者进行读写)。当使用 Java 配置适配器时,可以使用setChmod(0600)

# 使用mput命令

mput向服务器发送多个文件,并支持以下选项:

  • -R:递归——发送目录和子目录中的所有文件(可能经过筛选)

消息有效负载必须是表示本地目录的java.io.File(或String)。从版本 5.1 开始,还支持FileString的集合。

支持与[put命令](#sftp-put-command)相同的属性。此外,你还可以使用mput-patternmput-regexmput-filtermput-filter-expression中的一个来过滤本地目录中的文件。只要子目录本身通过筛选器,筛选器就可以使用递归。不通过筛选器的子目录不会被递归。

mput操作产生的消息有效负载是List<String>对象(即由传输产生的远程文件路径的List)。

另请参见[出站网关部分成功(mgetmput)]。

版本 4.3 引入了chmod属性,它允许你在上传后更改远程文件权限。你可以使用常规的 UNIX 八进制格式(例如,600仅允许文件所有者进行读写)。当使用 Java 配置适配器时,可以使用setChmodOctal("600")setChmod(0600)

# 使用rm命令

rm命令没有选项。

如果删除操作成功,则得到的消息有效负载为Boolean.TRUE。否则,消息有效负载为Boolean.FALSEfile_remoteDirectory头保存远程目录,file_remoteFile头保存文件名。

# 使用mv命令

mv命令没有选项。

expression属性定义了“from”路径,rename-expression属性定义了“to”路径。默认情况下,rename-expressionheaders['file_renameTo']。这个表达式不能计算为空或空String。如有必要,将创建所需的任何远程目录。结果消息的有效负载是Boolean.TRUEfile_remoteDirectory报头保存原始远程目录,file_remoteFile报头保存文件名。file_renameTo头保存新路径。

从版本 5.5.6 开始,remoteDirectoryExpression可以在mv命令中使用,以方便使用。如果“from”文件不是完整的文件路径,则将remoteDirectoryExpression的结果用作远程目录。这同样适用于“to”文件,例如,如果任务只是重命名某个目录中的远程文件。

# 附加命令信息

getmget命令支持local-filename-generator-expression属性。它定义了一个 SPEL 表达式,在传输过程中生成本地文件的名称。求值上下文的根对象是请求消息。remoteFileName变量也是可用的。它对于mget特别有用(例如:local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.foo")。

getmget命令支持local-directory-expression属性。它定义了一个 SPEL 表达式,在传输过程中生成本地目录的名称。求值上下文的根对象是请求消息。remoteDirectory变量也是可用的。它对 MGET 特别有用(例如:local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.myheader")。这个属性与local-directory属性是互斥的。

对于所有命令,网关的“表达式”属性保存命令在其上执行的路径。对于mget命令,表达式可以计算为``,这意味着检索所有文件,somedirectory/,以及以*结尾的其他值。

下面的示例显示了为ls命令配置的网关:

<int-ftp:outbound-gateway id="gateway1"
        session-factory="ftpSessionFactory"
        request-channel="inbound1"
        command="ls"
        command-options="-1"
        expression="payload"
        reply-channel="toSplitter"/>

发送到toSplitter通道的消息的有效负载是String对象的列表,每个对象都包含一个文件的名称。如果省略command-options="-1",则有效负载将是FileInfo对象的列表。你可以以空格分隔的列表的形式提供选项(例如,command-options="-1 -dirs -links")。

从版本 4.2 开始,GETMGETPUTMPUT命令支持FileExistsMode属性(使用名称空间支持时mode)。这会影响当本地文件存在(GETMGET)或远程文件存在(PUTMPUT)时的行为。支持的模式有REPLACEAPPENDFAILIGNORE。对于向后兼容性,PUTMPUT操作的默认模式是REPLACE。对于GETMGET操作,缺省值是FAIL

# 使用 Java 配置进行配置

Spring 以下引导应用程序展示了如何使用 Java 配置出站网关的示例:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new SftpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
    }

}

# 使用 Java DSL 进行配置

Spring 以下引导应用程序展示了如何使用 Java DSL 配置出站网关的示例:

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(sf);
    }

    @Bean
    public QueueChannelSpec remoteFileOutputChannel() {
        return MessageChannels.queue();
    }

    @Bean
    public IntegrationFlow sftpMGetFlow() {
        return IntegrationFlows.from("sftpMgetInputChannel")
            .handle(Sftp.outboundGateway(sftpSessionFactory(),
                            AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
                    .options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
                    .regexFileNameFilter("(subSftpSource|.*1.txt)")
                    .localDirectoryExpression("'myDir/' + #remoteDirectory")
                    .localFilenameExpression("#remoteFileName.replaceFirst('sftpSource', 'localTarget')"))
            .channel("remoteFileOutputChannel")
            .get();
    }

}

# 出站网关部分成功(mgetmput

在对多个文件执行操作时(通过使用mgetmput),在传输一个或多个文件后的一段时间内可能会发生异常。在这种情况下(从版本 4.2 开始),将抛出一个PartialSuccessException。除了通常的MessagingException属性(failedMessagecause)外,此异常还具有两个附加属性:

  • partialResults:成功的转移结果。

  • derivedInput:从请求消息中生成的文件列表(例如要传输的本地文件为mput)。

这些属性允许你确定哪些文件已成功传输,哪些文件未成功传输。

在递归mput的情况下,PartialSuccessException可能嵌套了PartialSuccessException实例。

考虑以下目录结构:

root/
|- file1.txt
|- subdir/
   | - file2.txt
   | - file3.txt
|- zoo.txt

如果异常发生在file3.txt上,则网关抛出的PartialSuccessException具有derivedInputoffile1.txtsubdir,以及zoo.txtpartialResultsoffile1.txt。它的cause是另一个PartialSuccessExceptionderivedInputfile2.txtfile3.txtpartialResultsfile2.txt之<gtr="。

# SFTP/JSCH 日志记录

由于我们使用 JSCH 库来提供 SFTP 支持,因此有时你可能需要从 JSCHAPI 本身获得更多信息,尤其是在某些事情不能正常工作(例如身份验证异常)的情况下。遗憾的是,JSCH 不使用commons-logging,而是依赖于其com.jcraft.jsch.Logger接口的自定义实现。在 Spring Integration2.0.1 中,我们已经实现了这个接口。因此,现在,要启用 JSCH 日志记录,你可以按照通常的方式配置你的日志记录程序。例如,下面的示例是使用 log4j 的记录器的有效配置:

log4j.category.com.jcraft.jsch=DEBUG

# MessagesessionCallback

从 Spring Integration Version4.2 开始,你可以使用带有<int-sftp:outbound-gateway/>SftpOutboundGateway)的MessageSessionCallback<F, T>实现来执行带有requestMessage上下文的Session<LsEntry>上的任何操作。你可以将它用于任何非标准或低级别的 SFTP 操作(或多个),例如允许从集成流定义或功能接口实现注入进行访问。下面的示例使用 lambda:

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler sftpOutboundGateway(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
    return new SftpOutboundGateway(sessionFactory,
         (session, requestMessage) -> session.list(requestMessage.getPayload()));
}

另一个示例可能是对要发送或检索的文件数据进行预处理或后处理。

在使用 XML 配置时,<int-sftp:outbound-gateway/>提供了一个session-callback属性,该属性允许你指定MessageSessionCallback Bean 名称。

session-callbackcommandexpression属性是互斥的。
当使用 Java 进行配置时,SftpOutboundGateway类提供了不同的构造函数。

# Apache Mina SFTP 服务器事件

在版本 5.2 中添加的ApacheMinaSftpEventListener监听某些 Apache Mina SFTP 服务器事件,并将其发布为ApplicationEvents,该事件可以由任何ApplicationListener Bean、@EventListener Bean 方法或事件入站通道适配器方法接收。

目前支持的活动有:

  • SessionOpenedEvent-打开了一个客户端会话

  • DirectoryCreatedEvent-创建了一个目录

  • FileWrittenEvent-一个文件被写入到

  • PathMovedEvent-重命名了一个文件或目录

  • PathRemovedEvent-删除了一个文件或目录

  • SessionClosedEvent-客户端已断开连接

每一个都是ApacheMinaSftpEvent的子类;你可以配置一个侦听器来接收所有的事件类型。每个事件的source属性是一个ServerSession,你可以从它获得诸如客户机地址之类的信息;在抽象事件上提供了一个方便的getSession()方法。

要用侦听器(必须是 Spring Bean)配置服务器,只需将其添加到SftpSubsystemFactory中:

server = SshServer.setUpDefaultServer();
...
SftpSubsystemFactory sftpFactory = new SftpSubsystemFactory();
sftpFactory.addSftpEventListener(apacheMinaSftpEventListenerBean);
...

要使用 Spring 集成事件适配器来使用这些事件:

@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
    ApplicationEventListeningMessageProducer producer =
        new ApplicationEventListeningMessageProducer();
    producer.setEventTypes(ApacheMinaSftpEvent.class);
    producer.setOutputChannel(eventChannel());
    return producer;
}

# 远程文件信息

从版本 5.2 开始,SftpStreamingMessageSourceSFTP 流入站通道适配器)、SftpInboundFileSynchronizingMessageSourceSFTP 入站通道适配器)和“read”---命令的SftpOutboundGatewaySFTP 出站网关)在消息中提供额外的头,以生成有关远程文件的信息:

  • FileHeaders.REMOTE_HOST_PORT-在文件传输操作期间,远程会话已连接到的 host:port pair;

  • FileHeaders.REMOTE_DIRECTORY-已执行操作的远程目录;

  • FileHeaders.REMOTE_FILE-远程文件名;仅适用于单个文件操作。

由于SftpInboundFileSynchronizingMessageSource不会针对远程文件生成消息,而是使用本地副本,因此AbstractInboundFileSynchronizer在同步操作期间以 URI 样式(protocol://host:port/remoteDirectory#remoteFileName)在MetadataStore(可以在外部配置)中存储有关远程文件的信息。当对本地文件进行轮询时,将通过SftpInboundFileSynchronizingMessageSource检索此元数据。当本地文件被删除时,建议删除其元数据条目。AbstractInboundFileSynchronizer为此提供了一个removeRemoteFileMetadata()回调。此外还有一个setMetadataStorePrefix()要在元数据中使用的键。建议将该前缀与基于MetadataStoreFileListFilter实现中使用的前缀不同,当相同的MetadataStore实例在这些组件之间共享时,为了避免条目重写,因为 filter 和AbstractInboundFileSynchronizer都对元数据项使用相同的本地文件名。