Sunday, June 1, 2025
HomeTechnologyArtificial IntelligenceUsing Amazon OpenSearch ML connector APIs | Amazon Web Services TechTricks365

Using Amazon OpenSearch ML connector APIs | Amazon Web Services TechTricks365


When ingesting data into Amazon OpenSearch, customers often need to augment data before putting it into their indexes. For instance, you might be ingesting log files with an IP address and want to get a geographic location for the IP address, or you might be ingesting customer comments and want to identify the language they are in. Traditionally, this requires an external process that complicates data ingest pipelines and can cause a pipeline to fail. OpenSearch offers a wide range of third-party machine learning (ML) connectors to support this augmentation.

This post highlights two of these third-party ML connectors. The first connector we demonstrate is the Amazon Comprehend connector. In this post, we show you how to use this connector to invoke the LangDetect API to detect the languages of ingested documents.

The second connector we demonstrate is the Amazon Bedrock connector to invoke the Amazon Titan Text Embeddings v2 model so that you can create embeddings from ingested documents and perform semantic search.

Solution overview

We use Amazon OpenSearch with Amazon Comprehend to demonstrate the language detection feature. To help you replicate this setup, we’ve provided the necessary source code, an Amazon SageMaker notebook, and an AWS CloudFormation template. You can find these resources in the sample-opensearch-ml-rest-api GitHub repo.

The reference architecture shown in the preceding figure shows the components used in this solution. A SageMaker notebook is used as a convenient way to execute the code that is provided in the Github repository provided above.

Prerequisites

To run the full demo using the sample-opensearch-ml-rest-api, make sure you have an AWS account with access to:

Part 1: The Amazon Comprehend ML connector

Set up OpenSearch to access Amazon Comprehend

Before you can use Amazon Comprehend, you need to make sure that OpenSearch can call Amazon Comprehend. You do this by supplying OpenSearch with an IAM role that has access to invoke the DetectDominantLanguage API. This requires the OpenSearch Cluster to have fine grained access control enabled. The CloudFormation template creates a role for this called --SageMaker-OpenSearch-demo-role. Use the following steps to attach this role to the OpenSearch cluster.

  1. Open the OpenSearch Dashboard console—you can find the URL in the output of the CloudFormation template—and sign in using the username and password you provided.OpenSearch Dashboards landing page featuring navigation sidebar, visualization tools, and data management options
  1. Choose Security in the left-hand menu (if you don’t see the menu, choose the three horizontal lines icon at the top left of the dashboard).OpenSearch security setup guide detailing role creation and user mapping processes with action buttons
  2. From the security menu, select Roles to manage the OpenSearch roles.OpenSearch roles dashboard with detailed permissions matrix showing security analytics, alerting, and snapshot management access controls
  3. In the search box. enter ml_full_access role.OpenSearch roles management screen with filtered view of ML full access role, showing detailed permissions and reserved status
  4. Select the Mapped users link to map the IAM role to this OpenSearch role.AWS IAM console displaying full access role with security restrictions and role duplication option
  5. On the Mapped users screen, choose Manage mapping to edit the current mappings.AWS IAM role management interface showing zero mapped users with creation and mapping controls
  6. Add the IAM role mentioned previously to map it to the ml_full_access role, this will allow OpenSearch to access the needed AWS resources from the ml-commons plugin. Enter your IAM role Amazon Resource Name (ARN) (arn:aws:iam:::role/--SageMaker-OpenSearch-demo-role) in the backend roles field and choose Map.AWS IAM console showing user and backend role mapping options with explanations for role inheritance

Set up the OpenSearch ML connector to Amazon Comprehend

In this step, you set up the ML connector to connect Amazon Comprehend to OpenSearch.

  1. Get an authorization token to use when making the call to OpenSearch from the SageMaker notebook. The token uses an IAM role attached to the notebook by the CloudFormation template that has permissions to call OpenSearch. That same role is mapped to the OpenSearch admin role in the same way you just mapped the role to access Amazon Comprehend. Use the following code to set this up:
awsauth = AWS4Auth(credentials.access_key,
credentials.secret_key,
region,
'es',
session_token=credentials.token)

  1. Create the connector. It needs a few pieces of information:
    1. It needs a protocol. For this example, use aws_sigv4, which allows OpenSearch to use an IAM role to call Amazon Comprehend.
    2. Provide the ARN for this role, which is the same role you used to set up permissions for the ml_full_access role.
    3. Provide comprehend as the service_name, and DetectDominateLanguage as the api_name.
    4. Provide the URL to Amazon Comprehend and set up how to call the API and what data to pass to it.

The final call looks like:

comprehend = boto3.client('comprehend', region_name="us-east-1")
path="/_plugins/_ml/connectors/_create"
url = host + path

payload = {
  "name": "Comprehend lang identification",
  "description": "comprehend model",
  "version": 1,
  "protocol": "aws_sigv4",
  "credential": {
    "roleArn": sageMakerOpenSearchRoleArn
  },
  "parameters": {
    "region": "us-east-1",
    "service_name": "comprehend",
    "api_version": "20171127",
    "api_name": "DetectDominantLanguage",
    "api": "Comprehend_${parameters.api_version}.${parameters.api_name}",
    "response_filter": "$"
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "url": "https://${parameters.service_name}.${parameters.region}.amazonaws.com",
      "headers": {
        "content-type": "application/x-amz-json-1.1",
        "X-Amz-Target": "${parameters.api}"
      },
      "request_body": "{"Text": "${parameters.Text}"}" 
    }
  ]
}

comprehend_connector_response = requests.post(url, auth=awsauth, json=payload)
comprehend_connector = comprehend_connector_response.json()["connector_id"]

Register the Amazon Comprehend API connector

The next step is to register the Amazon Comprehend API connector with OpenSearch using the Register Model API from OpenSearch.

  • Use the comprehend_connector that you saved from the last step.
path="/_plugins/_ml/models/_register"
url = host + path

payload = {
    "name": "comprehend lang id API",
    "function_name": "remote",
    "description": "API to detect the language of text",
    "connector_id": comprehend_connector
}
headers = {"Content-Type": "application/json"}

response = requests.post(url, auth=awsauth, json=payload, headers=headers)
comprehend_model_id = response.json()['model_id']

As of OpenSearch 2.13, when the model is first invoked, it’s automatically deployed. Prior to 2.13 you would have to manually deploy the model within OpenSearch.

Test the Amazon Comprehend API in OpenSearch

With the connector in place, you need to test the API to make sure it was set up and configured correctly.

  1. Make the following call to OpenSearch.
path="/_plugins/_ml/models/"+ comprehend_model_id + '/_predict'
url = host + path

headers = {"Content-Type": "application/json"}
payload = {
    "parameters": {
        "Text": "你知道厕所在哪里吗"
    }
}

response = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(response.json())

  1. You should get the following result from the call, showing the language code as zh with a score of 1.0:
{
   "inference_results":[
      {
         "output":[
            {
               "name":"response",
               "dataAsMap":{
                  "response":{
                     "Languages":[
                        {
                           "LanguageCode":"zh",
                           "Score":1.0
                        }
                     ]
                  }
               }
            }
         ],
         "status_code":200
      }
   ]
}

Create an ingest pipeline that uses the Amazon Comprehend API to annotate the language

The next step is to create a pipeline in OpenSearch that calls the Amazon Comprehend API and adds the results of the call to the document being indexed. To do this, you provide both an input_map and an output_map. You use these to tell OpenSearch what to send to the API and how to handle what comes back from the call.

path="/_ingest/pipeline/comprehend_language_identification_pipeline"
url = host + path

payload = {
  "description": "ingest identify lang with the comprehend API",
  "processors":[
    {
      "ml_inference": {
        "model_id": comprehend_model_id,
        "input_map": [
            {
               "Text": "Text"
            }
        ],
        "output_map": [
            {  
               "detected_language": "response.Languages[0].LanguageCode",
               "language_score": "response.Languages[0].Score"
            }
        ]
      }
    }
  ]
}
headers = {"Content-Type": "application/json"}
response = requests.put(url, auth=awsauth, json=payload, headers=headers)

You can see from the preceding code that you are pulling back both the top language result and its score from Amazon Comprehend and adding those fields to the document.

Part 2: The Amazon Bedrock ML connector

In this section, you use Amazon OpenSearch with Amazon Bedrock through the ml-commons plugin to perform a multilingual semantic search. Make sure that you have the solution prerequisites in place before attempting this section.

In the SageMaker instance that was deployed for you, you can see the following files: english.json, french.json, german.json.

These documents have sentences in their respective languages that talk about the term spring in different contexts. These contexts include spring as a verb meaning to move suddenly, as a noun meaning the season of spring, and finally spring as a noun meaning a mechanical part. In this section, you deploy Amazon Titan Text Embeddings model v2 using the ml connector for Amazon Bedrock. You then use this embeddings model to create vectors of text in three languages by ingesting the different language JSON files. Finally, these vectors are stored in Amazon OpenSearch to enable semantic searches to be used across the language sets.

Amazon Bedrock provides streamlined access to various powerful AI foundation models through a single API interface. This managed service includes models from Amazon and other leading AI companies. You can test different models to find the ideal match for your specific needs, while maintaining security, privacy, and responsible AI practices. The service enables you to customize these models with your own data through methods such as fine-tuning and Retrieval Augmented Generation (RAG). Additionally, you can use Amazon Bedrock to create AI agents that can interact with enterprise systems and data, making it a comprehensive solution for developing generative AI applications.

AWS architecture diagram showing document ingestion and processing flow between OpenSearch, SageMaker Notebook, and Bedrock ML

The reference architecture in the preceding figure shows the components used in this solution.

(1) First we must create the OpenSearch ML connector via running code within the Amazon SageMaker notebook. The connector essentially creates a Rest API call to any model, we specifically want to create a connector to call the Titan Embeddings model within Amazon Bedrock.

(2) Next, we must create an index to later index our language documents into. When creating an index, you can specify its mappings, settings, and aliases.

(3) After creating an index within Amazon OpenSearch, we want to create an OpenSearch Ingestion pipeline that will allow us to streamline data processing and preparation for indexing, making it easier to manage and utilize the data. (4) Now that we have created an index and set up a pipeline, we can start indexing our documents into the pipeline.

(5 – 6) We use the pipeline in OpenSearch that calls the Titan Embeddings model API. We send our language documents to the titan embeddings model, and the model returns vector embeddings of the sentences.

(7) We store the vector embeddings within our index and perform vector semantic search.

While this post highlights only specific areas of the overall solution, the SageMaker notebook has the code and instructions to run the full demo yourself.

Before you can use Amazon Bedrock, you need to make sure that OpenSearch can call Amazon Bedrock. .

Load sentences from the JSON documents into dataframes

Start by loading the JSON document sentences into dataframes for more structured organization. Each row can contain the text, embeddings, and additional contextual information:

import json
import pandas as pd

def load_sentences(file_name):
    sentences = []
    with open(file_name, 'r', encoding='utf-8') as file:
        for line in file:
            try:
                data = json.loads(line)
                if 'sentence' in data and 'sentence_english' in data:
                    sentences.append({
                        'sentence': data['sentence'],
                        'sentence_english': data['sentence_english']
                    })
            except json.JSONDecodeError:
                # Skip lines that are not valid JSON (like the index lines)
                continue
    
    return pd.DataFrame(sentences)

# Usage
german_df = load_sentences('german.json')
english_df = load_sentences('english.json')
french_df = load_sentences('french.json')
# print(french_df.head())

Create the OpenSearch ML connector to Amazon Bedrock

After loading the JSON documents into dataframes, you’re ready to set up the OpenSearch ML connector to connect Amazon Bedrock to OpenSearch.

  1. The connector needs the following information.
    1. It needs a protocol. For this solution, use aws_sigv4, which allows OpenSearch to use an IAM role to call Amazon Bedrock.
    2. Provide the same role used earlier to set up permissions for the ml_full_access role.
    3. Provide the service_name, model, dimensions of the model, and embedding type.

The final call looks like the following:

payload = {
  "name": "Amazon Bedrock Connector: embedding",
  "description": "The connector to bedrock Titan embedding model",
  "version": 1,
  "protocol": "aws_sigv4",
  "parameters": {
    "region": "us-east-1",
    "service_name": "bedrock",
    "model": "amazon.titan-embed-text-v2:0",
    "dimensions": 1024,
    "normalize": True,
    "embeddingTypes": ["float"]
  },
  "credential": {
    "roleArn": sageMakerOpenSearchRoleArn
  },
  "actions": [
    {
      "action_type": "predict",
      "method": "POST",
      "url": "https://bedrock-runtime.${parameters.region}.amazonaws.com/model/${parameters.model}/invoke",
      "headers": {
        "content-type": "application/json",
        "x-amz-content-sha256": "required"
      },
      "request_body": "{ "inputText": "${parameters.inputText}", "dimensions": ${parameters.dimensions}, "normalize": ${parameters.normalize}, "embeddingTypes": ${parameters.embeddingTypes} }",
      "pre_process_function": "connector.pre_process.bedrock.embedding",
      "post_process_function": "connector.post_process.bedrock.embedding"
    }
  ]
}

bedrock_connector_response = requests.post(url, auth=awsauth, json=payload, headers=headers)

bedrock_connector_3 = bedrock_connector_response.json()["connector_id"]
print('Connector id: ' + bedrock_connector_3)

Test the Amazon Titan Embeddings model in OpenSearch

After registering and deploying the Amazon Titan Embeddings model using the Amazon Bedrock connector, you can test the API to verify that it was set up and configured correctly. To do this, make the following call to OpenSearch:

headers = {"Content-Type": "application/json"}
payload = {
  "parameters": {
    "inputText": "It's nice to see the flowers bloom and hear the birds sing in the spring"
  }
}
response = requests.post(url, auth=awsauth, json=payload, headers=headers)
print(response.json())

You should get a formatted result, similar to the following, from the call that shows the generated embedding from the Amazon Titan Embeddings model:

