可以将 SQL 适配器配置为接收 SQL Server 表或视图的定期数据更改消息。 可以指定让适配器执行的轮询语句,以对数据库进行轮询。 轮询语句可以是 SELECT 语句或返回结果集的存储过程。
有关适配器如何支持轮询的详细信息,请参阅 对使用轮询的入站呼叫的支持。
重要
如果要在单个应用程序中执行多个轮询作,则必须将 InboundID 连接属性指定为连接 URI 的一部分,使其唯一。 指定的入站 ID 将添加到操作命名空间,使其唯一。
本主题如何演示轮询
在本主题中,为演示 SQL 适配器如何支持接收数据更改消息,请创建一个用于轮询操作的 .NET 应用程序。 对于本主题,请将 PolledDataAvailableStatement 指定为:
SELECT COUNT(*) FROM Employee  
PolledDataAvailableStatement 必须返回包含正值的第一个单元格的结果集。 如果第一个单元格不包含正值,适配器不会执行轮询语句。
作为轮询语句的一部分,执行以下操作:
- 从 Employee 表中选择所有行。 
- 执行存储过程(MOVE_EMP_DATA),将所有记录从 Employee 表移动到 EmployeeHistory 表。 
- 执行存储过程(ADD_EMP_DETAILS),将新记录添加到 Employee 表。 此过程采用员工名称、指定和工资作为参数。 - 若要执行这些作,必须为 PollingStatement 绑定属性指定以下内容: 
SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000   
执行查询语句后,将选择 Employee 表中的所有记录,并接收来自 SQL Server 的消息。 适配器执行MOVE_EMP_DATA存储过程后,所有记录都会移动到 EmployeeHistory 表。 然后,执行ADD_EMP_DETAILS存储过程以向 Employee 表添加新记录。 下一次轮询执行将仅返回单个记录。 此周期将继续,直到关闭通道侦听器。
使用 SQL 适配器的绑定属性来配置轮询查询
下表汇总了用于配置适配器以接收数据更改消息的 SQL 适配器绑定属性。 必须将这些绑定属性指定为用于轮询的 .NET 应用程序的一部分。
| Binding 属性 | DESCRIPTION | 
|---|---|
| InboundOperationType | 指定是否要执行 轮询、类型轮询 还是 通知 入站作业。 默认值为 轮询。 | 
| 数据可用性声明 | 指定适配器执行的 SQL 语句,以确定是否有任何数据可用于轮询。 SQL 语句必须返回由行和列组成的结果集。 仅当某行可用时,才会执行 为 PollingStatement 绑定属性指定的 SQL 语句。 | 
| PollingIntervalInSeconds | 指定 SQL 适配器执行 为 PolledDataAvailableStatement 绑定属性指定的语句的间隔(以秒为单位)。 默认值为 30 秒。 轮询间隔定义了连续轮询之间的时间间隔。 如果语句在指定间隔内执行,适配器将等待间隔中的剩余时间。 | 
| PollingStatement | 指定要轮询 SQL Server 数据库表的 SQL 语句。 可以为轮询语句指定简单的 SELECT 语句或存储过程。 默认值为 null。 必须为 PollingStatement 指定一个值才能启用轮询。 仅当有数据可用于轮询时,才会执行轮询语句,该数据由 PolledDataAvailableStatement 绑定属性确定。 可以指定用分号分隔的任意数量的 SQL 语句。 | 
| PollWhileDataFound | 指定 SQL 适配器是否忽略轮询间隔并连续执行 为 PolledDataAvailableStatement 绑定属性指定的 SQL 语句(如果正在轮询的表中的数据可用)。 如果表中没有可用数据,适配器将还原为按指定的轮询间隔执行 SQL 语句。 默认值为 false。 | 
有关这些属性的更完整说明,请参阅 有关适用于 SQL Server 适配器绑定属性的 BizTalk 适配器的信息。 有关如何使用 SQL 适配器轮询 SQL Server 的完整说明,请阅读本主题的其余部分。
使用轮询请求消息
适配器调用代码中的轮询操作以轮询 SQL Server 数据库。 也就是说,适配器发送了一个轮询请求消息,该消息是通过 IInputChannel 通道接收到的。 轮询请求消息包含 PollingStatement 绑定属性指定的查询的结果集。 可以通过以下两种方式之一处理轮询消息:
- 若要使用节点值流式处理使用消息,必须在响应消息上调用 WriteBodyContents 方法,并向其传递实现节点值流式处理的 XmlDictionaryWriter 。 
- 若要使用节点流式处理来使用消息,可以在响应消息上调用 GetReaderAtBodyContents 以获取 XmlReader。 
关于本主题中使用的示例
本主题中的示例查询 Employee 表。 该示例还使用MOVE_EMP_DATA和ADD_EMP_DETAILS存储过程。 提供的示例中附带了一个脚本,用于生成这些构件。 有关示例的详细信息,请参阅 SQL 适配器的示例。 提供了一个基于本主题的示例 Polling_ChannelModel,还包括在 SQL 适配器示例中。
使用 WCF 通道模型接收用于轮询操作的入站消息
本部分提供有关如何编写 .NET 应用程序(通道模型)以使用 SQL 适配器接收入站轮询消息的说明。
从 SQL 适配器接收查询消息
- 在 Visual Studio 中创建Microsoft Visual C# 项目。 对于本主题,请创建控制台应用程序。 
- 在解决方案资源管理器中,添加对 - Microsoft.Adapters.Sql、- Microsoft.ServiceModel.Channels、- System.ServiceModel和- System.Runtime.Serialization的引用。
- 打开Program.cs文件并添加以下命名空间: - Microsoft.Adapters.Sql
- System.ServiceModel
- System.ServiceModel.Description
- System.ServiceModel.Channels
- System.Xml
 
