Skip to content

Conversation

@jonathan-buttner
Copy link
Contributor

@jonathan-buttner jonathan-buttner commented Dec 11, 2025

This PR implements chat completion for amazon bedrock. It's based on this PR: #133697

Testing

Create the endpoint

PUT _inference/chat_completion/test-chat?timeout=30s
{
    "service": "amazonbedrock",
    "service_settings": {
        "provider": "anthropic",
        "model": "anthropic.claude-3-5-sonnet-20240620-v1:0",
        "region": "us-east-1",
        "access_key": "<access key>",
        "secret_key": "<secret>"
    }
}
Complex request
POST _inference/chat_completion/test-chat/_stream
{
    "messages": [
        {
            "role": "user",
            "content": "test"
        },
        {
            "role": "assistant",
            "content": "tool call",
            "tool_calls": [
                {
                    "function": {
                        "name": "context",
                        "arguments": "{}"
                    },
                    "id": "803434",
                    "type": "function"
                }
            ]
        },
        {
            "role": "tool",
            "content": "{\"screen_description\":\"The user is looking at http://localhost:5601/app/observability/overview?rangeFrom=now-15m&rangeTo=now. The current time range is 2024-12-13T01:18:25.752Z - 2024-12-13T01:33:25.752Z.\\n\\nThe user is viewing the Overview page which shows a summary of the following apps: {\\\"universal_profiling\\\":{\\\"hasData\\\":false,\\\"status\\\":\\\"success\\\"},\\\"alert\\\":{\\\"hasData\\\":false,\\\"status\\\":\\\"success\\\"},\\\"uptime\\\":{\\\"hasData\\\":false,\\\"indices\\\":\\\"heartbeat-*\\\",\\\"status\\\":\\\"success\\\"},\\\"infra_metrics\\\":{\\\"hasData\\\":false,\\\"indices\\\":\\\"metrics-*,metricbeat-*\\\",\\\"status\\\":\\\"success\\\"},\\\"ux\\\":{\\\"hasData\\\":false,\\\"indices\\\":\\\"traces-apm*,apm-*,traces-*.otel-*,logs-apm*,apm-*,logs-*.otel-*,metrics-apm*,apm-*,metrics-*.otel-*\\\",\\\"status\\\":\\\"success\\\"},\\\"infra_logs\\\":{\\\"hasData\\\":false,\\\"indices\\\":\\\"logs-*-*,logs-*,filebeat-*\\\",\\\"status\\\":\\\"success\\\"},\\\"apm\\\":{\\\"hasData\\\":false,\\\"indices\\\":{\\\"transaction\\\":\\\"traces-apm*,apm-*,traces-*.otel-*\\\",\\\"span\\\":\\\"traces-apm*,apm-*,traces-*.otel-*\\\",\\\"error\\\":\\\"logs-apm*,apm-*,logs-*.otel-*\\\",\\\"metric\\\":\\\"metrics-apm*,apm-*,metrics-*.otel-*\\\",\\\"onboarding\\\":\\\"apm-*\\\",\\\"sourcemap\\\":\\\"apm-*\\\"},\\\"status\\\":\\\"success\\\"}}\",\"learnings\":[]}",
            "tool_call_id": "803434"
        }
    ],
    "tool_choice": "auto",
    "tools": [
        {
            "type": "function",
            "function": {
                "name": "query",
                "description": "This function generates, executes and/or visualizes a query\n      based on the user's request. It also explains how ES|QL works and how to\n      convert queries from one language to another. Make sure you call one of\n      the get_dataset functions first if you need index or field names. This\n      function takes no input.",
                "parameters": {
                    "type": "object",
                    "properties": {}
                }
            }
        },
        {
            "type": "function",
            "function": {
                "name": "get_alerts_dataset_info",
                "description": "Use this function to get information about alerts data.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "start": {
                            "type": "string",
                            "description": "The start of the current time range, in datemath, like now-24h or an ISO timestamp"
                        },
                        "end": {
                            "type": "string",
                            "description": "The end of the current time range, in datemath, like now-24h or an ISO timestamp"
                        }
                    }
                }
            }
        },
        
        {
            "type": "function",
            "function": {
                "name": "execute_connector",
                "description": "Use this function when user explicitly asks to call a kibana connector.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "id": {
                            "type": "string",
                            "description": "The id of the connector"
                        },
                        "params": {
                            "type": "object",
                            "description": "The connector parameters"
                        }
                    },
                    "required": [
                        "id",
                        "params"
                    ]
                }
            }
        }
    ]
}

Evgenii-Kazannik and others added 30 commits October 17, 2025 13:40
…Completions-support' into Add-Amazon-Bedrock-Unified-Chat-Completions-support

# Conflicts:
#	x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/amazonbedrock/request/completion/ToolAwareUnifiedPublisher.java
…Completions-support' into Add-Amazon-Bedrock-Unified-Chat-Completions-support
@elasticsearchmachine
Copy link
Collaborator

Hi @jonathan-buttner, I've created a changelog YAML for you.

