How to Add HTTP API for GraphRAG?

How to Add HTTP API for GraphRAG?

Jul 30, 2024

Kimi Lu

Abstract background with a sophisticated and elegant feel, featuring a blend of soft, diffused colors like blues, purples, and golds. The colorful, blurry elements create a refined and high-end look suitable for professional content.

Introduction

GraphRAG is a structured, hierarchical approach to Retrieval Augmented Generation (RAG), developed by Microsoft Research. Unlike traditional RAG methods that rely on plain text snippets, GraphRAG uses LLM-generated knowledge graphs to enhance document analysis and improve question-answering performance. This tutorial will guide you through the process of adding an HTTP API to GraphRAG using FastAPI, enabling you to interact with GraphRAG via simple HTTP requests.

Prerequisites

Ensure you have the following installed:

  • Python 3.7+

  • FastAPI

  • Uvicorn

  • GraphRAG

You can install the necessary packages using pip:

pip install fastapi uvicorn graphrag

Project Structure

Organize your project directory as follows:

rag/
  ├── settings.yml   # GraphRAG settings
  ├── input/         # Folder to put all the text documents for GraphRAG
  ├── output/        # Folder for GraphRAG to put all the results and logs
  ├── cache/         # Folder for caches generated by GraphRAG
  ├── prompts/       # Folder to store all the prompts used by GraphRAG
  └── api.py         # Python script to run the API server

api.py Script

Here is the complete code for the api.py script, which sets up the FastAPI server and defines endpoints for running indexing and querying through GraphRAG:

from fastapi import FastAPI, HTTPException
from typing import Optional
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Context manager for FastAPI lifespan, to start the background job processor."""
    asyncio.create_task(background_job_processor())
    yield

app = FastAPI(lifespan=lifespan)
job_queue = asyncio.Queue()
running_jobs = set()
queue_lock = asyncio.Lock()

async def read_stream(stream, prefix):
    """Read and print lines from an async stream."""
    output = []
    async for line in stream:
        decoded_line = line.decode().strip()
        output.append(decoded_line)
        print(f"{prefix}: {decoded_line}")
    return '\n'.join(output)

async def run_command(root: str):
    """Run the indexing command as an async subprocess."""
    job_id = root
    running_jobs.add(job_id)
    print(f"Running job: {job_id}")

    try:
        process = await asyncio.create_subprocess_exec(
            "python", "-m", "graphrag.index", "--root", root,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout_output, stderr_output = await asyncio.gather(
            read_stream(process.stdout, "stdout"),
            read_stream(process.stderr, "stderr")
        )

        rc = await process.wait()
        print(f"Indexing finished with return code: {rc}")

        return {
            "status": "success" if rc == 0 else "error",
            "message": "Job completed",
            "stdout": stdout_output,
            "stderr": stderr_output
        }
    except Exception as e:
        print(f"Error in run_command: {str(e)}")
        return {"status": "error", "message": str(e)}
    finally:
        running_jobs.remove(job_id)
        await process_next_job()

async def process_next_job():
    """Process the next job in the queue."""
    if not job_queue.empty():
        next_root = await job_queue.get()
        asyncio.create_task(run_command(next_root))

async def background_job_processor():
    """Continuously process jobs from the queue."""
    while True:
        if not job_queue.empty() and len(running_jobs) == 0:
            await process_next_job()
        await asyncio.sleep(1)

@app.get("/run-index")
async def run_index(root: Optional[str] = "."):
    """Endpoint to start the indexing job."""
    print(f"Received request for /run-index - root: {root}")

    async with queue_lock:
        if root in running_jobs:
            return {"status": "running", "message": "Job is currently running.", "queueSize": job_queue.qsize()}

        await job_queue.put(root)
        if len(running_jobs) == 0:
            asyncio.create_task(process_next_job())
            return {"status": "running", "message": "Job started.", "queueSize": job_queue.qsize()}
        else:
            return {"status": "queued", "message": "Job queued.", "queueSize": job_queue.qsize()}

@app.get("/query")
async def query(query: str):
    """Endpoint to run a query command."""
    print(f"Received request for /query - query: {query}")

    try:
        process = await asyncio.create_subprocess_exec(
            "python", "-m", "graphrag.query", "--root", ".", "--method", "global", query,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout_output, stderr_output = await asyncio.gather(
            read_stream(process.stdout, "stdout"),
            read_stream(process.stderr, "stderr")
        )

        rc = await process.wait()
        return {
            "status": "success" if rc == 0 else "error",
            "message": "Query completed",
            "stdout": stdout_output,
            "stderr": stderr_output
        }
    except Exception as e:
        return {"status": "error", "message": str(e)}

@app.get("/status")
async def status():
    """Endpoint to get the status of the job queue and running jobs."""
    queue_size = job_queue.qsize()
    running_status = list(running_jobs) if running_jobs else "No jobs running"
    return {"queueSize": queue_size, "runningJobs": running_status}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=3000)

Running the Server

Navigate to the project directory where api.py is located and run the following command to start the server:

python api.py

Using the API

You can interact with the API using tools like curl or Postman. Here are some sample usages:

Check Status
curl http://localhost:3000/status
{
    "queueSize": 0,
    "runningJobs": "No jobs running"
}
Run Indexing Job
curl -X GET "http://localhost:3000/run-index?root=./input"
{
    "status": "running",
    "message": "Job started.",
    "queueSize": 1
}
Query
curl -X GET "http://localhost:3000/query?query=what's+the+document+about?"
{
    "status": "success",
    "message": "Query completed",
    "stdout": "Global Search Response: ...<some response here>",
    "stderr": ""
}

Conclusion

With this setup, you now have a functioning HTTP API for GraphRAG that can process indexing jobs and handle queries asynchronously. This API can be expanded and customized further to suit your specific needs.

Feel free to ask any questions or request further assistance. Happy coding!

Turn Your Memory
Into Productivity

Turn Your Memory
Into Productivity

Subscribe to know more

Subscribe to know more