本文介绍如何在 Unity 目录中或旧工作区功能存储中使用特征工程训练模型。 必须先创建训练数据集,该数据集定义要使用的特征以及如何联接它们。 然后,在训练模型时,该模型将保留对这些特征的引用。
使用 Unity Catalog 中的特征工程训练模型时,可以在目录资源管理器中查看模型的世系。 自动跟踪和显示用于创建模型的表和函数。 请参阅特征治理和世系。
将模型用于推理时,可以选择让该模型从特征存储中检索特征值。 还可以使用模型服务来处理模型,它会自动查找发布到联机存储的特征。 特征存储模型还与 MLflow pyfunc 接口兼容,因此可以使用 MLflow 对特征表执行批量推理。
如果模型使用环境变量,请详细了解如何在联机为模型提供服务时使用这些变量,详见配置对来自模型服务终结点的资源的访问权限。
一个模型最多可以使用 50 个表和 100 个函数进行训练。
创建训练数据集
若要从特征表中选择特定的特征用于模型训练,可以使用 FeatureEngineeringClient.create_training_set(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient.create_training_set(适用于工作区特征存储)API 和名为 FeatureLookup 的对象创建训练数据集。 
              FeatureLookup 指定要在训练集中使用的每个特征,包括特征表的名称、特征的名称,以及在将特征表与传递给 create_training_set 的数据帧联接时要使用的键。 有关详细信息,请参阅特征查找。
创建 feature_names 时,请使用 FeatureLookup 参数。
              feature_names 取值为单个特征名称、特征名称列表,或取值 None,表示在创建训练集时查找特征表中的所有特征(不包括主键)。
注意
该数据帧中 lookup_key 列的类型和顺序必须与引用特征表的主键(时间戳键除外)的类型和顺序相匹配。
本文包含了上述两个语法版本的代码示例。
在此示例中,trainingSet.load_df 返回的数据帧包含 feature_lookups 中每个特征对应的一列。 它会保留提供给的 create_training_set 的数据帧的所有列,但使用 exclude_columns 排除的列除外。
Unity Catalog 中的特征工程
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key='customer_id'
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]
fe = FeatureEngineeringClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fe.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df()
工作区特征存储
from databricks.feature_store import FeatureLookup, FeatureStoreClient
# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key='customer_id'
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]
fs = FeatureStoreClient()
# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id', 'product_id']
)
training_df = training_set.load_df()
当查找键与主键不匹配时创建训练集
将 lookup_key 中的参数 FeatureLookup 用于训练集中的列名。 
              create_training_set 使用创建特征表时指定主键的顺序,在 lookup_key 参数中指定的训练集内的列之间执行有序联接。
在此示例中,recommender_system.customer_features 具有以下主键:customer_id、dt。
              recommender_system.product_features 特征表具有主键 product_id。
如果 training_df 包含以下列:
- cid
- transaction_dt
- product_id
- rating
则以下代码将为 TrainingSet 创建正确的特征查找:
Unity Catalog 中的特征工程
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key=['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]
工作区特征存储
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key=['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]
调用 create_training_set 时,它会通过执行左联接创建一个训练数据集。该左联接操作使用对应于 (recommender_system.customer_features,training_df) 的键 (customer_id,dt) 来联接 cid 和 transaction_dt 表,如以下代码中所示:
Unity Catalog 中的特征工程
customer_features_df = spark.sql("SELECT * FROM ml.recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM ml.recommender_system.product_features")
training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)
工作区特征存储
customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")
training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)
创建包含来自不同特征表的两个同名特征的训练集
在 output_name 中使用可选参数 FeatureLookup。 提供的名称用于取代 TrainingSet.load_df 返回的数据帧中的特征名称。 例如,在以下代码中,training_set.load_df 返回的数据帧包含 customer_height 和 product_height 列。
Unity Catalog 中的特征工程
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['height'],
      lookup_key='customer_id',
      output_name='customer_height',
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['height'],
      lookup_key='product_id',
      output_name='product_height'
    ),
  ]