- 指定连接 URI。 有关适配器连接 URI 的详细信息,请参阅 创建 SQL Server 连接 URI。 - Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?");
- 创建 SqlAdapterBinding 的实例,并设置配置轮询所需的绑定属性。 至少必须设置 InboundOperationType、 PolledDataAvailableStatement 和 PollingStatement 绑定属性。 有关用于配置轮询的绑定属性的详细信息,请参阅 支持使用轮询的入站呼叫。 - SqlAdapterBinding binding = new SqlAdapterBinding(); binding.InboundOperationType = InboundOperation.Polling; binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE"; binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";
- 创建绑定参数集合并设置凭据。 - ClientCredentials credentials = new ClientCredentials(); credentials.UserName.UserName = "<Enter user name here>"; credentials.UserName.Password = "<Enter password here>"; BindingParameterCollection bindingParams = new BindingParameterCollection(); bindingParams.Add(credentials);
- 创建通道侦听器并打开它。 通过在 SqlAdapterBinding 上调用 BuildChannelListener<IInputChannel> 方法来创建侦听器。 - IChannelListener<IInputChannel> listener = binding.BuildChannelListener<IInputChannel>(connectionUri, bindingParams); listener.Open();
- 通过在侦听器上调用 AcceptChannel 方法并打开它来获取 IInputChannel 通道。 - IInputChannel channel = listener.AcceptChannel(); channel.Open();
- 调用通道上的 Receive ,从适配器获取下一条 POLLINGSTMT 消息。 - Message message = channel.Receive();
- 使用 POLLINGSTMT 操作返回的结果集。 可以使用 XmlReader 或 XmlDictionaryWriter 来处理消息。 - XmlReader reader = message.GetReaderAtBodyContents();
- 完成处理请求后关闭通道。 - channel.Close()- 重要 - 在处理完 POLLINGSTMT 操作后,必须关闭通道。 未能关闭通道可能会影响代码的行为。 
- 在接收完数据更改消息后,关闭侦听器。 - listener.Close()- 重要 - 关闭侦听器不会关闭使用侦听器创建的通道。 必须显式关闭使用侦听器创建的每个通道。 
示例:
以下示例演示执行 Employee 表的轮询查询。 轮询语句执行以下任务:
- 从 Employee 表中选择所有记录。 
- 执行MOVE_EMP_DATA存储过程,将所有记录从 Employee 表移动到 EmployeeHistory 表。 
- 执行ADD_EMP_DETAILS存储过程,将单个记录添加到 Employee 表。 - 轮询消息保存在 - C:\PollingOutput.xml.
using System;  
using Microsoft.Adapters.Sql;  
using System.ServiceModel;  
using System.ServiceModel.Description;  
using System.ServiceModel.Channels;  
using System.Xml;  
namespace ConsoleApplication1  
{  
    class Program  
    {  
        static void Main(string[] args)  
        {  
            Console.WriteLine("Sample started. This sample will poll 5 times and will perform the following tasks:");  
            Console.WriteLine("Press any key to start polling...");  
            Console.ReadLine();  
            IChannelListener<IInputChannel> listener = null;  
            IInputChannel channel = null;  
            try  
            {  
                TimeSpan messageTimeout = new TimeSpan(0, 0, 30);  
                SqlAdapterBinding binding = new SqlAdapterBinding();  
                binding.InboundOperationType = InboundOperation.Polling;  
                binding.PolledDataAvailableStatement = "SELECT COUNT (*) FROM EMPLOYEE";  
                binding.PollingStatement = "SELECT * FROM Employee;EXEC MOVE_EMP_DATA;EXEC ADD_EMP_DETAILS John, Tester, 100000";  
                Uri ConnectionUri = new Uri("mssql://mysqlserver//mydatabase?");  
                ClientCredentials credentials = new ClientCredentials();  
                credentials.UserName.UserName = "<Enter user name here>";  
                credentials.UserName.Password = "<Enter password here>";  
                BindingParameterCollection bindingParams = new BindingParameterCollection();  
                bindingParams.Add(credentials);  
                listener = binding.BuildChannelListener<IInputChannel>(ConnectionUri, bindingParams);  
                listener.Open();  
                channel = listener.AcceptChannel();  
                channel.Open();  
                Console.WriteLine("Channel and Listener opened...");  
                Console.WriteLine("\nWaiting for polled data...");  
                Console.WriteLine("Receive request timeout is {0}", messageTimeout);  
                // Poll five times with the specified message timeout   
                // If a timeout occurs polling will be aborted  
                for (int i = 0; i < 5; i++)  
                {  
                    Console.WriteLine("Polling: " + i);  
                    Message message = null;  
                    XmlReader reader = null;  
                    try  
                    {  
                        //Message is received so process the results  
                        message = channel.Receive(messageTimeout);  
                    }  
                    catch (System.TimeoutException toEx)  
                    {  
                        Console.WriteLine("\nNo data for request number {0}: {1}", i + 1, toEx.Message);  
                        continue;  
                    }  
                    // Get the query results using an XML reader  
                    try  
                    {  
                        reader = message.GetReaderAtBodyContents();  
                    }  
                    catch (Exception ex)  
                    {  
                        Console.WriteLine("Exception :" + ex);  
                        throw;  
                    }  
                    XmlDocument doc = new XmlDocument();  
                    doc.Load(reader);  
                    using (XmlWriter writer = XmlWriter.Create("C:\\PollingOutput.xml"))  
                    {  
                        doc.WriteTo(writer);  
                        Console.WriteLine("The polling response is saved at 'C:\\PollingOutput.xml'");  
                    }  
                    // return the cursor  
                    Console.WriteLine();  
                    // close the reader  
                    reader.Close();  
                    message.Close();  
                }  
                Console.WriteLine("\nPolling done -- hit <RETURN> to finish");  
                Console.ReadLine();  
            }  
            catch (Exception ex)  
            {  
                Console.WriteLine("Exception is: " + ex.Message);  
                if (ex.InnerException != null)  
                {  
                    Console.WriteLine("Inner Exception is: " + ex.InnerException.Message);  
                }  
            }  
            finally  
            {  
                // IMPORTANT: close the channel and listener to stop polling  
                if (channel != null)  
                {  
                    if (channel.State == CommunicationState.Opened)  
                        channel.Close();  
                    else  
                        channel.Abort();  
                }  
                if (listener != null)  
                {  
                    if (listener.State == CommunicationState.Opened)  
                        listener.Close();  
                    else  
                        listener.Abort();  
                }  
            }  
        }  
    }  
}