[MS] Discoverable - Observable MCP Server - devamazonaws.blogspot.com
Introduction: The Problem
We recently conducted a customer engagement that started as a series of data science experiments and gradually moved into a production capability by "promoting" experimental code into MCP Tools or Agents. Throughout this article "tool" should be considered to be synonymous with Tool or Agent in the MCP sense. The production MCP Server was hosted in an Azure Container App and exposed a set of tools to GitHub Copilot in VS Code. We defined 22 requirements for this MCP Server. The relevant ones for this article are:- The tools should be discoverable at runtime
- Individual tools must be enabled or disabled using Feature Flags
- The tools should be observable in the cloud
- The tools must be stateless - all state to be externalized
- Tools should be chainable, allowing one tool to invoke either in-process or external MCP tools.
The Journey: Our Approach and Solution
This section briefly describes the solution for each of the stated requirements.Discoverable tools
A key requirement from the experimentation phase was runtime tool discovery. The initial approach required each new tool to add many lines of code to the server's runtime configuration. This approach quickly resulted in a multi-thousand line python file, listing all of the tools we provided. The solution was to separate tool implementation from the MCP Server code. We introduced an abstract base class calledToolBase. This class defined many properties and methods that all derived tools would inherit and need to implement.
class ToolBase(ABC):
"""Abstract base class for dynamically loaded MCP tools."""
name: ClassVar[str] # Name of the tool.
version: ClassVar[str] # Version of the tool.
description: str # Tool description - exposed through the MCP protocol and visible to clients.
input_schema: Dict[str, Any] # Weakly typed input schema defining the parameters the tool expects. Expected values are textually defined in the description.
...
@staticmethod
@abstractmethod
async def is_enabled(self) -> bool # Is this specific tool version enabled?
@abstractmethod
async def initialize(self) # Used for any initialization logic - only called once.
@abstractmethod
async def validate_arguments(self, Dict[str, Any]) -> ToolArguments # Converts the weakly typed input_schema dictionary to a strongly typed parameter class derived from ToolArguments. Raises exceptions if required parameters are missing.
async def run(self, arguments: Dict[str, Any], ctx : Context) -> Any : # Run method called by the MCP Server. This method sets up Langfuse spans (see the Tool Observability section), validates arguments using self.validate_arguments to convert the dict to ToolArguments, and then calls the private _run() function.
@abstractmethod
async def _run(self, arguments: ToolArguments, ctx: Context) -> Any # Abstract method implemented by the derived classes that contains their business logic.
During the MCP Server startup, a custom ToolsManager iterates through a configurable directory location looking for any python classes that implement the ToolBase class. Each class it finds, it calls the static is_enabled method. This call uses Feature Flags based on Azure App Configuration Service to determine whether this version of this tool is enabled. If it is enabled, the server creates an instance of the tool and calls ``initialize()``` to perform any tool specific setup, for example checking for the existence of database tables, or access to storage accounts. The tool type is then added to a list of enabled tools. This process allows a hierarchy of tools to be created in a separate folder to the MCP Server
src
|--app # The MCP Server
|--tools # Separate folder containing all the tool
|--Tool_A
|
|--Version_1.0
|--Version_1.1
|--Tool_B
|--Version_2.0
Each tool/version can be enabled/disabled via feature flags. Adding a new tool or a new version of a tool is simply adding a new folder, creating a new class derived from ToolBase and configuring the Feature Flags. Restart the MCP Server and it discovers the new tool at runtime without changing the server's code. Once the server iterates through the folder structure and has a list of enabled tools, it builds an instance of a "callable tool wrapper" around the type and registers it with the MCP server. This wrapper is some GitHub Copilot generated code that takes the python type of the tool and converts it into a class that can be instantiated per MCP tool call. We needed to support multiple concurrent users. Each tool call needed a unique tool instance. Our original approach used singletons and supported only a single local user. Once this tool discovery process is complete, the MCP Server knows which tools it is exposing (based on the feature flags) and has a mechanism for mapping an incoming call to unique tool instance to process it.
Feature Flags for tool enablement
As described in the Discoverable tools section, feature flags control which tools the MCP Server exposes in any given run. The system supports finding these flags in several sources:- Env Var - including .env file
- Azure App Configuration Service
- Azure Key Vault
is_enabled method looking for a feature called FILE_ENDPOINTS
@staticmethod
async def is_enabled() -> bool:
# Check if the upload file tool is enabled
return Configuration.is_enabled("FILE_ENDPOINTS", f"enable_{upload_file_tool.versioned_name}", default=True)
The Configuration class uses a provider pattern to read configuration values from multiple sources, including environment variables, Azure App Configuration Service, and Azure Key Vault. These sources are checked in the listed order; the first match wins. This class allows general feature sets to be defined in Azure (App Config Service or Key Vault) and allows developers to override specific versions or features locally using their .env file. The .is_enabled function takes three parameters:
- Feature name - The name of the feature set to check.
- Versioned tool name - Is this individual version of this tool enabled in the configuration.
- Default value - If no configuration matching the feature name and version is found, use this default value.
FILE_ENDPOINTS—while giving developers control over which version they run locally.
Tool Observability
Observability is an important requirement for any deployed solution. We wanted to split our observability requirements into two groups, Data Science observability and Software Engineering observability.Data Science observability
Langfuse captures data-science-specific logging. This library integrates with the MCP Server and provides information such as token usage and cost per tool call. It also captures the inputs and outputs of tool calls, with no code required in the derived tool classes. During startup, a configuration step connects the Langfuse client using a connection string. In theToolBase class, from which all tools derive, the run method contains the following code. Note the use of the Langfuse @observe decorator.
@observe()
async def run(self, arguments: Dict[str, Any], ctx: Context) -> Any:
"""Execute the tool with provided arguments."""
lf = get_langfuse_client()
lf.update_current_span(name=self.name)
self.sessionId = str(id(ctx.session))
tracer = await self._get_tracing_span(ctx)
with tracer.start_as_current_span(self.sessionId) as span:
span.set_attribute("sessionId", self.sessionId)
self.logger.info(f"{self.versioned_name} tool called")
validated_arguments = self.validate_arguments(arguments)
return await self._run(validated_arguments, ctx)
The @observe decorator tells LangFuse to create a span with an id based on the current session. The span captures all activity within the self._run method, and LangFuse logs all LLM activity automatically. This approach provides a consistent, low-effort mechanism for logging data-science-related values from all tools, without adding observability code to each tool individually.
Software Engineering observability
For traditional observability—monitoring usage, errors, and logs—we used Azure Application Insights. We configured an OpenTelemetry Span-aware provider to correlate calls across tool invocations, which proved useful for observing the Orchestrator's workflow. # Add Azure Monitor trace exporter
trace_exporter = AzureMonitorTraceExporter(connection_string=connection_string)
span_processor = BatchSpanProcessor(trace_exporter)
tracer_provider.add_span_processor(span_processor)
# This private class is used to copy span attributes to log records
# so that logs can be correlated with traces in Azure Application Insights.
# It is not part of the public API, but is necessary for this integration.
class SpanAttributesLogProcessor(LogRecordProcessor):
def on_emit(self, log_data: LogData):
span = trace.get_current_span()
if span and span.is_recording():
# copy every attr from the span into the log's attributes
for k, v in span.attributes.items():
log_data.log_record.attributes[k] = v
def shutdown(self):
"""Shutdown the processor."""
def force_flush(self, timeout_millis: int = 30000):
"""Force flush the processor."""
# Wire up LoggerProvider → AzureMonitorLogExporter
# This custom Log Processor copies span attributes to log records
# so that logs can be correlated with traces in Azure Application Insights.
logger_provider = LoggerProvider()
logger_provider.add_log_record_processor(SpanAttributesLogProcessor())
log_exporter = AzureMonitorLogExporter(
connection_string=connection_string,
)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
This configuration integrates with standard Python logging capabilities, so all logger.{level}(...) calls are sent to Application Insights.
Stateless tools
The nature of the experiments we conducted required access to both files and a vector store. During development, we wanted to use the local file system and an in-memory vector store, but once deployed to the cloud, we want to externalize these resources to Azure Storage and a Databricks based vector database. To prevent having to rewrite code to support both these scenarios, we introduced theStorageFacade and VectorFacade classes. These classes use a provider pattern internally, based on configuration flags, to hide away the implementation. Here is the constructor of the StorageFacade class, which builds the private storage provider.
def __init__(self, storage_type: Optional[str] = None):
"""
Initialize the storage facade with a specific storage type.
ONLY specify the storage type if you want to override the default - in general
you should rely on the configuration as this adapts per environment.
"""
# Storage backend can be swapped via configuration without touching call sites.
if storage_type is None:
storage_type = Configuration().get_value("STORAGE_TYPE", "azure")
self.storage: StorageProvider
if storage_type == FileStorageProvider.name:
self.storage = FileStorageProvider()
elif storage_type == AzureStorageProvider.name:
self.storage = AzureStorageProvider()
else:
raise ValueError(f"Unknown storage type: {storage_type}")
The StorageFacade then provides the following methods:
async def write(
self,
path: str,
contents: bytes,
file_name: Optional[str] = None,
overwrite_existing: bool = False,
create_path_if_not_exists: bool = False,
) -> None:
async def write_string(
self,
path: str,
contents: str,
file_name: Optional[str] = None,
overwrite_existing: bool = False,
create_path_if_not_exists: bool = False,
) -> None:
async def read(
self,
path: str,
file_name: Optional[str] = None,
) -> bytes:
async def read_string(
self,
path: str,
file_name: Optional[str] = None,
) -> str:
async def delete(
self,
path: str,
file_name: Optional[str] = None,
) -> None:
async def list_files(self, path: str) -> list[str]:
async def check_health(self) -> bool:
The FileStorageProvider and AzureStorageProvider handle the details of working with either the local file system (dev) or Azure Storage (production). The VectorFacade class provides a similar set of vector-specific functionality. Externalizing storage and vectors in this way makes the MCP Tools semi-stateless. They can't be fully stateless because tool calls can be relatively long-lived—with conversations occurring between the tool and the client via the MCP elicitation mechanism. However, when tools produce results (RAG embeddings or generated code in our case), they store those results outside the MCP Server process in the vector database or Azure Storage. Other tools can then access and use these results.
Chainable tools
We investigated two approaches to building complex workflows, Client Side and Server Side orchestration. Both required multiple tools to be called in order to achieve the code generation use case.Client Side Orchestration
In the Client Side orchestration case, GitHub Copilot was provided with a relatively complex instructions file describing the goal it was trying to achieve and containing hints as to which tools to use in order to achieve this goal. This file was useful during development as it provided a quick dev cycle, allowing the developers to update the instructions and iterate easily. The downside appears once the code is productionized and deployed to the Azure Container App. There are a few hundred users of the system, so each one needs the latest instructions file on their machine. This process becomes complex and tedious to manage; rolling out new versions of the instructions is time-consuming and error-prone.Server Side Orchestration
To overcome the production issues of client side instruction files, we investigated orchestrating the workflow using Semantic Kernel in the MCP Server. This orchestration allowed us to expose to the client a smaller number of tools to choose from and have the server side orchestrator manage the complexity of calling more tools. When changes were made to the tools or the orchestration workflow, we were able to update the MCP Server container apps and all users would implicitly pick up those changes with no extra steps on the client.Remote Tool calling
To perform the workflow, the server side orchestration tool chains together many MCP tools exposed by the server to perform its work. Output from one is passed as an input to the next. During development, these tools were called by creating an instance of the required tool class in code and calling itsrun method. This workflow was quick and effective, but it did not provide a pattern for calling remote MCP tools hosted in other processes. To solve this problem, we provided a RemoteTool class that hid the location of the MCP Tool being called.
class RemoteTool:
def __init__(self, name: str, ctx: Context):
self.name = name
self.ctx = ctx
self.logger = getLogger(__class__.__name__)
remote_server_url = Configuration.get_value("REMOTE_MCP_SERVER_URL")
self.client = Client(
remote_server_url, elicitation_handler=self.elicitation_handler, progress_handler=self.progress_handler
)
async def elicitation_handler(self, message: str, response_type: type, params, context):
schema = ElicitationRegistry.get_elicitation_type(params.requestedSchema["title"])
result = await self.ctx.elicit(message=message, schema=schema)
if result.action == "accept":
return ElicitResult(action="accept", content=result.data)
elif result.action == "decline":
return ElicitResult(action="decline", content="")
else:
return ElicitResult(action="cancel", content="Unknown error")
async def progress_handler(self, progress: float, total: float | None, message: str | None):
self.ctx.report_progress(progress, total, message)
async def run(self, args: dict[str, Any]):
async with self.client:
response = await self.client.call_tool(self.name, args)
# Extract content from the first TextContent object in the response
if response.content and len(response.content) > 0:
content_item = response.content[0]
if isinstance(content_item, TextContent):
text = content_item.text
result = json.loads(text)
return MCPToolResponse(**result)
return None
This class supports MCP elicitation and progress functionality. The ElicitationRegistry provides tools a mechanism for registering the types to be used during the elicitation process. This decoupling means the MCP Server itself does not need to know the tools it hosts; the tools simply register their elicitation types during their initialize call. It made calling remote tools as simple as:
# Build tool arguments based on agent type
tool_args = self._build_agent_arguments(
agent_name,
...
)
remote_tool = RemoteTool(agent_name, ctx)
mcp_response = await remote_tool.run(tool_args)
This pattern provides several benefits. In addition to calling externally managed MCP tools—for example, GitHub-hosted tools—this approach also allows the orchestrator's tools to move to a private container app. This change reduces the number of tools exposed by the public-facing MCP Server, simplifying the client's decision-making and reducing the attack surface. As described in the Stateless tools section, externalizing storage means that when passing large volumes of data between tools—generated code in this case—the system passes a storage descriptor rather than a large block of text. This approach improves performance, as passing large text strings between MCP tools appears to be inefficient. This data passing is storage-agnostic, so it works whether the code is written to local disk or stored in Azure Storage. Developers can still work locally, and the code requires no changes when pushed to the deployed container app.
The Destination: Outcomes and Learnings
The work presented in this article creates a tool-agnostic MCP Server with built-in Data Science and Software Engineering observability that can call MCP tools locally or remotely in a consistent manner. This architecture allows for the externalization of storage and vectors to increase resilience and efficiency. Tools are discovered at runtime and are enabled or disabled via cloud-hosted feature flags. Developers need zero code changes to run locally versus deploying to a cloud container; only configuration changes via a local .env file are required. We expect multiple projects within the customer's organization to use this MCP Server, extending its benefits to a wider audience. The feature image was generated using Copilot.Post Updated on April 17, 2026 at 08:00AM
Thanks for reading
from devamazonaws.blogspot.com
Comments
Post a Comment