Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Distributed training of XGBoost models using
Important
This feature is in Public Preview.
The Python package xgboost>=1.7 contains a new module xgboost.spark. This module includes the xgboost PySpark estimators xgboost.spark.SparkXGBRegressor, xgboost.spark.SparkXGBClassifier, and xgboost.spark.SparkXGBRanker. These new classes support the inclusion of XGBoost estimators in SparkML Pipelines. For API details, see the XGBoost python spark API doc.
Requirements
Databricks Runtime 12.0 ML and above.
xgboost.spark parameters
The estimators defined in the xgboost.spark module support most of the same parameters and arguments used in standard XGBoost.
- The parameters for the class constructor,
fitmethod, andpredictmethod are largely identical to those in thexgboost.sklearnmodule. - Naming, values, and defaults are mostly identical to those described in XGBoost parameters.
- Exceptions are a few unsupported parameters (such as
gpu_id,nthread,sample_weight,eval_set), and thepysparkestimator specific parameters that have been added (such asfeaturesCol,labelCol,use_gpu,validationIndicatorCol). For details, see XGBoost Python Spark API documentation.
Distributed training
PySpark estimators defined in the xgboost.spark module support distributed XGBoost training using the num_workers parameter. To use distributed training, create a classifier or regressor and set num_workers to the number of concurrent running Spark tasks during distributed training. To use the all Spark task slots, set num_workers=sc.defaultParallelism.
For example:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism)
Note
- You cannot use
mlflow.xgboost.autologwith distributed XGBoost. To log an xgboost Spark model using MLflow, usemlflow.spark.log_model(spark_xgb_model, artifact_path). - You cannot use distributed XGBoost on a cluster that has autoscaling enabled. New worker nodes that start in this elastic scaling paradigm cannot receive new sets of tasks and remain idle. For instructions to disable autoscaling, see Enable autoscaling.
Enable optimization for training on sparse features dataset
PySpark Estimators defined in xgboost.spark module support optimization for training on datasets with sparse features.
To enable optimization of sparse feature sets, you need to provide a dataset to the fit method that contains a features column consisting of values of type pyspark.ml.linalg.SparseVector and set the estimator parameter enable_sparse_data_optim to True. Additionally, you need to set the missing parameter to 0.0.
For example:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(enable_sparse_data_optim=True, missing=0.0)
classifier.fit(dataset_with_sparse_features_col)
GPU training
PySpark estimators defined in the xgboost.spark module support training on GPUs. Set the parameter use_gpu to True to enable GPU training.
Note
For each Spark task used in XGBoost distributed training, only one GPU is used in training when the use_gpu argument is set to True. Databricks recommends using the default value of 1 for the Spark cluster configuration spark.task.resource.gpu.amount. Otherwise, the additional GPUs allocated to this Spark task are idle.
For example:
from xgboost.spark import SparkXGBClassifier
classifier = SparkXGBClassifier(num_workers=sc.defaultParallelism, use_gpu=True)
Troubleshooting
During multi-node training, if you encounter a NCCL failure: remote process exited or there was a network error message, it typically indicates a problem with network communication among GPUs. This issue arises when NCCL (NVIDIA Collective Communications Library) cannot use certain network interfaces for GPU communication.
To resolve, set the cluster's sparkConf for spark.executorEnv.NCCL_SOCKET_IFNAME to eth. This essentially sets the environment variable NCCL_SOCKET_IFNAME to eth for all of the workers in a node.
Example notebook
This notebook shows the use of the Python package xgboost.spark with Spark MLlib.
PySpark-XGBoost notebook
Migration guide for the deprecated sparkdl.xgboost module
- Replace
from sparkdl.xgboost import XgboostRegressorwithfrom xgboost.spark import SparkXGBRegressorand replacefrom sparkdl.xgboost import XgboostClassifierwithfrom xgboost.spark import SparkXGBClassifier. - Change all parameter names in the estimator constructor from camelCase style to snake_case style. For example, change
XgboostRegressor(featuresCol=XXX)toSparkXGBRegressor(features_col=XXX). - The parameters
use_external_storageandexternal_storage_precisionhave been removed.xgboost.sparkestimators use the DMatrix data iteration API to use memory more efficiently. There is no longer a need to use the inefficient external storage mode. For extremely large datasets, Databricks recommends that you increase thenum_workersparameter, which makes each training task partition the data into smaller, more manageable data partitions. Consider settingnum_workers = sc.defaultParallelism, which setsnum_workersto the total number of Spark task slots in the cluster. - For estimators defined in
xgboost.spark, settingnum_workers=1executes model training using a single Spark task. This utilizes the number of CPU cores specified by the Spark cluster configuration settingspark.task.cpus, which is 1 by default. To use more CPU cores to train the model, increasenum_workersorspark.task.cpus. You cannot set thenthreadorn_jobsparameter for estimators defined inxgboost.spark. This behavior is different from the previous behavior of estimators defined in the deprecatedsparkdl.xgboostpackage.
Convert sparkdl.xgboost model into xgboost.spark model
sparkdl.xgboost models are saved in a different format than xgboost.spark models and have
different parameter settings. Use the following
utility function to convert the model:
def convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls,
sparkdl_xgboost_model,
):
"""
:param xgboost_spark_estimator_cls:
`xgboost.spark` estimator class, e.g. `xgboost.spark.SparkXGBRegressor`
:param sparkdl_xgboost_model:
`sparkdl.xgboost` model instance e.g. the instance of
`sparkdl.xgboost.XgboostRegressorModel` type.
:return
A `xgboost.spark` model instance
"""
def convert_param_key(key):
from xgboost.spark.core import _inverse_pyspark_param_alias_map
if key == "baseMarginCol":
return "base_margin_col"
if key in _inverse_pyspark_param_alias_map:
return _inverse_pyspark_param_alias_map[key]
if key in ['use_external_storage', 'external_storage_precision', 'nthread', 'n_jobs', 'base_margin_eval_set']:
return None
return key
xgboost_spark_params_dict = {}
for param in sparkdl_xgboost_model.params:
if param.name == "arbitraryParamsDict":
continue
if sparkdl_xgboost_model.isDefined(param):
xgboost_spark_params_dict[param.name] = sparkdl_xgboost_model.getOrDefault(param)
xgboost_spark_params_dict.update(sparkdl_xgboost_model.getOrDefault("arbitraryParamsDict"))
xgboost_spark_params_dict = {
convert_param_key(k): v
for k, v in xgboost_spark_params_dict.items()
if convert_param_key(k) is not None
}
booster = sparkdl_xgboost_model.get_booster()
booster_bytes = booster.save_raw("json")
booster_config = booster.save_config()
estimator = xgboost_spark_estimator_cls(**xgboost_spark_params_dict)
sklearn_model = estimator._convert_to_sklearn_model(booster_bytes, booster_config)
return estimator._copyValues(estimator._create_pyspark_model(sklearn_model))
# Example
from xgboost.spark import SparkXGBRegressor
new_model = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=model,
)
If you have a pyspark.ml.PipelineModel model containing a sparkdl.xgboost model as the
last stage, you can replace the stage of sparkdl.xgboost model with
the converted xgboost.spark model.
pipeline_model.stages[-1] = convert_sparkdl_model_to_xgboost_spark_model(
xgboost_spark_estimator_cls=SparkXGBRegressor,
sparkdl_xgboost_model=pipeline_model.stages[-1],
)