U kunt het pull-model van de wijzigingenfeed gebruiken om de Azure Cosmos DB-wijzigingenfeed in uw eigen tempo te gebruiken. Net als bij de verwerker van de wijzigingenfeed kunt u het pull-model van de wijzigingenfeed gebruiken om de verwerking van wijzigingen voor meerdere gebruikers van wijzigingenfeeds te parallelliseren.
Je kunt vervolgtokens echter niet naar een lease converteren en andersom.
Hier volgen enkele belangrijke verschillen tussen de wijzigingenfeedprocessor en het pull-model van de wijzigingenfeed:
Als u de wijzigingenfeed wilt verwerken met behulp van het pull-model, maakt u een exemplaar van FeedIterator. Wanneer u in eerste instantie maakt FeedIterator, moet u een vereiste ChangeFeedStartFrom waarde opgeven, die bestaat uit zowel de beginpositie voor het lezen van wijzigingen als de waarde waarvoor u wilt gebruiken FeedRange. Het FeedRange is een bereik van partitiesleutelwaarden en specificeert de items die kunnen worden gelezen uit de veranderingsfeed met behulp van die specifieke FeedIterator. U moet ook een vereiste ChangeFeedMode waarde opgeven voor de modus waarin u wijzigingen wilt verwerken: de nieuwste versie of alle versies en verwijderingen. Gebruik ChangeFeedMode.LatestVersion of ChangeFeedMode.AllVersionsAndDeletes om aan te geven welke modus u wilt gebruiken om de wijzigingenfeed te lezen. Wanneer u de modus Alle versies en Verwijderen gebruikt, moet u een startwaarde voor de wijzigingsfeed selecteren, ofwel Now() of een specifiek vervolgtoken.
U kunt desgewenst ChangeFeedRequestOptions opgeven om een PageSizeHint in te stellen. Wanneer deze eigenschap is ingesteld, wordt het maximum aantal ontvangen items per pagina ingesteld. Als bewerkingen in de bewaakte verzameling worden uitgevoerd via opgeslagen procedures, blijft het transactiebereik behouden bij het lezen van items uit de wijzigingenfeed. Als gevolg hiervan kan het aantal ontvangen items hoger zijn dan de opgegeven waarde, zodat de items die door dezelfde transactie zijn gewijzigd, worden geretourneerd als onderdeel van één atomische batch.
Hier volgt een voorbeeld van hoe u FeedIterator kunt verkrijgen in de meest recente versieversiemodus die entiteitsobjecten teruggeeft, in dit geval een User object.
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Advies
Voor versies ouder dan 3.34.0, kan de meest recente versiemodus worden gebruikt door de instelling in te stellen ChangeFeedMode.Incremental. Zowel Incremental als LatestVersion verwijzen naar de nieuwste versiemodus van de wijzigingenfeed, en toepassingen die beide modus gebruiken, zien hetzelfde gedrag.
Alle versies en verwijderingsmodus is in preview en kan worden gebruikt met preview .NET SDK-versies >= 3.32.0-preview. Hier volgt een voorbeeld voor het verkrijgen van FeedIterator in alle versies en de verwijdermodus die User objecten teruggeeft.
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
De wijzigingenfeed via streams gebruiken
FeedIterator Voor beide modi van de wijzigingsfeeds zijn twee opties beschikbaar. Naast de voorbeelden die entiteitsobjecten retourneren, kunt u ook het antwoord verkrijgen met Stream ondersteuning. Met streams kunt u gegevens lezen zonder deze eerst gedeserialiseerd te hebben, zodat u op clientresources bespaart.
Hier volgt een voorbeeld van hoe u de meest recente versiemodus kunt verkrijgen FeedIterator die retourneert Stream:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
De wijzigingen voor een hele container verwerken
Als u geen FeedRange-parameter aan FeedIterator opgeeft, kunt u de wijzigingenfeed van een hele container in uw eigen tempo verwerken. Hier volgt een voorbeeld, dat begint met het lezen van alle wijzigingen, beginnend bij het huidige tijdstip met behulp van de nieuwste versiemodus:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Omdat de wijzigingenfeed effectief een oneindige lijst is met items die alle toekomstige schrijf- en updates omvatten, is de waarde altijd HasMoreResultstrue. Wanneer u de wijzigingenfeed probeert te lezen en er geen nieuwe wijzigingen beschikbaar zijn, ontvangt u een antwoord met NotModified de status. Dit verschilt van het ontvangen van een antwoord zonder wijzigingen en OK status. Het is mogelijk om lege reacties op wijzigingenfeeds te krijgen terwijl er meer wijzigingen beschikbaar zijn en u moet doorgaan met peilen totdat u deze ontvangt NotModified. In het voorgaande voorbeeld wordt NotModified verwerkt door vijf seconden te wachten voordat je opnieuw controleert op wijzigingen.
De wijzigingen voor een partitiesleutel gebruiken
In sommige gevallen wilt u mogelijk alleen de wijzigingen voor een specifieke partitiesleutel verwerken. U kunt een specifieke partitiesleutel verkrijgen FeedIterator en de wijzigingen op dezelfde manier verwerken als voor een hele container.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
FeedRange gebruiken voor parallelle uitvoering
In de change feed-verwerker wordt werk automatisch verspreid over meerdere gebruikers. In het wijzigingenfeed-pullmodel kunt u de FeedRange gebruiken om de verwerking van de wijzigingenfeed te parallelliseren. A FeedRange vertegenwoordigt een bereik van partitiesleutelwaarden.
Hier is een voorbeeld dat laat zien hoe u een lijst met intervallen voor uw container kunt opvragen.
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Wanneer u een lijst FeedRange met waarden voor uw container krijgt, krijgt u er één FeedRange per fysieke partitie.
Met behulp van een FeedRangekunt u een FeedIterator maken om de verwerking van de wijzigingenfeed te parallelliseren op meerdere computers of threads. In tegenstelling tot het vorige voorbeeld dat laat zien hoe u een FeedIterator voor de hele container of één partitiesleutel kunt verkrijgen, kunt u FeedRanges gebruiken om meerdere FeedIterators te verkrijgen, waarmee de wijzigingenfeed parallel kan worden verwerkt.
In het geval dat u FeedRanges wilt gebruiken, moet u een orchestratorproces hebben dat FeedRanges verkrijgt en distribueert naar die machines. Deze distributie kan het volgende zijn:
- Het gebruik van
FeedRange.ToJsonString en de distributie van deze tekenreekswaarde. De consumenten kunnen deze waarde gebruiken met FeedRange.FromJsonString.
- Als de distributie nog aan de gang is, geef dan de
FeedRange objectverwijzing door.
Hier volgt een voorbeeld waarin wordt uitgelegd hoe u kunt lezen vanaf het begin van de wijzigingenfeed van de container met behulp van twee hypothetische afzonderlijke machines die parallel lezen.
Machine 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Machine 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Vervolgtokens opslaan
U kunt de positie van uw FeedIterator bestand opslaan door het vervolgtoken te verkrijgen. Een vervolgtoken is een tekenreekswaarde die de laatst verwerkte wijzigingen van uw FeedIterator bijhoudt en de FeedIterator laatste verwerkte wijzigingen later kan hervatten. Het vervolgtoken, indien opgegeven, heeft voorrang op de begintijd en begint vanaf beginwaarden. De volgende code leest de wijzigingenfeed door sinds het maken van de container. Nadat er geen wijzigingen meer beschikbaar zijn, wordt een vervolgtoken opgeslagen, zodat het verbruik van de wijzigingenfeed later kan worden hervat.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
Wanneer u de nieuwste versiemodus gebruikt, verloopt het FeedIterator vervolgtoken nooit zolang de Azure Cosmos DB-container nog steeds bestaat. Wanneer u alle versies gebruikt en de modus verwijdert, is het FeedIterator vervolgtoken geldig zolang de wijzigingen in het bewaarvenster voor continue back-ups zijn opgetreden.
Als u de wijzigingenfeed wilt verwerken met behulp van het pull-model, maakt u een exemplaar van Iterator<FeedResponse<JsonNode>> responseIterator. Wanneer u CosmosChangeFeedRequestOptions maakt, moet u specificeren waar u moet beginnen met het lezen van de wijzigingenfeed en de FeedRange parameter doorgeven die u wilt gebruiken. Het FeedRange is een bereik van partitiesleutelwaarden waarmee de items worden opgegeven die uit de wijzigingenfeed kunnen worden gelezen.
Als u de wijzigingenfeed wilt lezen in alle versies en de modus Voor verwijderen, moet u ook opgeven allVersionsAndDeletes() wanneer u de CosmosChangeFeedRequestOptionsfeed maakt. Alle versies en verwijderingsmodus bieden geen ondersteuning voor het verwerken van de wijzigingenfeed vanaf het begin of vanaf een bepaald tijdstip. U moet wijzigingen van nu of van een vervolgtoken verwerken. Alle versies en verwijdermodus is in preview en is beschikbaar in Java SDK-versie >= 4.42.0.
De wijzigingen voor een hele container verwerken
Als u opgeeft FeedRange.forFullRange(), kunt u de wijzigingenfeed voor een hele container in uw eigen tempo verwerken. U kunt desgewenst een waarde opgeven in byPage(). Wanneer deze eigenschap is ingesteld, wordt het maximum aantal ontvangen items per pagina ingesteld.
Hier volgt een voorbeeld van het verkrijgen van een responseIterator waarde in de meest recente versiemodus:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Hier is een voorbeeld van hoe je een responseIterator kunt krijgen in alle versies en verwijdermodus:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Vervolgens kunnen we de resultaten herhalen. Omdat de wijzigingenfeed effectief een oneindige lijst met items is die alle toekomstige schrijf- en updates omvat, is de waarde altijd responseIterator.hasNext()true. Hier volgt een voorbeeld in de nieuwste versiemodus, waarin alle wijzigingen worden gelezen, beginnend vanaf het begin. Elke iteratie blijft een vervolgtoken behouden nadat alle gebeurtenissen zijn verwerkt. Het gaat verder vanaf het laatst verwerkte punt in de wijzigingenstroom en wordt verwerkt met behulp van createForProcessingFromContinuation:
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
De wijzigingen van een partitiesleutel gebruiken
In sommige gevallen wilt u mogelijk alleen de wijzigingen voor een specifieke partitiesleutel verwerken. U kunt de wijzigingen voor een specifieke partitiesleutel op dezelfde manier verwerken als voor een hele container. Hier volgt een voorbeeld waarin de meest recente versiemodus wordt gebruikt:
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
FeedRange gebruiken voor parallelle uitvoering
In de change feed-verwerker wordt werk automatisch verspreid over meerdere gebruikers. In het wijzigingenfeed-pullmodel kunt u de FeedRange gebruiken om de verwerking van de wijzigingenfeed te parallelliseren. A FeedRange vertegenwoordigt een bereik van partitiesleutelwaarden.
Hier volgt een voorbeeld waarin de meest recente versiemodus wordt gebruikt, waarin wordt getoond hoe u een lijst met bereiken voor uw container kunt ophalen:
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
Wanneer u een lijst met FeedRanges voor uw container ophaalt, krijgt u er één FeedRange per fysieke partitie.
Met behulp van een FeedRangekunt u de verwerking van de wijzigingenfeed parallelliseren op meerdere computers of threads. In tegenstelling tot het vorige voorbeeld waarin u hebt gezien hoe u wijzigingen voor de hele container of één partitiesleutel verwerkt, kunt u FeedRanges gebruiken om de wijzigingenfeed parallel te verwerken.
In het geval dat u FeedRanges wilt gebruiken, moet u een orchestratorproces hebben dat FeedRanges verkrijgt en distribueert naar die machines. Deze distributie kan het volgende zijn:
- Het gebruik van
FeedRange.toString() en de distributie van deze tekenreekswaarde.
- Als de distributie nog aan de gang is, geef dan de
FeedRange objectverwijzing door.
Hier volgt een voorbeeld waarin de meest recente versiemodus wordt gebruikt. Het laat zien hoe u vanaf het begin van de wijzigingenfeed van de container kunt lezen met behulp van twee hypothetische afzonderlijke machines die parallel worden gelezen:
Machine 1:
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Machine 2:
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Als u de wijzigingenfeed wilt verwerken met behulp van het pull-model, maakt u een exemplaar van responseIterator met het type ItemPaged[Dict[str, Any]].
Wanneer u de change feed-API aanroept, moet u opgeven waar u moet beginnen met het lezen van de wijzigingenfeed en de feed_range parameter doorgeven die u wilt gebruiken.
Het feed_range is een bereik van partitiesleutelwaarden waarmee de items worden opgegeven die uit de wijzigingenfeed kunnen worden gelezen.
U kunt ook de parameter opgeven mode voor de wijzigingsfeedmodus waarin u wijzigingen wilt verwerken: LatestVersion of AllVersionsAndDeletes. De standaardwaarde is LatestVersion.
Gebruik LatestVersion of AllVersionsAndDeletes om aan te geven welke modus u wilt gebruiken om de wijzigingenfeed te lezen.
Wanneer u de AllVersionsAndDeletes modus gebruikt, kunt u beginnen met het verwerken van wijzigingen vanaf nu of vanaf een continuation token.
Het lezen van de wijzigingenfeed vanaf het begin of vanaf een bepaald tijdstip wordt start_time niet ondersteund.
De wijzigingen voor een hele container verwerken
Als u geen parameter opgeeft feed_range , kunt u de wijzigingenfeed van een hele container in uw eigen tempo verwerken.
Hier is een voorbeeld van hoe je responseIterator kunt verkrijgen in de LatestVersion-modus vanuit Beginning. Omdat LatestVersion dit een standaardmodus is, mode hoeft de parameter niet te worden doorgegeven:
responseIterator = container.query_items_change_feed(start_time="Beginning")
Hier volgt een voorbeeld van hoe u in responseIterator de modus kunt verkrijgen AllVersionsAndDeletes van Now, Omdat Now dit een standaardwaarde van start_time de parameter is, hoeft deze niet te worden doorgegeven:
responseIterator = container.query_items_change_feed(mode="AllVersionsAndDeletes")
Vervolgens kunnen we de resultaten herhalen. Omdat de wijzigingenfeed in feite een oneindige lijst met items is die alle toekomstige schrijf- en updates omvat, responseIterator kan deze oneindig worden herhaald.
Hier volgt een voorbeeld in de nieuwste versiemodus, waarin alle wijzigingen worden gelezen, beginnend vanaf het begin.
Bij elke iteratie worden wijzigingenfeeds voor documenten afgedrukt.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
De wijzigingen van een partitiesleutel gebruiken
In sommige gevallen wilt u mogelijk alleen de wijzigingen voor een specifieke partitiesleutel verwerken.
U kunt de wijzigingen op dezelfde manier verwerken als voor een hele container met de partition_key parameter.
Hier volgt een voorbeeld waarin de modus wordt gebruikt LatestVersion :
pk = "partition_key_value"
responseIterator = container.query_items_change_feed(start_time="Beginning", partition_key=pk)
for doc in responseIterator:
print(doc)
FeedRange gebruiken voor parallelle uitvoering
In het wijzigingenfeed-pullmodel kunt u de feed_range gebruiken om de verwerking van de wijzigingenfeed te parallelliseren.
A feed_range vertegenwoordigt een bereik van partitiesleutelwaarden.
Hier is een voorbeeld dat laat zien hoe u een lijst met reeksen voor uw container kunt verkrijgen.
list met de opdracht wordt iterator geconverteerd naar een lijst:
rangesIterator = container.read_feed_ranges(force_refresh=False)
ranges = list(rangesIterator)
Wanneer u een lijst feed_range met waarden voor uw container krijgt, krijgt u er één feed_range per fysieke partitie.
Met behulp van een feed_range kunt u een iterator creëren om het verwerken van de wijzigingenfeed op meerdere computers of threads te parallelliseren.
In tegenstelling tot het vorige voorbeeld dat laat zien hoe u een responseIterator voor de hele container of één partitiesleutel kunt verkrijgen, kunt u meerdere feed_range iterators verkrijgen, waarmee de wijzigingenfeed parallel kan worden verwerkt.
Hier volgt een voorbeeld waarin wordt uitgelegd hoe u kunt lezen vanaf het begin van de wijzigingenfeed van de container met behulp van twee hypothetische afzonderlijke machines die parallel lezen.
Machine 1:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[0])
for doc in responseIterator:
print(doc)
Machine 2:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[1])
for doc in responseIterator:
print(doc)
Vervolgtokens opslaan
U kunt de positie van uw iterator opslaan door het vervolgtoken te verkrijgen.
Een vervolgtoken is een tekenreeks die uw responseIterator laatst verwerkte wijzigingen bijhoudt en stelt de iterator in staat om later op dit punt verder te gaan.
Het vervolgtoken, indien opgegeven, heeft voorrang op de begintijd en begint vanaf beginwaarden.
De volgende code leest de wijzigingenfeed door sinds het maken van de container.
Nadat er geen wijzigingen meer beschikbaar zijn, wordt een vervolgtoken opgeslagen, zodat het verbruik van de wijzigingenfeed later kan worden hervat.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
continuation_token = container.client_connection.last_response_headers['etag']
Notitie
Aangezien het continuation token de eerder gebruikte mode parameter bevat, wordt de continuation parameter genegeerd als mode is gebruikt, en wordt in plaats daarvan het mode van het continuation token gebruikt.
Hier volgt een voorbeeld waarin wordt getoond hoe u de wijzigingenfeed van de container kunt lezen met behulp van een continuation token:
responseIterator = container.query_items_change_feed(continuation=continuation_token)
for doc in responseIterator:
print(doc)
Als u de wijzigingenfeed wilt verwerken met behulp van het pull-model, maakt u een exemplaar van ChangeFeedPullModelIterator. Wanneer u in eerste instantie maakt ChangeFeedPullModelIterator, moet u een vereiste changeFeedStartFrom waarde opgeven binnen de ChangeFeedIteratorOptions, die bestaat uit zowel de beginpositie voor het lezen van wijzigingen als de resource (een partitiesleutel of een FeedRange) waarvoor wijzigingen moeten worden opgehaald.
Notitie
Als er geen changeFeedStartFrom waarde is opgegeven, wordt de wijzigingenfeed opgehaald voor een hele container van Now().
Op dit moment wordt alleen de nieuwste versie ondersteund door de JavaScript SDK en is deze standaard geselecteerd.
U kunt desgewenst maxItemCount in ChangeFeedIteratorOptions gebruiken om het maximum aantal ontvangen items per pagina in te stellen.
Hier volgt een voorbeeld van het verkrijgen van de iterator in de meest recente versiemodus die entiteitsobjecten retourneert:
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
De wijzigingen voor een hele container verwerken
Als u geen FeedRange of PartitionKey parameter invoert binnen ChangeFeedStartFrom, kunt u de wijzigingenfeed van een hele container in uw eigen tempo verwerken. Hier volgt een voorbeeld, dat begint met het lezen van alle wijzigingen, beginnend op het huidige tijdstip:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Omdat de wijzigingenfeed effectief een oneindige lijst is met items die alle toekomstige schrijf- en updates omvatten, is de waarde altijd hasMoreResultstrue. Wanneer u de wijzigingenfeed probeert te lezen en er geen nieuwe wijzigingen beschikbaar zijn, ontvangt u een antwoord met NotModified de status. Dit verschilt van het ontvangen van een antwoord zonder wijzigingen en OK status. Het is mogelijk om lege reacties op wijzigingenfeeds te krijgen terwijl er meer wijzigingen beschikbaar zijn en u moet doorgaan met peilen totdat u deze ontvangt NotModified. In het voorgaande voorbeeld wordt NotModified verwerkt door vijf seconden te wachten voordat je opnieuw controleert op wijzigingen.
De wijzigingen voor een partitiesleutel gebruiken
In sommige gevallen wilt u mogelijk alleen de wijzigingen voor een specifieke partitiesleutel verwerken. U kunt iterator verkrijgen voor een specifieke partitiesleutel en de wijzigingen op dezelfde manier verwerken als voor een hele container.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
FeedRange gebruiken voor parallelle uitvoering
In het wijzigingenfeed-pullmodel kunt u de FeedRange gebruiken om de verwerking van de wijzigingenfeed te parallelliseren. A FeedRange vertegenwoordigt een bereik van partitiesleutelwaarden.
Hier is een voorbeeld dat laat zien hoe u een lijst met intervallen voor uw container kunt opvragen.
const ranges = await container.getFeedRanges();
Wanneer u een lijst FeedRange met waarden voor uw container krijgt, krijgt u er één FeedRange per fysieke partitie.
Met behulp van een FeedRange kunt u een iterator creëren om het verwerken van de wijzigingenfeed op meerdere computers of threads te parallelliseren. In tegenstelling tot het vorige voorbeeld dat laat zien hoe u een wijzigingsfeed-iterator voor de hele container of één partitiesleutel kunt verkrijgen, kunt u FeedRanges gebruiken om meerdere iterators te verkrijgen, waardoor de wijzigingenfeed parallel kan worden verwerkt.
Hier volgt een voorbeeld waarin wordt uitgelegd hoe u kunt lezen vanaf het begin van de wijzigingenfeed van de container met behulp van twee hypothetische afzonderlijke machines die parallel lezen.
Machine 1:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Machine 2:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Vervolgtokens opslaan
U kunt de positie van uw iterator opslaan door een vervolgtoken te verkrijgen. Een vervolgtoken is een tekenreekswaarde die de laatst verwerkte wijzigingen van de wisselfeed-iterator bijhoudt en waarmee de iterator later op dit punt kan hervatten. Het vervolgtoken, indien opgegeven, heeft voorrang op de begintijd en begint vanaf beginwaarden. De volgende code leest de wijzigingenfeed door sinds het maken van de container. Nadat er geen wijzigingen meer beschikbaar zijn, wordt een vervolgtoken opgeslagen, zodat het verbruik van de wijzigingenfeed later kan worden hervat.
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
Het vervolgtoken verloopt nooit zolang de Azure Cosmos DB-container nog bestaat.
AsyncIterator gebruiken
U kunt JavaScript AsyncIterator gebruiken om de wijzigingenfeed op te halen. Hier volgt een voorbeeld van AsyncIterator.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}