本页介绍如何识别和删除空矢量搜索终结点。 由于矢量搜索终结点是特定于工作区的资源,因此需要单独为每个工作区重复此过程。
要求
- 用于 Python 的 Databricks SDK (
databricks-sdk)。 - Databricks 矢量搜索 Python SDK (
databricks-vectorsearch)。 - 已配置身份验证(OAuth、PAT 或配置文件)。
-
CAN_MANAGE目标工作区中矢量搜索终结点的权限。
若要在 Databricks 笔记本或本地 Python 环境中安装所需的 SDK,请执行以下作:
# In a Databricks notebook
%pip install databricks-sdk databricks-vectorsearch
# In local Python environment
# pip install databricks-sdk databricks-vectorsearch
标识空终结点
在 Databricks UI 中,矢量搜索终结点显示在“计算”屏幕的“矢量搜索”选项卡上。 切换 空终结点 复选框以显示没有与其关联的索引的终结点。 空终结点还标有警告三角形图标,如下所示。
Authentication
本部分介绍身份验证选项。
选项 1. 在 Databricks 笔记本中运行
在 Databricks 工作区笔记本中运行代码时,身份验证是自动进行的:
from databricks.vector_search.client import VectorSearchClient
# Credentials are picked up automatically from notebook context
client = VectorSearchClient()
选项 2. 个人访问令牌 (PAT)
对于外部环境,请提供显式凭据:
from databricks.vector_search.client import VectorSearchClient
client = VectorSearchClient(
workspace_url="https://<your-instance>.cloud.databricks.com",
personal_access_token="dapiXXXXXXXXXXXXXXXXXXXXXXXX"
)
选项 3. 使用配置文件(建议用于多个工作区)
在主目录中创建一个 .databrickscfg 文件,并包括每个工作区的配置文件:
[DEFAULT]
host = https://workspace1.cloud.databricks.com
token = dapiXXXXXXXXXXXXXXXXXXXXXXXX
[PRODUCTION]
host = https://workspace2.cloud.databricks.com
token = dapiYYYYYYYYYYYYYYYYYYYYYYYY
[DEVELOPMENT]
host = https://workspace3.cloud.databricks.com
token = dapiZZZZZZZZZZZZZZZZZZZZZZZZ
如果不想使用配置文件,可以直接指定凭据:
# Define workspaces with explicit credentials
workspace_configs = [
{
'workspace_url': 'https://workspace1.cloud.databricks.com',
'token': 'dapiXXXXXXXXXXXXXXXXXXXXXXXX'
},
{
'workspace_url': 'https://workspace2.cloud.databricks.com',
'token': 'dapiYYYYYYYYYYYYYYYYYYYYYYYY'
}
]
# Run cleanup, set `dry_run=False` to perform actual deletion
results = cleanup_multiple_workspaces(workspace_configs, dry_run=True)
删除单个工作区中的终结点
矢量搜索终结点特定于工作区。 下面是一个基本脚本,用于查找和删除单个工作区中的空终结点。 若要清理多个工作区中的空终结点,请参阅 跨多个工作区删除终结点。
重要
终结点删除不可逆。 使用此选项 dry_run=True 可查看将删除的终结点的列表。 确认列表正确后,使用 dry_run=False.. 运行脚本。
from databricks.vector_search.client import VectorSearchClient
def cleanup_empty_endpoints(client, dry_run=True):
"""
Find and delete empty Vector Search endpoints.
Args:
client: VectorSearchClient instance
dry_run: If True, only print what would be deleted without actually deleting
Returns:
List of deleted endpoint names
"""
deleted_endpoints = []
# List all Vector Search endpoints
endpoints = client.list_endpoints()
for endpoint in endpoints["endpoints"]:
# List indexes in this endpoint
indexes = list(client.list_indexes(name=endpoint["name"])['vector_indexes'])
if len(indexes) == 0:
if dry_run:
print(f"[DRY RUN] Would delete empty endpoint: '{endpoint["name"]}'")
else:
print(f"Deleting empty endpoint: '{endpoint["name"]}'")
try:
client.delete_endpoint(endpoint["name"])
deleted_endpoints.append(endpoint["name"])
print(f"✓ Successfully deleted: {endpoint["name"]}")
except Exception as e:
print(f"✗ Failed to delete {endpoint["name"]}: {str(e)}")
else:
print(f"Endpoint '{endpoint["name"]}' has {len(indexes)} indexes - keeping")
return deleted_endpoints
# Example usage
client = VectorSearchClient() # Uses default authentication
# Set `dry_run=False` when you are ready to delete endpoints
deleted = cleanup_empty_endpoints(client, dry_run=True)
print(f"\nTotal endpoints deleted: {len(deleted)}")
跨多个工作区删除终结点
若要跨多个工作区清理空终结点,请循环访问配置文件:
重要
终结点删除不可逆。 使用此选项
dry_run=True可查看将删除的终结点的列表。 确认列表正确后,使用dry_run=False.. 运行脚本。处理许多工作区时,请注意 API 速率限制。 如有必要,请添加延迟:
import time for config in workspace_configs: # Set `dry_run=False` to perform actual deletion result = cleanup_workspace(**config, dry_run=True) time.sleep(2) # Add delay between workspaces
from databricks.sdk import WorkspaceClient
from databricks.vector_search.client import VectorSearchClient
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def cleanup_workspace(profile_name=None, workspace_url=None, token=None, dry_run=True):
"""
Clean up empty endpoints in a specific workspace.
Args:
profile_name: Name of configuration profile to use
workspace_url: Direct workspace URL (if not using profile)
token: PAT token (if not using profile)
dry_run: If True, only show what would be deleted
Returns:
Dict with cleanup results
"""
try:
# Initialize client based on authentication method
if profile_name:
# Use Databricks SDK to get credentials from profile
w = WorkspaceClient(profile=profile_name)
workspace_url = w.config.host
client = VectorSearchClient(
workspace_url=workspace_url,
personal_access_token=w.config.token
)
logger.info(f"Connected to workspace using profile '{profile_name}': {workspace_url}")
elif workspace_url and token:
client = VectorSearchClient(
workspace_url=workspace_url,
personal_access_token=token
)
logger.info(f"Connected to workspace: {workspace_url}")
else:
# Use default authentication (notebook context)
client = VectorSearchClient()
logger.info("Connected using default authentication")
# Perform cleanup
deleted = cleanup_empty_endpoints(client, dry_run=dry_run)
return {
'workspace': workspace_url or 'default',
'success': True,
'deleted_count': len(deleted),
'deleted_endpoints': deleted
}
except Exception as e:
logger.error(f"Failed to process workspace: {str(e)}")
return {
'workspace': workspace_url or profile_name or 'default',
'success': False,
'error': str(e)
}
def cleanup_multiple_workspaces(workspace_configs, dry_run=True):
"""
Clean up empty endpoints across multiple workspaces.
Args:
workspace_configs: List of workspace configurations
dry_run: If True, only show what would be deleted
Returns:
Summary of cleanup results
"""
results = []
for config in workspace_configs:
logger.info(f"\n{'='*60}")
result = cleanup_workspace(**config, dry_run=dry_run)
results.append(result)
logger.info(f"{'='*60}\n")
# Print summary
total_deleted = sum(r['deleted_count'] for r in results if r['success'])
successful = sum(1 for r in results if r['success'])
failed = sum(1 for r in results if not r['success'])
logger.info("\n" + "="*60)
logger.info("CLEANUP SUMMARY")
logger.info("="*60)
logger.info(f"Workspaces processed: {len(results)}")
logger.info(f"Successful: {successful}")
logger.info(f"Failed: {failed}")
logger.info(f"Total endpoints deleted: {total_deleted}")
if failed > 0:
logger.warning("\nFailed workspaces:")
for r in results:
if not r['success']:
logger.warning(f" - {r['workspace']}: {r['error']}")
return results
# Example: Clean up using configuration profiles
workspace_configs = [
{'profile_name': 'DEFAULT'},
{'profile_name': 'PRODUCTION'},
{'profile_name': 'DEVELOPMENT'}
]
# Set `dry_run=False` to do actual deletion.
results = cleanup_multiple_workspaces(workspace_configs, dry_run=True)
自定义筛选
可以添加自定义逻辑以从删除中删除某些终结点,如下所示:
def should_delete_endpoint(endpoint, indexes):
"""
Custom logic to determine if an endpoint should be deleted.
Args:
endpoint: Endpoint object
indexes: List of indexes in the endpoint
Returns:
Boolean indicating if endpoint should be deleted
"""
# Don't delete if it has indexes
if len(indexes) > 0:
return False
# Don't delete endpoints with specific naming patterns
protected_patterns = ['prod-', 'critical-', 'do-not-delete']
for pattern in protected_patterns:
if pattern in endpoint.name.lower():
logger.warning(f"Skipping protected endpoint: {endpoint.name}")
return False
# Add more custom logic as needed
return True
导出结果
若要将清理结果保存到用于审核的文件,请执行以下作:
import json
from datetime import datetime
def export_results(results, filename=None):
"""Export cleanup results to JSON file."""
if not filename:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'vector_search_cleanup_{timestamp}.json'
with open(filename, 'w') as f:
json.dump({
'timestamp': datetime.now().isoformat(),
'results': results
}, f, indent=2)
logger.info(f"Results exported to: {filename}")
Troubleshooting
身份验证问题
- 验证 PAT 令牌是否有效且未过期。
- 确保正确设置配置文件的格式。
- 检查令牌是否具有必要的权限。
权限错误
验证用户或服务主体是否具有 CAN_MANAGE 向量搜索终结点的权限。
网络问题
对于具有代理要求的环境,请适当配置 SDK:
import os
os.environ['HTTPS_PROXY'] = 'http://your-proxy:po
后续步骤
- 计划此脚本以使用 Lakeflow 作业定期运行。
- 与基础结构即代码管道集成。
- 添加用于清理摘要的电子邮件或 Slack 通知。
- 创建仪表板以跟踪跨工作区的终结点使用情况。