fe = FeatureEngineeringClient()
with mlflow.start_run():
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()
工作区特征存储
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['height'],
      lookup_key='customer_id',
      output_name='customer_height',
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['height'],
      lookup_key='product_id',
      output_name='product_height'
    ),
  ]
fs = FeatureStoreClient()
with mlflow.start_run():
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()
多次使用相同特征创建训练集
若要使用由不同查找键联接的相同特征创建训练集,请使用多个 FeatureLookup。
为每个 FeatureLookup 输出使用唯一 output_name。
Unity Catalog 中的特征工程
feature_lookups = [
    FeatureLookup(
      table_name='ml.taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['pickup_zip'],
      output_name='pickup_temp'
    ),
    FeatureLookup(
      table_name='ml.taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['dropoff_zip'],
      output_name='dropoff_temp'
    )
  ]
工作区特征存储
feature_lookups = [
    FeatureLookup(
      table_name='taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['pickup_zip'],
      output_name='pickup_temp'
    ),
    FeatureLookup(
      table_name='taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['dropoff_zip'],
      output_name='dropoff_temp'
    )
  ]
为非监督式机器学习模型创建一个训练集
在为非监督式学习模型创建训练集时设置 label=None。 例如,以下训练集可用于根据不同客户的兴趣将不同的客户分组:
Unity Catalog 中的特征工程
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['interests'],
      lookup_key='customer_id',
    ),
  ]
fe = FeatureEngineeringClient()
with mlflow.start_run():
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label=None,
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()
工作区特征存储
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['interests'],
      lookup_key='customer_id',
    ),
  ]
fs = FeatureStoreClient()
with mlflow.start_run():
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label=None,
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()
在使用视图作为特征表时创建 TrainingSet
若要将视图用作功能表,必须使用 databricks-feature-engineering 版本 0.7.0 或更高版本,该版本内置于 Databricks Runtime 16.0 ML 中。
该视图必须是源 Delta 表中的简单 SELECT 视图。 简单的 SELECT 视图定义为从 Unity 目录中的单个 Delta 表创建的视图,该表可用作特征表,其主键在没有 JOIN、GROUP BY或 DISTINCT 子句的情况下被选定。 SQL 语句中的可接受关键字是 SELECT、FROM、WHERE、ORDER BY、LIMIT和 OFFSET。
在以下示例中, ml.recommender_system.customer_table 具有主键 cid , dt其中是 dt 时序列。 该示例假定数据帧 training_df 具有列 cid、dt和 label:
from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup
customer_features_df = spark.sql("CREATE OR REPLACE VIEW ml.recommender_system.customer_features AS SELECT cid, dt, pid, rating FROM ml.recommender_system.customer_table WHERE rating > 3")
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['pid', 'rating'],
      lookup_key=['cid'],
      timestamp_lookup_key='dt'
    ),
]
fe = FeatureEngineeringClient()
training_set = fe.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='label'
)
training_df = training_set.load_df()
使用默认值创建训练集
创建训练数据集时,可以为特征指定默认值,以便在特征存储没有为某个 ID 计算特征值的情况下进行处理。
若要指定默认值,请使用 default_values . 中的 FeatureLookup参数。
以下示例演示如何为一组功能指定默认值:
feature_lookups = [
    FeatureLookup(
        table_name="ml.recommender_system.customer_features",
        feature_names=[
            "membership_tier",
            "age",
            "page_views_count_30days",
        ],
        lookup_key="customer_id",
        default_values={
          "age": 18,
          "membership_tier": "bronze"
        },
    ),
]
如果使用 rename_outputs 参数重命名特征列,则 default_values 必须使用重命名后的特征名称。
FeatureLookup(
  table_name = 'main.default.table',
  feature_names = ['materialized_feature_value'],
  lookup_key = 'id',
  rename_outputs={"materialized_feature_value": "feature_value"},
  default_values={
    "feature_value": 0
  }
)
训练模型并使用特征表执行批量推理
使用特征存储中的特征训练模型时,该模型将保留对这些特征的引用。 将模型用于推理时,可以选择让该模型从特征存储中检索特征值。 必须提供模型中使用的特征的主键。 模型从工作区中的特征存储检索所需的特征。 然后,它会在评分期间根据需要联接特征值。
若要在推理时支持特征查找:
- 必须使用 log_model(适用于 Unity Catalog 中的特征工程)或FeatureEngineeringClient(适用于工作区特征存储)的FeatureStoreClient方法来记录模型。
- 必须使用 TrainingSet.load_df返回的数据帧训练模型。 如果在使用此数据帧训练模型之前以任何方式对其进行修改,在将该模型用于推理时将不会应用这些修改。 这会降低模型的性能。
- 模型类型在 MLflow 中必须有对应的 python_flavor。 MLflow 支持大多数 Python 模型训练框架,包括:- scikit-learn
- keras
- PyTorch
- SparkML
- LightGBM
- XGBoost
- TensorFlow Keras(使用 python_flavormlflow.keras)
 