{'inference_results': [{'output': [{'name': 'sentence_embedding', 'data_type': 'FLOAT32', 'shape': [1024], 'data': [-0.04092199727892876, 0.052057236433029175, -0.03354490175843239, 0.04398418962955475, -0.001235315459780395, -0.03284895047545433, -0.014197427779436111, 0.0098129278048…

The preceding result is significantly shortened compared to the actual embedding result you might receive. The purpose of this snippet is to show you the format.

Create the index pipeline that uses the Amazon Titan Embeddings model

Create a pipeline in OpenSearch. You use this pipeline to tell OpenSearch to send the fields you want embeddings for to the embeddings model.

pipeline_name = "titan_embedding_pipeline_v2"
url = f"{host}/_ingest/pipeline/{pipeline_name}"

pipeline_body = {
    "description": "Titan embedding pipeline",
    "processors": [
        {
            "text_embedding": {
                "model_id": bedrock_model_id,
                "field_map": {
                    "sentence": "sentence_vector"
                }
            }
        }
    ]
}

response = requests.put(url, auth=awsauth, json=pipeline_body, headers={"Content-Type": "application/json"})
print(response.text)

Create an index

With the pipeline in place, the next step is to create an index that will use the pipeline. There are three fields in the index:

  • sentence_vector – This is where the vector embedding will be stored when returned from Amazon Bedrock.
  • sentence – This is the non-English language sentence.
  • sentence_english – this is the English translation of the sentence. Include this to see how well the model is translating the original sentence.
index_name="bedrock-knn-index-v2"
url = f'{host}/{index_name}'
mapping = {
    "mappings": {
        "properties": {
            "sentence_vector": {
                "type": "knn_vector",
                "dimension": 1024,  
                "method": {
                    "name": "hnsw",
                    "space_type": "l2",
                    "engine": "nmslib"
                },
                "store":True
            },
            "sentence":{
                "type": "text",
                "store": True
            },
            "sentence_english":{
                "type": "text",
                "store": True
            }
        }
    },
    "settings": {
        "index": {
            "knn": True,
            "knn.space_type": "cosinesimil",
            "default_pipeline": pipeline_name
        }
    }
}

response = requests.put(url, auth=awsauth, json=mapping, headers={"Content-Type": "application/json"})
print(f"Index creation response: {response.text}")

Load dataframes into the index

Earlier in this section, you loaded the sentences from the JSON documents into dataframes. Now, you can index the documents and generate embeddings for them using the Amazon Titan Text Embeddings Model v2. The embeddings will be stored in the sentence_vector field.

index_name = "bedrock-knn-index-v2"

def index_documents(df, batch_size=100):
    total = len(df)
    for start in range(0, total, batch_size):
        end = min(start + batch_size, total)
        batch = df.iloc[start:end]

        bulk_data = []
        for _, row in batch.iterrows():
            # Prepare the action metadata
            action = {
                "index": {
                    "_index": index_name
                }
            }
            # Prepare the document data
            doc = {
                "sentence": row['sentence'],
                "sentence_english": row['sentence_english']
            }
            
            # Add the action and document to the bulk data
            bulk_data.append(json.dumps(action))
            bulk_data.append(json.dumps(doc))

        # Join the bulk data with newlines
        bulk_body = "n".join(bulk_data) + "n"

        # Send the bulk request
        bulk_url = f"{host}/_bulk"
        response = requests.post(bulk_url, auth=awsauth, data=bulk_body, headers={"Content-Type": "application/x-ndjson"})

        if response.status_code == 200:
            print(f"Successfully indexed batch {start}-{end} of {total}")
        else:
            print(f"Error indexing batch {start}-{end} of {total}: {response.text}")

        # Optional: add a small delay to avoid overwhelming the cluster
        time.sleep(1)

# Index your documents
print("Indexing German documents:")
index_documents(german_df)
print("nIndexing English documents:")
index_documents(english_df)
print("nIndexing French documents:")
index_documents(french_df)

Perform semantic k-NN across the documents

The final step is to perform a k-nearest neighbor (k-NN) search across the documents.

# Define your OpenSearch host and index name
index_name = "bedrock-knn-index-v2"
def semantic_search(query_text, k=5):
    search_url = f"{host}/{index_name}/_search"
    # First, index the query to generate its embedding
    index_doc = {
        "sentence": query_text,
        "sentence_english": query_text  # Assuming the query is in English
    }
    index_url = f"{host}/{index_name}/_doc"
    index_response = requests.post(index_url, auth=awsauth, json=index_doc, headers={"Content-Type": "application/json"})
    
    if index_response.status_code != 201:
        print(f"Failed to index query document: {index_response.text}")
        return []
    
    # Retrieve the indexed query document to get its vector
    doc_id = index_response.json()['_id']
    get_url = f"{host}/{index_name}/_doc/{doc_id}"
    get_response = requests.get(get_url, auth=awsauth)
    query_vector = get_response.json()['_source']['sentence_vector']
    
    # Now perform the KNN search
    search_query = {
        "size": 30,
        "query": {
            "knn": {
                "sentence_vector": {
                    "vector": query_vector,
                    "k": 30
                }
            }
        },
        "_source": ["sentence", "sentence_english"]
    }

    search_response = requests.post(search_url, auth=awsauth, json=search_query, headers={"Content-Type": "application/json"})
    
    if search_response.status_code != 200:
        print(f"Search failed with status code {search_response.status_code}")
        print(search_response.text)
        return []

    # Clean up - delete the temporary query document
    delete_url = f"{host}/{index_name}/_doc/{doc_id}"
    requests.delete(delete_url, auth=awsauth)

    return search_response.json()['hits']['hits']

# Example usage
query = "le soleil brille"
results = semantic_search(query)

if results:
    print(f"Search results for: '{query}'")
    for result in results:
        print(f"Score: {result['_score']}")
        print(f"Sentence: {result['_source']['sentence']}")
        print(f"English: {result['_source']['sentence_english']}")
        print()
else:
    print("No results found or search failed.")

The example query is in French and can be translated to the sun is shining. Keeping in mind that the JSON documents have sentences that use spring in different contexts, you’re looking for query results and vector matches of sentences that use spring in the context of the season of spring.

Here are some of the results from this query:

Search results for: ' le soleil brille'
Score: 0.40515712
Sentence: Les premiers rayons de soleil au printemps réchauffent la terre.
English: The first rays of spring sunshine warm the earth.

Score: 0.40117615
Sentence: Die ersten warmen Sonnenstrahlen kitzeln auf der Haut im Frühling.
English: The first warm sun rays tickle the skin in spring.

Score: 0.3999985
Sentence: Die ersten Sonnenstrahlen im Frühling wecken die Lebensgeister.
English: The first rays of sunshine in spring awaken the spirits.

This shows that the model can provide results across all three languages. It is important to note that the confidence scores for these results might be low because you’ve only ingested a couple documents with a handful of sentences in each for this demo. To increase confidence scores and accuracy, ingest a robust dataset with multiple languages and plenty of sentences for reference.

Clean Up

To avoid incurring future charges, go to the AWS Management Console for CloudFormation console and delete the stack you deployed. This will terminate the resources used in this solution.

Benefits of using the ML connector for machine learning model integration with OpenSearch

There are many ways you can perform k-nn semantic vector searches; a popular methods is to deploy external Hugging Face sentence transformer models to a SageMaker endpoint. The following are the benefits of using the ML connector approach we showed in this post, and why should you use it instead of deploying models to a SageMaker endpoint:

  • Simplified architecture
    • Single system to manage
    • Native OpenSearch integration
    • Simpler deployment
    • Unified monitoring
  • Operational benefits
    • Less infrastructure to maintain
    • Built-in scaling with OpenSearch
    • Simplified security model
    • Straightforward updates and maintenance
  • Cost efficiency
    • Single system costs
    • Pay-per-use Amazon Bedrock pricing
    • No endpoint management costs
    • Simplified billing

Conclusion

Now that you’ve seen how you can use the OpenSearch ML connector to augment your data with external REST calls, we recommend that you visit the GitHub repo if you haven’t already and walk through the full demo yourselves. The full demo shows how you can use Amazon Comprehend for language detection and how to use Amazon Bedrock for multilingual semantic vector search, using the ml-connector plugin for both use cases. It also has sample text and JSON documents to ingest so you can see how the pipeline works.


About the Authors

John Trollinger photo

John Trollinger is a Principal Solutions Architect supporting the World Wide Public Sector with a focus on OpenSearch and Data Analytics. John has been working with public sector customers over the past 25 years helping them deliver mission capabilities. Outside of work, John likes to collect AWS certifications and compete in triathlons.

Shwetha Radhakrishnan photo

Shwetha Radhakrishnan is a Solutions Architect for Amazon Web Services (AWS) with a focus in Data Analytics & Machine Learning. She has been building solutions that drive cloud adoption and help empower organizations to make data-driven decisions within the public sector. Outside of work, she loves dancing, spending time with friends and family, and traveling.


RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments