Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
ibm
GitHub Repository: ibm/watson-machine-learning-samples
Path: blob/master/cloud/notebooks/python_sdk/deployments/ai_services/Use watsonx to run generate_batch job using AI service.ipynb
6405 views
Kernel: test_env

image

Use watsonx to run generate_batch job using AI service

Disclaimers

  • Use only Projects and Spaces that are available in watsonx context.

Notebook content

This notebook provides a detailed demonstration of the steps and code required to showcase support for watsonx.ai AI service.

Some familiarity with Python is helpful. This notebook uses Python 3.11.

Learning goal

The primary objective of this notebook is to illustrate how to utilize watsonx.ai AI services to execute a generate_batch job, facilitating the ingestion of documents into a Milvus vector database.

Table of Contents

This notebook contains the following parts:

Set up the environment

Before you use the sample code in this notebook, you must perform the following setup tasks:

Install required packages

%pip install -U langchain_community | tail -n 1 %pip install -U "ibm_watsonx_ai>=1.3.20" | tail -n 1

Define the watsonx.ai credentials

Use the code cell below to define the watsonx.ai credentials that are required to work with watsonx Foundation Model inferencing.

Action: Provide the IBM Cloud user API key. For details, see Managing user API keys.

import getpass from ibm_watsonx_ai import Credentials credentials = Credentials( url="https://us-south.ml.cloud.ibm.com", api_key=getpass.getpass("Enter your watsonx.ai api key and hit enter: "), )

Working with spaces

You need to create a space that will be used for your work. If you do not have a space, you can use Deployment Spaces Dashboard to create one.

  • Click New Deployment Space

  • Create an empty space

  • Select Cloud Object Storage

  • Select watsonx.ai Runtime instance and press Create

  • Go to Manage tab

  • Copy Space GUID and paste it below

Tip: You can also use SDK to prepare the space for your work. More information can be found here.

Action: assign space ID below

import os try: space_id = os.environ["SPACE_ID"] except KeyError: space_id = input("Please enter your space_id (hit enter): ")

Create an instance of APIClient with authentication details.

from ibm_watsonx_ai import APIClient api_client = APIClient(credentials=credentials, space_id=space_id)

Create an embedding function for VectorStore

Note that you can feed a custom embedding function to be used by Milvus. The performance of Milvus may differ depending on the embedding model used.

Note: To list available embedding models use:

api_client.foundation_models.EmbeddingModels.show()
from ibm_watsonx_ai.foundation_models import Embeddings embedding_model_id = "ibm/slate-125m-english-rtrvr-v2" embeddings = Embeddings(model_id=embedding_model_id, api_client=api_client)

Set up connectivity information to Milvus

This notebook focuses on a self-managed Milvus cluster using IBM watsonx.data.

The following cell retrieves the Milvus username, password, host, and port from the environment (if available) and prompts you to provide them manually in case of failure.

You can provide a connection asset ID to read all required connection data from it. Before doing so, make sure that a connection asset was created in your space.

import os import getpass milvus_connection_id = ( input( "Provide connection asset ID in your space. Skip this, if you wish to type credentials by hand and hit enter: " ) or None ) if milvus_connection_id is None: try: username = os.environ["USERNAME"] except KeyError: username = input("Please enter your Milvus user name and hit enter: ") try: password = os.environ["PASSWORD"] except KeyError: password = getpass.getpass("Please enter your Milvus password and hit enter: ") try: host = os.environ["HOST"] except KeyError: host = input("Please enter your Milvus hostname and hit enter: ") try: port = os.environ["PORT"] except KeyError: port = input("Please enter your Milvus port number and hit enter: ") try: ssl = os.environ["SSL"] except: ssl = bool( input( "Please enter ('y'/anything) if your Milvus instance has SSL enabled. Skip if it is not: " ) ) # Create connection milvus_data_source_type_id = api_client.connections.get_datasource_type_uid_by_name( "milvus" ) details = api_client.connections.create( { api_client.connections.ConfigurationMetaNames.NAME: "Milvus Connection", api_client.connections.ConfigurationMetaNames.DESCRIPTION: "Connection created by the sample notebook", api_client.connections.ConfigurationMetaNames.DATASOURCE_TYPE: milvus_data_source_type_id, api_client.connections.ConfigurationMetaNames.PROPERTIES: { "host": host, "port": port, "username": username, "password": password, "ssl": ssl, }, } ) milvus_connection_id = api_client.connections.get_id(details)

Set up VectorStore with Milvus credentials

Create a VectorStore class that automatically detects the database type (in our case it will be Milvus) and allows us to add, search and delete documents.

It works as a wrapper for LangChain VectorStore classes. You can customize the settings as long as it is supported. Consult the LangChain documentation for more information about Milvus connector.

Provide the name of your Milvus index for subsequent operations:

index_name = input("Please enter Milvus index name and hit enter: ") print(f"{index_name=}")
index_name='example_milvus_index_name'
from ibm_watsonx_ai.foundation_models.extensions.rag.vector_stores import VectorStore vector_store = VectorStore( api_client=api_client, connection_id=milvus_connection_id, index_name=index_name, embeddings=embeddings, )

