21 lines
610 B
Python
21 lines
610 B
Python
"""DB connector: query database (stub; extend with SQL driver)."""
|
|
|
|
from typing import Any
|
|
|
|
from fusionagi.tools.connectors.base import BaseConnector
|
|
|
|
|
|
class DBConnector(BaseConnector):
|
|
name = "db"
|
|
|
|
def __init__(self) -> None:
|
|
pass
|
|
|
|
def invoke(self, action: str, params: dict[str, Any]) -> Any:
|
|
if action == "query":
|
|
return {"rows": [], "error": "DBConnector stub: implement query"}
|
|
return {"error": f"Unknown action: {action}"}
|
|
|
|
def schema(self) -> dict[str, Any]:
|
|
return {"name": self.name, "actions": ["query"], "parameters": {"query": "string"}}
|