On a quest for enterprise RAG, we explore how to craft RAG microservices from an RAG pipeline POC developed in a Colab notebook in this article. We take the following approach:

  • Generate boilerplate RAG microservices with LlamaIndex's create-llama command line tool.
  • Develop two microservices: ingestion-service, and inference-service to cover the two main stages of RAG.
  • Convert code logic from Colab notebook to the microservices.
  • Add Milvus vector database integration to our new microservices.
  • Add NeMo Guardrails to inference-service to add guardrails for user inputs, LLM outputs, topical moderation, and custom actions to integrate with LlamaIndex.

The notebook

For rapid prototyping, Colab notebook presents the perfect option due to its ease of use, accessibility, and free usage.

For example, this Colab notebook demonstrates how to use Metadata replacement + node sentence window in an RAG pipeline, which serves as a chatbot for the NVIDIA AI Enterprise user guide.

SentenceWindowNodeParser is a tool that can be used to create representations of sentences that consider the surrounding words and sentences. It breaks down documents into individual sentences, and it captures the surrounding sentences too, building a richer picture. Now, imagine needing to translate or summarize this enriched passage. Enter MetadataReplacementNodePostProcessor. It carefully replaces isolated sentences with their surrounding context, creating a smoother, more informed interpretation. This approach shines for large documents, where grasping nuances is crucial.

Since we know reranker helps with retrieval accuracy, we added CohereRerank as one of the node post processors.

Our POC is complete, and we are ready to proceed to the next step on our production RAG journey.

Two microservices

Following some of the key microservice design principles, such as single responsibility, loose coupling, and separation of concern, we are going to create two microservices from our one notebook:

  • ingestion-service: handles data loading, indexing, and storing embeddings to vector database.
  • inference-service: handles retrieval and generation.

An RAG app is a multi-staged pipeline. We divide our RAG pipeline into two separate microservices based on their distinctive functionalities. From a high level, ingestion-service is mainly responsible for data loading, indexing, and writing embeddings to the vector database. On the other hand, inference-service focuses mainly on the retrieval and generation aspects of the RAG pipeline. The scalability, availability, maintainability, and resilience for these two main stages vary, thus making it a preferred option to split an RAG pipeline into two different microservices.

None
Image adapted from LlamaIndex documentation on RAG concepts

Depending on your use case, if it's a small and simple RAG, the overhead of having a standalone ingestion-service may outweigh the benefits; it's perfectly fine to have one microservice handling both main stages: ingestion and inference.

However, if you expect high volumes of data or queries for your RAG pipeline, data ingestion and indexing can be computationally expensive. Splitting these ingestion tasks into a separate ingestion-service can improve the performance of the inference-service. Thus, having two microservices can help your RAG pipeline scale efficiently.

In this article, we will be implementing both ingestion-service and inference-service.

Now, let's dive into the details of developing our new microservices.

The command line tool, create-llama

create-llama is a command line tool to generate LlamaIndex apps. Crafted by Marcus Schiesser, create-llama is a convenient way to create the boilerplate code for your RAG microservices. Simply run the following command to start building your app:

npx create-llama

You will be prompted with a series of questions to help guide your RAG microservice creation. See below a sample of how I created our inference-service microservice.

None

As indicated in the answers to the prompts for create-llama, we selected the following stack for our inference-service:

  • FastAPI for backend: FastAPI is a modern web framework for creating Python APIs that offer high performance, automatic validation, interactive documentation, support for async operations, and dependency injection. FastAPI is known for its ease of use, speed, and features that streamline development.
  • NextJS for frontend: NextJS is an open-source framework built on top of React that's specifically designed for building fast, performant, and SEO-friendly web applications. It simplifies React development by providing a set of tools and conventions to handle common tasks like routing, data fetching, rendering, image optimization, etc.

Similarly, we use create-llama to bootstrap our ingestion-service:

None

Once create-llama is completed, the shells for our new RAG microservices are ready. See the screenshot below for the sample file structure for inference-service.

None

With FastAPI, many suggested project structures can be found online, with more preferences than standards. I like what create-llama has generated; let's keep this file structure.

Now, let's explore further in our RAG microservices development.

Milvus as Vector Database

For POC in the notebook, we didn't use any vector database as we ran successfully with Metadata replacement + node sentence window and Cohere reranker; there is no need for us to mandate a vector database to manage embeddings for a simple POC. However, in a microservice, we want to manage embeddings properly by introducing a vector database. We pick Milvus.

Why Milvus?

Milvus is an open-source cloud-native vector database designed for large-scale similarity search applications. It's gaining popularity for its impressive features and capabilities, including scalability, high performance, flexible data handling, and cost-effectiveness.

In addition, Milvus offers GPU acceleration through its GPU-enabled distribution. This means you can leverage the processing power of GPUs to achieve significant performance improvements, especially for:

  • Search: GPU acceleration can significantly speed up similarity search queries, particularly when working with large-scale datasets and high-dimensional vectors.
  • Index building: Building indexes (like HNSW, IVFFL) can be accelerated using GPUs, reducing the time it takes to prepare your data for efficient search.