- 自定义的 MLflow pyfunc 模型
Unity Catalog 中的特征工程
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]
fe = FeatureEngineeringClient()
with mlflow.start_run():
  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )
  training_df = training_set.load_df().toPandas()
  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating
  model = linear_model.LinearRegression().fit(X_train, y_train)
  fe.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fe = FeatureEngineeringClient()
# batch_df has columns 'customer_id' and 'product_id'
predictions = fe.score_batch(
    model_uri=model_uri,
    df=batch_df
)
# The 'predictions' DataFrame has these columns:
# 'customer_id', 'product_id', 'total_purchases_30d', 'category', 'prediction'
工作区特征存储
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]
fs = FeatureStoreClient()
with mlflow.start_run():
  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )
  training_df = training_set.load_df().toPandas()
  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating
  model = linear_model.LinearRegression().fit(X_train, y_train)
  fs.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )
# Batch inference
# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.
fs = FeatureStoreClient()
# batch_df has columns 'customer_id' and 'product_id'
predictions = fs.score_batch(
    model_uri=model_uri,
    df=batch_df
)
# The 'predictions' DataFrame has these columns:
# 'customer_id', 'product_id', 'total_purchases_30d', 'category', 'prediction'
对随特征元数据一起打包的模型评分时使用自定义特征值
默认情况下,随特征元数据一起打包的模型在推理时将从特征表中查找特征。 若要使用自定义特征值进行评分,请在传递给 FeatureEngineeringClient.score_batch(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient.score_batch(适用于工作区特征存储)的数据帧中包含它们。
例如,假设你要将模型随以下两个特征一起打包:
Unity Catalog 中的特征工程
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['account_creation_date', 'num_lifetime_purchases'],
      lookup_key='customer_id',
    ),
  ]
工作区特征存储
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['account_creation_date', 'num_lifetime_purchases'],
      lookup_key='customer_id',
    ),
  ]
在推理时,可以通过对包含 account_creation_date 列的数据帧调用 score_batch,来为特征 account_creation_date 提供自定义值。 在这种情况下,API 只会在特征存储中查找 num_lifetime_purchases 特征,并使用提供的自定义 account_creation_date 列值进行模型评分。
Unity Catalog 中的特征工程
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
  model_uri='models:/ban_prediction_model/1',
  df=batch_df
)
工作区特征存储
# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
  model_uri='models:/ban_prediction_model/1',
  df=batch_df
)
结合使用特征存储特征和驻留在特征存储外部的数据对模型进行训练和评分
可以结合使用特征存储特征和特征存储外部的数据对模型进行训练和评分。 将模型随特征元数据一起打包时,该模型将从特征存储中检索特征值用于推理。
若要训练模型,请将额外数据作为列包含在传递给 FeatureEngineeringClient.create_training_set(适用于 Unity Catalog 中的特征工程)或 FeatureStoreClient.create_training_set(适用于工作区特征存储)的数据帧中。 此示例使用特征存储中的特征 total_purchases_30d 以及外部列 browser。
Unity Catalog 中的特征工程
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
  ]
