Initial commit
This commit is contained in:
9
src/kroger_mcp/__init__.py
Normal file
9
src/kroger_mcp/__init__.py
Normal file
@@ -0,0 +1,9 @@
|
||||
"""
|
||||
Kroger MCP Server Package
|
||||
|
||||
A FastMCP server that provides access to Kroger's API for grocery shopping functionality.
|
||||
"""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
__author__ = "Your Name"
|
||||
__email__ = "your.email@example.com"
|
||||
79
src/kroger_mcp/cli.py
Normal file
79
src/kroger_mcp/cli.py
Normal file
@@ -0,0 +1,79 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
CLI entry point for Kroger MCP server
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
def main():
|
||||
"""CLI entry point with argument parsing"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Kroger MCP Server - FastMCP server for Kroger API integration"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--client-id",
|
||||
help="Kroger API client ID (can also use KROGER_CLIENT_ID env var)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--client-secret",
|
||||
help="Kroger API client secret (can also use KROGER_CLIENT_SECRET env var)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--redirect-uri",
|
||||
default="http://localhost:8000/callback",
|
||||
help="OAuth redirect URI (default: http://localhost:8000/callback)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--zip-code",
|
||||
help="Default zip code for location searches"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--transport",
|
||||
choices=["stdio", "streamable-http", "sse"],
|
||||
default="stdio",
|
||||
help="Transport protocol (default: stdio)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--host",
|
||||
default="127.0.0.1",
|
||||
help="Host for HTTP transports (default: 127.0.0.1)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--port",
|
||||
type=int,
|
||||
default=8000,
|
||||
help="Port for HTTP transports (default: 8000)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Set environment variables from CLI args if provided
|
||||
if args.client_id:
|
||||
os.environ["KROGER_CLIENT_ID"] = args.client_id
|
||||
if args.client_secret:
|
||||
os.environ["KROGER_CLIENT_SECRET"] = args.client_secret
|
||||
if args.redirect_uri:
|
||||
os.environ["KROGER_REDIRECT_URI"] = args.redirect_uri
|
||||
if args.zip_code:
|
||||
os.environ["KROGER_USER_ZIP_CODE"] = args.zip_code
|
||||
|
||||
# Import and create server
|
||||
from kroger_mcp.server import create_server
|
||||
|
||||
server = create_server()
|
||||
|
||||
# Run with specified transport
|
||||
if args.transport == "stdio":
|
||||
server.run()
|
||||
elif args.transport == "streamable-http":
|
||||
server.run(transport="streamable-http", host=args.host, port=args.port)
|
||||
elif args.transport == "sse":
|
||||
server.run(transport="sse", host=args.host, port=args.port)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
97
src/kroger_mcp/prompts.py
Normal file
97
src/kroger_mcp/prompts.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""
|
||||
MCP prompts for the Kroger MCP server
|
||||
"""
|
||||
|
||||
from typing import List, Dict, Any, Optional
|
||||
from fastmcp import Context
|
||||
|
||||
|
||||
def register_prompts(mcp):
|
||||
"""Register prompts with the FastMCP server"""
|
||||
|
||||
@mcp.prompt()
|
||||
async def grocery_list_store_path(grocery_list: str, ctx: Context = None) -> str:
|
||||
"""
|
||||
Generate a prompt asking for the optimal path through a store based on a grocery list.
|
||||
|
||||
Args:
|
||||
grocery_list: A list of grocery items the user wants to purchase
|
||||
|
||||
Returns:
|
||||
A prompt asking for the optimal shopping path
|
||||
"""
|
||||
return f"""I'm planning to go grocery shopping at Kroger with this list:
|
||||
|
||||
{grocery_list}
|
||||
|
||||
Can you help me find the most efficient path through the store? Please search for these products to determine their aisle locations, then arrange them in a logical shopping order.
|
||||
|
||||
If you can't find exact matches for items, please suggest similar products that are available.
|
||||
|
||||
IMPORTANT: Please only organize my shopping path - DO NOT add any items to my cart.
|
||||
"""
|
||||
|
||||
@mcp.prompt()
|
||||
async def pharmacy_open_check(ctx: Context = None) -> str:
|
||||
"""
|
||||
Generate a prompt asking whether a pharmacy at the preferred Kroger location is open.
|
||||
|
||||
Returns:
|
||||
A prompt asking about pharmacy status
|
||||
"""
|
||||
return """Can you tell me if the pharmacy at my preferred Kroger store is currently open?
|
||||
|
||||
Please check the department information for the pharmacy department and let me know:
|
||||
1. If there is a pharmacy at my preferred store
|
||||
2. If it's currently open
|
||||
3. What the hours are for today
|
||||
4. What services are available at this pharmacy
|
||||
|
||||
Please use the get_location_details tool to find this information for my preferred location.
|
||||
"""
|
||||
|
||||
@mcp.prompt()
|
||||
async def set_preferred_store(zip_code: Optional[str] = None, ctx: Context = None) -> str:
|
||||
"""
|
||||
Generate a prompt to help the user set their preferred Kroger store.
|
||||
|
||||
Args:
|
||||
zip_code: Optional zip code to search near
|
||||
|
||||
Returns:
|
||||
A prompt asking for help setting a preferred store
|
||||
"""
|
||||
zip_phrase = f" near zip code {zip_code}" if zip_code else ""
|
||||
|
||||
return f"""I'd like to set my preferred Kroger store{zip_phrase}. Can you help me with this process?
|
||||
|
||||
Please:
|
||||
1. Search for nearby Kroger stores{zip_phrase}
|
||||
2. Show me a list of the closest options with their addresses
|
||||
3. Let me choose one from the list
|
||||
4. Set that as my preferred location
|
||||
|
||||
For each store, please show the full address, distance, and any special features or departments.
|
||||
"""
|
||||
|
||||
@mcp.prompt()
|
||||
async def add_recipe_to_cart(recipe_type: str = "classic apple pie", ctx: Context = None) -> str:
|
||||
"""
|
||||
Generate a prompt to find a specific recipe and add ingredients to cart. (default: classic apple pie)
|
||||
|
||||
Args:
|
||||
recipe_type: The type of recipe to search for (e.g., "chicken curry", "vegetarian lasagna")
|
||||
|
||||
Returns:
|
||||
A prompt asking for a recipe and to add ingredients to cart
|
||||
"""
|
||||
return f"""I'd like to make a recipe: {recipe_type}. Can you help me with the following:
|
||||
|
||||
1. Search the web for a good {recipe_type} recipe
|
||||
2. Present the recipe with ingredients and instructions
|
||||
3. Look up each ingredient in my local Kroger store
|
||||
4. Add all the ingredients I'll need to my cart using bulk_add_to_cart
|
||||
5. If any ingredients aren't available, suggest alternatives
|
||||
|
||||
Before adding items to cart, please ask me if I prefer pickup or delivery for these items.
|
||||
"""
|
||||
87
src/kroger_mcp/server.py
Normal file
87
src/kroger_mcp/server.py
Normal file
@@ -0,0 +1,87 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
FastMCP Server for Kroger API
|
||||
|
||||
This server provides MCP tools for interacting with the Kroger API, including:
|
||||
- Location management (search stores, get details, set preferred location)
|
||||
- Product search and details
|
||||
- Cart management (add items, bulk operations, tracking)
|
||||
- Chain and department information
|
||||
- User profile and authentication
|
||||
|
||||
Environment Variables Required:
|
||||
- KROGER_CLIENT_ID: Your Kroger API client ID
|
||||
- KROGER_CLIENT_SECRET: Your Kroger API client secret
|
||||
- KROGER_REDIRECT_URI: Redirect URI for OAuth2 flow (default: http://localhost:8000/callback)
|
||||
- KROGER_USER_ZIP_CODE: Default zip code for location searches (optional)
|
||||
"""
|
||||
|
||||
import sys
|
||||
from fastmcp import FastMCP
|
||||
|
||||
# Import all tool modules
|
||||
from .tools import location_tools
|
||||
from .tools import product_tools
|
||||
from .tools import cart_tools
|
||||
from .tools import info_tools
|
||||
from .tools import profile_tools
|
||||
from .tools import utility_tools
|
||||
|
||||
# Import prompts
|
||||
from . import prompts
|
||||
|
||||
|
||||
def create_server() -> FastMCP:
|
||||
"""Create and configure the FastMCP server instance"""
|
||||
# Initialize the FastMCP server
|
||||
mcp = FastMCP(
|
||||
name="Kroger API Server",
|
||||
instructions="""
|
||||
This MCP server provides access to Kroger's API for grocery shopping functionality.
|
||||
|
||||
Key Features:
|
||||
- Search and manage store locations
|
||||
- Find and search products
|
||||
- Add items to shopping cart with local tracking
|
||||
- Access chain and department information
|
||||
- User profile management
|
||||
|
||||
Common workflows:
|
||||
1. Set a preferred location with set_preferred_location
|
||||
2. Search for products with search_products
|
||||
3. Add items to cart with add_items_to_cart
|
||||
4. Use bulk_add_to_cart for multiple items at once
|
||||
5. View current cart with view_current_cart
|
||||
6. Mark order as placed with mark_order_placed
|
||||
|
||||
Authentication is handled automatically - OAuth flows will open in your browser when needed.
|
||||
|
||||
Cart Tracking:
|
||||
This server maintains a local record of items added to your cart since the Kroger API
|
||||
doesn't provide cart viewing functionality. When you place an order through the Kroger
|
||||
website/app, use mark_order_placed to move the current cart to order history.
|
||||
"""
|
||||
)
|
||||
|
||||
# Register all tools from the modules
|
||||
location_tools.register_tools(mcp)
|
||||
product_tools.register_tools(mcp)
|
||||
cart_tools.register_tools(mcp)
|
||||
info_tools.register_tools(mcp)
|
||||
profile_tools.register_tools(mcp)
|
||||
utility_tools.register_tools(mcp)
|
||||
|
||||
# Register prompts
|
||||
prompts.register_prompts(mcp)
|
||||
|
||||
return mcp
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point for the Kroger MCP server"""
|
||||
mcp = create_server()
|
||||
mcp.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
11
src/kroger_mcp/tools/__init__.py
Normal file
11
src/kroger_mcp/tools/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""
|
||||
Tools package for Kroger MCP server
|
||||
|
||||
This package contains all the tool modules organized by functionality:
|
||||
- location_tools: Store location search and management
|
||||
- product_tools: Product search and details
|
||||
- cart_tools: Shopping cart management with local tracking
|
||||
- info_tools: Chain and department information
|
||||
- profile_tools: User profile and authentication
|
||||
- shared: Common utilities and client management
|
||||
"""
|
||||
504
src/kroger_mcp/tools/cart_tools.py
Normal file
504
src/kroger_mcp/tools/cart_tools.py
Normal file
@@ -0,0 +1,504 @@
|
||||
"""
|
||||
Cart tracking and management functionality
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any, List
|
||||
|
||||
from fastmcp import Context
|
||||
from .shared import get_authenticated_client
|
||||
|
||||
|
||||
# Cart storage file
|
||||
CART_FILE = "kroger_cart.json"
|
||||
ORDER_HISTORY_FILE = "kroger_order_history.json"
|
||||
|
||||
|
||||
def _load_cart_data() -> Dict[str, Any]:
|
||||
"""Load cart data from file"""
|
||||
try:
|
||||
if os.path.exists(CART_FILE):
|
||||
with open(CART_FILE, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
pass
|
||||
return {"current_cart": [], "last_updated": None, "preferred_location_id": None}
|
||||
|
||||
|
||||
def _save_cart_data(cart_data: Dict[str, Any]) -> None:
|
||||
"""Save cart data to file"""
|
||||
try:
|
||||
with open(CART_FILE, 'w') as f:
|
||||
json.dump(cart_data, f, indent=2)
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not save cart data: {e}")
|
||||
|
||||
|
||||
def _load_order_history() -> List[Dict[str, Any]]:
|
||||
"""Load order history from file"""
|
||||
try:
|
||||
if os.path.exists(ORDER_HISTORY_FILE):
|
||||
with open(ORDER_HISTORY_FILE, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
pass
|
||||
return []
|
||||
|
||||
|
||||
def _save_order_history(history: List[Dict[str, Any]]) -> None:
|
||||
"""Save order history to file"""
|
||||
try:
|
||||
with open(ORDER_HISTORY_FILE, 'w') as f:
|
||||
json.dump(history, f, indent=2)
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not save order history: {e}")
|
||||
|
||||
|
||||
def _add_item_to_local_cart(product_id: str, quantity: int, modality: str, product_details: Dict[str, Any] = None) -> None:
|
||||
"""Add an item to the local cart tracking"""
|
||||
cart_data = _load_cart_data()
|
||||
current_cart = cart_data.get("current_cart", [])
|
||||
|
||||
# Check if item already exists in cart
|
||||
existing_item = None
|
||||
for item in current_cart:
|
||||
if item.get("product_id") == product_id and item.get("modality") == modality:
|
||||
existing_item = item
|
||||
break
|
||||
|
||||
if existing_item:
|
||||
# Update existing item quantity
|
||||
existing_item["quantity"] = existing_item.get("quantity", 0) + quantity
|
||||
existing_item["last_updated"] = datetime.now().isoformat()
|
||||
else:
|
||||
# Add new item
|
||||
new_item = {
|
||||
"product_id": product_id,
|
||||
"quantity": quantity,
|
||||
"modality": modality,
|
||||
"added_at": datetime.now().isoformat(),
|
||||
"last_updated": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
# Add product details if provided
|
||||
if product_details:
|
||||
new_item.update(product_details)
|
||||
|
||||
current_cart.append(new_item)
|
||||
|
||||
cart_data["current_cart"] = current_cart
|
||||
cart_data["last_updated"] = datetime.now().isoformat()
|
||||
_save_cart_data(cart_data)
|
||||
|
||||
|
||||
def register_tools(mcp):
|
||||
"""Register cart-related tools with the FastMCP server"""
|
||||
|
||||
@mcp.tool()
|
||||
async def add_items_to_cart(
|
||||
product_id: str,
|
||||
quantity: int = 1,
|
||||
modality: str = "PICKUP",
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Add a single item to the user's Kroger cart and track it locally.
|
||||
|
||||
If the user doesn't specifically indicate a preference for pickup or delivery,
|
||||
you should ask them which modality they prefer before calling this tool.
|
||||
|
||||
Args:
|
||||
product_id: The product ID or UPC to add to cart
|
||||
quantity: Quantity to add (default: 1)
|
||||
modality: Fulfillment method - PICKUP or DELIVERY
|
||||
|
||||
Returns:
|
||||
Dictionary confirming the item was added to cart
|
||||
"""
|
||||
try:
|
||||
if ctx:
|
||||
await ctx.info(f"Adding {quantity}x {product_id} to cart with {modality} modality")
|
||||
|
||||
# Get authenticated client
|
||||
client = get_authenticated_client()
|
||||
|
||||
# Format the item for the API
|
||||
cart_item = {
|
||||
"upc": product_id,
|
||||
"quantity": quantity,
|
||||
"modality": modality
|
||||
}
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Calling Kroger API to add item: {cart_item}")
|
||||
|
||||
# Add the item to the actual Kroger cart
|
||||
# Note: add_to_cart returns None on success, raises exception on failure
|
||||
client.cart.add_to_cart([cart_item])
|
||||
|
||||
if ctx:
|
||||
await ctx.info("Successfully added item to Kroger cart")
|
||||
|
||||
# Add to local cart tracking
|
||||
_add_item_to_local_cart(product_id, quantity, modality)
|
||||
|
||||
if ctx:
|
||||
await ctx.info("Item added to local cart tracking")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Successfully added {quantity}x {product_id} to cart",
|
||||
"product_id": product_id,
|
||||
"quantity": quantity,
|
||||
"modality": modality,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Failed to add item to cart: {str(e)}")
|
||||
|
||||
# Provide helpful error message for authentication issues
|
||||
error_message = str(e)
|
||||
if "401" in error_message or "Unauthorized" in error_message:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Authentication failed. Please run force_reauthenticate and try again.",
|
||||
"details": error_message
|
||||
}
|
||||
elif "400" in error_message or "Bad Request" in error_message:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Invalid request. Please check the product ID and try again.",
|
||||
"details": error_message
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to add item to cart: {error_message}",
|
||||
"product_id": product_id,
|
||||
"quantity": quantity,
|
||||
"modality": modality
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def bulk_add_to_cart(
|
||||
items: List[Dict[str, Any]],
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Add multiple items to the user's Kroger cart in a single operation.
|
||||
|
||||
If the user doesn't specifically indicate a preference for pickup or delivery,
|
||||
you should ask them which modality they prefer before calling this tool.
|
||||
|
||||
Args:
|
||||
items: List of items to add. Each item should have:
|
||||
- product_id: The product ID or UPC
|
||||
- quantity: Quantity to add (default: 1)
|
||||
- modality: PICKUP or DELIVERY (default: PICKUP)
|
||||
|
||||
Returns:
|
||||
Dictionary with results for each item
|
||||
"""
|
||||
try:
|
||||
if ctx:
|
||||
await ctx.info(f"Adding {len(items)} items to cart in bulk")
|
||||
|
||||
client = get_authenticated_client()
|
||||
|
||||
# Format items for the API
|
||||
cart_items = []
|
||||
for item in items:
|
||||
cart_item = {
|
||||
"upc": item["product_id"],
|
||||
"quantity": item.get("quantity", 1),
|
||||
"modality": item.get("modality", "PICKUP")
|
||||
}
|
||||
cart_items.append(cart_item)
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Calling Kroger API to add {len(cart_items)} items")
|
||||
|
||||
# Add all items to the actual Kroger cart
|
||||
client.cart.add_to_cart(cart_items)
|
||||
|
||||
if ctx:
|
||||
await ctx.info("Successfully added all items to Kroger cart")
|
||||
|
||||
# Add all items to local cart tracking
|
||||
for item in items:
|
||||
_add_item_to_local_cart(
|
||||
item["product_id"],
|
||||
item.get("quantity", 1),
|
||||
item.get("modality", "PICKUP")
|
||||
)
|
||||
|
||||
if ctx:
|
||||
await ctx.info("All items added to local cart tracking")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Successfully added {len(items)} items to cart",
|
||||
"items_added": len(items),
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Failed to bulk add items to cart: {str(e)}")
|
||||
|
||||
error_message = str(e)
|
||||
if "401" in error_message or "Unauthorized" in error_message:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Authentication failed. Please run force_reauthenticate and try again.",
|
||||
"details": error_message
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to add items to cart: {error_message}",
|
||||
"items_attempted": len(items)
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def view_current_cart(ctx: Context = None) -> Dict[str, Any]:
|
||||
"""
|
||||
View the current cart contents tracked locally.
|
||||
|
||||
Note: This tool can only see items that were added via this MCP server.
|
||||
The Kroger API does not provide permission to query the actual user cart contents.
|
||||
|
||||
Returns:
|
||||
Dictionary containing current cart items and summary
|
||||
"""
|
||||
try:
|
||||
cart_data = _load_cart_data()
|
||||
current_cart = cart_data.get("current_cart", [])
|
||||
|
||||
# Calculate summary
|
||||
total_quantity = sum(item.get("quantity", 0) for item in current_cart)
|
||||
pickup_items = [item for item in current_cart if item.get("modality") == "PICKUP"]
|
||||
delivery_items = [item for item in current_cart if item.get("modality") == "DELIVERY"]
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"current_cart": current_cart,
|
||||
"summary": {
|
||||
"total_items": len(current_cart),
|
||||
"total_quantity": total_quantity,
|
||||
"pickup_items": len(pickup_items),
|
||||
"delivery_items": len(delivery_items),
|
||||
"last_updated": cart_data.get("last_updated")
|
||||
}
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to view cart: {str(e)}"
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def remove_from_cart(
|
||||
product_id: str,
|
||||
modality: str = None,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Remove an item from the local cart tracking only.
|
||||
|
||||
IMPORTANT: This tool CANNOT remove items from the actual Kroger cart in the app/website.
|
||||
It only updates our local tracking to stay in sync. The user must remove the item from
|
||||
their actual cart through the Kroger app or website themselves.
|
||||
|
||||
Use this tool only when:
|
||||
1. The user has already removed an item from their Kroger cart through the app/website
|
||||
2. You need to update the local tracking to reflect that change
|
||||
|
||||
Args:
|
||||
product_id: The product ID to remove
|
||||
modality: Specific modality to remove (if None, removes all instances)
|
||||
|
||||
Returns:
|
||||
Dictionary confirming the removal from local tracking
|
||||
"""
|
||||
try:
|
||||
cart_data = _load_cart_data()
|
||||
current_cart = cart_data.get("current_cart", [])
|
||||
original_count = len(current_cart)
|
||||
|
||||
if modality:
|
||||
# Remove specific modality
|
||||
cart_data["current_cart"] = [
|
||||
item for item in current_cart
|
||||
if not (item.get("product_id") == product_id and item.get("modality") == modality)
|
||||
]
|
||||
else:
|
||||
# Remove all instances
|
||||
cart_data["current_cart"] = [
|
||||
item for item in current_cart
|
||||
if item.get("product_id") != product_id
|
||||
]
|
||||
|
||||
items_removed = original_count - len(cart_data["current_cart"])
|
||||
|
||||
if items_removed > 0:
|
||||
cart_data["last_updated"] = datetime.now().isoformat()
|
||||
_save_cart_data(cart_data)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Removed {items_removed} items from local cart tracking",
|
||||
"items_removed": items_removed,
|
||||
"product_id": product_id,
|
||||
"modality": modality
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to remove from cart: {str(e)}"
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def clear_current_cart(ctx: Context = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Clear all items from the local cart tracking only.
|
||||
|
||||
IMPORTANT: This tool CANNOT remove items from the actual Kroger cart in the app/website.
|
||||
It only clears our local tracking. The user must remove items from their actual cart
|
||||
through the Kroger app or website themselves.
|
||||
|
||||
Use this tool only when:
|
||||
1. The user has already cleared their Kroger cart through the app/website
|
||||
2. You need to update the local tracking to reflect that change
|
||||
3. Or when the local tracking is out of sync with the actual cart
|
||||
|
||||
Returns:
|
||||
Dictionary confirming the local cart tracking was cleared
|
||||
"""
|
||||
try:
|
||||
cart_data = _load_cart_data()
|
||||
items_count = len(cart_data.get("current_cart", []))
|
||||
|
||||
cart_data["current_cart"] = []
|
||||
cart_data["last_updated"] = datetime.now().isoformat()
|
||||
_save_cart_data(cart_data)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Cleared {items_count} items from local cart tracking",
|
||||
"items_cleared": items_count
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to clear cart: {str(e)}"
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def mark_order_placed(
|
||||
order_notes: str = None,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Mark the current cart as an order that has been placed and move it to order history.
|
||||
Use this after you've completed checkout on the Kroger website/app.
|
||||
|
||||
Args:
|
||||
order_notes: Optional notes about the order
|
||||
|
||||
Returns:
|
||||
Dictionary confirming the order was recorded
|
||||
"""
|
||||
try:
|
||||
cart_data = _load_cart_data()
|
||||
current_cart = cart_data.get("current_cart", [])
|
||||
|
||||
if not current_cart:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "No items in current cart to mark as placed"
|
||||
}
|
||||
|
||||
# Create order record
|
||||
order_record = {
|
||||
"items": current_cart.copy(),
|
||||
"placed_at": datetime.now().isoformat(),
|
||||
"item_count": len(current_cart),
|
||||
"total_quantity": sum(item.get("quantity", 0) for item in current_cart),
|
||||
"notes": order_notes
|
||||
}
|
||||
|
||||
# Load and update order history
|
||||
order_history = _load_order_history()
|
||||
order_history.append(order_record)
|
||||
_save_order_history(order_history)
|
||||
|
||||
# Clear current cart
|
||||
cart_data["current_cart"] = []
|
||||
cart_data["last_updated"] = datetime.now().isoformat()
|
||||
_save_cart_data(cart_data)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Marked order with {order_record['item_count']} items as placed",
|
||||
"order_id": len(order_history), # Simple order ID based on history length
|
||||
"items_placed": order_record["item_count"],
|
||||
"total_quantity": order_record["total_quantity"],
|
||||
"placed_at": order_record["placed_at"]
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to mark order as placed: {str(e)}"
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def view_order_history(
|
||||
limit: int = 10,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
View the history of placed orders.
|
||||
|
||||
Note: This tool can only see orders that were explicitly marked as placed via this MCP server.
|
||||
The Kroger API does not provide permission to query the actual order history from Kroger's systems.
|
||||
|
||||
Args:
|
||||
limit: Number of recent orders to show (1-50)
|
||||
|
||||
Returns:
|
||||
Dictionary containing order history
|
||||
"""
|
||||
try:
|
||||
# Ensure limit is within bounds
|
||||
limit = max(1, min(50, limit))
|
||||
|
||||
order_history = _load_order_history()
|
||||
|
||||
# Sort by placed_at date (most recent first) and limit
|
||||
sorted_orders = sorted(order_history, key=lambda x: x.get("placed_at", ""), reverse=True)
|
||||
limited_orders = sorted_orders[:limit]
|
||||
|
||||
# Calculate summary stats
|
||||
total_orders = len(order_history)
|
||||
total_items_all_time = sum(order.get("item_count", 0) for order in order_history)
|
||||
total_quantity_all_time = sum(order.get("total_quantity", 0) for order in order_history)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"orders": limited_orders,
|
||||
"showing": len(limited_orders),
|
||||
"summary": {
|
||||
"total_orders": total_orders,
|
||||
"total_items_all_time": total_items_all_time,
|
||||
"total_quantity_all_time": total_quantity_all_time
|
||||
}
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Failed to view order history: {str(e)}"
|
||||
}
|
||||
274
src/kroger_mcp/tools/info_tools.py
Normal file
274
src/kroger_mcp/tools/info_tools.py
Normal file
@@ -0,0 +1,274 @@
|
||||
"""
|
||||
Chain and department information tools for Kroger MCP server
|
||||
"""
|
||||
|
||||
from typing import Dict, List, Any, Optional
|
||||
from fastmcp import Context
|
||||
|
||||
from .shared import get_client_credentials_client
|
||||
|
||||
|
||||
def register_tools(mcp):
|
||||
"""Register information-related tools with the FastMCP server"""
|
||||
|
||||
@mcp.tool()
|
||||
async def list_chains(ctx: Context = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Get a list of all Kroger-owned chains.
|
||||
|
||||
Returns:
|
||||
Dictionary containing chain information
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info("Getting list of Kroger chains")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
chains = client.location.list_chains()
|
||||
|
||||
if not chains or "data" not in chains or not chains["data"]:
|
||||
return {
|
||||
"success": False,
|
||||
"message": "No chains found",
|
||||
"data": []
|
||||
}
|
||||
|
||||
# Format chain data
|
||||
formatted_chains = [
|
||||
{
|
||||
"name": chain.get("name"),
|
||||
"division_numbers": chain.get("divisionNumbers", [])
|
||||
}
|
||||
for chain in chains["data"]
|
||||
]
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Found {len(formatted_chains)} chains")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"count": len(formatted_chains),
|
||||
"data": formatted_chains
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error listing chains: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"data": []
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_chain_details(
|
||||
chain_name: str,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get detailed information about a specific Kroger chain.
|
||||
|
||||
Args:
|
||||
chain_name: Name of the chain to get details for
|
||||
|
||||
Returns:
|
||||
Dictionary containing chain details
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info(f"Getting details for chain: {chain_name}")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
chain_details = client.location.get_chain(chain_name)
|
||||
|
||||
if not chain_details or "data" not in chain_details:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"Chain '{chain_name}' not found"
|
||||
}
|
||||
|
||||
chain = chain_details["data"]
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"name": chain.get("name"),
|
||||
"division_numbers": chain.get("divisionNumbers", [])
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error getting chain details: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def check_chain_exists(
|
||||
chain_name: str,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Check if a chain exists in the Kroger system.
|
||||
|
||||
Args:
|
||||
chain_name: Name of the chain to check
|
||||
|
||||
Returns:
|
||||
Dictionary indicating whether the chain exists
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info(f"Checking if chain '{chain_name}' exists")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
exists = client.location.chain_exists(chain_name)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"chain_name": chain_name,
|
||||
"exists": exists,
|
||||
"message": f"Chain '{chain_name}' {'exists' if exists else 'does not exist'}"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error checking chain existence: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def list_departments(ctx: Context = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Get a list of all available departments in Kroger stores.
|
||||
|
||||
Returns:
|
||||
Dictionary containing department information
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info("Getting list of departments")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
departments = client.location.list_departments()
|
||||
|
||||
if not departments or "data" not in departments or not departments["data"]:
|
||||
return {
|
||||
"success": False,
|
||||
"message": "No departments found",
|
||||
"data": []
|
||||
}
|
||||
|
||||
# Format department data
|
||||
formatted_departments = [
|
||||
{
|
||||
"department_id": dept.get("departmentId"),
|
||||
"name": dept.get("name")
|
||||
}
|
||||
for dept in departments["data"]
|
||||
]
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Found {len(formatted_departments)} departments")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"count": len(formatted_departments),
|
||||
"data": formatted_departments
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error listing departments: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"data": []
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_department_details(
|
||||
department_id: str,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get detailed information about a specific department.
|
||||
|
||||
Args:
|
||||
department_id: The unique identifier for the department
|
||||
|
||||
Returns:
|
||||
Dictionary containing department details
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info(f"Getting details for department: {department_id}")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
dept_details = client.location.get_department(department_id)
|
||||
|
||||
if not dept_details or "data" not in dept_details:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"Department '{department_id}' not found"
|
||||
}
|
||||
|
||||
dept = dept_details["data"]
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"department_id": dept.get("departmentId"),
|
||||
"name": dept.get("name")
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error getting department details: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def check_department_exists(
|
||||
department_id: str,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Check if a department exists in the Kroger system.
|
||||
|
||||
Args:
|
||||
department_id: The department ID to check
|
||||
|
||||
Returns:
|
||||
Dictionary indicating whether the department exists
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info(f"Checking if department '{department_id}' exists")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
exists = client.location.department_exists(department_id)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"department_id": department_id,
|
||||
"exists": exists,
|
||||
"message": f"Department '{department_id}' {'exists' if exists else 'does not exist'}"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error checking department existence: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
332
src/kroger_mcp/tools/location_tools.py
Normal file
332
src/kroger_mcp/tools/location_tools.py
Normal file
@@ -0,0 +1,332 @@
|
||||
"""
|
||||
Location management tools for Kroger MCP server
|
||||
"""
|
||||
|
||||
from typing import Dict, List, Any, Optional
|
||||
from pydantic import Field
|
||||
from fastmcp import Context
|
||||
|
||||
from .shared import (
|
||||
get_client_credentials_client,
|
||||
get_preferred_location_id,
|
||||
set_preferred_location_id,
|
||||
get_default_zip_code
|
||||
)
|
||||
|
||||
|
||||
def register_tools(mcp):
|
||||
"""Register location-related tools with the FastMCP server"""
|
||||
|
||||
@mcp.tool()
|
||||
async def search_locations(
|
||||
zip_code: Optional[str] = None,
|
||||
radius_in_miles: int = Field(default=10, ge=1, le=100, description="Search radius in miles (1-100)"),
|
||||
limit: int = Field(default=10, ge=1, le=200, description="Number of results to return (1-200)"),
|
||||
chain: Optional[str] = None,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Search for Kroger store locations near a zip code.
|
||||
|
||||
Args:
|
||||
zip_code: Zip code to search near (uses environment default if not provided)
|
||||
radius_in_miles: Search radius in miles (1-100)
|
||||
limit: Number of results to return (1-200)
|
||||
chain: Filter by specific chain name
|
||||
|
||||
Returns:
|
||||
Dictionary containing location search results
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info(f"Searching for Kroger locations near {zip_code or 'default zip code'}")
|
||||
|
||||
if not zip_code:
|
||||
zip_code = get_default_zip_code()
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
locations = client.location.search_locations(
|
||||
zip_code=zip_code,
|
||||
radius_in_miles=radius_in_miles,
|
||||
limit=limit,
|
||||
chain=chain
|
||||
)
|
||||
|
||||
if not locations or "data" not in locations or not locations["data"]:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"No locations found near zip code {zip_code}",
|
||||
"data": []
|
||||
}
|
||||
|
||||
# Format location data for easier consumption
|
||||
formatted_locations = []
|
||||
for loc in locations["data"]:
|
||||
address = loc.get("address", {})
|
||||
formatted_loc = {
|
||||
"location_id": loc.get("locationId"),
|
||||
"name": loc.get("name"),
|
||||
"chain": loc.get("chain"),
|
||||
"phone": loc.get("phone"),
|
||||
"address": {
|
||||
"street": address.get("addressLine1", ""),
|
||||
"city": address.get("city", ""),
|
||||
"state": address.get("state", ""),
|
||||
"zip_code": address.get("zipCode", "")
|
||||
},
|
||||
"full_address": f"{address.get('addressLine1', '')}, {address.get('city', '')}, {address.get('state', '')} {address.get('zipCode', '')}",
|
||||
"coordinates": loc.get("geolocation", {}),
|
||||
"departments": [dept.get("name") for dept in loc.get("departments", [])],
|
||||
"department_count": len(loc.get("departments", []))
|
||||
}
|
||||
|
||||
# Add hours info if available
|
||||
if "hours" in loc and "monday" in loc["hours"]:
|
||||
monday = loc["hours"]["monday"]
|
||||
if monday.get("open24", False):
|
||||
formatted_loc["hours_monday"] = "Open 24 hours"
|
||||
elif "open" in monday and "close" in monday:
|
||||
formatted_loc["hours_monday"] = f"{monday['open']} - {monday['close']}"
|
||||
else:
|
||||
formatted_loc["hours_monday"] = "Hours not available"
|
||||
|
||||
formatted_locations.append(formatted_loc)
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Found {len(formatted_locations)} locations")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"search_params": {
|
||||
"zip_code": zip_code,
|
||||
"radius_miles": radius_in_miles,
|
||||
"limit": limit,
|
||||
"chain": chain
|
||||
},
|
||||
"count": len(formatted_locations),
|
||||
"data": formatted_locations
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error searching locations: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"data": []
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_location_details(
|
||||
location_id: str,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get detailed information about a specific Kroger store location.
|
||||
|
||||
Args:
|
||||
location_id: The unique identifier for the store location
|
||||
|
||||
Returns:
|
||||
Dictionary containing detailed location information
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info(f"Getting details for location {location_id}")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
location_details = client.location.get_location(location_id)
|
||||
|
||||
if not location_details or "data" not in location_details:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"Location {location_id} not found"
|
||||
}
|
||||
|
||||
loc = location_details["data"]
|
||||
|
||||
# Format department information
|
||||
departments = []
|
||||
for dept in loc.get("departments", []):
|
||||
dept_info = {
|
||||
"department_id": dept.get("departmentId"),
|
||||
"name": dept.get("name"),
|
||||
"phone": dept.get("phone")
|
||||
}
|
||||
|
||||
# Add department hours
|
||||
if "hours" in dept and "monday" in dept["hours"]:
|
||||
monday = dept["hours"]["monday"]
|
||||
if monday.get("open24", False):
|
||||
dept_info["hours_monday"] = "Open 24 hours"
|
||||
elif "open" in monday and "close" in monday:
|
||||
dept_info["hours_monday"] = f"{monday['open']} - {monday['close']}"
|
||||
|
||||
departments.append(dept_info)
|
||||
|
||||
# Format the response
|
||||
address = loc.get("address", {})
|
||||
result = {
|
||||
"success": True,
|
||||
"location_id": loc.get("locationId"),
|
||||
"name": loc.get("name"),
|
||||
"chain": loc.get("chain"),
|
||||
"phone": loc.get("phone"),
|
||||
"address": {
|
||||
"street": address.get("addressLine1", ""),
|
||||
"street2": address.get("addressLine2", ""),
|
||||
"city": address.get("city", ""),
|
||||
"state": address.get("state", ""),
|
||||
"zip_code": address.get("zipCode", "")
|
||||
},
|
||||
"coordinates": loc.get("geolocation", {}),
|
||||
"departments": departments,
|
||||
"department_count": len(departments)
|
||||
}
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error getting location details: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def set_preferred_location(
|
||||
location_id: str,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Set a preferred store location for future operations.
|
||||
|
||||
Args:
|
||||
location_id: The unique identifier for the store location
|
||||
|
||||
Returns:
|
||||
Dictionary confirming the preferred location has been set
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info(f"Setting preferred location to {location_id}")
|
||||
|
||||
# Verify the location exists
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
exists = client.location.location_exists(location_id)
|
||||
if not exists:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Location {location_id} does not exist"
|
||||
}
|
||||
|
||||
# Get location details for confirmation
|
||||
location_details = client.location.get_location(location_id)
|
||||
loc_data = location_details.get("data", {})
|
||||
|
||||
set_preferred_location_id(location_id)
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Preferred location set to {loc_data.get('name', location_id)}")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"preferred_location_id": location_id,
|
||||
"location_name": loc_data.get("name"),
|
||||
"message": f"Preferred location set to {loc_data.get('name', location_id)}"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error setting preferred location: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_preferred_location(ctx: Context = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Get the currently set preferred store location.
|
||||
|
||||
Returns:
|
||||
Dictionary containing the preferred location information
|
||||
"""
|
||||
preferred_location_id = get_preferred_location_id()
|
||||
|
||||
if not preferred_location_id:
|
||||
return {
|
||||
"success": False,
|
||||
"message": "No preferred location set. Use set_preferred_location to set one."
|
||||
}
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Getting preferred location details for {preferred_location_id}")
|
||||
|
||||
# Get location details
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
location_details = client.location.get_location(preferred_location_id)
|
||||
loc_data = location_details.get("data", {})
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"preferred_location_id": preferred_location_id,
|
||||
"location_details": {
|
||||
"name": loc_data.get("name"),
|
||||
"chain": loc_data.get("chain"),
|
||||
"phone": loc_data.get("phone"),
|
||||
"address": loc_data.get("address", {})
|
||||
}
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error getting preferred location details: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"preferred_location_id": preferred_location_id
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def check_location_exists(
|
||||
location_id: str,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Check if a location exists in the Kroger system.
|
||||
|
||||
Args:
|
||||
location_id: The unique identifier for the store location
|
||||
|
||||
Returns:
|
||||
Dictionary indicating whether the location exists
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info(f"Checking if location {location_id} exists")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
exists = client.location.location_exists(location_id)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"location_id": location_id,
|
||||
"exists": exists,
|
||||
"message": f"Location {location_id} {'exists' if exists else 'does not exist'}"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error checking location existence: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
484
src/kroger_mcp/tools/product_tools.py
Normal file
484
src/kroger_mcp/tools/product_tools.py
Normal file
@@ -0,0 +1,484 @@
|
||||
"""
|
||||
Product search and management tools for Kroger MCP server
|
||||
"""
|
||||
|
||||
from typing import Dict, List, Any, Optional, Literal
|
||||
from pydantic import Field
|
||||
from fastmcp import Context, Image
|
||||
import requests
|
||||
from io import BytesIO
|
||||
|
||||
from .shared import (
|
||||
get_client_credentials_client,
|
||||
get_preferred_location_id,
|
||||
format_currency
|
||||
)
|
||||
|
||||
|
||||
def register_tools(mcp):
|
||||
"""Register product-related tools with the FastMCP server"""
|
||||
|
||||
@mcp.tool()
|
||||
async def get_product_images(
|
||||
product_id: str,
|
||||
perspective: str = "front",
|
||||
location_id: Optional[str] = None,
|
||||
ctx: Context = None
|
||||
) -> Image:
|
||||
"""
|
||||
Get an image for a specific product from the requested perspective.
|
||||
|
||||
Use get_product_details first to see what perspectives are available (typically "front", "back", "left", "right").
|
||||
|
||||
Args:
|
||||
product_id: The unique product identifier
|
||||
perspective: The image perspective to retrieve (default: "front")
|
||||
location_id: Store location ID (uses preferred if not provided)
|
||||
|
||||
Returns:
|
||||
The product image from the requested perspective
|
||||
"""
|
||||
# Use preferred location if none provided
|
||||
if not location_id:
|
||||
location_id = get_preferred_location_id()
|
||||
if not location_id:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "No location_id provided and no preferred location set. Use set_preferred_location first."
|
||||
}
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Fetching images for product {product_id} at location {location_id}")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
# Get product details to extract image URLs
|
||||
product_details = client.product.get_product(
|
||||
product_id=product_id,
|
||||
location_id=location_id
|
||||
)
|
||||
|
||||
if not product_details or "data" not in product_details:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"Product {product_id} not found"
|
||||
}
|
||||
|
||||
product = product_details["data"]
|
||||
|
||||
# Check if images are available
|
||||
if "images" not in product or not product["images"]:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"No images available for product {product_id}"
|
||||
}
|
||||
|
||||
# Find the requested perspective image
|
||||
perspective_image = None
|
||||
available_perspectives = []
|
||||
|
||||
for img_data in product["images"]:
|
||||
img_perspective = img_data.get("perspective", "unknown")
|
||||
available_perspectives.append(img_perspective)
|
||||
|
||||
# Skip if not the requested perspective
|
||||
if img_perspective != perspective:
|
||||
continue
|
||||
|
||||
if not img_data.get("sizes"):
|
||||
continue
|
||||
|
||||
# Find the best image size (prefer large, fallback to xlarge or other available)
|
||||
img_url = None
|
||||
size_preference = ["large", "xlarge", "medium", "small", "thumbnail"]
|
||||
|
||||
# Create a map of available sizes for quick lookup
|
||||
available_sizes = {size.get("size"): size.get("url") for size in img_data.get("sizes", []) if size.get("size") and size.get("url")}
|
||||
|
||||
# Select best size based on preference order
|
||||
for size in size_preference:
|
||||
if size in available_sizes:
|
||||
img_url = available_sizes[size]
|
||||
break
|
||||
|
||||
if img_url:
|
||||
try:
|
||||
if ctx:
|
||||
await ctx.info(f"Downloading {perspective} image from {img_url}")
|
||||
|
||||
# Download image
|
||||
response = requests.get(img_url)
|
||||
response.raise_for_status()
|
||||
|
||||
# Create Image object
|
||||
perspective_image = Image(
|
||||
data=response.content,
|
||||
format="jpeg" # Kroger images are typically JPEG
|
||||
)
|
||||
break
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.warning(f"Failed to download {perspective} image: {str(e)}")
|
||||
|
||||
# If the requested perspective wasn't found
|
||||
if not perspective_image:
|
||||
available_str = ", ".join(available_perspectives) if available_perspectives else "none"
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"No image found for perspective '{perspective}'. Available perspectives: {available_str}"
|
||||
}
|
||||
|
||||
return perspective_image
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error getting product images: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def search_products(
|
||||
search_term: str,
|
||||
location_id: Optional[str] = None,
|
||||
limit: int = Field(default=10, ge=1, le=50, description="Number of results to return (1-50)"),
|
||||
fulfillment: Optional[Literal["csp", "delivery", "pickup"]] = None,
|
||||
brand: Optional[str] = None,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Search for products at a Kroger store.
|
||||
|
||||
Args:
|
||||
search_term: Product search term (e.g., "milk", "bread", "organic apples")
|
||||
location_id: Store location ID (uses preferred location if not provided)
|
||||
limit: Number of results to return (1-50)
|
||||
fulfillment: Filter by fulfillment method (csp=curbside pickup, delivery, pickup)
|
||||
brand: Filter by brand name
|
||||
|
||||
Returns:
|
||||
Dictionary containing product search results
|
||||
"""
|
||||
# Use preferred location if none provided
|
||||
if not location_id:
|
||||
location_id = get_preferred_location_id()
|
||||
if not location_id:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "No location_id provided and no preferred location set. Use set_preferred_location first."
|
||||
}
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Searching for '{search_term}' at location {location_id}")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
products = client.product.search_products(
|
||||
term=search_term,
|
||||
location_id=location_id,
|
||||
limit=limit,
|
||||
fulfillment=fulfillment,
|
||||
brand=brand
|
||||
)
|
||||
|
||||
if not products or "data" not in products or not products["data"]:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"No products found matching '{search_term}'",
|
||||
"data": []
|
||||
}
|
||||
|
||||
# Format product data
|
||||
formatted_products = []
|
||||
for product in products["data"]:
|
||||
formatted_product = {
|
||||
"product_id": product.get("productId"),
|
||||
"upc": product.get("upc"),
|
||||
"description": product.get("description"),
|
||||
"brand": product.get("brand"),
|
||||
"categories": product.get("categories", []),
|
||||
"country_origin": product.get("countryOrigin"),
|
||||
"temperature": product.get("temperature", {})
|
||||
}
|
||||
|
||||
# Add item information (size, price, etc.)
|
||||
if "items" in product and product["items"]:
|
||||
item = product["items"][0]
|
||||
formatted_product["item"] = {
|
||||
"size": item.get("size"),
|
||||
"sold_by": item.get("soldBy"),
|
||||
"inventory": item.get("inventory", {}),
|
||||
"fulfillment": item.get("fulfillment", {})
|
||||
}
|
||||
|
||||
# Add pricing information
|
||||
if "price" in item:
|
||||
price = item["price"]
|
||||
formatted_product["pricing"] = {
|
||||
"regular_price": price.get("regular"),
|
||||
"sale_price": price.get("promo"),
|
||||
"regular_per_unit": price.get("regularPerUnitEstimate"),
|
||||
"formatted_regular": format_currency(price.get("regular")),
|
||||
"formatted_sale": format_currency(price.get("promo")),
|
||||
"on_sale": price.get("promo") is not None and price.get("promo") < price.get("regular", float('inf'))
|
||||
}
|
||||
|
||||
# Add aisle information
|
||||
if "aisleLocations" in product:
|
||||
formatted_product["aisle_locations"] = [
|
||||
{
|
||||
"description": aisle.get("description"),
|
||||
"number": aisle.get("number"),
|
||||
"side": aisle.get("side"),
|
||||
"shelf_number": aisle.get("shelfNumber")
|
||||
}
|
||||
for aisle in product["aisleLocations"]
|
||||
]
|
||||
|
||||
# Add image information
|
||||
if "images" in product and product["images"]:
|
||||
formatted_product["images"] = [
|
||||
{
|
||||
"perspective": img.get("perspective"),
|
||||
"url": img["sizes"][0].get("url") if img.get("sizes") else None,
|
||||
"size": img["sizes"][0].get("size") if img.get("sizes") else None
|
||||
}
|
||||
for img in product["images"]
|
||||
if img.get("sizes")
|
||||
]
|
||||
|
||||
formatted_products.append(formatted_product)
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Found {len(formatted_products)} products")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"search_params": {
|
||||
"search_term": search_term,
|
||||
"location_id": location_id,
|
||||
"limit": limit,
|
||||
"fulfillment": fulfillment,
|
||||
"brand": brand
|
||||
},
|
||||
"count": len(formatted_products),
|
||||
"data": formatted_products
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error searching products: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"data": []
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_product_details(
|
||||
product_id: str,
|
||||
location_id: Optional[str] = None,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get detailed information about a specific product.
|
||||
|
||||
Args:
|
||||
product_id: The unique product identifier
|
||||
location_id: Store location ID for pricing/availability (uses preferred if not provided)
|
||||
|
||||
Returns:
|
||||
Dictionary containing detailed product information
|
||||
"""
|
||||
# Use preferred location if none provided
|
||||
if not location_id:
|
||||
location_id = get_preferred_location_id()
|
||||
if not location_id:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "No location_id provided and no preferred location set. Use set_preferred_location first."
|
||||
}
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Getting details for product {product_id} at location {location_id}")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
product_details = client.product.get_product(
|
||||
product_id=product_id,
|
||||
location_id=location_id
|
||||
)
|
||||
|
||||
if not product_details or "data" not in product_details:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"Product {product_id} not found"
|
||||
}
|
||||
|
||||
product = product_details["data"]
|
||||
|
||||
# Format the detailed product information
|
||||
result = {
|
||||
"success": True,
|
||||
"product_id": product.get("productId"),
|
||||
"upc": product.get("upc"),
|
||||
"description": product.get("description"),
|
||||
"brand": product.get("brand"),
|
||||
"categories": product.get("categories", []),
|
||||
"country_origin": product.get("countryOrigin"),
|
||||
"temperature": product.get("temperature", {}),
|
||||
"location_id": location_id
|
||||
}
|
||||
|
||||
# Add detailed item information
|
||||
if "items" in product and product["items"]:
|
||||
item = product["items"][0]
|
||||
result["item_details"] = {
|
||||
"size": item.get("size"),
|
||||
"sold_by": item.get("soldBy"),
|
||||
"inventory": item.get("inventory", {}),
|
||||
"fulfillment": item.get("fulfillment", {})
|
||||
}
|
||||
|
||||
# Add detailed pricing
|
||||
if "price" in item:
|
||||
price = item["price"]
|
||||
result["pricing"] = {
|
||||
"regular_price": price.get("regular"),
|
||||
"sale_price": price.get("promo"),
|
||||
"regular_per_unit": price.get("regularPerUnitEstimate"),
|
||||
"formatted_regular": format_currency(price.get("regular")),
|
||||
"formatted_sale": format_currency(price.get("promo")),
|
||||
"on_sale": price.get("promo") is not None and price.get("promo") < price.get("regular", float('inf')),
|
||||
"savings": price.get("regular", 0) - price.get("promo", price.get("regular", 0)) if price.get("promo") else 0
|
||||
}
|
||||
|
||||
# Add aisle locations
|
||||
if "aisleLocations" in product:
|
||||
result["aisle_locations"] = [
|
||||
{
|
||||
"description": aisle.get("description"),
|
||||
"aisle_number": aisle.get("number"),
|
||||
"side": aisle.get("side"),
|
||||
"shelf_number": aisle.get("shelfNumber")
|
||||
}
|
||||
for aisle in product["aisleLocations"]
|
||||
]
|
||||
|
||||
# Add images
|
||||
if "images" in product and product["images"]:
|
||||
result["images"] = [
|
||||
{
|
||||
"perspective": img.get("perspective"),
|
||||
"sizes": [
|
||||
{
|
||||
"size": size.get("size"),
|
||||
"url": size.get("url")
|
||||
}
|
||||
for size in img.get("sizes", [])
|
||||
]
|
||||
}
|
||||
for img in product["images"]
|
||||
]
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error getting product details: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def search_products_by_id(
|
||||
product_id: str,
|
||||
location_id: Optional[str] = None,
|
||||
ctx: Context = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Search for products by their specific product ID.
|
||||
|
||||
Args:
|
||||
product_id: The product ID to search for
|
||||
location_id: Store location ID (uses preferred location if not provided)
|
||||
|
||||
Returns:
|
||||
Dictionary containing matching products
|
||||
"""
|
||||
# Use preferred location if none provided
|
||||
if not location_id:
|
||||
location_id = get_preferred_location_id()
|
||||
if not location_id:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "No location_id provided and no preferred location set. Use set_preferred_location first."
|
||||
}
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Searching for products with ID '{product_id}' at location {location_id}")
|
||||
|
||||
client = get_client_credentials_client()
|
||||
|
||||
try:
|
||||
products = client.product.search_products(
|
||||
product_id=product_id,
|
||||
location_id=location_id
|
||||
)
|
||||
|
||||
if not products or "data" not in products or not products["data"]:
|
||||
return {
|
||||
"success": False,
|
||||
"message": f"No products found with ID '{product_id}'",
|
||||
"data": []
|
||||
}
|
||||
|
||||
# Format product data (similar to search_products but simpler)
|
||||
formatted_products = []
|
||||
for product in products["data"]:
|
||||
formatted_product = {
|
||||
"product_id": product.get("productId"),
|
||||
"upc": product.get("upc"),
|
||||
"description": product.get("description"),
|
||||
"brand": product.get("brand"),
|
||||
"categories": product.get("categories", [])
|
||||
}
|
||||
|
||||
# Add basic pricing if available
|
||||
if "items" in product and product["items"] and "price" in product["items"][0]:
|
||||
price = product["items"][0]["price"]
|
||||
formatted_product["pricing"] = {
|
||||
"regular_price": price.get("regular"),
|
||||
"sale_price": price.get("promo"),
|
||||
"formatted_regular": format_currency(price.get("regular")),
|
||||
"formatted_sale": format_currency(price.get("promo"))
|
||||
}
|
||||
|
||||
formatted_products.append(formatted_product)
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Found {len(formatted_products)} products with ID '{product_id}'")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"search_params": {
|
||||
"product_id": product_id,
|
||||
"location_id": location_id
|
||||
},
|
||||
"count": len(formatted_products),
|
||||
"data": formatted_products
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error searching products by ID: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"data": []
|
||||
}
|
||||
184
src/kroger_mcp/tools/profile_tools.py
Normal file
184
src/kroger_mcp/tools/profile_tools.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""
|
||||
User profile and authentication tools for Kroger MCP server
|
||||
"""
|
||||
|
||||
from typing import Dict, List, Any, Optional
|
||||
from fastmcp import Context
|
||||
|
||||
from .shared import get_authenticated_client, invalidate_authenticated_client
|
||||
|
||||
|
||||
def register_tools(mcp):
|
||||
"""Register profile-related tools with the FastMCP server"""
|
||||
|
||||
@mcp.tool()
|
||||
async def get_user_profile(ctx: Context = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Get the authenticated user's Kroger profile information.
|
||||
|
||||
Returns:
|
||||
Dictionary containing user profile data
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info("Getting user profile information")
|
||||
|
||||
try:
|
||||
client = get_authenticated_client()
|
||||
profile = client.identity.get_profile()
|
||||
|
||||
if profile and "data" in profile:
|
||||
profile_id = profile["data"].get("id", "N/A")
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Retrieved profile for user ID: {profile_id}")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"profile_id": profile_id,
|
||||
"message": "User profile retrieved successfully",
|
||||
"note": "The Kroger Identity API only provides the profile ID for privacy reasons."
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"success": False,
|
||||
"message": "Failed to retrieve user profile"
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error getting user profile: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def test_authentication(ctx: Context = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Test if the current authentication token is valid.
|
||||
|
||||
Returns:
|
||||
Dictionary indicating authentication status
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info("Testing authentication token validity")
|
||||
|
||||
try:
|
||||
client = get_authenticated_client()
|
||||
is_valid = client.test_current_token()
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Authentication test result: {'valid' if is_valid else 'invalid'}")
|
||||
|
||||
result = {
|
||||
"success": True,
|
||||
"token_valid": is_valid,
|
||||
"message": f"Authentication token is {'valid' if is_valid else 'invalid'}"
|
||||
}
|
||||
|
||||
# Check for refresh token availability
|
||||
if hasattr(client.client, 'token_info') and client.client.token_info:
|
||||
has_refresh_token = "refresh_token" in client.client.token_info
|
||||
result["has_refresh_token"] = has_refresh_token
|
||||
result["can_auto_refresh"] = has_refresh_token
|
||||
|
||||
if has_refresh_token:
|
||||
result["message"] += ". Token can be automatically refreshed when it expires."
|
||||
else:
|
||||
result["message"] += ". No refresh token available - will need to re-authenticate when token expires."
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error testing authentication: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"token_valid": False
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def get_authentication_info(ctx: Context = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Get information about the current authentication state and token.
|
||||
|
||||
Returns:
|
||||
Dictionary containing authentication information
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info("Getting authentication information")
|
||||
|
||||
try:
|
||||
client = get_authenticated_client()
|
||||
|
||||
result = {
|
||||
"success": True,
|
||||
"authenticated": True,
|
||||
"message": "User is authenticated"
|
||||
}
|
||||
|
||||
# Get token information if available
|
||||
if hasattr(client.client, 'token_info') and client.client.token_info:
|
||||
token_info = client.client.token_info
|
||||
|
||||
result.update({
|
||||
"token_type": token_info.get("token_type", "Unknown"),
|
||||
"has_refresh_token": "refresh_token" in token_info,
|
||||
"expires_in": token_info.get("expires_in"),
|
||||
"scope": token_info.get("scope", "Unknown")
|
||||
})
|
||||
|
||||
# Don't expose the actual tokens for security
|
||||
result["access_token_preview"] = f"{token_info.get('access_token', '')[:10]}..." if token_info.get('access_token') else "N/A"
|
||||
|
||||
if "refresh_token" in token_info:
|
||||
result["refresh_token_preview"] = f"{token_info['refresh_token'][:10]}..."
|
||||
|
||||
# Get token file information if available
|
||||
if hasattr(client.client, 'token_file') and client.client.token_file:
|
||||
result["token_file"] = client.client.token_file
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error getting authentication info: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e),
|
||||
"authenticated": False
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def force_reauthenticate(ctx: Context = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Force re-authentication by clearing the current authentication token.
|
||||
Use this if you're having authentication issues or need to log in as a different user.
|
||||
|
||||
Returns:
|
||||
Dictionary indicating the re-authentication was initiated
|
||||
"""
|
||||
if ctx:
|
||||
await ctx.info("Forcing re-authentication by clearing current token")
|
||||
|
||||
try:
|
||||
# Clear the current authenticated client
|
||||
invalidate_authenticated_client()
|
||||
|
||||
if ctx:
|
||||
await ctx.info("Authentication token cleared. Next cart operation will trigger re-authentication.")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": "Authentication token cleared. The next cart operation will open your browser for re-authentication.",
|
||||
"note": "You will need to log in again when you next use cart-related tools."
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Error clearing authentication: {str(e)}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
307
src/kroger_mcp/tools/shared.py
Normal file
307
src/kroger_mcp/tools/shared.py
Normal file
@@ -0,0 +1,307 @@
|
||||
"""
|
||||
Shared utilities and client management for Kroger MCP server
|
||||
"""
|
||||
|
||||
import os
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
import webbrowser
|
||||
import json
|
||||
from typing import Optional
|
||||
from kroger_api.kroger_api import KrogerAPI
|
||||
from kroger_api.auth import authenticate_user
|
||||
from kroger_api.utils.env import load_and_validate_env, get_zip_code, get_redirect_uri
|
||||
from kroger_api.utils.oauth import start_oauth_server, generate_random_state, extract_port_from_redirect_uri
|
||||
from kroger_api.token_storage import load_token, save_token
|
||||
|
||||
# Global state for clients and preferred location
|
||||
_authenticated_client: Optional[KrogerAPI] = None
|
||||
_client_credentials_client: Optional[KrogerAPI] = None
|
||||
|
||||
# JSON files for configuration storage
|
||||
PREFERENCES_FILE = "kroger_preferences.json"
|
||||
|
||||
|
||||
def get_client_credentials_client() -> KrogerAPI:
|
||||
"""Get or create a client credentials authenticated client for public data"""
|
||||
global _client_credentials_client
|
||||
|
||||
if _client_credentials_client is None:
|
||||
try:
|
||||
load_and_validate_env(["KROGER_CLIENT_ID", "KROGER_CLIENT_SECRET"])
|
||||
_client_credentials_client = KrogerAPI()
|
||||
|
||||
# Try to load existing token first
|
||||
token_file = ".kroger_token_client_product.compact.json"
|
||||
token_info = load_token(token_file)
|
||||
|
||||
if token_info:
|
||||
# Test if the token is still valid
|
||||
_client_credentials_client.client.token_info = token_info
|
||||
if _client_credentials_client.test_current_token():
|
||||
# Token is valid, use it
|
||||
pass
|
||||
else:
|
||||
# Token is invalid, get a new one
|
||||
token_info = _client_credentials_client.authorization.get_token_with_client_credentials("product.compact")
|
||||
else:
|
||||
# No existing token, get a new one
|
||||
token_info = _client_credentials_client.authorization.get_token_with_client_credentials("product.compact")
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to get client credentials: {str(e)}")
|
||||
|
||||
return _client_credentials_client
|
||||
|
||||
|
||||
def get_authenticated_client() -> KrogerAPI:
|
||||
"""Get or create a user-authenticated client for cart operations with browser-based OAuth"""
|
||||
global _authenticated_client
|
||||
|
||||
if _authenticated_client is None:
|
||||
try:
|
||||
load_and_validate_env(["KROGER_CLIENT_ID", "KROGER_CLIENT_SECRET", "KROGER_REDIRECT_URI"])
|
||||
|
||||
# Try to load existing user token first
|
||||
token_file = ".kroger_token_user.json"
|
||||
token_info = load_token(token_file)
|
||||
|
||||
if token_info:
|
||||
# Test if the token is still valid
|
||||
_authenticated_client = KrogerAPI()
|
||||
_authenticated_client.client.token_info = token_info
|
||||
_authenticated_client.client.token_file = token_file
|
||||
|
||||
if _authenticated_client.test_current_token():
|
||||
# Token is valid, use it
|
||||
return _authenticated_client
|
||||
else:
|
||||
# Token is invalid, try to refresh it
|
||||
if "refresh_token" in token_info:
|
||||
try:
|
||||
new_token_info = _authenticated_client.authorization.refresh_token(token_info["refresh_token"])
|
||||
# Token refreshed successfully
|
||||
return _authenticated_client
|
||||
except Exception as e:
|
||||
print(f"Token refresh failed: {str(e)}")
|
||||
# Refresh failed, need to re-authenticate
|
||||
_authenticated_client = None
|
||||
|
||||
# No valid token available, need to authenticate with browser
|
||||
if _authenticated_client is None:
|
||||
_authenticated_client = _authenticate_with_browser()
|
||||
except Exception as e:
|
||||
raise Exception(f"Authentication failed: {str(e)}")
|
||||
|
||||
return _authenticated_client
|
||||
|
||||
|
||||
def _authenticate_with_browser() -> KrogerAPI:
|
||||
"""Authenticate user by opening browser and handling OAuth flow"""
|
||||
server = None
|
||||
try:
|
||||
redirect_uri = get_redirect_uri()
|
||||
port = extract_port_from_redirect_uri(redirect_uri)
|
||||
|
||||
# Check if port is already in use and try alternative ports
|
||||
original_port = port
|
||||
max_attempts = 5
|
||||
|
||||
for attempt in range(max_attempts):
|
||||
try:
|
||||
# Test if port is available
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.bind(('127.0.0.1', port))
|
||||
# Port is available, break out of loop
|
||||
break
|
||||
except OSError:
|
||||
if attempt < max_attempts - 1:
|
||||
port += 1
|
||||
print(f"Port {port - 1} is in use, trying port {port}...")
|
||||
else:
|
||||
raise Exception(f"Ports {original_port}-{port} are all in use. Please free up port {original_port} or restart your system.")
|
||||
|
||||
# Update redirect URI if we had to change the port
|
||||
if port != original_port:
|
||||
redirect_uri = redirect_uri.replace(f":{original_port}", f":{port}")
|
||||
print(f"Using alternative port {port} for OAuth callback.")
|
||||
|
||||
# Create the API client
|
||||
kroger = KrogerAPI()
|
||||
|
||||
# Generate a random state value for security
|
||||
state = generate_random_state()
|
||||
|
||||
# Variables to store the authorization code
|
||||
auth_code = None
|
||||
auth_state = None
|
||||
auth_event = threading.Event()
|
||||
auth_error = None
|
||||
|
||||
# Callback for when the authorization code is received
|
||||
def on_code_received(code, received_state):
|
||||
nonlocal auth_code, auth_state, auth_error
|
||||
try:
|
||||
auth_code = code
|
||||
auth_state = received_state
|
||||
auth_event.set()
|
||||
except Exception as e:
|
||||
auth_error = str(e)
|
||||
auth_event.set()
|
||||
|
||||
# Start the server to handle the OAuth2 redirect
|
||||
try:
|
||||
server, server_thread = start_oauth_server(port, on_code_received)
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to start OAuth server on port {port}: {str(e)}")
|
||||
|
||||
try:
|
||||
# Get the authorization URL with the potentially updated redirect URI
|
||||
if port != original_port:
|
||||
# Temporarily override the redirect URI for this authentication
|
||||
original_redirect = os.environ.get('KROGER_REDIRECT_URI')
|
||||
os.environ['KROGER_REDIRECT_URI'] = redirect_uri
|
||||
|
||||
auth_url = kroger.authorization.get_authorization_url(
|
||||
scope="cart.basic:write profile.compact",
|
||||
state=state
|
||||
)
|
||||
|
||||
# Restore original redirect URI if we changed it
|
||||
if port != original_port and original_redirect:
|
||||
os.environ['KROGER_REDIRECT_URI'] = original_redirect
|
||||
|
||||
print("\n" + "="*60)
|
||||
print("KROGER AUTHENTICATION REQUIRED")
|
||||
print("="*60)
|
||||
print("Opening your browser for Kroger login...")
|
||||
print("Please log in and authorize the application.")
|
||||
if port != original_port:
|
||||
print(f"Note: Using port {port} instead of {original_port} due to port conflict.")
|
||||
print("This window will close automatically after authentication.")
|
||||
print("If the browser doesn't open, copy this URL:")
|
||||
print(f" {auth_url}")
|
||||
print("="*60)
|
||||
|
||||
# Open the authorization URL in the default browser
|
||||
try:
|
||||
webbrowser.open(auth_url)
|
||||
except Exception as e:
|
||||
print(f"Could not open browser automatically: {e}")
|
||||
print("Please manually open the URL above in your browser.")
|
||||
|
||||
# Wait for the authorization code (timeout after 5 minutes)
|
||||
if not auth_event.wait(timeout=300):
|
||||
raise Exception("Authentication timed out after 5 minutes. Please try again.")
|
||||
|
||||
if auth_error:
|
||||
raise Exception(f"OAuth callback error: {auth_error}")
|
||||
|
||||
if not auth_code:
|
||||
raise Exception("No authorization code received. Authentication may have been cancelled.")
|
||||
|
||||
# Verify the state parameter to prevent CSRF attacks
|
||||
if auth_state != state:
|
||||
raise Exception(f"State mismatch. Expected {state}, got {auth_state}. This could be a security issue.")
|
||||
|
||||
# Exchange the authorization code for an access token
|
||||
try:
|
||||
token_info = kroger.authorization.get_token_with_authorization_code(auth_code)
|
||||
except Exception as e:
|
||||
raise Exception(f"Failed to exchange authorization code for token: {str(e)}")
|
||||
|
||||
print("\n" + "="*60)
|
||||
print("AUTHENTICATION SUCCESSFUL!")
|
||||
print("="*60)
|
||||
print("You can now use cart operations and user-specific features.")
|
||||
print("Your authentication token has been saved for future use.")
|
||||
print("="*60 + "\n")
|
||||
|
||||
return kroger
|
||||
|
||||
finally:
|
||||
# Ensure the server is shut down properly
|
||||
if server:
|
||||
try:
|
||||
server.shutdown()
|
||||
# Give it a moment to fully shut down
|
||||
time.sleep(0.5)
|
||||
except Exception as e:
|
||||
print(f"Note: OAuth server cleanup had an issue: {e}")
|
||||
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
print(f"\nAuthentication failed: {error_msg}")
|
||||
print("\nTo resolve this issue:")
|
||||
print("1. Make sure KROGER_REDIRECT_URI is set correctly in your .env file")
|
||||
print("2. Ensure the redirect URI matches what's registered in Kroger Developer Portal")
|
||||
print("3. If port issues persist, restart Claude Desktop or try a different port")
|
||||
print("4. You can change the port by updating KROGER_REDIRECT_URI to http://localhost:8001/callback")
|
||||
print("5. Make sure you have a stable internet connection")
|
||||
|
||||
# Re-raise with a cleaner error message
|
||||
if "timed out" in error_msg.lower():
|
||||
raise Exception("Authentication timed out. Please try again and complete the login process more quickly.")
|
||||
elif "port" in error_msg.lower():
|
||||
raise Exception(f"Port conflict: {error_msg}")
|
||||
elif "connection" in error_msg.lower():
|
||||
raise Exception(f"Connection error: {error_msg}")
|
||||
else:
|
||||
raise Exception(f"Authentication failed: {error_msg}")
|
||||
|
||||
|
||||
def invalidate_authenticated_client():
|
||||
"""Invalidate the authenticated client to force re-authentication"""
|
||||
global _authenticated_client
|
||||
_authenticated_client = None
|
||||
|
||||
|
||||
def invalidate_client_credentials_client():
|
||||
"""Invalidate the client credentials client to force re-authentication"""
|
||||
global _client_credentials_client
|
||||
_client_credentials_client = None
|
||||
|
||||
|
||||
def _load_preferences() -> dict:
|
||||
"""Load preferences from file"""
|
||||
try:
|
||||
if os.path.exists(PREFERENCES_FILE):
|
||||
with open(PREFERENCES_FILE, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not load preferences: {e}")
|
||||
return {"preferred_location_id": None}
|
||||
|
||||
|
||||
def _save_preferences(preferences: dict) -> None:
|
||||
"""Save preferences to file"""
|
||||
try:
|
||||
with open(PREFERENCES_FILE, 'w') as f:
|
||||
json.dump(preferences, f, indent=2)
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not save preferences: {e}")
|
||||
|
||||
|
||||
def get_preferred_location_id() -> Optional[str]:
|
||||
"""Get the current preferred location ID from preferences file"""
|
||||
preferences = _load_preferences()
|
||||
return preferences.get("preferred_location_id")
|
||||
|
||||
|
||||
def set_preferred_location_id(location_id: str) -> None:
|
||||
"""Set the preferred location ID in preferences file"""
|
||||
preferences = _load_preferences()
|
||||
preferences["preferred_location_id"] = location_id
|
||||
_save_preferences(preferences)
|
||||
|
||||
|
||||
def format_currency(value: Optional[float]) -> str:
|
||||
"""Format a value as currency"""
|
||||
if value is None:
|
||||
return "N/A"
|
||||
return f"${value:.2f}"
|
||||
|
||||
|
||||
def get_default_zip_code() -> str:
|
||||
"""Get the default zip code from environment or fallback"""
|
||||
return get_zip_code(default="10001")
|
||||
33
src/kroger_mcp/tools/utility_tools.py
Normal file
33
src/kroger_mcp/tools/utility_tools.py
Normal file
@@ -0,0 +1,33 @@
|
||||
"""
|
||||
Utility tools for the Kroger MCP server
|
||||
"""
|
||||
|
||||
from typing import Dict, Any
|
||||
from datetime import datetime
|
||||
from fastmcp import Context
|
||||
|
||||
|
||||
def register_tools(mcp):
|
||||
"""Register utility tools with the FastMCP server"""
|
||||
|
||||
@mcp.tool()
|
||||
async def get_current_datetime(ctx: Context = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Get the current system date and time.
|
||||
|
||||
This tool is useful for comparing with cart checkout dates, order history,
|
||||
or any other time-sensitive operations.
|
||||
|
||||
Returns:
|
||||
Dictionary containing current date and time information
|
||||
"""
|
||||
now = datetime.now()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"datetime": now.isoformat(),
|
||||
"date": now.date().isoformat(),
|
||||
"time": now.time().isoformat(),
|
||||
"timestamp": int(now.timestamp()),
|
||||
"formatted": now.strftime("%A, %B %d, %Y at %I:%M:%S %p")
|
||||
}
|
||||
Reference in New Issue
Block a user