可以在 SingalR 客户端中接收 HPC Pack 作业和任务事件。 有不同的编程语言的客户端。 下面我们以 C# 为例,演示如何接收作业和任务事件。
安装 C# 客户端
Microsoft.AspNet.SignalR.Client。在代码中
usingMicrosoft.AspNet.SignalR.Client和Microsoft.AspNet.SignalR.Client.Transports。然后创建
HubConnection实例,例如:var hubConnection = new HubConnection(url)此处
url为“https://{your-cluster-name-or-ip}/hpc”。(可选)可以通过以下方式跟踪连接错误
var hubConnection = new HubConnection(url) { TraceLevel = TraceLevels.All, TraceWriter = Console.Error };还可以添加错误处理程序,例如:
hubConnection.Error += ex => Console.Error.WriteLine($"HubConnection Exception:\n{ex}");必须提供用于访问事件的凭据。 HTTP 基本身份验证在此处使用:
hubConnection.Headers.Add("Authorization", BasicAuthHeader(username, password));然后,从中心连接创建中心代理对象,并从中侦听事件。
对于作业事件,请创建
JobEventHub并侦听JobStateChange事件:var jobEventHubProxy = hubConnection.CreateHubProxy("JobEventHub"); jobEventHubProxy.On("JobStateChange", (int id, string state, string previousState) => { Console.WriteLine($"Job: {id}, State: {state}, Previous State: {previousState}"); });对于任务事件,请创建
TaskEventHub并侦听TaskStateChange事件:var taskEventHubProxy = hubConnection.CreateHubProxy("TaskEventHub"); taskEventHubProxy.On("TaskStateChange", (int id, int taskId, int instanceId, string state, string previousState) => { Console.WriteLine($"Job: {id}, Task: {taskId}, Instance: {instanceId}, State: {state}, Previous State: {previousState}"); });然后,可以通过以下方式连接到服务器:
await hubConnection.Start(new WebSocketTransport())在这里,我们显式指定
WebSocketTransport。 SignalR 支持各种传输方式。 但我们选择了 WebSocket。 如果使用除 WebSocket 以外的传输,则可能不会收到任何事件或根本无法连接。最后,在中心代理对象上调用
BeginListen方法以开始侦听。 必须指定要侦听其作业/任务事件的作业 ID。try { await jobEventHubProxy.Invoke("BeginListen", jobId); await taskEventHubProxy.Invoke("BeginListen", jobId); } catch (Exception ex) { Console.Error.WriteLine($"Exception on invoking server method:\n{ex}"); return null; }调用
BeginListen方法时,catch块将捕获服务器上的错误。 对于中心连接的其他错误,可能需要使用类似于上一步中的错误处理程序。
整个代码片段是:
using Microsoft.AspNet.SignalR.Client;
using Microsoft.AspNet.SignalR.Client.Transports;
//...
string BasicAuthHeader(string username, string password)
{
string credentials = $"{username}:{password}";
byte[] bytes = Encoding.ASCII.GetBytes(credentials);
string base64 = Convert.ToBase64String(bytes);
return $"Basic {base64}";
}
async Task<HubConnection> ListenToJobAndTasks(string url, int jobId, string username, string password)
{
var hubConnection = new HubConnection(url) { TraceLevel = TraceLevels.All, TraceWriter = Console.Error };
hubConnection.Headers.Add("Authorization", BasicAuthHeader(username, password));
hubConnection.Error += ex => Console.Error.WriteLine($"HubConnection Exception:\n{ex}");
var jobEventHubProxy = hubConnection.CreateHubProxy("JobEventHub");
jobEventHubProxy.On("JobStateChange", (int id, string state, string previousState) =>
{
Console.WriteLine($"Job: {id}, State: {state}, Previous State: {previousState}");
});
var taskEventHubProxy = hubConnection.CreateHubProxy("TaskEventHub");
taskEventHubProxy.On("TaskStateChange", (int id, int taskId, int instanceId, string state, string previousState) =>
{
Console.WriteLine($"Job: {id}, Task: {taskId}, Instance: {instanceId}, State: {state}, Previous State: {previousState}");
});
Console.WriteLine($"Connecting to {url} ...");
try
{
await hubConnection.Start(new WebSocketTransport());
}
catch (Exception ex)
{
Console.Error.WriteLine($"Exception on starting:\n{ex}");
return null;
}
Console.WriteLine($"Begin to listen...");
try
{
await jobEventHubProxy.Invoke("BeginListen", jobId);
await taskEventHubProxy.Invoke("BeginListen", jobId);
}
catch (Exception ex)
{
Console.Error.WriteLine($"Exception on invoking server method:\n{ex}");
return null;
}
return hubConnection;
}