Verify if index in Milvus instance is empty.

Note: If collection is not empty you can use clear method on VectorStore object:

vector_store.clear()
vector_store.count()
0

References to input data

This example uses the ModelInference description from the ibm_watsonx_ai documentation.

from langchain_community.document_loaders import WebBaseLoader url = "https://ibm.github.io/watsonx-ai-python-sdk/fm_model_inference.html" docs = WebBaseLoader(url).load() model_inference_content = docs[0].page_content
import os document_filename = "ModelInference.txt" if not os.path.isfile(document_filename): with open(document_filename, "w") as file: file.write(model_inference_content)

Upload the input data to the space as a data asset.

document_asset_details = api_client.data_assets.create( name=document_filename, file_path=document_filename ) document_asset_id = api_client.data_assets.get_id(document_asset_details) document_asset_id
Creating data asset... SUCCESS
'45f7c1e3-0b04-439a-940e-c642e498213a'

Define a connection to the input data.

from ibm_watsonx_ai.helpers import DataConnection data_connection = DataConnection(data_asset_id=document_asset_id) data_connection.set_client(api_client=api_client)

Create input_data_references as a dict representation.

import json input_data_references = [data_connection.to_dict()] print(json.dumps(input_data_references, indent=2))
[ { "type": "data_asset", "location": { "href": "/v2/assets/45f7c1e3-0b04-439a-940e-c642e498213a?space_id=9f44cc2b-b3d0-4472-824e-4941afb1617b", "id": "45f7c1e3-0b04-439a-940e-c642e498213a" } } ]

Create AI service

Prepare function which will be deployed using AI service.

Please specify the default parameters that will be passed to the function.

def deployable_ai_service( context, url=None, embedding_model_id=None, milvus_connection_id=None, index_name=None, ): from ibm_watsonx_ai import APIClient from ibm_watsonx_ai import Credentials from ibm_watsonx_ai.helpers import DataConnection from ibm_watsonx_ai.foundation_models.embeddings import Embeddings from ibm_watsonx_ai.foundation_models.extensions.rag.vector_stores import ( VectorStore, ) from ibm_watsonx_ai.data_loaders.datasets.documents import DocumentsIterableDataset from ibm_watsonx_ai.foundation_models.extensions.rag.chunker.langchain_chunker import ( LangChainChunker, ) api_client = APIClient( credentials=Credentials(url=url, token=context.generate_token()), space_id=context.get_space_id(), ) print("Successfully initialized APIClient") embeddings = Embeddings(model_id=embedding_model_id, api_client=api_client) print("Successfully initialized Embeddings") def generate_batch( input_data_references: list[dict], output_data_reference: dict | None = None ) -> None: vector_store = VectorStore( api_client=api_client, connection_id=milvus_connection_id, index_name=index_name, embeddings=embeddings, ) print("Successfully initialized VectorStore") connections = [] for input_data_reference in input_data_references: connections.append(DataConnection.from_dict(input_data_reference)) dataset = DocumentsIterableDataset( connections=connections, enable_sampling=False, api_client=api_client ) print("Successfully initialized DocumentsIterableDataset") chunker = LangChainChunker( chunk_size=512, ) print("Successfully initialized LangChainChunker") documents = chunker.split_documents(dataset) print("Successfully splitted documents") vector_store.add_documents(documents) print("Documents added") vector_store_count = vector_store.count() print(f"Vector Store count: {vector_store_count}") return generate_batch

Testing AI service's function locally

You can test AI service's function locally. Initialise RuntimeContext firstly.

from ibm_watsonx_ai.deployments import RuntimeContext context = RuntimeContext(api_client=api_client) generate_batch = deployable_ai_service( context, url=credentials["url"], embedding_model_id=embedding_model_id, milvus_connection_id=milvus_connection_id, index_name=index_name, )
Successfully initialized APIClient Successfully initialized Embeddings
generate_batch(input_data_references)
Successfully initialized VectorStore Successfully initialized DocumentsIterableDataset Successfully initialized LangChainChunker Successfully splitted documents Documents added Vector Store count: 82

Verify the total number of documents currently stored within the Milvus collection.

Note: Due to the implementation specifics of Milvus, it is necessary to initialize a new VectorStore instance in order to accurately retrieve the count of indexed elements.

vector_store = VectorStore( api_client=api_client, connection_id=milvus_connection_id, index_name=index_name, embeddings=embeddings, ) vector_store.count()
82

Once the collection accurately reflects the expected number of items, proceed to clear its contents to prepare the environment for subsequent testing activities.

vector_store.clear() vector_store.count()
0

Deploy AI service

Store AI service with previous created custom software specifications.

sw_spec_id = api_client.software_specifications.get_id_by_name("runtime-24.1-py3.11") print(f"{sw_spec_id=}")
sw_spec_id='45f12dfe-aa78-5b8d-9f38-0ee223c47309'
meta_props = { api_client.repository.AIServiceMetaNames.NAME: "AI service with generate_batch", api_client.repository.AIServiceMetaNames.DESCRIPTION: "Sample AI service with implemented generate_batch", api_client.repository.AIServiceMetaNames.SOFTWARE_SPEC_ID: sw_spec_id, } stored_ai_service_details = api_client.repository.store_ai_service( deployable_ai_service, meta_props ) print(json.dumps(stored_ai_service_details, indent=2))
{ "entity": { "code_type": "python", "software_spec": { "id": "45f12dfe-aa78-5b8d-9f38-0ee223c47309", "name": "runtime-24.1-py3.11" } }, "metadata": { "created_at": "2025-05-27T08:01:26.399Z", "description": "Sample AI service with implemented generate_batch", "id": "ea282d35-57d6-4e27-9cfa-e1c496832b56", "modified_at": "2025-05-27T08:01:26.399Z", "name": "AI service with generate_batch", "owner": "IBMid-55000091VC", "space_id": "9f44cc2b-b3d0-4472-824e-4941afb1617b" }, "system": { "warnings": [] } }
ai_service_id = api_client.repository.get_ai_service_id(stored_ai_service_details) print(f"{ai_service_id=}")
ai_service_id='ea282d35-57d6-4e27-9cfa-e1c496832b56'

Create batch deployment of AI service.

deployment_details = api_client.deployments.create( artifact_id=ai_service_id, meta_props={ api_client.deployments.ConfigurationMetaNames.NAME: "Vector Store Batch Deployment", api_client.deployments.ConfigurationMetaNames.BATCH: { "parameters": { "url": credentials["url"], "embedding_model_id": embedding_model_id, "milvus_connection_id": milvus_connection_id, "index_name": index_name, } }, api_client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: { "id": api_client.hardware_specifications.get_id_by_name("L") }, }, ) deployment_id = api_client.deployments.get_id(deployment_details) print(f"{deployment_id=}")
###################################################################################### Synchronous deployment creation for id: 'ea282d35-57d6-4e27-9cfa-e1c496832b56' started ###################################################################################### ready. ----------------------------------------------------------------------------------------------- Successfully finished deployment creation, deployment_id='66820b23-48e2-4bde-9717-f6db689cc06e' ----------------------------------------------------------------------------------------------- deployment_id='66820b23-48e2-4bde-9717-f6db689cc06e'

Example of executing an AI service

Run the following cells to create and run a job with the deployed AI service.

def poll_async_job(wml_client, job_id): import time while True: job_status = wml_client.deployments.get_job_status(job_id) print(job_status) state = job_status["state"] if state == "completed" or "fail" in state: return wml_client.deployments.get_job_details(job_id) time.sleep(5)
batch_reference_payload = { "input_data_references": input_data_references, } job_details = api_client.deployments.create_job(deployment_id, batch_reference_payload) job_id = api_client.deployments.get_job_id(job_details) print(f"{job_id=}")
job_id='f5337ad8-b78f-454b-a6e4-6e54be26dc1e'
job_details = poll_async_job(api_client, job_id)
{'completed_at': '', 'running_at': '', 'state': 'queued'} {'completed_at': '', 'running_at': '', 'state': 'queued'} {'completed_at': '', 'running_at': '', 'state': 'queued'} {'completed_at': '', 'running_at': '2025-05-27T08:02:24.619Z', 'state': 'running'} {'completed_at': '', 'running_at': '2025-05-27T08:02:24.619Z', 'state': 'running'} {'completed_at': '', 'running_at': '2025-05-27T08:02:24.619Z', 'state': 'running'} {'completed_at': '2025-05-27T08:02:44.115706Z', 'running_at': '2025-05-27T08:02:24.552221Z', 'state': 'completed'}

Verify the total number of documents currently stored within the Milvus collection.

Note: Due to the implementation specifics of Milvus, it is necessary to initialize a new VectorStore instance in order to accurately retrieve the count of indexed elements.

vector_store = VectorStore( api_client=api_client, connection_id=milvus_connection_id, index_name=index_name, embeddings=embeddings, ) vector_store.count()
82

Cleanup

Please execute the following commands to clean up and decommission all resources provisioned during the execution of this notebook.

# Delete deployment job api_client.deployments.delete_job(job_id)
'SUCCESS'
# Delete batch deployment api_client.deployments.delete(deployment_id)
'SUCCESS'
# Delete AI service asset api_client.repository.delete(ai_service_id)
'SUCCESS'
# Delete `ModelInference.txt` asset api_client.data_assets.delete(document_asset_id)
'SUCCESS'
# Delete `ModelInference.txt` file locally import os if os.path.exists(document_filename): os.remove(document_filename) print(f"{document_filename} has been deleted.") else: print(f"{document_filename} does not exist.")
ModelInference.txt has been deleted.

Summary and next steps

You successfully completed this notebook!

You have successfully learned how to design and deploy an AI service utilizing the generate_batch functionality, leveraging the capabilities of the ibm_watsonx_ai SDK.

Check out our Online Documentation for more samples, tutorials, documentation, how-tos, and blog posts.

Author

Mateusz Szewczyk, Software Engineer at watsonx.ai.

Copyright © 2025 IBM. This notebook and its source code are released under the terms of the MIT License.