Delen via


Pull-model voor wijzigingenfeeds in Azure Cosmos DB

VAN TOEPASSING OP: NoSQL

U kunt het pull-model van de wijzigingenfeed gebruiken om de Azure Cosmos DB-wijzigingenfeed in uw eigen tempo te gebruiken. Net als bij de verwerker van de wijzigingenfeed kunt u het pull-model van de wijzigingenfeed gebruiken om de verwerking van wijzigingen voor meerdere gebruikers van wijzigingenfeeds te parallelliseren.

Vergelijken met de verwerker van de wijzigingenfeed

Veel scenario's kunnen de wijzigingenfeed verwerken met behulp van de wijzigingenfeedprocessor of het pull-model van de wijzigingenfeed. De vervolgtokens van het pull-model en de leasecontainer van de wijzigingenfeedprocessor werken allebei als bladwijzers voor het laatst verwerkte item of de laatste batch items in de wijzigingenfeed.

Je kunt vervolgtokens echter niet naar een lease converteren en andersom.

Notitie

In de meeste gevallen, wanneer u uit de wijzigingenfeed moet lezen, is de eenvoudigste optie om de verwerker van de wijzigingenfeed te gebruiken.

Overweeg het gebruik van het pull-model in deze scenario's:

  • Wijzigingen van een specifieke partitiesleutel lezen
  • Het tempo bepalen waarop uw klant wijzigingen ontvangt voor verwerking
  • Een eenmalige leesbewerking van de bestaande gegevens in de wijzigingenfeed uitvoeren (bijvoorbeeld om een gegevensmigratie uit te voeren)

Hier volgen enkele belangrijke verschillen tussen de wijzigingenfeedprocessor en het pull-model van de wijzigingenfeed:

Kenmerk Verwerker van wijzigingenfeed Wijzig pull-model voor feed
Het huidige punt bijhouden bij het verwerken van de wijzigingenfeed Lease (opgeslagen in een Azure Cosmos DB-container) Vervolgtoken (opgeslagen in het geheugen of handmatig persistent)
Mogelijkheid om eerdere wijzigingen opnieuw af te spelen Ja, met pushmodel Ja, met pullmodel
Peiling voor toekomstige wijzigingen Automatisch controleren op wijzigingen op basis van de door de gebruiker opgegeven WithPollInterval waarde Handmatig
Gedrag wanneer er geen nieuwe wijzigingen zijn Automatisch wachten op de waarde WithPollInterval en vervolgens opnieuw controleren Moet de status controleren en handmatig opnieuw controleren
Verwerk wijzigingen van een hele container Ja, en automatisch geparallelliseerd voor meerdere threads en machines die van dezelfde container gebruikmaken Ja, en handmatig geparallelliseerd met behulp van FeedRange
Verwerk wijzigingen van slechts één partitiesleutel Niet ondersteund Ja

Notitie

Wanneer u het pull-model gebruikt, moet u, in tegenstelling tot bij het lezen met behulp van de verwerker van de wijzigingenfeed, expliciet zaken afhandelen waarbij er geen nieuwe wijzigingen zijn. Dit wordt aangegeven door een HTTP 304 NotModified. Een reactie van een wijzigingenfeed die 0 documenten met een HTTP 200-statuscode OK retourneert, betekent niet per se dat u het einde van de wijzigingenfeed hebt bereikt en dat u moet doorgaan met peilen.

Werken met het pull-model

Als u de wijzigingenfeed wilt verwerken met behulp van het pull-model, maakt u een exemplaar van FeedIterator. Wanneer u in eerste instantie maakt FeedIterator, moet u een vereiste ChangeFeedStartFrom waarde opgeven, die bestaat uit zowel de beginpositie voor het lezen van wijzigingen als de waarde waarvoor u wilt gebruiken FeedRange. Het FeedRange is een bereik van partitiesleutelwaarden en specificeert de items die kunnen worden gelezen uit de veranderingsfeed met behulp van die specifieke FeedIterator. U moet ook een vereiste ChangeFeedMode waarde opgeven voor de modus waarin u wijzigingen wilt verwerken: de nieuwste versie of alle versies en verwijderingen. Gebruik ChangeFeedMode.LatestVersion of ChangeFeedMode.AllVersionsAndDeletes om aan te geven welke modus u wilt gebruiken om de wijzigingenfeed te lezen. Wanneer u de modus Alle versies en Verwijderen gebruikt, moet u een startwaarde voor de wijzigingsfeed selecteren, ofwel Now() of een specifiek vervolgtoken.

U kunt desgewenst ChangeFeedRequestOptions opgeven om een PageSizeHint in te stellen. Wanneer deze eigenschap is ingesteld, wordt het maximum aantal ontvangen items per pagina ingesteld. Als bewerkingen in de bewaakte verzameling worden uitgevoerd via opgeslagen procedures, blijft het transactiebereik behouden bij het lezen van items uit de wijzigingenfeed. Als gevolg hiervan kan het aantal ontvangen items hoger zijn dan de opgegeven waarde, zodat de items die door dezelfde transactie zijn gewijzigd, worden geretourneerd als onderdeel van één atomische batch.

Hier volgt een voorbeeld van hoe u FeedIterator kunt verkrijgen in de meest recente versieversiemodus die entiteitsobjecten teruggeeft, in dit geval een User object.

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Advies

Voor versies ouder dan 3.34.0, kan de meest recente versiemodus worden gebruikt door de instelling in te stellen ChangeFeedMode.Incremental. Zowel Incremental als LatestVersion verwijzen naar de nieuwste versiemodus van de wijzigingenfeed, en toepassingen die beide modus gebruiken, zien hetzelfde gedrag.