fe = FeatureEngineeringClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fe.create_training_set(
  df=df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id']  # 'browser' is not excluded
)
工作区特征存储
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
  ]
fs = FeatureStoreClient()
# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
  df=df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id']  # 'browser' is not excluded
)
在推理时,FeatureStoreClient.score_batch 中使用的数据帧必须包含 browser 列。
Unity Catalog 中的特征工程
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fe.score_batch(
  model_uri=model_uri,
  df=batch_df
)
工作区特征存储
# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
  model_uri=model_uri,
  df=batch_df
)
使用 MLflow 加载模型并执行批量推理
使用 log_model(适用于 Unity Catalog 中的特征工程)或 FeatureEngineeringClient(适用于工作区特征存储)的 FeatureStoreClient 方法来记录模型日志后,可以在推理时使用 MLflow。 
              mlflow.pyfunc.predict 从特征存储中检索特征值,并联接推理时提供的任何值。 必须提供模型中使用的特征的主键。
注意
使用 MLflow 进行批量推理需要 MLflow 2.11 及更高版本。 使用时序特征表的模型不受支持。 若要使用时序特征表进行批量推理,请使用 score_batch。 请参阅训练模型并使用特征表执行批量推理。
# Train model
import mlflow
from sklearn import linear_model
feature_lookups = [
  FeatureLookup(
    table_name='ml.recommender_system.customer_features',
    feature_names=['total_purchases_30d'],
    lookup_key='customer_id',
  ),
  FeatureLookup(
    table_name='ml.recommender_system.product_features',
    feature_names=['category'],
    lookup_key='product_id'
  )
]
fe = FeatureEngineeringClient()
with mlflow.start_run():
  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )
  training_df = training_set.load_df().toPandas()
  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating
  model = linear_model.LinearRegression().fit(X_train, y_train)
  fe.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model",
    #refers to the default value of "result_type" if not provided at inference
    params={"result_type":"double"},
  )
# Batch inference with MLflow
# NOTE: the result_type parameter can only be used if a default value
# is provided in log_model. This is automatically done for all models
# logged using Databricks Runtime for ML 15.0 or above.
# For earlier Databricks Runtime versions, use set_result as shown below.
# batch_df has columns 'customer_id' and 'product_id'
model = mlflow.pyfunc.load_model(model_version_uri)
# If result_type parameter is provided in log_model
predictions = model.predict(df, {"result_type":"double"})
# If result_type parameter is NOT provided in log_model
model._model_impl.set_result_type("double")
predictions = model.predict(df)
处理缺失特征值
将不存在的查找键传递给模型进行预测时,FeatureLookup 提取的特征值可能是 None 或 NaN,具体取决于环境。 模型实现应能够处理这两个值。
- 对于使用 fe.score_batch的脱机应用程序,缺失特征的返回值为NaN。
- 对于使用“模型服务”的在线应用程序,返回值可能是 None或NaN:- 如果提供的所有查找键均不存在,则值为 None。
- 如果仅有一部分查找键不存在,则值为 NaN。
 
- 如果提供的所有查找键均不存在,则值为 
若要在使用按需功能时处理缺失特征值,请参阅如何处理缺失特征值。
示例笔记本
基本笔记本演示如何创建特征表、使用它来训练模型,以及如何使用自动功能查找运行批量评分。 它还显示了功能工程 UI,可用于搜索特征,并了解功能的创建和使用方式。
Unity Catalog 示例笔记本中的基本功能设计
出租车示例笔记本演示了创建特征、更新特征并将特征用于模型训练和批量推理的过程。