(安全) 文件传输,唯一的“飞行”... 或者说是“复制”方式

工程 | Josh Long | 2010年8月23日 | ...

实现方法有很多种。当今许多应用依赖于消息传递(AMQP、JMS)来弥合不同系统和数据之间的差距。另一些则依赖于 RPC(通常是 Web 服务或 REST)。然而,对于许多应用程序来说,文件传输是非常重要的方式!有几种常见的支持方式,其中最常见的三种是使用共享挂载或文件夹、使用 FTP 服务器,以及——对于更安全的交换——使用 SSH(或 SFTP)。虽然众所周知 Spring 一直为消息传递(JMS、AMQP)和 RPC 提供了头等支持(远程调用选项太多无法一一列举!),但许多人可能会对 Spring Integration 项目提供的强大文件传输选项感到惊讶。在本文中,我将基于即将发布的 Spring Integration 2.0 框架中一些令人兴奋的支持进行构建,该框架允许你在新文件到达时接入事件,并将文件发送到远程端点,例如 FTP 或 SFTP 服务器,或共享挂载。

我们将使用一对熟悉的 Java 类——一个用于生成出站数据,另一个用于接收入站数据,无论它们是用于 SFTP、FTP 还是普通的旧式文件系统,这都不重要。所有适配器都将 java.io.File 对象作为其入站负载传递,我们可以将 File、String 或 byte[] 发送到远程系统。首先,让我们看看我们的标准客户端。在 Spring Integration 中,响应入站消息执行逻辑的类称为“服务激活器”(service activators)。你只需配置一个 <service-activator> 元素,并告诉它你要使用哪个 bean 来处理 Message。它会遵循几种不同的启发式方法来帮助你确定调度消息到哪个方法。在这里,我们只是显式地进行注解。因此,以下是我们将在整个文章中使用的客户端代码

import org.springframework.integration.annotation.*;
import org.springframework.stereotype.Component;
import java.io.File;
import java.util.Map;

@Component
public class InboundFileProcessor {

    @ServiceActivator
    public void onNewFileArrival(
            @Headers Map&lt;String, Object&gt; headers,
            @Payload File file) {

        System.out.printf("A new file has arrived deposited into " +
                          "the accounting folder at the absolute " +
                          "path %s \n", file.getAbsolutePath());

        System.out.println("The headers are:");
        for (String k : headers.keySet())
            System.out.println(String.format("%s=%s", k, headers.get(k)));

    }
}

并且,这里是我们将用于合成数据,最终将数据存储到文件系统中作为文件的代码

import org.springframework.integration.annotation.Header;
import org.springframework.integration.aop.Publisher;
import org.springframework.integration.file.FileHeaders;
import org.springframework.stereotype.Component;

@Component
public class OutboundFileProducer {

    @Publisher(channel = "outboundFiles")
    public String writeReportToDisk (
             @Header("customerId") long customerId,
             @Header(FileHeaders.FILENAME) String fileName    ) {
        return String.format("this is a message tailor made for customer # %s", customerId);
    }

}

最后一点是 Spring Integration 乃至整个 Spring 中我最喜欢的特性之一:接口透明性。OutboundFileProducer 类定义了一个使用 @Publisher 注解的方法。@Publisher 注解告诉 Spring Integration 将此方法调用的返回值转发到一个通道(这里我们通过注解为其命名 - outboundFiles)。这与你直接注入一个 org.springframework.integration.MessageChannel 实例并直接在该实例上发送 Message 的效果是一样的。不同之处在于,现在这一切都隐藏在一个简洁干净的 POJO 后面!任何人都可以随意注入这个 bean——当他们调用该方法时,返回值会被写入某个地方的一个 File 中 :-) 这是我们的秘密。要激活此特性,我们在 Spring 上下文中安装一个 Spring BeanPostProcessor。Bean 后处理器机制使你能够轻松扫描 Spring 上下文中的 bean,并在适当时增强它们的定义。在这种情况下,我们正在增强带有 @Publisher 注解的 bean。安装 BeanPostProcessor 就像实例化它一样简单

<beans:bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

现在,我可以创建一个注入此 bean 的客户端(或者直接从上下文中访问它),并像使用其他任何服务一样使用它

@Autowired
private OutboundFileProducer outboundFileProducer ; 

 // ... 