Overall, GPU acceleration in Milvus is valuable for handling large-scale workloads and demanding search applications.

The simplest way to experiment with Milvus is through Zilliz Cloud, a fully managed version of Milvus. Follow its quick start guide to stand up a Milvus cluster and create a new collection ai_enterprise for our microservices.

Both ingestion-service and inference-service need to access Milvus — one for writing, the other for reading.

We store the Milvus connection details in the .env file under the backend directory. Together with the OpenAI API key, our .env file now looks like the following. Feel free to revise the values according to the configuration for your RAG pipeline:

# OpenAI
MODEL=gpt-3.5-turbo-0125
OPENAI_API_KEY=sk-###

# Milvus
MILVUS_API_KEY=###
MILVUS_URI="https://###.api.gcp-us-west1.zillizcloud.com"
MILVUS_COLLECTION="ai_enterprise"
MILVUS_DIMENSION=1536

# Cohere
COHERE_API_KEY=###

ingestion-service

From ingestion-service side, we are going to utilize one key file that comes with the create-llama template:

  • generate.py: this file generates the embeddings of our PDF document in the ./data directory and saves them in the Milvus collection. See the code snippet below for the generate_datasource function.
def generate_datasource():

    try:
        milvus_uri = os.getenv("MILVUS_URI")
        milvus_api_key = os.getenv("MILVUS_API_KEY")
        milvus_collection = os.getenv("MILVUS_COLLECTION")
        milvus_dimension = int(os.getenv("MILVUS_DIMENSION"))

        if not all([milvus_uri, milvus_api_key, milvus_collection, milvus_dimension]):
            raise ValueError("Missing required environment variables.")

        # Create MilvusVectorStore 
        vector_store = MilvusVectorStore(
            uri=milvus_uri,
            token=milvus_api_key,
            collection_name=milvus_collection,
            dim=milvus_dimension, # mandatory for new collection creation
            overwrite=True, # mandatory for new collection creation 
        )

        # Create StorageContext 
        storage_context = StorageContext.from_defaults(vector_store=vector_store)

        # create the sentence window node parser
        node_parser = SentenceWindowNodeParser.from_defaults(
            window_size=3,
            window_metadata_key="window",
            original_text_metadata_key="original_text",
        )

        documents = SimpleDirectoryReader("data").load_data()
        nodes = node_parser.get_nodes_from_documents(documents)
        index = VectorStoreIndex(nodes, storage_context=storage_context, embed_model=embed_model)

    except (KeyError, ValueError) as e:
        raise ValueError(f"Invalid environment variables: {e}")
    except ConnectionError as e:
        raise ConnectionError(f"Failed to connect to Milvus: {e}")

inference-service

From inference-service side, we are going to utilize another key file that comes with the create-llama template:

  • index.py: this file gets called when we need to define the index and query engine. It loads the existing Milvus collection created by ingestion-service, creates the index, and defines the query engine.
load_dotenv()

def get_index_and_query_engine():

    model = os.getenv("MODEL", "gpt-3.5-turbo")
    llm=OpenAI(model=model)

    cohere_api_key = os.environ["COHERE_API_KEY"]
    cohere_rerank = CohereRerank(api_key=cohere_api_key, top_n=2) # return top 2 nodes from reranker

    try:
        milvus_uri = os.getenv("MILVUS_URI")
        milvus_api_key = os.getenv("MILVUS_API_KEY")
        milvus_collection = os.getenv("MILVUS_COLLECTION")
        milvus_dim = os.getenv("MILVUS_DIMENSION")

        if not all([milvus_uri, milvus_api_key, milvus_collection]):
            raise ValueError("Missing required environment variables.")
    
        # load the existing collection
        vector_store = MilvusVectorStore(
            uri=milvus_uri,
            token=milvus_api_key,
            collection_name=milvus_collection,
            dim=milvus_dim,
            overwrite=False,
        )

        index = VectorStoreIndex.from_vector_store(vector_store)

        query_engine = index.as_query_engine(
            similarity_top_k=2,
            llm=llm,
            # the target key defaults to `window` to match the node_parser's default
            node_postprocessors=[
                MetadataReplacementPostProcessor(target_metadata_key="window"),
                cohere_rerank
            ],
        )

    except (KeyError, ValueError) as e:
        raise ValueError(f"Invalid environment variables: {e}")
    except ConnectionError as e:
        raise ConnectionError(f"Failed to connect to Milvus: {e}")

    return query_engine

Once the vector database is implemented, let's add NeMo Guardrails to our inference-service, as this microservice interfaces with end users, and we need to ensure guardrails are properly configured for input/output/topical moderations.

NeMo Guardrails

We explored the implementation details of adding NeMo Guardrails to an RAG pipeline in our last article, NeMo Guardrails, the Ultimate Open-Source LLM Security Toolkit. Let's now try implementing it in our new inference-service.

We copy the config directory containing the four guardrails configuration files from our last article's GitHub repo, and add it under the backend/app directory of inference-service. The only file we need to change is actions.py, where we define a custom action to integrate LlamaIndex with NeMo Guardrails.

from typing import Optional
from nemoguardrails.actions import action
from llama_index.core.base.base_query_engine import BaseQueryEngine
from llama_index.core.base.response.schema import StreamingResponse
from app.engine.index import get_index_and_query_engine

# Global variable to cache the query_engine
query_engine_cache = None

def init():
    global query_engine_cache  # Declare to use the global variable
    # Check if the query_engine is already initialized
    if query_engine_cache is not None:
        print('Using cached query engine')
        return query_engine_cache

    query_engine_cache = get_index_and_query_engine()

    return query_engine_cache

def get_query_response(query_engine: BaseQueryEngine, query: str) -> str:
    """
    Function to query based on the query_engine and query string passed in.
    """
    response = query_engine.query(query)
    if isinstance(response, StreamingResponse):
        typed_response = response.get_response()
    else:
        typed_response = response
    response_str = typed_response.response
    if response_str is None:
        return ""
    return response_str

@action(is_system_action=True)
async def user_query(context: Optional[dict] = None):
    """
    Function to invoke the query_engine to query user message.
    """
    user_message = context.get("user_message")
    print('user_message is ', user_message)
    query_engine = init()
    return get_query_response(query_engine, user_message)

As you can see, the main line calls the get_index_and_query_engine function from index.py to get the index and query engine for executing the user query.

The other three files under the config directory, config.yml, prompts.yml, and bot_flows.co, remain the same, as this is the same AI enterprise chatbot as from the last article, and the flows and prompts remain the same.

Now all the major components are lined up, let's string them together and expose the endpoints through FastAPI.

Define FastAPI app and router

For inference-service, in the main.py class under the backend directory, we define the FastAPI app, and include the chat_router on the application, making its endpoints accessible under the /api/chat path prefix. A router groups related API endpoints together. In our case, we only define one router, chat_router.

app = FastAPI()

app.include_router(chat_router, prefix="/api/chat")

In the chat.py class, we define a POST endpoint on the chat_router. It takes in the data which contains the user query. The key logic lies in the RailsConfig construction and defining LLMRails by passing in the config. We call its generate_async function to do Q&A.

from typing import List
from fastapi import APIRouter, HTTPException, status
from llama_index.core.llms import MessageRole
from pydantic import BaseModel
from nemoguardrails import LLMRails, RailsConfig

chat_router = r = APIRouter()

class _Message(BaseModel):
    role: MessageRole
    content: str

class _ChatData(BaseModel):
    messages: List[_Message]

@r.post("")
async def chat(data: _ChatData):
    # check preconditions and get last message
    if len(data.messages) == 0:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="No messages provided",
        )
    lastMessage = data.messages.pop()
    if lastMessage.role != MessageRole.USER:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Last message must be from user",
        )

    # Load a guardrails configuration from the specified path.
    config = RailsConfig.from_path("./app/config")
    rails = LLMRails(config)

    # call generate_async
    response = await rails.generate_async(prompt=lastMessage.content)

    return response

For ingestion-service, we have two options to trigger the data ingestion:

  • call python app/engine/generate.py.
  • trigger a POST call to the endpoint /api/ingestion by navigating to the swagger UI of the API at http://localhost:8000/docs.

Keep in mind these two microservices are dealing with a pretty simple use case, and they only serve as starters. Do feel free to customize and expand these microservices to incorporate your domain specific requirements.

Microservices Launch

Until now, our draft of the two microservices is in good shape. Let's try launching them to see if they work as we expected.

ingestion-service

We run the following commands to kick off the ingestion-service.

python -m venv venv
.\venv\Scripts\activate # source venv/scripts/activate for mac
poetry install
poetry shell
python app/engine/generate.py

If all goes well, you should see the following Milvus collection ai_enterprise populated through Zilliz Cloud:

None

Our source document has been successfully loaded, indexed, and its embeddings persisted into the ai_enterprise Milvus collection.

inference-service

We launch both the backend and frontend of inference-service to start the microservice:

# backend
python -m venv venv
.\venv\Scripts\activate # source venv/scripts/activate for mac
cd backend
poetry install
poetry shell
python main.py
# frontend
npm install
npm run dev

Now, let's launch our chatbot UI and ask a few questions:

None
None

Pretty impressive! I am happy with how NeMo Guardrails performs.

Summary

We explored the RAG journey from one notebook to two microservices in this article. Specifically, we touched on the topics of:

  • create-llama command line tool, which bootstraps new microservices development.
  • Crafting two microservices, ingestion-service, and inference-service, to cover the two main stages of RAG.
  • GPU-accelerated Milvus vector database integration into our new microservices.
  • Adding NeMo Guardrails to inference-service to add guardrails for user inputs, LLM outputs, topical moderation, and custom actions to integrate with LlamaIndex.

This is only the beginning of our exploration into Enterprise RAG stacks. I plan to continue this blog series in the coming weeks so we can dive deeper into the many facets of Enterprise RAG. Let's keep adding more features to our RAG starters which we built today.

The complete source code for this article can be found in my GitHub repo, which contains the Colab notebook and two microservices.

Happy coding!

References: