Dela via


Använda strömning i ASP.NET Core SignalR

Av Brennan Conroy

ASP.NET Core SignalR stöder direktuppspelning från klient till server och från server till klient. Detta är användbart för scenarier där fragment av data anländer över tid. Vid direktuppspelning skickas varje fragment till klienten eller servern så snart det blir tillgängligt, i stället för att vänta på att alla data ska bli tillgängliga.

Visa eller ladda ned exempelkod (hur du laddar ned)

Konfigurera en hubb för direktuppspelning

En hubbmetod blir automatiskt en direktuppspelningshubbmetod när den returnerar IAsyncEnumerable<T>, ChannelReader<T>, Task<IAsyncEnumerable<T>>eller Task<ChannelReader<T>>.

Direktuppspelning från server till klient

Direktuppspelningshubbmetoder kan returnera IAsyncEnumerable<T> förutom ChannelReader<T>. Det enklaste sättet att returnera IAsyncEnumerable<T> är att göra hubbmetoden till en asynkron iteratormetod, vilket visas i följande exempel. Hub async-iteratormetoder kan acceptera en CancellationToken parameter som utlöses när klienten avregistrerar sig från dataströmmen. Async-iteratormetoder undviker problem som ofta förekommer med kanaler, såsom att inte returnera ChannelReader tillräckligt tidigt eller att avsluta metoden utan att slutföra ChannelWriter<T>.

Note

Följande exempel kräver C# 8.0 eller senare.

public class AsyncEnumerableHub : Hub
{
    public async IAsyncEnumerable<int> Counter(
        int count,
        int delay,
        [EnumeratorCancellation]
        CancellationToken cancellationToken)
    {
        for (var i = 0; i < count; i++)
        {
            // Check the cancellation token regularly so that the server will stop
            // producing items if the client disconnects.
            cancellationToken.ThrowIfCancellationRequested();

            yield return i;

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
}

Följande exempel visar grunderna i strömmande data till klienten med hjälp av Kanaler. När ett objekt skrivs till ChannelWriter<T>skickas objektet omedelbart till klienten. I slutet av ChannelWriter slutförs för att meddela klienten att strömmen har avslutats.

Note

Skriv till ChannelWriter<T> på en bakgrundstråd och returnera ChannelReader så snart som möjligt. Andra hubbanrop blockeras tills en ChannelReader returneras.

Omslut logik i ett try ... catch-uttryck. Slutför Channel i ett finally block. Om du vill hantera ett fel, fångar du det i catch-blocket och skriver det i finally-blocket.

public ChannelReader<int> Counter(
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    var channel = Channel.CreateUnbounded<int>();

    // We don't want to await WriteItemsAsync, otherwise we'd end up waiting 
    // for all the items to be written before returning the channel back to
    // the client.
    _ = WriteItemsAsync(channel.Writer, count, delay, cancellationToken);

    return channel.Reader;
}

private async Task WriteItemsAsync(
    ChannelWriter<int> writer,
    int count,
    int delay,
    CancellationToken cancellationToken)
{
    Exception localException = null;
    try
    {
        for (var i = 0; i < count; i++)
        {
            await writer.WriteAsync(i, cancellationToken);

            // Use the cancellationToken in other APIs that accept cancellation
            // tokens so the cancellation can flow down to them.
            await Task.Delay(delay, cancellationToken);
        }
    }
    catch (Exception ex)
    {
        localException = ex;
    }
    finally
    {
        writer.Complete(localException);
    }
}

Server-till-klient-metoder för direktuppspelningshubbar kan acceptera en CancellationToken parameter som utlöses när klienten avregistrerar sig från dataströmmen. Använd den här token för att stoppa serveråtgärden och frigöra resurser om klienten kopplas från före strömmens slut.

Direktuppspelning från klient till server

En hubbmetod blir automatiskt en strömningshubb för klient-till-server när den accepterar ett eller flera objekt av typen ChannelReader<T> eller IAsyncEnumerable<T>. Följande exempel visar grunderna i att läsa strömmande data som skickas från klienten. När klienten skriver till ChannelWriter<T>skrivs data till ChannelReader på den server som hubbmetoden läser från.

public async Task UploadStream(ChannelReader<string> stream)
{
    while (await stream.WaitToReadAsync())
    {
        while (stream.TryRead(out var item))
        {
            // do something with the stream item
            Console.WriteLine(item);
        }
    }
}

En IAsyncEnumerable<T> version av metoden följer.

Note

Följande exempel kräver C# 8.0 eller senare.

public async Task UploadStream(IAsyncEnumerable<string> stream)
{
    await foreach (var item in stream)
    {
        Console.WriteLine(item);
    }
}

.NET-klient

Direktuppspelning från server till klient

Metoderna StreamAsync och StreamAsChannelAsyncHubConnection används för att anropa strömningsmetoder från server till klient. Skicka hubbmetodens namn och argument som definierats i hubbmetoden till StreamAsync eller StreamAsChannelAsync. Den allmänna parametern på StreamAsync<T> och StreamAsChannelAsync<T> anger vilken typ av objekt som returneras av strömningsmetoden. Ett objekt av typen IAsyncEnumerable<T> eller ChannelReader<T> returneras från strömanropet och representerar strömmen på klienten.

Ett StreamAsync exempel som returnerar IAsyncEnumerable<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var stream = hubConnection.StreamAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

await foreach (var count in stream)
{
    Console.WriteLine($"{count}");
}

Console.WriteLine("Streaming completed");

Ett motsvarande StreamAsChannelAsync exempel som returnerar ChannelReader<int>:

// Call "Cancel" on this CancellationTokenSource to send a cancellation message to
// the server, which will trigger the corresponding token in the hub method.
var cancellationTokenSource = new CancellationTokenSource();
var channel = await hubConnection.StreamAsChannelAsync<int>(
    "Counter", 10, 500, cancellationTokenSource.Token);

// Wait asynchronously for data to become available
while (await channel.WaitToReadAsync())
{
    // Read all currently available data synchronously, before waiting for more data
    while (channel.TryRead(out var count))
    {
        Console.WriteLine($"{count}");
    }
}

Console.WriteLine("Streaming completed");

I föregående kod:

  • Metoden StreamAsChannelAsyncHubConnection används för att anropa en server-till-klient-strömningsmetod. Skicka hubbmetodens namn och argument som definierats i hubbmetoden till StreamAsChannelAsync.
  • Den generiska parametern på StreamAsChannelAsync<T> anger vilken typ av objekt som returneras av strömningsmetoden.
  • En ChannelReader<T> returneras från strömanropet och representerar strömmen på klienten.

Direktuppspelning från klient till server

Det finns två sätt att anropa en klient-till-server-strömningshubbmetod från .NET-klienten. Du kan antingen skicka in en IAsyncEnumerable<T> eller ett ChannelReader som ett argument till SendAsync, InvokeAsynceller StreamAsChannelAsync, beroende på vilken hubbmetod som anropas.

När data skrivs till IAsyncEnumerable eller-objektet ChannelWriter tar hubbmetoden på servern emot ett nytt objekt med data från klienten.

Om du använder ett IAsyncEnumerable objekt avslutas strömmen när metoden som returnerar dataströmobjekt avslutas.

Note

Följande exempel kräver C# 8.0 eller senare.

async IAsyncEnumerable<string> clientStreamData()
{
    for (var i = 0; i < 5; i++)
    {
        var data = await FetchSomeData();
        yield return data;
    }
    //After the for loop has completed and the local function exits the stream completion will be sent.
}

await connection.SendAsync("UploadStream", clientStreamData());

Eller om du använder en ChannelWriterslutför du kanalen med channel.Writer.Complete():

var channel = Channel.CreateBounded<string>(10);
await connection.SendAsync("UploadStream", channel.Reader);
await channel.Writer.WriteAsync("some data");
await channel.Writer.WriteAsync("some more data");
channel.Writer.Complete();

JavaScript-klient

Direktuppspelning från server till klient

JavaScript-klienter anropar metoder för server-till-klient-strömning på hubbar med connection.stream. Metoden stream accepterar två argument:

  • Namnet på hubbmetoden. I följande exempel är hubbmetodens namn Counter.
  • Argumenten som definierats i hubbmetoden. I följande exempel är argumenten ett antal för antalet dataströmsobjekt som ska tas emot och fördröjningen mellan dataströmobjekt.

connection.stream returnerar en IStreamResult, som innehåller en subscribe metod. Skicka en IStreamSubscriber till subscribe och ange återanropen next, error, och complete för att ta emot meddelanden från stream-anropet.

connection.stream("Counter", 10, 500)
    .subscribe({
        next: (item) => {
            var li = document.createElement("li");
            li.textContent = item;
            document.getElementById("messagesList").appendChild(li);
        },
        complete: () => {
            var li = document.createElement("li");
            li.textContent = "Stream completed";
            document.getElementById("messagesList").appendChild(li);
        },
        error: (err) => {
            var li = document.createElement("li");
            li.textContent = err;
            document.getElementById("messagesList").appendChild(li);
        },
});

Om du vill avsluta strömmen från klienten, anropa metoden disposeISubscription som returneras från subscribe-metoden. Om du anropar den här metoden avbryts parametern CancellationToken för Hub-metoden, om du angav en.

Direktuppspelning från klient till server

JavaScript-klienter anropar strömningsmetoder från klient till server på hubbar genom att skicka in ett Subject som argument till send, invokeeller stream, beroende på vilken hubbmetod som anropas. Subject är en klass som ser ut som en Subject. I RxJS kan du till exempel använda klassen Ämne från biblioteket.

const subject = new signalR.Subject();
yield connection.send("UploadStream", subject);
var iteration = 0;
const intervalHandle = setInterval(() => {
    iteration++;
    subject.next(iteration.toString());
    if (iteration === 10) {
        clearInterval(intervalHandle);
        subject.complete();
    }
}, 500);

Anropa subject.next(item) med ett objekt skriver objektet till strömmen och hubbmetoden tar emot objektet på servern.

Om du vill avsluta strömmen anropar du subject.complete().

Java-klient

Direktuppspelning från server till klient

Java-klienten SignalR använder stream metoden för att anropa strömningsmetoder. stream accepterar tre eller fler argument:

  • Den förväntade typen av strömobjekt.
  • Namnet på hubbmetoden.
  • Argumenten som definierats i hubbmetoden.
hubConnection.stream(String.class, "ExampleStreamingHubMethod", "Arg1")
    .subscribe(
        (item) -> {/* Define your onNext handler here. */ },
        (error) -> {/* Define your onError handler here. */},
        () -> {/* Define your onCompleted handler here. */});

Metoden streamHubConnection returnerar en observerbar dataströmobjekttyp. Metoden för den observerbara typen subscribe är där onNext, onError och onCompleted hanterare definieras.

Direktuppspelning från klient till server

SignalR Java-klienten kan anropa strömningsmetoder från klient till server på hubbar genom att skicka in ett observerbart argument till send, invokeeller stream, beroende på vilken hubbmetod som anropas.

ReplaySubject<String> stream = ReplaySubject.create();
hubConnection.send("UploadStream", stream);
stream.onNext("FirstItem");
stream.onNext("SecondItem");
stream.onComplete();

Anropa stream.onNext(item) med ett objekt skriver objektet till strömmen och hubbmetoden tar emot objektet på servern.

Om du vill avsluta strömmen anropar du stream.onComplete().

Ytterligare resurser