outboundFileProducer.writeReportToDisk(1L, "1.txt") ;

最后,在我所有的 Spring 上下文中,我都会启用 <context:component-scan ... />,让 Java 代码来完成大部分工作并处理业务逻辑。我使用 XML 的唯一地方是描述全局集成解决方案的流程和配置。

文件系统

第一个选择——共享挂载——非常普遍。构建此类解决方案的方法越来越多。大多数操作系统都有一个机制,允许你在文件到达时接收通知。Win32 / .NET 为 Windows 提供了钩子,而在 Linux 上有许多机制,例如内核级的 inotify。在 Java 平台上,Java 7 计划在 NIO.2 包中包含一个 WatchService。然而,在此期间,你需要编写代码来执行目录轮询、维护状态,然后分派事件。这听起来不是很令人兴奋,对吗?请注意,我们将讨论的所有适配器都需要某种形式的轮询。轮询运行得足够好,但需要你进行一定的校准。首先,扫描目录时完全有可能捡到仍在写入的文件,除非你适当地屏蔽文件。通常,系统会将文件存放在某个挂载点上,写入完成后再将其重命名,以便它能匹配适配器上的正则表达式屏蔽:这确保了适配器在文件完成写入之前不会“看到”该文件。

在这里,Spring Integration 提供了很大的帮助——让你免于编写所有的目录轮询代码,并让你能够自由地编写对你重要的逻辑。如果你以前使用过 Spring Integration,那么你就知道从外部系统接收事件就像插入一个适配器一样简单,然后让适配器告诉你何时有值得响应的事情。设置很简单:监控一个文件文件夹以查找新文件,当新文件到达并(可选地)匹配某些条件时,Spring Integration 会转发一个 Message,其负载是已添加文件的 java.io.File 引用。

你可以使用 file:inbound-channel-adapter 来达到此目的。该适配器以固定的时间间隔(由 poller 元素配置)监控一个目录,并在检测到新文件时发布一个 Message。让我们看看如何在 Spring Integration 中配置它

<?xml version="1.0" encoding="UTF-8"?>

<beans:beans ... xmlns:file="http://www.springframework.org/schema/integration/file" >
    <context:component-scan base-package="org.springframework.integration.examples.filetransfer.core"/>

    <file:inbound-channel-adapter channel="inboundFiles"
                                  auto-create-directory="true"
                                  filename-pattern=".*?csv"
                                  directory="#{systemProperties['user.home']}/accounting">
        <poller fixed-rate="10000"/>
    </file:inbound-channel-adapter>

    <channel id="inboundFiles"/>

    <service-activator input-channel="inboundFiles" ref="inboundFileProcessor"/>

</beans:beans>

我认为这些选项都相当直观。filename-pattern 是一个正则表达式,将针对目录中的每个文件名进行评估。如果文件名匹配正则表达式,则将被处理。适配器标签内的 poller 元素告诉适配器每隔 10,000 毫秒(即 10 秒)重新检查目录。directory 属性当然允许你指定要监控的目录,而 channel 则描述当适配器发现内容时应将消息转发到哪个命名通道。在此示例中,与所有后续示例一样,我们将让它将消息转发到一个连接到 <service-activator> 元素的命名通道。服务激活器就是你提供的 Java 代码,Spring Integration 会在收到新消息时调用这些代码。你可以在那里做任何想做的事情。

写入文件系统挂载则是完全不同的事情;它更容易!

<?xml version="1.0" encoding="UTF-8"?>

<beans:beans ... xmlns:file="http://www.springframework.org/schema/integration/file" >

    <context:component-scan base-package="org.springframework.integration.examples.filetransfer.core"/>
    <beans:bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

    <channel id="outboundFiles"/>

    <file:outbound-channel-adapter
            channel="outboundFiles"
            auto-create-directory="true"
            directory="#{systemProperties['user.home']}/Desktop/sales"/>

</beans:beans>

在这个例子中,我们描述了一个命名通道和一个出站适配器。回想一下,出站通道是我们之前创建的 Publisher 类中引用的。在所有示例中,当你调用 writeReportToDisk 方法时,它会将一个 Message 放入通道(outboundFiles)中,并且这些消息会一直传输直到到达出站适配器。当你调用 writeReportToDisk 方法时,返回值(一个 String)被用作 Message 的负载,并且用 @Header 元素注解的两个方法参数会作为消息头添加到 Message 中。键为 FileHeaders.FILENAME@Header 用于告诉出站适配器在配置的目录中写入文件时要使用的文件名。如果我们没有指定它,它会根据一个 UUID 为我们合成一个。相当巧妙吧?

FTP(文件传输协议)

FTP 是一种非常常见的文件存储方式。FTP 支持基本认证,所以它不是最安全的协议。它无处不在:所有操作系统都有免费客户端,事实上很多非技术人员也知道如何使用它,这使得它成为在你的系统和客户之间集成和启用文件共享的好方法。要在 Spring Integration 中使用 FTP 适配器,你需要告诉它如何连接到你的 FTP 服务器,并且你需要告诉它在入站场景中你想将文件下载到本地系统的什么位置。

让我们看看如何配置 Spring Integration 以从远程 FTP 服务器接收新文件。

<?xml version="1.0" encoding="UTF-8"?>
<beans  ... xmlns:ftp="http://www.springframework.org/schema/integration/ftp">

    <context:component-scan base-package="org.springframework.integration.examples.filetransfer.core"/>
    <context:property-placeholder location="file://${user.home}/Desktop/ftp.properties" ignore-unresolvable="true"/>

    <ftp:inbound-channel-adapter
            remote-directory="${ftp.remotedir}"
            channel="ftpIn"
            auto-create-directories="true"
            host="${ftp.host}"
            auto-delete-remote-files-on-sync="false"
            username="${ftp.username}" password="${ftp.password}"
            port="2222"
            client-mode="passive-local-data-connection-mode"
            filename-pattern=".*?jpg"
            local-working-directory="#{systemProperties['user.home']}/received_ftp_files"
            >
        <int:poller fixed-rate="10000"/>
    </ftp:inbound-channel-adapter>

    <int:channel id="ftpIn"/>

    <int:service-activator input-channel="ftpIn" ref="inboundFileProcessor"/>

</beans>

你可以看到有很多选项!其中大多数都是可选的——但知道它们存在总是好的。这个适配器将下载符合指定 filename-pattern 的文件,然后将其作为 payload 为 java.io.FileMessage 传递,就像之前一样。这就是为什么我们可以简单地重用之前的 inboundFileProcessor bean。如果你想进一步控制哪些文件被下载,哪些不被下载,可以考虑使用 filename-pattern 指定一个掩码。请注意,这里暴露了相当多的控制选项,包括连接模式的控制以及文件交付后是否删除源文件。

出站适配器看起来与我们为文件支持配置的出站适配器非常相似。执行时,它将整理传入 payload 的内容,然后将这些内容存储在 FTP 服务器上。目前已内置支持对 Stringbyte[]java.io.File 对象进行整理。

<?xml version="1.0" encoding="UTF-8"?>
<beans ... xmlns:ftp="http://www.springframework.org/schema/integration/ftp">

    <context:component-scan base-package="org.springframework.integration.examples.filetransfer.core"/>
    <context:property-placeholder location="file://${user.home}/Desktop/ftp.properties" ignore-unresolvable="true"/>

    <int:channel id="outboundFiles"/>

    <ftp:outbound-channel-adapter
            remote-directory="${ftp.remotedir}"
            channel="outboundFiles"
            host="${ftp.host}"
            username="${ftp.username}" password="${ftp.password}" port="2222"
            client-mode="passive-local-data-connection-mode"
            />
</beans>

与出站文件适配器一样,我们使用 OutboundFileProducer 类生成要存储的内容,因此无需回顾这部分。剩下的就是通道和适配器本身的配置,其中规定了你期望看到的所有内容:服务器配置以及用于存放 payload 的远程目录。

继续....

SSH 文件传输协议(或安全文件传输协议)