inferenceResultsListener
);
chatCompletionRequest.executeChatCompletionRequest(awsBedrockClient, chatCompletionResponseListener);
// Chat completions only supports streaming
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new

} catch (IOException e) {
listener.onFailure(new RuntimeException(e));
}
throw new UnsupportedOperationException("Unsupported operation, use streaming execution instead");
Copy link
Contributor Author

@jonathan-buttner jonathan-buttner Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new. This class is only used for chat completion. Chat completion doesn't support non-streaming in general for all providers (not just bedrock) so we don't need this method.

.modelId(amazonBedrockModel.model())
.messages(getConverseMessageList(requestEntity.messages()))
.additionalModelResponseFieldPaths(requestEntity.additionalModelFields());
public Flow.Publisher<StreamingUnifiedChatCompletionResults.Results> executeStreamChatCompletionRequest(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is all new

package org.elasticsearch.xpack.inference.services.amazonbedrock.request.completion;

import software.amazon.awssdk.services.bedrockruntime.model.ConverseRequest;
import software.amazon.awssdk.core.document.Document;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please review the changes in this class carefully.


package org.elasticsearch.xpack.inference.services.amazonbedrock.request.completion;

import software.amazon.awssdk.core.document.Document;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please review the changes in this class carefully, it's new content after the other PR.

/**
* The task types that the {@link InferenceAction.Request} can accept.
*/
private static final EnumSet<TaskType> SUPPORTED_INFERENCE_ACTION_TASK_TYPES = EnumSet.of(TaskType.TEXT_EMBEDDING, TaskType.COMPLETION);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new so we can return an error if we call a chat_completion endpoint without _stream.

action.execute(inputs, timeout, listener);
} else {
if (SUPPORTED_INFERENCE_ACTION_TASK_TYPES.contains(model.getTaskType()) == false) {
listener.onFailure(createUnsupportedTaskTypeStatusException(model, SUPPORTED_INFERENCE_ACTION_TASK_TYPES));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New change is here

responseString = responseString + " " + useChatCompletionUrlMessage(model);
}
listener.onFailure(new ElasticsearchStatusException(responseString, RestStatus.BAD_REQUEST));
listener.onFailure(createUnsupportedTaskTypeStatusException(model, SUPPORTED_INFERENCE_ACTION_TASK_TYPES));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the same helper

responseString = responseString + " " + useChatCompletionUrlMessage(model);
}
listener.onFailure(new ElasticsearchStatusException(responseString, RestStatus.BAD_REQUEST));
listener.onFailure(createUnsupportedTaskTypeStatusException(model, SUPPORTED_INFERENCE_ACTION_TASK_TYPES));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the same helper and adding a return because we had a fall through bug here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Is it possible to add a test that would fail without the return for this (maybe you already have)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added one

);
}

public static ElasticsearchStatusException createUnsupportedTaskTypeStatusException(Model model, EnumSet<TaskType> supportedTaskTypes) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New helper

assertThat(exceptionThrown.getCause().getMessage(), containsString("test exception"));
}

public void testExecute_ChatCompletionRequest_NonStreaming_Fails() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're lacking coverage of around returning streamed converse responses and the stream processor logic to parse them. I'll work on those in a follow up PR because we're going to need to refactor how we mock the internal client I think. Unfortunately it's not as straightforward as when we test streaming for other services that don't use an sdk like openai.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'm fine with this being in a follow-up PR.

@jonathan-buttner jonathan-buttner marked this pull request as ready for review December 12, 2025 19:26
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/search-inference-team (Team:Search - Inference)

assertThat(exceptionThrown.getCause().getMessage(), containsString("test exception"));
}

public void testExecute_ChatCompletionRequest_NonStreaming_Fails() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'm fine with this being in a follow-up PR.

Copy link
Contributor

@DonalEvans DonalEvans left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to finish looking at all the test changes today, but I have a few comments/questions for the non-test code.

Comment on lines 32 to 34
var requestTaskSettings = AmazonBedrockCompletionRequestTaskSettings.fromMap(taskSettings);
var taskSettingsToUse = AmazonBedrockCompletionTaskSettings.of(completionModel.getTaskSettings(), requestTaskSettings);
return new AmazonBedrockChatCompletionModel(completionModel, taskSettingsToUse);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If requestTaskSettings is equal to taskSettingsToUse, we can return the original model and avoid creating an identical object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AmazonBedrockCompletionRequestTaskSettings and AmazonBedrockCompletionTaskSettings are separate classes. I'll check that the result of AmazonBedrockCompletionTaskSettings.of is the same as the task settings from the model passed in, if so we can return the same completionModel.

responseString = responseString + " " + useChatCompletionUrlMessage(model);
}
listener.onFailure(new ElasticsearchStatusException(responseString, RestStatus.BAD_REQUEST));
listener.onFailure(createUnsupportedTaskTypeStatusException(model, SUPPORTED_INFERENCE_ACTION_TASK_TYPES));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. Is it possible to add a test that would fail without the return for this (maybe you already have)?

@jonathan-buttner jonathan-buttner added the Feature:GenAI Features around GenAI label Dec 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants