Dela via


detect_anomalous_new_entity_fl()

Gäller för: ✅Microsoft FabricAzure Data ExplorerAzure MonitorMicrosoft Sentinel

Identifiera utseendet på avvikande nya entiteter i tidsstämplade data.

Funktionen detect_anomalous_new_entity_fl() är en UDF (användardefinierad funktion) som identifierar utseendet på avvikande nya entiteter – till exempel IP-adresser eller användare – i tidsstämplade data, till exempel trafikloggar. I cybersäkerhetssammanhang kan sådana händelser vara misstänkta och tyda på en potentiell attack eller kompromiss.

Avvikelsemodellen baseras på en Poisson-distribution som representerar antalet nya entiteter som visas per tidsintervall (till exempel en dag) för varje omfång. Poisson-distributionsparametern beräknas baserat på utseendet på nya entiteter under träningsperioden, med en extra förfallsfaktor som återspeglar det faktum att de senaste framträdandena är viktigare än gamla. Därför beräknar vi sannolikheten för att stöta på en ny entitet under den definierade identifieringsperioden per viss omfattning , till exempel en prenumeration eller ett konto. Modellutdata styrs av flera valfria parametrar, till exempel minimalt tröskelvärde för avvikelse, parameter för avvikelsefrekvens och andra.

Modellens direkta utdata är en avvikelsepoäng som beräknas baserat på inverteringen av den uppskattade sannolikheten att stöta på en ny entitet. Avvikelsepoängen varierar från 0 till 1, där 1 anger en mycket avvikande entitet. Förutom avvikelsepoängen finns det en binär flagga för identifierade avvikelser (styrs av en minimal tröskelvärdesparameter) och andra förklarande fält.

Syntax

detect_anomalous_new_entity_fl( entityColumnName, scopeColumnName, timeColumnName, startTraining, startDetection, endDetection, [maxEntitiesThresh], [minTrainingDaysThresh], [decayParam], [anomalyScoreThresh])

Läs mer om syntaxkonventioner.

Parametrar

Namn Typ Krävs Beskrivning
entityColumnName string ✔️ Namnet på den indatatabellkolumn som innehåller namnen eller ID:na för de entiteter som avvikelsemodellen beräknas för.
scopeColumnName string ✔️ Namnet på den indatatabellkolumn som innehåller partitionen eller omfånget, så att en annan avvikelsemodell skapas för varje omfång.
timeColumnName string ✔️ Namnet på den indatatabellkolumn som innehåller tidsstämplarna, som används för att definiera tränings- och identifieringsperioderna.
startTraining datetime ✔️ Början av träningsperioden för avvikelsemodellen. Dess slut definieras i början av identifieringsperioden.
startDetection datetime ✔️ Början av identifieringsperioden för avvikelseidentifiering.
endDetection datetime ✔️ Slutet av identifieringsperioden för avvikelseidentifiering.
maxEntitiesThresh int Det maximala antalet befintliga entiteter i omfånget för att beräkna avvikelser. Om antalet entiteter ligger över tröskelvärdet anses omfånget vara för bullrigt och avvikelser beräknas inte. Standardvärdet är 60.
minTrainingDaysThresh int Det minsta antalet dagar under träningsperioden som det finns ett omfång för att beräkna avvikelser. Om det är under tröskelvärdet anses omfånget vara för nytt och okänt, så avvikelser beräknas inte. Standardvärdet är 14.
decayParam real Parametern för avvikelsefrekvens för avvikelsemodellen, ett tal i intervallet (0,1]. Lägre värden innebär snabbare sönderfall, så mer vikt läggs vid senare framträdanden under träningsperioden. Värdet 1 innebär inget sönderfall, så ett enkelt medelvärde används för poisson-fördelningsparameteruppskattning. Standardvärdet är 0,95.
anomalyScoreThresh real Det minsta värdet för avvikelsepoäng som en avvikelse identifieras för, ett tal i intervallet [0, 1]. Högre värden innebär att endast mer betydande fall anses vara avvikande, så färre avvikelser identifieras (högre precision, lägre träffsäkerhet). Standardvärdet är 0,9.

Funktionsdefinition

Du kan definiera funktionen genom att antingen bädda in koden som en frågedefinierad funktion eller skapa den som en lagrad funktion i databasen enligt följande:

Definiera funktionen med hjälp av följande låt -instruktionen. Inga behörigheter krävs.

Viktig

En låta instruktionen inte kan köras på egen hand. Den måste följas av en tabelluttrycksinstruk. Om du vill köra ett fungerande exempel på detect_anomalous_new_entity_fl()läser du Exempel.

let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
                                        , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                        , maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
    processedData
    | summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet) 
        by scope, entity
    | extend firstSeenSet = dataSet
    | project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
    entityData
    | summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
        , firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0) 
            by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
    entityData
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | where firstSeenSet == 'trainSet'
    | summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
        by scope, firstSeenSetOnScope = firstSeenSet, firstSeenEntityOnScope = firstSeenEntity
    | extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntityOnScope)
// adding exponentially decaying weights to counts
    | extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
    | extend decayingValue = countAddedEntities * decayingWeight
    | summarize   newEntityProbability = round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
                , countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntityOnScope), slicesOnScope = max(slicesInTrainingScope)///for explainability
        by scope, firstSeenSetOnScope
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
    | extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
    | extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (modelData) on scope
    | join kind = inner (entityData | where firstSeenSet == 'detectSet') on scope, entity, $left.sliceTime == $right.firstSeenEntity
    | project-away scope1, scope2, entity1
    | where isAnomalousNewEntity == 1
    | summarize arg_min(sliceTime, *) by scope, entity
    | extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ',  slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
        , ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
    | join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
    | project-away scope1
);
resultsData
};
// Write your query to use the function here.

Exempel

I följande exempel används anropa operatorn för att köra funktionen.

Om du vill använda en frågedefinierad funktion anropar du den efter den inbäddade funktionsdefinitionen.

let detect_anomalous_new_entity_fl = (T:(*), entityColumnName:string, scopeColumnName:string
                                        , timeColumnName:string, startTraining:datetime, startDetection:datetime, endDetection:datetime
                                        , maxEntitiesThresh:int = 60, minTrainingDaysThresh:int = 14, decayParam:real = 0.95, anomalyScoreThresh:real = 0.9)
{
//pre-process the input data by adding standard column names and dividing to datasets
let timePeriodBinSize = 'day';      // we assume a reasonable bin for time is day, so the probability model is built per that bin size
let processedData = (
    T
    | extend scope      = column_ifexists(scopeColumnName, '')
    | extend entity     = column_ifexists(entityColumnName, '')
    | extend sliceTime  = todatetime(column_ifexists(timeColumnName, ''))
    | where isnotempty(scope) and isnotempty(entity) and isnotempty(sliceTime)
    | extend dataSet = case((sliceTime >= startTraining and sliceTime < startDetection), 'trainSet'
                           , sliceTime >= startDetection and sliceTime <= endDetection,  'detectSet'
                                                                                       , 'other')
    | where dataSet in ('trainSet', 'detectSet')
);
// summarize the data by scope and entity. this will be used to create a distribution of entity appearances based on first seen data
let entityData = (
    processedData
    | summarize countRowsEntity = count(), firstSeenEntity = min(sliceTime), lastSeenEntity = max(sliceTime), firstSeenSet = arg_min(sliceTime, dataSet) 
        by scope, entity
    | extend firstSeenSet = dataSet
    | project-away dataSet
);
// aggregate entity data per scope and get the number of entities appearing over time
let aggregatedCandidateScopeData = (
    entityData
    | summarize countRowsScope = sum(countRowsEntity), countEntitiesScope = dcount(entity), countEntitiesScopeInTrain = dcountif(entity, firstSeenSet == 'trainSet')
        , firstSeenScope = min(firstSeenEntity), lastSeenScope = max(lastSeenEntity), hasNewEntities = iff(dcountif(entity,firstSeenSet == 'detectSet') > 0, 1, 0) 
            by scope
    | extend slicesInTrainingScope = datetime_diff(timePeriodBinSize, startDetection, firstSeenScope)
    | where countEntitiesScopeInTrain <= maxEntitiesThresh and slicesInTrainingScope >= minTrainingDaysThresh and lastSeenScope >= startDetection and hasNewEntities == 1
);
let modelData = (
    entityData
    | join kind = inner (aggregatedCandidateScopeData) on scope 
    | where firstSeenSet == 'trainSet'
    | summarize countAddedEntities = dcount(entity), firstSeenScope = min(firstSeenScope), slicesInTrainingScope = max(slicesInTrainingScope), countEntitiesScope = max(countEntitiesScope)
        by scope, firstSeenSetOnScope = firstSeenSet, firstSeenEntityOnScope = firstSeenEntity
    | extend diffInDays = datetime_diff(timePeriodBinSize, startDetection, firstSeenEntityOnScope)
// adding exponentially decaying weights to counts
    | extend decayingWeight = pow(base = decayParam, exponent = diffInDays)
    | extend decayingValue = countAddedEntities * decayingWeight
    | summarize   newEntityProbability =  round(1 - exp(-1.0 * sum(decayingValue)/max(diffInDays)), 4)
                , countKnownEntities = sum(countAddedEntities), lastNewEntityTimestamp = max(firstSeenEntityOnScope), slicesOnScope = max(slicesInTrainingScope)///for explainability
        by scope, firstSeenSetOnScope
// anomaly score is based on probability to get no new entities, calculated using Poisson distribution (P(X=0) = exp(-avg)) with added decay on average
    | extend newEntityAnomalyScore = round(1 - newEntityProbability, 4)
    | extend isAnomalousNewEntity = iff(newEntityAnomalyScore >= anomalyScoreThresh, 1, 0)
);
let resultsData = (
    processedData
    | where dataSet == 'detectSet'
    | join kind = inner (modelData) on scope
    | join kind = inner (entityData | where firstSeenSet == 'detectSet') on scope, entity, $left.sliceTime == $right.firstSeenEntity
    | project-away scope1, scope2, entity1
    | where isAnomalousNewEntity == 1
    | summarize arg_min(sliceTime, *) by scope, entity
    | extend anomalyType = strcat('newEntity_', entityColumnName), anomalyExplainability = strcat('The ', entityColumnName, ' ', entity, ' wasn\'t seen on ', scopeColumnName, ' ', scope, ' during the last ',  slicesOnScope, ' ', timePeriodBinSize, 's. Previously, ', countKnownEntities
        , ' entities were seen, the last one of them appearing at ', format_datetime(lastNewEntityTimestamp, 'yyyy-MM-dd HH:mm'), '.')
    | join kind = leftouter (entityData | where firstSeenSet == 'trainSet' | extend entityFirstSeens = strcat(entity, ' : ', format_datetime(firstSeenEntity, 'yyyy-MM-dd HH:mm')) | sort by scope, firstSeenEntity asc | summarize anomalyState = make_list(entityFirstSeens) by scope) on scope
    | project-away scope1
);
resultsData
};
// synthetic data generation
let detectPeriodStart   = datetime(2022-04-30 05:00:00.0000000);
let trainPeriodStart    = datetime(2022-03-01 05:00);
let names               = pack_array("Admin", "Dev1", "Dev2", "IT-support");
let countNames          = array_length(names);
let testData            = range t from 1 to 24*60 step 1
    | extend timeSlice      = trainPeriodStart + 1h * t
    | extend countEvents    = round(2*rand() + iff((t/24)%7>=5, 10.0, 15.0) - (((t%24)/10)*((t%24)/10)), 2) * 100 // generate a series with weekly seasonality
    | extend userName       = tostring(names[toint(rand(countNames))])
    | extend deviceId       = hash_md5(rand())
    | extend accountName    = iff(((rand() < 0.2) and (timeSlice < detectPeriodStart)), 'testEnvironment', 'prodEnvironment')
    | extend userName       = iff(timeSlice == detectPeriodStart, 'H4ck3r', userName)
    | extend deviceId       = iff(timeSlice == detectPeriodStart, 'abcdefghijklmnoprtuvwxyz012345678', deviceId)
    | sort by timeSlice desc
;
testData
| invoke detect_anomalous_new_entity_fl(entityColumnName    = 'userName'  //principalName for positive, deviceId for negative
                                , scopeColumnName           = 'accountName'
                                , timeColumnName            = 'timeSlice'
                                , startTraining             = trainPeriodStart
                                , startDetection            = detectPeriodStart
                                , endDetection              = detectPeriodStart
                            )

utdata

omfattning enhet sliceTime t timeSlice countEvents userName deviceId accountName dataset firstSeenSetOnScope newEntityProbability countKnownEntities lastNewEntityTimestamp slicesOnScope newEntityAnomalyScore isAnomalousNewEntity anomalyType anomalyExplainability anomalyState
prodEnvironment H4ck3r 2022-04-30 05:00:00.0000000 1440 2022-04-30 05:00:00.0000000 1687 H4ck3r abcdefghijklmnoprtuvwxyz012345678 prodEnvironment detectSet trainSet 0.0031 4 2022-03-01 09:00:00.0000000 60 0.9969 1 newEntity_userName UserName H4ck3r sågs inte på accountName prodEnvironment under de senaste 60 dagarna. Tidigare sågs fyra entiteter, varav den sista visades 2022-03-01 09:00. ["IT-support: 2022-03-01 07:00", "Admin : 2022-03-01 08:00", "Dev2 : 2022-03-01 09:00", "Dev1 : 2022-03-01 14:00"]

Utdata från att köra funktionen är den första raden i testdatauppsättningen för varje entitet per omfång, filtrerad efter nya entiteter (vilket innebär att de inte visades under träningsperioden) som taggades som avvikande (vilket innebär att entitetens avvikelsepoäng var över anomalyScoreThresh). Några andra fält läggs till för tydlighetens skull:

  • dataSet: aktuell datauppsättning (är alltid detectSet).
  • firstSeenSetOnScope: datauppsättning där omfånget först sågs (bör vara "trainSet").
  • newEntityProbability: sannolikhet att se en ny entitet baserat på Poisson-modelluppskattning.
  • countKnownEntities: befintliga entiteter i omfånget.
  • lastNewEntityTimestamp: förra gången en ny entitet sågs före den avvikande.
  • slicesOnScope: antal sektorer per omfång.
  • newEntityAnomalyScore: avvikelsepoäng var den nya entiteten i intervallet [0, 1], högre värden som betyder mer avvikelse.
  • isAnomalousNewEntity: binär flagga för avvikande nya entiteter
  • anomalyType: visar typen av avvikelse (användbar när du kör flera avvikelseidentifieringslogik tillsammans).
  • anomalyExplainability: textomslutning för genererad avvikelse och dess förklaring.
  • anomalyState: påse med befintliga entiteter i omfånget med deras första sedda tider.

Om du kör den här funktionen på användare per konto med standardparametrar identifieras en tidigare osynlig och avvikande användare ("H4ck3r") med hög avvikelsepoäng på 0,9969, vilket innebär att detta är oväntat (på grund av ett litet antal befintliga användare under träningsperioden).

När vi kör funktionen med standardparametrar på deviceId som entitet ser vi ingen avvikelse på grund av det stora antalet befintliga enheter , vilket gör att ett nytt förväntas. Men om vi sänker parameteravvikelsenScoreThresh till 0,0001 och höjer parametern maxEntitiesThresh till 10000, minskar vi effektivt precisionen till förmån för återkallande och identifierar en avvikelse (med låg avvikelsepoäng) på enheten "abcdefghijklmnoprtuvwxyz012345678".

Utdata visar avvikande entiteter tillsammans med förklaringsfält i standardiserat format. De här fälten är användbara för att undersöka avvikelsen och för att köra avvikande entitetsidentifiering på flera entiteter eller köra andra algoritmer tillsammans.

Den föreslagna användningen i cybersäkerhetskontexten kör funktionen på meningsfulla entiteter , till exempel användarnamn eller IP-adresser , enligt meningsfulla omfång, till exempel prenumeration på konton. En identifierad avvikande ny entitet innebär att dess utseende inte förväntas i omfånget och kan vara misstänkt.