你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
Azure IoT 中心支持 OASIS 高级消息队列协议 (AMQP) 版本 1.0 ,通过面向设备和面向服务的终结点提供各种功能。 本文档介绍如何使用 AMQP 客户端连接到 IoT 中心以使用 IoT 中心功能。
服务客户端
连接到 IoT 中心并进行身份验证(服务客户端)
若要使用 AMQP 连接到 IoT 中心,客户端可以使用 基于声明的安全(CBS) 或 简单身份验证和安全层(SASL)身份验证。
服务客户端需要以下信息:
| 信息 | 价值 |
|---|---|
| IoT 中心主机名 | <iot-hub-name>.azure-devices.net |
| 密钥名称 | service |
| 访问密钥 | 与服务关联的主密钥或辅助密钥 |
| 共享访问签名 | 采用以下格式的短期共享访问签名: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} 若要获取生成此签名的代码,请参阅 控制对 IoT 中心的访问。 |
以下代码片段使用 Python 中的 uAMQP 库 通过发送方链接连接到 IoT 中心。
import uamqp
import urllib
import time
# Use generate_sas_token implementation available here:
# https://free.blessedness.top/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>' # example: '/messages/devicebound'
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)
调用云到设备的消息(服务端客户端)
若要了解服务与 IoT 中心之间以及设备和 IoT 中心之间的云到设备消息交换,请参阅 了解 IoT 中心的云到设备消息传送。 服务客户端使用两个链接来发送消息,并接收以前从设备发送的消息的反馈,如下表所述:
| 创建者 | 链接类型 | 链接路径 | Description |
|---|---|---|---|
| 服务 | 发件人链接 | /messages/devicebound |
服务向设备发送云到设备消息的链接。 通过此链接发送的消息将属性 To 设置为目标设备的接收方链接路径 /devices/<deviceID>/messages/devicebound。 |
| 服务 | 接收器链接 | /messages/serviceBound/feedback |
设备通过此链接向服务发送的完成、拒绝和放弃反馈消息。 有关反馈消息的详细信息,请参阅 从 IoT 中心了解云到设备的消息传送。 |
以下代码片段演示如何使用 Python 中的 uAMQP 库创建云到设备的消息并将其发送到设备。
import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full' # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
application_properties=app_props)
# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()
# Close the client if it's not needed
send_client.close()
若要接收反馈,服务客户端将创建接收方链接。 以下代码片段演示如何在 Python 中使用 uAMQP 库创建链接:
import json
operation = '/messages/serviceBound/feedback'
# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
print('received a message')
# Check content_type in message property to identify feedback messages coming from device
if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
msg_body_raw = msg.get_data()
msg_body_str = ''.join(msg_body_raw)
msg_body = json.loads(msg_body_str)
print(json.dumps(msg_body, indent=2))
print('******************')
for feedback in msg_body:
print('feedback received')
print('\tstatusCode: ' + str(feedback['statusCode']))
print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
print('\tdeviceId: ' + str(feedback['deviceId']))
print
else:
print('unknown message:', msg.properties.content_type)
如前面的代码所示,云到设备的反馈消息的内容类型为application/vnd.microsoft.iothub.feedback.json。 可以使用消息 JSON 正文中的属性推断原始消息的传递状态:
反馈正文中的键
statusCode具有以下值之一: Success、 Expired、 DeliveryCountExceeded、 Rejected 或 Purged。反馈正文中的密钥
deviceId具有目标设备的 ID。反馈正文中的密钥
originalMessageId包含服务发送的原始云到设备消息的 ID。 可以使用此传递状态将反馈信息关联到设备云消息。
接收遥测消息(服务客户端)
默认情况下,IoT 中心将引入的设备遥测消息存储在内置事件中心。 服务客户端可以使用 AMQP 协议来接收存储的事件。
为此,服务客户端首先需要连接到 IoT 中心终结点,并接收到内置事件中心的重定向地址。 然后,服务客户端使用提供的地址连接到内置事件中心。
在每个步骤中,客户端需要提供以下信息:
有效的服务凭据(服务共享访问签名令牌)。
格式正确的路径,指向其计划从中检索消息的使用者组分区。 对于给定使用者组和分区 ID,路径具有以下格式:
/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>(默认使用者组为$Default)。一个可选的筛选谓词,用于指定分区中的起点。 此谓词可以采用序列号、偏移量或排队时间戳的形式。
以下代码片段使用 Python 中的 uAMQP 库 来演示前面的步骤:
import json
import uamqp
import urllib
import time
# Use the generate_sas_token implementation that's available here: https://free.blessedness.top/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
consumer_group='$Default', p_id=0)
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'
# Helper function to set the filtering predicate on the source URI
def set_endpoint_filter(uri, endpoint_filter=''):
source_uri = uamqp.address.Source(uri)
source_uri.set_filter(endpoint_filter)
return source_uri
receive_client = uamqp.ReceiveClient(
set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
redirect.address, policy_name, access_key)
receive_client = uamqp.ReceiveClient(set_endpoint_filter(
redirect.address, endpoint_filter), auth=sas_auth, debug=True)
# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
print('\t: ' + str(msg.annotations['x-opt-offset']))
print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))
对于给定的设备 ID,IoT 中心使用设备 ID 的哈希来确定要在其中存储其消息的分区。 前面的代码片段演示了如何从单个此类分区接收事件。 但是,典型的应用程序通常需要检索存储在所有事件中心分区中的事件。
设备客户端
连接到 IoT 中心并进行身份验证(设备客户端)
若要使用 AMQP 连接到 IoT 中心,设备可以使用 基于声明的安全(CBS) 或 简单身份验证和安全层(SASL) 身份验证。
设备客户端需要以下信息:
| 信息 | 价值 |
|---|---|
| IoT 中心主机名 | <iot-hub-name>.azure-devices.net |
| 访问密钥 | 与设备关联的主密钥或辅助密钥 |
| 共享访问签名 | 采用以下格式的短期共享访问签名: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} 若要获取生成此签名的代码,请参阅 使用共享访问签名控制对 IoT 中心的访问权限。 |
以下代码片段使用 Python 中的 uAMQP 库 通过发送方链接连接到 IoT 中心。
import uamqp
import urllib
import uuid
# Use generate_sas_token implementation available here:
# https://free.blessedness.top/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
hostname=hostname, device_id=device_id), access_key, None)
# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)
支持以下链接路径作为设备操作:
| 创建者 | 链接类型 | 链接路径 | Description |
|---|---|---|---|
| 设备 | 接收器链接 | /devices/<deviceID>/messages/devicebound |
每个目的设备接收云到设备消息的链接,这些消息是专门发送给设备的。 |
| 设备 | 发件人链接 | /devices/<deviceID>/messages/events |
从设备发送的设备到云的消息通过此链接发送。 |
| 设备 | 发件人链接 | /messages/serviceBound/feedback |
设备通过此链接发送到服务的云到设备消息反馈。 |
接收云到设备命令(设备客户端)
发送到设备的云到设备命令会到达 /devices/<deviceID>/messages/devicebound 链接。 设备可以批量接收这些消息,并根据需要在消息中使用消息数据有效负载、消息属性、批注或应用程序属性。
以下代码片段使用 Python 中的 uAMQP 库 通过设备接收云到设备的消息。
# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
# Property 'to' is set to: '/devices/device1/messages/devicebound',
print('\tto: ' + str(msg.properties.to))
# Property 'message_id' is set to value provided by the service
print('\tmessage_id: ' + str(msg.properties.message_id))
# Other properties are present if they were provided by the service
print('\tcreation_time: ' + str(msg.properties.creation_time))
print('\tcorrelation_id: ' +
str(msg.properties.correlation_id))
print('\tcontent_type: ' + str(msg.properties.content_type))
print('\treply_to_group_id: ' +
str(msg.properties.reply_to_group_id))
print('\tsubject: ' + str(msg.properties.subject))
print('\tuser_id: ' + str(msg.properties.user_id))
print('\tgroup_sequence: ' +
str(msg.properties.group_sequence))
print('\tcontent_encoding: ' +
str(msg.properties.content_encoding))
print('\treply_to: ' + str(msg.properties.reply_to))
print('\tabsolute_expiry_time: ' +
str(msg.properties.absolute_expiry_time))
print('\tgroup_id: ' + str(msg.properties.group_id))
# Message sequence number in the built-in event hub
print('\tx-opt-sequence-number: ' +
str(msg.annotations['x-opt-sequence-number']))
发送遥测消息(设备客户端)
还可以使用 AMQP 从设备发送遥测消息。 设备可以选择提供应用程序属性字典或各种消息属性,例如消息 ID。
若要基于消息正文路由消息,必须将 content_type 属性设置为 application/json;charset=utf-8。 若要详细了解如何基于消息属性或消息正文路由消息,请参阅 IoT 中心消息路由查询语法 文档。
以下代码片段使用 Python 中的 uAMQP 库 从设备发送设备到云的消息。
# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)
send_client = uamqp.SendClient(uri, debug=True)
# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None
# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }
# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)
send_client.queue_message(message)
results = send_client.send_all_messages()
for result in results:
if result == uamqp.constants.MessageState.SendFailed:
print result
其他说明
AMQP 连接可能因网络故障或身份验证令牌过期(在代码中生成)而中断。 服务客户端必须处理这些情况,并根据需要重新建立连接和链接。 如果身份验证令牌过期,客户端可以通过在令牌过期前主动续订令牌来避免连接断开。
客户端有时必须能够正确处理链接重定向。 若要了解此类操作,请参阅 AMQP 客户端文档。
后续步骤
若要了解有关 AMQP 协议的详细信息,请参阅 AMQP v1.0 规范。
若要了解有关 IoT 中心消息传送的详细信息,请参阅: