使用 Azure AI 搜索推送 API 为任何数据编制索引
REST API 是将数据推送到 Azure AI 搜索索引的最灵活方式。 可以使用任何编程语言或与任何可以将 JSON 请求发布到终结点的应用交互。
在这里,你将了解如何有效地使用 REST API 并探索可用的操作。 然后,你将了解 .NET Core 代码,并了解如何通过 API 优化添加大量数据的过程。
支持的 REST API 操作
AI 搜索提供了两个受支持的 REST API。 搜索和管理 API。 本模块重点介绍搜索 REST API,这些 API 提供对五项搜索功能的操作:
| 功能 / 特点 | 操作 |
|---|---|
| 索引 | 创建、删除、更新和配置。 |
| 文档 | 获取、添加、更新和删除。 |
| 索引 | 在有限的数据源上配置数据源和计划。 |
| 技能组 | 获取、创建、删除、列出和更新。 |
| 同义词映射 | 获取、创建、删除、列出和更新。 |
如何调用搜索 REST API
如果要调用任何搜索 API,需要:
- 使用搜索服务提供的 HTTPS 终结点(通过默认端口 443),必须在 URI 中包含一个 api-version。
- 请求标头必须包含 api-key 属性。
若要查找终结点、api-version 和 api-key,请转到 Azure 门户。
在门户中,导航到搜索服务,然后选择“搜索资源管理器”。 REST API 终结点位于请求 URL 字段中。 URL 的第一部分是终结点(例如 https://azsearchtest.search.windows.net),而查询字符串显示 api-version(例如 api-version=2023-07-01-Preview)。
若要查找左侧的 api-key,请选择“密钥”。 如果使用 REST API 不仅仅是来查询索引,则可以使用主管理密钥或辅助管理密钥。 如果只需要搜索索引,可以创建和使用查询键。
若要在索引中添加、更新或删除数据,需要使用管理密钥。
将数据添加到索引
通过以下格式的索引功能使用 HTTP POST 请求:
POST https://[service name].search.windows.net/indexes/[index name]/docs/index?api-version=[api-version]
请求正文需要让 REST 终结点知道要对文档执行的操作、应用操作的文档以及要使用的数据。
JSON 必须采用以下格式:
{
"value": [
{
"@search.action": "upload (default) | merge | mergeOrUpload | delete",
"key_field_name": "unique_key_of_document", (key/value pair for key field from index schema)
"field_name": field_value (key/value pairs matching index schema)
...
},
...
]
}
| 行动 | 说明 |
|---|---|
| upload | 与 SQL 中的更新插入类似,将创建或替换文档。 |
| 合并 | Merge 使用指定字段更新现有文档。 如果找不到任何文档,merge 将失败。 |
| mergeOrUpload | Merge 使用指定字段更新现有文档,如果文档不存在,则上传该文档。 |
| 删除 | 若要删除整个文档,只需指定 key_field_name。 |
如果请求成功,API 将返回 200 状态代码。
注释
有关所有响应代码和错误消息的完整列表,请参阅添加、更新或删除文档(Azure AI 搜索 REST API)
此示例 JSON 在上一单元中上传客户记录:
{
"value": [
{
"@search.action": "upload",
"id": "5fed1b38309495de1bc4f653",
"firstName": "Sims",
"lastName": "Arnold",
"isAlive": false,
"age": 35,
"address": {
"streetAddress": "Sumner Place",
"city": "Canoochee",
"state": "Palau",
"postalCode": "1558"
},
"phoneNumbers": [
{
"phoneNumber": {
"type": "home",
"number": "+1 (830) 465-2965"
}
},
{
"phoneNumber": {
"type": "home",
"number": "+1 (889) 439-3632"
}
}
]
}
]
}
可以根据需要在值数组中添加任意数量的文档。 但是,为了获得最佳性能,请考虑将请求中的文档批量处理为最多 1,000 个文档,或总大小为 16 MB。
使用 .NET Core 为任何数据编制索引
为了获得最佳性能,请使用最新的 Azure.Search.Document 客户端库(当前版本为 11)。 可以使用 NuGet 安装客户端库:
dotnet add package Azure.Search.Documents --version 11.4.0
索引的执行情况取决于六项关键因素:
- 搜索服务层级以及启用的副本和分区数。
- 索引架构的复杂性。 减少每个字段拥有的属性(可搜索、可分面、可排序)。
- 每批中的文档数,最佳大小将取决于索引架构和文档的大小。
- 方法的多线程程度。
- 处理错误和限制。 使用指数退避重试策略。
- 数据所在的位置,请尝试将数据索引编制为尽可能接近搜索索引。 例如,从 Azure 环境内部运行 upload。
确定最佳批大小
由于确定最佳批大小是提高性能的关键因素,让我们看一下代码中的一种方法。
public static async Task TestBatchSizesAsync(SearchClient searchClient, int min = 100, int max = 1000, int step = 100, int numTries = 3)
{
DataGenerator dg = new DataGenerator();
Console.WriteLine("Batch Size \t Size in MB \t MB / Doc \t Time (ms) \t MB / Second");
for (int numDocs = min; numDocs <= max; numDocs += step)
{
List<TimeSpan> durations = new List<TimeSpan>();
double sizeInMb = 0.0;
for (int x = 0; x < numTries; x++)
{
List<Hotel> hotels = dg.GetHotels(numDocs, "large");
DateTime startTime = DateTime.Now;
await UploadDocumentsAsync(searchClient, hotels).ConfigureAwait(false);
DateTime endTime = DateTime.Now;
durations.Add(endTime - startTime);
sizeInMb = EstimateObjectSize(hotels);
}
var avgDuration = durations.Average(timeSpan => timeSpan.TotalMilliseconds);
var avgDurationInSeconds = avgDuration / 1000;
var mbPerSecond = sizeInMb / avgDurationInSeconds;
Console.WriteLine("{0} \t\t {1} \t\t {2} \t\t {3} \t {4}", numDocs, Math.Round(sizeInMb, 3), Math.Round(sizeInMb / numDocs, 3), Math.Round(avgDuration, 3), Math.Round(mbPerSecond, 3));
// Pausing 2 seconds to let the search service catch its breath
Thread.Sleep(2000);
}
Console.WriteLine();
}
该方法是增加批大小并监视接收有效响应所需的时间。 代码在 100 个文档步骤中从 100 循环到 1000。 对于每个批大小,它会输出文档大小、获得响应的时间以及每 MB 的平均时间。 运行此代码会得到如下结果:
在上面的示例中,吞吐量的最佳批大小是每秒 2.499 MB,每批 800 个文档。
实现指数回退重试策略
如果索引由于过载而开始限制请求,它会以 503(请求因负载过重而被拒绝)或 207(某些文档在批处理中失败)状态作出响应。 必须处理这些响应,退避是个不错的策略。 退避意味着暂停一段时间,然后再重试请求。 如果为每个错误增加此时间,你将以指数方式退避。
查看以下代码:
// Implement exponential backoff
do
{
try
{
attempts++;
result = await searchClient.IndexDocumentsAsync(batch).ConfigureAwait(false);
var failedDocuments = result.Results.Where(r => r.Succeeded != true).ToList();
// handle partial failure
if (failedDocuments.Count > 0)
{
if (attempts == maxRetryAttempts)
{
Console.WriteLine("[MAX RETRIES HIT] - Giving up on the batch starting at {0}", id);
break;
}
else
{
Console.WriteLine("[Batch starting at doc {0} had partial failure]", id);
Console.WriteLine("[Retrying {0} failed documents] \n", failedDocuments.Count);
// creating a batch of failed documents to retry
var failedDocumentKeys = failedDocuments.Select(doc => doc.Key).ToList();
hotels = hotels.Where(h => failedDocumentKeys.Contains(h.HotelId)).ToList();
batch = IndexDocumentsBatch.Upload(hotels);
Task.Delay(delay).Wait();
delay = delay * 2;
continue;
}
}
return result;
}
catch (RequestFailedException ex)
{
Console.WriteLine("[Batch starting at doc {0} failed]", id);
Console.WriteLine("[Retrying entire batch] \n");
if (attempts == maxRetryAttempts)
{
Console.WriteLine("[MAX RETRIES HIT] - Giving up on the batch starting at {0}", id);
break;
}
Task.Delay(delay).Wait();
delay = delay * 2;
}
} while (true);
代码跟踪批处理中失败的文档。 如果发生错误,它会等待延迟,然后将下一个错误的延迟加倍。
最后,有一个最大的重试次数,如果达到这个最大次数,该程序便运作。
使用线程处理来提高性能
可以通过将上述退避策略与线程处理方法相结合来完善你的文档上传应用。 下面是一些示例代码:
public static async Task IndexDataAsync(SearchClient searchClient, List<Hotel> hotels, int batchSize, int numThreads)
{
int numDocs = hotels.Count;
Console.WriteLine("Uploading {0} documents...\n", numDocs.ToString());
DateTime startTime = DateTime.Now;
Console.WriteLine("Started at: {0} \n", startTime);
Console.WriteLine("Creating {0} threads...\n", numThreads);
// Creating a list to hold active tasks
List<Task<IndexDocumentsResult>> uploadTasks = new List<Task<IndexDocumentsResult>>();
for (int i = 0; i < numDocs; i += batchSize)
{
List<Hotel> hotelBatch = hotels.GetRange(i, batchSize);
var task = ExponentialBackoffAsync(searchClient, hotelBatch, i);
uploadTasks.Add(task);
Console.WriteLine("Sending a batch of {0} docs starting with doc {1}...\n", batchSize, i);
// Checking if we've hit the specified number of threads
if (uploadTasks.Count >= numThreads)
{
Task<IndexDocumentsResult> firstTaskFinished = await Task.WhenAny(uploadTasks);
Console.WriteLine("Finished a thread, kicking off another...");
uploadTasks.Remove(firstTaskFinished);
}
}
// waiting for the remaining results to finish
await Task.WhenAll(uploadTasks);
DateTime endTime = DateTime.Now;
TimeSpan runningTime = endTime - startTime;
Console.WriteLine("\nEnded at: {0} \n", endTime);
Console.WriteLine("Upload time total: {0}", runningTime);
double timePerBatch = Math.Round(runningTime.TotalMilliseconds / (numDocs / batchSize), 4);
Console.WriteLine("Upload time per batch: {0} ms", timePerBatch);
double timePerDoc = Math.Round(runningTime.TotalMilliseconds / numDocs, 4);
Console.WriteLine("Upload time per document: {0} ms \n", timePerDoc);
}
此代码对实现退避策略的函数 ExponentialBackoffAsync 使用异步调用。 可以使用线程(例如,处理器的核心数)来调用函数。 使用了最大线程数时,代码将等待任何线程完成。 然后,它会创建一个新线程,直到上传所有文档。