Alle versies en verwijderingsmodus is in preview en kan worden gebruikt met preview .NET SDK-versies >= 3.32.0-preview. Hier volgt een voorbeeld voor het verkrijgen van FeedIterator in alle versies en de verwijdermodus die User objecten teruggeeft.

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Notitie

In de nieuwste versiemodus ontvangt u objecten die het item vertegenwoordigen dat is gewijzigd, met enkele extra metagegevens. Alle versies en verwijdermodus retourneert een ander gegevensmodel.

U kunt het volledige voorbeeld ophalen voor de modus van de nieuwste versie of de modus van alle versies en verwijderingen.

De wijzigingenfeed via streams gebruiken

FeedIterator Voor beide modi van de wijzigingsfeeds zijn twee opties beschikbaar. Naast de voorbeelden die entiteitsobjecten retourneren, kunt u ook het antwoord verkrijgen met Stream ondersteuning. Met streams kunt u gegevens lezen zonder deze eerst gedeserialiseerd te hebben, zodat u op clientresources bespaart.

Hier volgt een voorbeeld van hoe u de meest recente versiemodus kunt verkrijgen FeedIterator die retourneert Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

De wijzigingen voor een hele container verwerken

Als u geen FeedRange-parameter aan FeedIterator opgeeft, kunt u de wijzigingenfeed van een hele container in uw eigen tempo verwerken. Hier volgt een voorbeeld, dat begint met het lezen van alle wijzigingen, beginnend bij het huidige tijdstip met behulp van de nieuwste versiemodus:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Omdat de wijzigingenfeed effectief een oneindige lijst is met items die alle toekomstige schrijf- en updates omvatten, is de waarde altijd HasMoreResultstrue. Wanneer u de wijzigingenfeed probeert te lezen en er geen nieuwe wijzigingen beschikbaar zijn, ontvangt u een antwoord met NotModified de status. Dit verschilt van het ontvangen van een antwoord zonder wijzigingen en OK status. Het is mogelijk om lege reacties op wijzigingenfeeds te krijgen terwijl er meer wijzigingen beschikbaar zijn en u moet doorgaan met peilen totdat u deze ontvangt NotModified. In het voorgaande voorbeeld wordt NotModified verwerkt door vijf seconden te wachten voordat je opnieuw controleert op wijzigingen.

De wijzigingen voor een partitiesleutel gebruiken

In sommige gevallen wilt u mogelijk alleen de wijzigingen voor een specifieke partitiesleutel verwerken. U kunt een specifieke partitiesleutel verkrijgen FeedIterator en de wijzigingen op dezelfde manier verwerken als voor een hele container.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

FeedRange gebruiken voor parallelle uitvoering

In de change feed-verwerker wordt werk automatisch verspreid over meerdere gebruikers. In het wijzigingenfeed-pullmodel kunt u de FeedRange gebruiken om de verwerking van de wijzigingenfeed te parallelliseren. A FeedRange vertegenwoordigt een bereik van partitiesleutelwaarden.

Hier is een voorbeeld dat laat zien hoe u een lijst met intervallen voor uw container kunt opvragen.

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Wanneer u een lijst FeedRange met waarden voor uw container krijgt, krijgt u er één FeedRange per fysieke partitie.

Met behulp van een FeedRangekunt u een FeedIterator maken om de verwerking van de wijzigingenfeed te parallelliseren op meerdere computers of threads. In tegenstelling tot het vorige voorbeeld dat laat zien hoe u een FeedIterator voor de hele container of één partitiesleutel kunt verkrijgen, kunt u FeedRanges gebruiken om meerdere FeedIterators te verkrijgen, waarmee de wijzigingenfeed parallel kan worden verwerkt.

In het geval dat u FeedRanges wilt gebruiken, moet u een orchestratorproces hebben dat FeedRanges verkrijgt en distribueert naar die machines. Deze distributie kan het volgende zijn:

  • Het gebruik van FeedRange.ToJsonString en de distributie van deze tekenreekswaarde. De consumenten kunnen deze waarde gebruiken met FeedRange.FromJsonString.
  • Als de distributie nog aan de gang is, geef dan de FeedRange objectverwijzing door.

Hier volgt een voorbeeld waarin wordt uitgelegd hoe u kunt lezen vanaf het begin van de wijzigingenfeed van de container met behulp van twee hypothetische afzonderlijke machines die parallel lezen.

Machine 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Machine 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Vervolgtokens opslaan

U kunt de positie van uw FeedIterator bestand opslaan door het vervolgtoken te verkrijgen. Een vervolgtoken is een tekenreekswaarde die de laatst verwerkte wijzigingen van uw FeedIterator bijhoudt en de FeedIterator laatste verwerkte wijzigingen later kan hervatten. Het vervolgtoken, indien opgegeven, heeft voorrang op de begintijd en begint vanaf beginwaarden. De volgende code leest de wijzigingenfeed door sinds het maken van de container. Nadat er geen wijzigingen meer beschikbaar zijn, wordt een vervolgtoken opgeslagen, zodat het verbruik van de wijzigingenfeed later kan worden hervat.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Wanneer u de nieuwste versiemodus gebruikt, verloopt het FeedIterator vervolgtoken nooit zolang de Azure Cosmos DB-container nog steeds bestaat. Wanneer u alle versies gebruikt en de modus verwijdert, is het FeedIterator vervolgtoken geldig zolang de wijzigingen in het bewaarvenster voor continue back-ups zijn opgetreden.

Volgende stappen