Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Asynkrona strömmar modellerar en strömmande datakälla. Dataströmmar hämtar eller genererar ofta element asynkront. De tillhandahåller en naturlig programmeringsmodell för asynkrona strömmande datakällor.
I den här självstudien får du lära dig att:
- Skapa en datakälla som genererar en sekvens med dataelement asynkront.
- Använd datakällan asynkront.
- Stöd för annullering och insamlade kontexter för asynkrona strömmar.
- Identifiera när det nya gränssnittet och datakällan föredras framför tidigare synkrona datasekvenser.
Förutsättningar
Du måste konfigurera datorn för att köra .NET, inklusive C#-kompilatorn. C#-kompilatorn är tillgänglig med Visual Studio 2022 eller .NET SDK.
Du måste skapa en GitHub-åtkomsttoken så att du kan komma åt GitHub GraphQL-slutpunkten. Välj följande behörigheter för din GitHub-åtkomsttoken:
- repo:status
- offentligt arkiv
Spara åtkomsttoken på en säker plats så att du kan använda den för att få åtkomst till GitHub API-slutpunkten.
Varning
Skydda din personliga åtkomsttoken. Alla program med din personliga åtkomsttoken kan göra GitHub API-anrop med hjälp av dina åtkomsträttigheter.
Den här handledningen förutsätter att du är bekant med C# och .NET, inklusive Visual Studio eller .NET CLI.
Kör startprogrammet
Du kan hämta koden för startprogrammet som används i den här självstudien från dotnet/docs-lagringsplatsen i mappen asynchronous-programming/snippets .
Startprogrammet är ett konsolprogram som använder GitHub GraphQL-gränssnittet för att hämta de senaste problemen som skrivits på dotnet/docs-lagringsplatsen . Börja med att titta på följande kod för startappmetoden Main :
static async Task Main(string[] args)
{
//Follow these steps to create a GitHub Access Token
// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
//Select the following permissions for your GitHub Access Token:
// - repo:status
// - public_repo
// Replace the 3rd parameter to the following code with your GitHub access token.
var key = GetEnvVariable("GitHubKey",
"You must store your GitHub key in the 'GitHubKey' environment variable",
"");
var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
{
Credentials = new Octokit.Credentials(key)
};
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
}
Du kan antingen ange en GitHubKey miljövariabel till din personliga åtkomsttoken eller ersätta det sista argumentet i anropet till GetEnvVariable med din personliga åtkomsttoken. Placera inte din åtkomstkod i källkoden om du ska dela källan med andra. Ladda aldrig upp åtkomstkoder till en lagringsplats för delad källa.
När du har skapat GitHub-klienten skapar koden i Main ett förloppsrapporteringsobjekt och en annulleringstoken. När dessa objekt har skapats Main anropas RunPagedQueryAsync för att hämta de senaste 250 problem som skapats. När aktiviteten har slutförts visas resultatet.
När du kör startprogrammet kan du göra några viktiga observationer om hur det här programmet körs. Framstegen rapporteras för varje sida som returneras från GitHub. Du kan observera en märkbar paus innan GitHub returnerar varje ny sida med problem. Slutligen visas problemen först när alla 10 sidor har hämtats från GitHub.
Granska implementeringen
Implementeringen visar varför du observerade beteendet som beskrevs i föregående avsnitt. Granska koden för RunPagedQueryAsync:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
JArray finalResults = new JArray();
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
}
return finalResults;
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Det allra första den här metoden gör är att skapa POST-objektet med hjälp av GraphQLRequest klassen:
public class GraphQLRequest
{
[JsonProperty("query")]
public string? Query { get; set; }
[JsonProperty("variables")]
public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();
public string ToJsonText() =>
JsonConvert.SerializeObject(this);
}
som hjälper till att bilda POST-objekttexten och korrekt konvertera den till JSON som presenteras som en enskild sträng med ToJsonText-metoden, vilket tar bort alla nyradstecken från begärandetexten och markerar dem med escape-tecknet (omvänt snedstreck) \.
Nu ska vi koncentrera oss på bläddringsalgoritmen och den asynkrona strukturen i föregående kod. (Du kan läsa GitHub GraphQL-dokumentationen för mer information om GitHub GraphQL-API:et.) Metoden RunPagedQueryAsync räknar upp problemen från den senaste till den äldsta. Den begär 25 problem per sida och undersöker pageInfo strukturen för svaret för att fortsätta med föregående sida. Detta följer GraphQL:s standardsystem för paginering av flersidiga svar. Svaret innehåller ett pageInfo objekt som innehåller ett hasPreviousPages värde och ett startCursor värde som används för att begära föregående sida. Problemen finns i matrisen nodes . Metoden RunPagedQueryAsync lägger till dessa noder i en matris som innehåller alla resultat från alla sidor.
När en sida med resultat har hämtats och återställts, rapporterar RunPagedQueryAsync förloppet och kontrollerar om det har avbrutits. Om annullering har begärts kastar RunPagedQueryAsync en OperationCanceledException.
Det finns flera element i den här koden som kan förbättras. Viktigast av allt, RunPagedQueryAsync måste allokera lagring för alla problem som returneras. Det här exemplet stoppas vid 250 problem eftersom hämtning av alla öppna problem skulle kräva mycket mer minne för att lagra alla hämtade problem. Protokollen för stöd för förloppsrapporter och annullering gör algoritmen svårare att förstå vid första behandlingen. Fler typer och API:er ingår. Du måste spåra kommunikationen via CancellationTokenSource och dess associerade CancellationToken för att förstå var annullering begärs och var den beviljas.
Asynkrona strömmar ger ett bättre sätt
Asynkrona strömmar och tillhörande språkstöd hanterar alla dessa problem. Koden som genererar sekvensen kan nu användas yield return för att returnera element i en metod som deklarerades med async modifieraren. Du kan konsumera en asynkron ström med en await foreach-loop precis som du konsumerar en valfri sekvens med en foreach-loop.
Dessa nya språkfunktioner beror på tre nya gränssnitt som lagts till i .NET Standard 2.1 och implementerats i .NET Core 3.0:
- System.Collections.Generic.IAsyncEnumerable<T>
- System.Collections.Generic.IAsyncEnumerator<T>
- System.IAsyncDisposable
De här tre gränssnitten bör vara bekanta för de flesta C#-utvecklare. De beter sig på ett sätt som liknar deras synkrona motsvarigheter:
- System.Collections.Generic.IEnumerable<T>
- System.Collections.Generic.IEnumerator<T>
- System.IDisposable
En typ som kanske inte är bekant är System.Threading.Tasks.ValueTask. Structen ValueTask tillhandahåller ett liknande API som System.Threading.Tasks.Task klassen.
ValueTask används i dessa gränssnitt av prestandaskäl.
Konvertera till asynkrona strömmar
Sedan konverterar du RunPagedQueryAsync-metoden för att generera en asynkron ström. Ändra först signaturen RunPagedQueryAsync för att returnera en IAsyncEnumerable<JToken> och ta bort annulleringstoken och förloppobjekten från parameterlistan enligt följande kod:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
Startkoden bearbetar varje sida när sidan hämtas, enligt följande kod:
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
Ersätt dessa tre rader med följande kod:
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
Du kan också ta bort deklarationen av finalResults tidigare i denna metod och instruktionen return som följer den loop som du ändrade.
Du har slutfört ändringarna för att generera en asynkron ström. Den färdiga metoden bör likna följande kod:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Därefter ändrar du koden som använder samlingen för att använda asynkron dataström. Hitta följande kod i Main som bearbetar insamlingen av problem:
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
Ersätt koden med följande await foreach loop:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
Det nya gränssnittet IAsyncEnumerator<T> härleds från IAsyncDisposable. Det innebär att föregående loop tar bort strömmen asynkront när loopen är klar. Du kan tänka dig att loopen ser ut som följande kod:
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
Som standard bearbetas dataströmelement i den insamlade kontexten. Om du vill inaktivera infångning av kontexten använder du TaskAsyncEnumerableExtensions.ConfigureAwait tilläggsmetoden. Mer information om synkroniseringskontexter och insamling av den aktuella kontexten finns i artikeln om hur du använder det aktivitetsbaserade asynkrona mönstret.
Async-strömmar stöder annullering med samma protokoll som andra async metoder. Du ändrar signaturen för metoden async iterator enligt följande för att stödja annullering:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Attributet System.Runtime.CompilerServices.EnumeratorCancellationAttribute gör att kompilatorn genererar kod för IAsyncEnumerator<T> som gör att den token som skickas till GetAsyncEnumerator synlig för kroppen i en asynkron iterator som det aktuella argumentet. Inuti runQueryAsync kan du undersöka tokenets tillstånd och avbryta arbetet om så önskas.
Du använder en annan tilläggsmetod, WithCancellation, för att skicka annulleringstoken till asynkron dataströmmen. Du skulle ändra loopen som räknar upp problemen på följande sätt:
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
Du kan hämta koden för den färdiga självstudien från dotnet/docs-lagringsplatsen i mappen asynchronous-programming/snippets .
Kör det färdiga programmet
Kör programmet igen. Kontrastera dess beteende med startprogrammets beteende. Den första sidan med resultat räknas upp så snart den är tillgänglig. Det finns en observerbar paus när varje ny sida begärs och hämtas. Nästa sidas resultat räknas snabbt upp. Blocket try / catch behövs inte för att hantera annullering: anroparen kan sluta räkna upp samlingen. Förloppet rapporteras tydligt eftersom asynkron dataström genererar resultat när varje sida laddas ned. Statusen för varje problem som returneras ingår sömlöst i loopen await foreach . Du behöver inget återkopplingsobjekt för att följa utvecklingen.
Du kan se förbättringar i minnesanvändningen genom att undersöka koden. Du behöver inte längre allokera en samling för att lagra alla resultat innan de räknas upp. Anroparen kan avgöra hur resultatet ska användas och om en lagringssamling behövs.
Kör både startprogram och färdiga program och du kan se skillnaderna mellan implementeringarna själv. Du kan ta bort GitHub-åtkomsttoken som du skapade när du startade den här självstudien efter att du är klar. Om en angripare får åtkomst till denna token kan de komma åt GitHub-API:er med dina autentiseringsuppgifter.
I den här självstudien använde du asynkrona strömmar för att läsa enskilda objekt från ett nätverks-API som returnerar datasidor. Asynkrona strömmar kan också läsas från "aldrig sinande strömmar" som en aktie ticker eller sensorenhet. Anropet till MoveNextAsync returnerar nästa objekt så snart det är tillgängligt.