最后,我们来到 SFTP 适配器。可以说,这是这三个适配器中配置最复杂的一个,但也是最容易测试的一个。SFTP 通常在任何你有 SSH 访问权限的地方都可以工作,尽管它并非严格限于此。SFTP 不是通过 SSH 的 FTP,而是一个完全不同的协议。它通常比 SCP 更普遍和一致,规范了许多 SCP 留待解释的地方。SFTP 本身是一个相对精简的协议,因为它对其通信的连接做出了许多假设:它假定——除其他外——客户端用户的身份是已知的,通信是通过安全通道进行的,并且已经发生了身份验证。它由设计 SSH2 的同一个工作组设计,并作为 SSH2 子系统工作良好;可以想象你可以在 SSH1 服务器上运行 SFTP。由于 SFTP 在提供认证机制的 SSH 之上工作,它支持相同的认证选项,包括用户名、密码,以及/或者公钥(公钥本身可能可选地带有密码)。如果你运行的是相对较新版本的 OpenSSH(它本身运行在 AIX、HP-UX、Iris、Linux、Cygwin、Mac OSX、Solaris、SNI、Digital Unix/Tru64/OSF、NeXT (!) 、SCO 等操作系统上),那么你很可能已经安装了它并可以继续。换句话说,找到一台支持某种形式的 SFTP 的计算机比找到一台支持你可以挂载的文件系统的计算机更容易。你看,我告诉过你测试很容易!

要开始使用入站适配器,只需复制粘贴 FTP 适配器,将所有出现的 FTP 重命名为 SFTP,酌情更改相关的配置值(端口、主机等),删除 client-mode 选项,然后就完成了!当然还有其他选项——许多其他选项允许你指定你的认证机制;例如公钥或用户名。这是一个熟悉的例子

<?xml version="1.0" encoding="UTF-8"?>
<beans ... xmlns:sftp="http://www.springframework.org/schema/integration/sftp">

    <context:component-scan base-package="org.springframework.integration.examples.filetransfer.core"/>
    <context:property-placeholder location="file://${user.home}/Desktop/sftp.properties" ignore-unresolvable="true"/>

    <sftp:inbound-channel-adapter
            remote-directory="${sftp.remotedir}"
            channel="sftpIn"
            auto-create-directories="true"
            host="${sftp.host}"
            auto-delete-remote-files-on-sync="false"
            username="${sftp.username}"
            password="${sftp.password}"
            filename-pattern=".*?jpg"
            local-working-directory="#{systemProperties['user.home']}/received_sftp_files"
            >
        <int:poller fixed-rate="10000"/>
    </sftp:inbound-channel-adapter>

    <int:channel id="sftpIn"/>

    <int:service-activator input-channel="sftpIn" ref="inboundFileProcessor"/>

</beans>

相当方便吧?规则与之前的例子相同:你的客户端代码将接收到一个 java.io.File 实例,你可以随意处理它。SFTP 出站适配器完善了这组功能

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:sftp="http://www.springframework.org/schema/integration/sftp">

    <context:component-scan base-package="org.springframework.integration.examples.filetransfer.core"/>
    <context:property-placeholder location="file://${user.home}/Desktop/sftp.properties" ignore-unresolvable="true"/>

    <int:channel id="outboundFiles"/>

    <sftp:outbound-channel-adapter
            remote-directory="${sftp.remotedir}"
            channel="outboundFiles"
            host="${sftp.host}"
            username="${sftp.username}"
            password="${sftp.password}"
    />
</beans>

何去何从?

思考一下哪些问题通常是文件导向的,或者本质上是批处理导向的,这很有用。Spring Integration 在通知你世界上有趣的事件(“新文件已放入文件夹!”)和整合数据方面做得非常出色;Spring Integration 是实现事件驱动架构的好方法。然而,一个包含一百万行记录的文件并不是一个事件。Spring Integration 在框架中没有处理大型批量文件负载的内置功能——那是 Spring Batch 的工作。因此,可以考虑一种方法,利用 Spring Integration 检测文件可用性作为作业的起源,然后启动一个 Spring Batch 作业。Spring Batch 没有处理不了的大型作业。Spring Batch 可以帮助你将包含一百万条记录的文件分解成事件大小的记录,以便 Spring Integration 更乐于处理。我喜欢将这两个框架想象成在事件驱动和数据处理的精妙芭蕾中相互交织的舞者!

总结

在本文中,我们讨论了 Spring Integration 中各种文件传输适配器,它们使得使用直接的文件系统挂载、FTP 和 SFTP 进行基于文件的集成变得非常愉快。

订阅 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

超越他人

VMware 提供培训和认证,助你加速进步。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将发生的活动

查看 Spring 社区所有即将发生的活动。

查看全部