How to Add HTTP API for GraphRAG?

How to Add HTTP API for GraphRAG?

Kimi Lu

Abstract background with a sophisticated and elegant feel, featuring a blend of soft, diffused colors like blues, purples, and golds. The colorful, blurry elements create a refined and high-end look suitable for professional content.

Introduction

GraphRAG is a structured, hierarchical approach to Retrieval Augmented Generation (RAG), developed by Microsoft Research. Unlike traditional RAG methods that rely on plain text snippets, GraphRAG uses LLM-generated knowledge graphs to enhance document analysis and improve question-answering performance. This tutorial will guide you through the process of adding an HTTP API to GraphRAG using FastAPI, enabling you to interact with GraphRAG via simple HTTP requests.

Prerequisites

Ensure you have the following installed:

  • Python 3.7+

  • FastAPI

  • Uvicorn

  • GraphRAG

You can install the necessary packages using pip:

pip install fastapi uvicorn graphrag

Project Structure

Organize your project directory as follows:

rag/
  ├── settings.yml   # GraphRAG settings
  ├── input/         # Folder to put all the text documents for GraphRAG
  ├── output/        # Folder for GraphRAG to put all the results and logs
  ├── cache/         # Folder for caches generated by GraphRAG
  ├── prompts/       # Folder to store all the prompts used by GraphRAG
  └── api.py         # Python script to run the API server

api.py Script

Here is the complete code for the api.py script, which sets up the FastAPI server and defines endpoints for running indexing and querying through GraphRAG:

from fastapi import FastAPI, HTTPException
from typing import Optional
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Context manager for FastAPI lifespan, to start the background job processor."""
    asyncio.create_task(background_job_processor())
    yield

app = FastAPI(lifespan=lifespan)
job_queue = asyncio.Queue()
running_jobs = set()
queue_lock = asyncio.Lock()

async def read_stream(stream, prefix):
    """Read and print lines from an async stream."""
    output = []
    async for line in stream:
        decoded_line = line.decode().strip()
        output.append(decoded_line)
        print(f"{prefix}: {decoded_line}")
    return '\n'.join(output)

async def run_command(root: str):
    """Run the indexing command as an async subprocess."""
    job_id = root
    running_jobs.add(job_id)
    print(f"Running job: {job_id}")

    try:
        process = await asyncio.create_subprocess_exec(
            "python", "-m", "graphrag.index", "--root", root,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout_output, stderr_output = await asyncio.gather(
            read_stream(process.stdout, "stdout"),
            read_stream(process.stderr, "stderr")
        )

        rc = await process.wait()
        print(f"Indexing finished with return code: {rc}")

        return {
            "status": "success" if rc == 0 else "error",
            "message": "Job completed",
            "stdout": stdout_output,
            "stderr": stderr_output
        }
    except Exception as e:
        print(f"Error in run_command: {str(e)}")
        return {"status": "error", "message": str(e)}
    finally:
        running_jobs.remove(job_id)
        await process_next_job()

async def process_next_job():
    """Process the next job in the queue."""
    if not job_queue.empty():
        next_root = await job_queue.get()
        asyncio.create_task(run_command(next_root))

async def background_job_processor():
    """Continuously process jobs from the queue."""
    while True:
        if not job_queue.empty() and len(running_jobs) == 0:
            await process_next_job()
        await asyncio.sleep(1)

@app.get("/run-index")
async def run_index(root: Optional[str] = "."):
    """Endpoint to start the indexing job."""
    print(f"Received request for /run-index - root: {root}")

    async with queue_lock:
        if root in running_jobs:
            return {"status": "running", "message": "Job is currently running.", "queueSize": job_queue.qsize()}

        await job_queue.put(root)
        if len(running_jobs) == 0:
            asyncio.create_task(process_next_job())
            return {"status": "running", "message": "Job started.", "queueSize": job_queue.qsize()}
        else:
            return {"status": "queued", "message": "Job queued.", "queueSize": job_queue.qsize()}

@app.get("/query")
async def query(query: str):
    """Endpoint to run a query command."""
    print(f"Received request for /query - query: {query}")

    try:
        process = await asyncio.create_subprocess_exec(
            "python", "-m", "graphrag.query", "--root", ".", "--method", "global", query,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout_output, stderr_output = await asyncio.gather(
            read_stream(process.stdout, "stdout"),
            read_stream(process.stderr, "stderr")
        )

        rc = await process.wait()
        return {
            "status": "success" if rc == 0 else "error",
            "message": "Query completed",
            "stdout": stdout_output,
            "stderr": stderr_output
        }
    except Exception as e:
        return {"status": "error", "message": str(e)}

@app.get("/status")
async def status():
    """Endpoint to get the status of the job queue and running jobs."""
    queue_size = job_queue.qsize()
    running_status = list(running_jobs) if running_jobs else "No jobs running"
    return {"queueSize": queue_size, "runningJobs": running_status}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=3000)

Running the Server

Navigate to the project directory where api.py is located and run the following command to start the server:

python api.py

Using the API

You can interact with the API using tools like curl or Postman. Here are some sample usages:

Check Status
curl http://localhost:3000/status
{
    "queueSize": 0,
    "runningJobs": "No jobs running"
}
Run Indexing Job
curl -X GET "http://localhost:3000/run-index?root=./input"
{
    "status": "running",
    "message": "Job started.",
    "queueSize": 1
}
Query
curl -X GET "http://localhost:3000/query?query=what's+the+document+about?"
{
    "status": "success",
    "message": "Query completed",
    "stdout": "Global Search Response: ...<some response here>",
    "stderr": ""
}

Conclusion

With this setup, you now have a functioning HTTP API for GraphRAG that can process indexing jobs and handle queries asynchronously. This API can be expanded and customized further to suit your specific needs.

Feel free to ask any questions or request further assistance. Happy coding!

Home

Home

Home

Integrations

Integrations

Integrations

Vault

Vault

Vault

Audit

Audit

Audit

Arana Grande

Arana Grande

Arana Grande

Free

Free

Free

30-day audit summary

30-day audit summary

30-day audit summary

Daily action-call volume and the latest receipts from the Loadout audit trail.

Daily action-call volume and the latest receipts from the Loadout audit trail.

Daily action-call volume and the latest receipts from the Loadout audit trail.

View Audit

View Audit

View Audit

Loadout usage

Loadout usage

Loadout usage

617 action calls in the last 30 days

617 action calls in the last 30 days

617 action calls in the last 30 days

May 19 - Jun 17

May 19 - Jun 17

May 19 - Jun 17

10 active days

10 active days

10 active days

Less

Less

Less

More

More

More

Recent activity

Recent activity

Recent activity

Latest action-call receipts from connected agents

Latest action-call receipts from connected agents

Latest action-call receipts from connected agents

Apr 23, 09:23 AM

Apr 23, 09:23 AM

Apr 23, 09:23 AM

Shopify

Shopify

Shopify

Creates Or Updates An Asset For A Theme

Creates Or Updates An Asset For A Theme

Creates Or Updates An Asset For A Theme

Success

Success

Success

Apr 23, 09:21 AM

Apr 23, 09:21 AM

Apr 23, 09:21 AM

Shopify

Shopify

Shopify

Update Products Param Product Id

Update Products Param Product Id

Update Products Param Product Id

Success

Success

Success

Apr 23, 08:53 AM

Apr 23, 08:53 AM

Apr 23, 08:53 AM

Shopify

Shopify

Shopify

Update Products Param Product Id

Update Products Param Product Id

Update Products Param Product Id

Failed

Failed

Failed

Apr 22, 22:13 PM

Apr 22, 22:13 PM

Apr 22, 22:13 PM

Shopify

Shopify

Shopify

Create Product Image

Create Product Image

Create Product Image

Success

Success

Success

Apr 22, 22:12 PM

Apr 22, 22:12 PM

Apr 22, 22:12 PM

Shopify

Shopify

Shopify

Create Product Image

Create Product Image

Create Product Image

Success

Success

Success

Connected integration coverage

Connected integration coverage

Connected integration coverage

162

162

162

of 753 accessible connected

of 753 accessible connected

of 753 accessible connected

Callable actions

Callable actions

Callable actions

1,126

1,126

1,126

Vault credentials

Vault credentials

Vault credentials

8

8

8

Explore what's possible

Explore what's possible

Explore what's possible

See all Integrations

See all Integrations

See all Integrations

Google Ads

Google Ads

Google Ads

All available Goolge Ads tools via...

All available Goolge Ads tools via...

All available Goolge Ads tools via...

X (twitter)

X (twitter)

X (twitter)

All available X tools via...

All available X tools via...

All available X tools via...

Github

Github

Github

All available Github tools via...

All available Github tools via...

All available Github tools via...

Notion

Notion

Notion

All available Notion tools via...

All available Notion tools via...

All available Notion tools via...

Slack

Slack

Slack

All available Slack tools via...

All available Slack tools via...

All available Slack tools via...

Firecrawl

Firecrawl

Firecrawl

All available Firecrawl tools via...

All available Firecrawl tools via...

All available Firecrawl tools via...

753 integrations are available for loadouts.

753 integrations are available for loadouts.

753 integrations are available for loadouts.

Plug your entire stack into your AI agents.

Plug your entire stack into your AI agents.

Plug your entire stack into your AI agents.

Skip the integration headache. Plug 750+ tools into Claude Code, Codex, and OpenClaw in one go, and let your agents execute today.

Skip the integration headache. Plug 750+ tools into Claude Code, Codex, and OpenClaw in one go, and let your agents execute today.