diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..535be58 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,41 @@ +# Changelog + +All notable changes to the `kroger-mcp` package will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.2.0] - 2025-05-28 + +### Added + +- **MCP-Compatible Authentication Flow**: Implemented a new authentication flow designed for MCP environments + - New `start_authentication` tool to begin the OAuth flow + - New `complete_authentication` tool to finish the OAuth flow with a redirect URL + - Better error handling and messaging for authentication issues + +### Changed + +- **PKCE Support**: Updated to use the Proof Key for Code Exchange (PKCE) extension for enhanced OAuth security +- **Updated Dependencies**: Now requires kroger-api >= 0.2.0 for PKCE support +- **Improved Error Messaging**: Better error messages for authentication issues + +### Removed + +- **Browser-Based Authentication**: Removed the automatic browser-opening authentication flow, replaced with MCP-compatible flow + +### Security + +- Enhanced OAuth security with PKCE support, mitigating authorization code interception attacks + +## [0.1.0] - 2025-05-23 + +### Added + +- Initial release of the Kroger MCP server +- Support for FastMCP tools to interact with the Kroger API +- Location search and management +- Product search and details +- Cart management with local tracking +- Chain and department information +- User profile and authentication diff --git a/pyproject.toml b/pyproject.toml index f117615..214dd77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "kroger-mcp" -version = "0.1.0" +version = "0.2.0" description = "FastMCP server for Kroger API integration" license = {file = "LICENSE"} authors = [ @@ -28,7 +28,7 @@ classifiers = [ dependencies = [ "fastmcp>=2.0.0", - "kroger-api", + "kroger-api>=0.2.0", "requests", "pydantic>=2.0.0", "python-dotenv", diff --git a/requirements.txt b/requirements.txt index 11db932..e29435b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ fastmcp # Kroger API client -kroger-api +kroger-api>=0.2.0 # Additional dependencies (if not already included in kroger-api) requests diff --git a/src/kroger_mcp/server.py b/src/kroger_mcp/server.py index 0ea4491..733742f 100644 --- a/src/kroger_mcp/server.py +++ b/src/kroger_mcp/server.py @@ -26,6 +26,7 @@ from .tools import cart_tools from .tools import info_tools from .tools import profile_tools from .tools import utility_tools +from .tools import auth_tools # Import prompts from . import prompts @@ -54,7 +55,11 @@ def create_server() -> FastMCP: 5. View current cart with view_current_cart 6. Mark order as placed with mark_order_placed - Authentication is handled automatically - OAuth flows will open in your browser when needed. + Authentication Flow: + 1. Use start_authentication to get an authorization URL + 2. Open the URL in your browser and authorize the application + 3. Copy the full redirect URL from your browser + 4. Use complete_authentication with the redirect URL to finish the process Cart Tracking: This server maintains a local record of items added to your cart since the Kroger API @@ -70,6 +75,7 @@ def create_server() -> FastMCP: info_tools.register_tools(mcp) profile_tools.register_tools(mcp) utility_tools.register_tools(mcp) + auth_tools.register_tools(mcp) # Register prompts prompts.register_prompts(mcp) diff --git a/src/kroger_mcp/tools/__init__.py b/src/kroger_mcp/tools/__init__.py index e7e16e5..04e1149 100644 --- a/src/kroger_mcp/tools/__init__.py +++ b/src/kroger_mcp/tools/__init__.py @@ -7,5 +7,6 @@ This package contains all the tool modules organized by functionality: - cart_tools: Shopping cart management with local tracking - info_tools: Chain and department information - profile_tools: User profile and authentication +- auth_tools: OAuth authentication tools - shared: Common utilities and client management """ diff --git a/src/kroger_mcp/tools/auth.py b/src/kroger_mcp/tools/auth.py new file mode 100644 index 0000000..968f775 --- /dev/null +++ b/src/kroger_mcp/tools/auth.py @@ -0,0 +1,193 @@ +""" +Authentication tools module for Kroger MCP server + +This module provides OAuth authentication tools designed specifically for MCP context, +where the browser-based authentication flow needs to be handled through user interaction +rather than automated browser opening. +""" +import os +from typing import Dict, Any, Optional +from fastmcp import Context +from dotenv import load_dotenv +from urllib.parse import urlparse, parse_qs + +# Import the PKCE utilities from kroger-api +from kroger_api.utils import generate_pkce_parameters +from kroger_api import KrogerAPI + +# Load environment variables +load_dotenv() + +# Store PKCE parameters between steps +_pkce_params = None +_auth_state = None + +def register_auth_tools(mcp): + """Register authentication-specific tools with the FastMCP server""" + + @mcp.tool() + async def start_authentication(ctx: Context = None) -> Dict[str, Any]: + """ + Start the OAuth authentication flow with Kroger. + + This tool returns a URL that the user needs to open in their browser + to authenticate with Kroger. After authorization, the user will be + redirected to a callback URL that they need to copy and paste back. + + Returns: + Dictionary with authorization URL and instructions + """ + global _pkce_params, _auth_state + + # Generate PKCE parameters + _pkce_params = generate_pkce_parameters() + + # Generate a state parameter for CSRF protection + _auth_state = _pkce_params.get('state', _pkce_params.get('code_verifier')[:16]) + + # Get client_id and redirect_uri from environment + client_id = os.environ.get("KROGER_CLIENT_ID") + redirect_uri = os.environ.get("KROGER_REDIRECT_URI", "http://localhost:8000/callback") + + if not client_id: + if ctx: + await ctx.error("Missing KROGER_CLIENT_ID environment variable") + return { + "error": True, + "message": "Missing KROGER_CLIENT_ID environment variable. Please set up your Kroger API credentials." + } + + # Initialize the Kroger API client + kroger = KrogerAPI() + + # Scopes needed for Kroger API (cart.basic:write is needed for cart operations) + scopes = "product.compact cart.basic:write" + + # Get the authorization URL with PKCE + auth_url = kroger.authorization.get_authorization_url( + scope=scopes, + state=_auth_state, + code_challenge=_pkce_params["code_challenge"], + code_challenge_method=_pkce_params["code_challenge_method"] + ) + + if ctx: + await ctx.info(f"Generated auth URL with PKCE: {auth_url}") + + return { + "auth_url": auth_url, + "instructions": ( + "1. Open this URL in your browser\n" + "2. Log in to your Kroger account and authorize the application\n" + "3. After authorization, you'll be redirected to a callback URL\n" + "4. Copy the FULL redirect URL from your browser's address bar\n" + "5. Use the complete_authentication tool with that URL to complete the process" + ) + } + + @mcp.tool() + async def complete_authentication(redirect_url: str, ctx: Context = None) -> Dict[str, Any]: + """ + Complete the OAuth flow using the redirect URL from Kroger. + + After opening the auth URL in your browser and authorizing the app, + you'll be redirected to a callback URL. Copy that entire URL and + pass it to this tool to complete the authentication process. + + Args: + redirect_url: The full URL from your browser after authorization + + Returns: + Dictionary indicating authentication status + """ + global _pkce_params, _auth_state + + if not _pkce_params or not _auth_state: + if ctx: + await ctx.error("Authentication flow not started") + return { + "error": True, + "message": "Authentication flow not started. Please use start_authentication first." + } + + try: + # Parse the redirect URL + parsed_url = urlparse(redirect_url) + query_params = parse_qs(parsed_url.query) + + # Extract code and state + if 'code' not in query_params: + if ctx: + await ctx.error("Authorization code not found in redirect URL") + return { + "error": True, + "message": "Authorization code not found in redirect URL. Please check the URL and try again." + } + + auth_code = query_params['code'][0] + received_state = query_params.get('state', [None])[0] + + # Verify state parameter to prevent CSRF attacks + if received_state != _auth_state: + if ctx: + await ctx.error(f"State mismatch: expected {_auth_state}, got {received_state}") + return { + "error": True, + "message": "State parameter mismatch. This could indicate a CSRF attack. Please try authenticating again." + } + + # Get client credentials + client_id = os.environ.get("KROGER_CLIENT_ID") + client_secret = os.environ.get("KROGER_CLIENT_SECRET") + redirect_uri = os.environ.get("KROGER_REDIRECT_URI", "http://localhost:8000/callback") + + if not client_id or not client_secret: + if ctx: + await ctx.error("Missing Kroger API credentials") + return { + "error": True, + "message": "Missing Kroger API credentials. Please set KROGER_CLIENT_ID and KROGER_CLIENT_SECRET." + } + + # Initialize Kroger API client + kroger = KrogerAPI() + + # Exchange the authorization code for tokens with the code verifier + if ctx: + await ctx.info(f"Exchanging authorization code for tokens with code_verifier") + + # Use the code_verifier from the PKCE parameters + token_info = kroger.authorization.get_token_with_authorization_code( + auth_code, + code_verifier=_pkce_params["code_verifier"] + ) + + # Clear PKCE parameters and state after successful exchange + _pkce_params = None + _auth_state = None + + if ctx: + await ctx.info(f"Authentication successful!") + + # Return success response + return { + "success": True, + "message": "Authentication successful! You can now use Kroger API tools that require authentication.", + "token_info": { + "expires_in": token_info.get("expires_in"), + "token_type": token_info.get("token_type"), + "scope": token_info.get("scope"), + "has_refresh_token": "refresh_token" in token_info + } + } + + except Exception as e: + error_message = str(e) + + if ctx: + await ctx.error(f"Authentication error: {error_message}") + + return { + "error": True, + "message": f"Authentication failed: {error_message}" + } diff --git a/src/kroger_mcp/tools/auth_tools.py b/src/kroger_mcp/tools/auth_tools.py new file mode 100644 index 0000000..9474b40 --- /dev/null +++ b/src/kroger_mcp/tools/auth_tools.py @@ -0,0 +1,9 @@ +""" +Separate module for registering authentication tools to avoid circular imports +""" + +from .auth import register_auth_tools + +def register_tools(mcp): + """Register authentication tools with the FastMCP server""" + register_auth_tools(mcp) diff --git a/src/kroger_mcp/tools/shared.py b/src/kroger_mcp/tools/shared.py index ac3a3ab..886ade1 100644 --- a/src/kroger_mcp/tools/shared.py +++ b/src/kroger_mcp/tools/shared.py @@ -3,17 +3,16 @@ Shared utilities and client management for Kroger MCP server """ import os -import socket -import threading -import time -import webbrowser import json -from typing import Optional +from typing import Optional, Dict, Any +from dotenv import load_dotenv + from kroger_api.kroger_api import KrogerAPI -from kroger_api.auth import authenticate_user -from kroger_api.utils.env import load_and_validate_env, get_zip_code, get_redirect_uri -from kroger_api.utils.oauth import start_oauth_server, generate_random_state, extract_port_from_redirect_uri -from kroger_api.token_storage import load_token, save_token +from kroger_api.utils.env import load_and_validate_env, get_zip_code +from kroger_api.token_storage import load_token + +# Load environment variables +load_dotenv() # Global state for clients and preferred location _authenticated_client: Optional[KrogerAPI] = None @@ -27,227 +26,95 @@ def get_client_credentials_client() -> KrogerAPI: """Get or create a client credentials authenticated client for public data""" global _client_credentials_client - if _client_credentials_client is None: - try: - load_and_validate_env(["KROGER_CLIENT_ID", "KROGER_CLIENT_SECRET"]) - _client_credentials_client = KrogerAPI() - - # Try to load existing token first - token_file = ".kroger_token_client_product.compact.json" - token_info = load_token(token_file) - - if token_info: - # Test if the token is still valid - _client_credentials_client.client.token_info = token_info - if _client_credentials_client.test_current_token(): - # Token is valid, use it - pass - else: - # Token is invalid, get a new one - token_info = _client_credentials_client.authorization.get_token_with_client_credentials("product.compact") - else: - # No existing token, get a new one - token_info = _client_credentials_client.authorization.get_token_with_client_credentials("product.compact") - except Exception as e: - raise Exception(f"Failed to get client credentials: {str(e)}") + if _client_credentials_client is not None and _client_credentials_client.test_current_token(): + return _client_credentials_client + + _client_credentials_client = None - return _client_credentials_client + try: + load_and_validate_env(["KROGER_CLIENT_ID", "KROGER_CLIENT_SECRET"]) + _client_credentials_client = KrogerAPI() + + # Try to load existing token first + token_file = ".kroger_token_client_product.compact.json" + token_info = load_token(token_file) + + if token_info: + # Test if the token is still valid + _client_credentials_client.client.token_info = token_info + if _client_credentials_client.test_current_token(): + # Token is valid, use it + return _client_credentials_client + + # Token is invalid or not found, get a new one + token_info = _client_credentials_client.authorization.get_token_with_client_credentials("product.compact") + return _client_credentials_client + except Exception as e: + raise Exception(f"Failed to get client credentials: {str(e)}") def get_authenticated_client() -> KrogerAPI: - """Get or create a user-authenticated client for cart operations with browser-based OAuth""" + """Get or create a user-authenticated client for cart operations + + This function attempts to load an existing token or prompts for authentication. + In an MCP context, the user needs to explicitly call start_authentication and + complete_authentication tools to authenticate. + + Returns: + KrogerAPI: Authenticated client + + Raises: + Exception: If no valid token is available and authentication is required + """ global _authenticated_client - if _authenticated_client is None: - try: - load_and_validate_env(["KROGER_CLIENT_ID", "KROGER_CLIENT_SECRET", "KROGER_REDIRECT_URI"]) - - # Try to load existing user token first - token_file = ".kroger_token_user.json" - token_info = load_token(token_file) - - if token_info: - # Test if the token is still valid - _authenticated_client = KrogerAPI() - _authenticated_client.client.token_info = token_info - _authenticated_client.client.token_file = token_file - - if _authenticated_client.test_current_token(): - # Token is valid, use it - return _authenticated_client - else: - # Token is invalid, try to refresh it - if "refresh_token" in token_info: - try: - new_token_info = _authenticated_client.authorization.refresh_token(token_info["refresh_token"]) - # Token refreshed successfully - return _authenticated_client - except Exception as e: - print(f"Token refresh failed: {str(e)}") - # Refresh failed, need to re-authenticate - _authenticated_client = None - - # No valid token available, need to authenticate with browser - if _authenticated_client is None: - _authenticated_client = _authenticate_with_browser() - except Exception as e: - raise Exception(f"Authentication failed: {str(e)}") + if _authenticated_client is not None and _authenticated_client.test_current_token(): + # Client exists and token is still valid + return _authenticated_client + + # Clear the reference if token is invalid + _authenticated_client = None - return _authenticated_client - - -def _authenticate_with_browser() -> KrogerAPI: - """Authenticate user by opening browser and handling OAuth flow""" - server = None try: - redirect_uri = get_redirect_uri() - port = extract_port_from_redirect_uri(redirect_uri) - - # Check if port is already in use and try alternative ports - original_port = port - max_attempts = 5 + load_and_validate_env(["KROGER_CLIENT_ID", "KROGER_CLIENT_SECRET", "KROGER_REDIRECT_URI"]) - for attempt in range(max_attempts): - try: - # Test if port is available - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('127.0.0.1', port)) - # Port is available, break out of loop - break - except OSError: - if attempt < max_attempts - 1: - port += 1 - print(f"Port {port - 1} is in use, trying port {port}...") - else: - raise Exception(f"Ports {original_port}-{port} are all in use. Please free up port {original_port} or restart your system.") + # Try to load existing user token first + token_file = ".kroger_token_user.json" + token_info = load_token(token_file) - # Update redirect URI if we had to change the port - if port != original_port: - redirect_uri = redirect_uri.replace(f":{original_port}", f":{port}") - print(f"Using alternative port {port} for OAuth callback.") - - # Create the API client - kroger = KrogerAPI() - - # Generate a random state value for security - state = generate_random_state() - - # Variables to store the authorization code - auth_code = None - auth_state = None - auth_event = threading.Event() - auth_error = None - - # Callback for when the authorization code is received - def on_code_received(code, received_state): - nonlocal auth_code, auth_state, auth_error - try: - auth_code = code - auth_state = received_state - auth_event.set() - except Exception as e: - auth_error = str(e) - auth_event.set() - - # Start the server to handle the OAuth2 redirect - try: - server, server_thread = start_oauth_server(port, on_code_received) - except Exception as e: - raise Exception(f"Failed to start OAuth server on port {port}: {str(e)}") - - try: - # Get the authorization URL with the potentially updated redirect URI - if port != original_port: - # Temporarily override the redirect URI for this authentication - original_redirect = os.environ.get('KROGER_REDIRECT_URI') - os.environ['KROGER_REDIRECT_URI'] = redirect_uri - - auth_url = kroger.authorization.get_authorization_url( - scope="cart.basic:write profile.compact", - state=state - ) - - # Restore original redirect URI if we changed it - if port != original_port and original_redirect: - os.environ['KROGER_REDIRECT_URI'] = original_redirect - - print("\n" + "="*60) - print("KROGER AUTHENTICATION REQUIRED") - print("="*60) - print("Opening your browser for Kroger login...") - print("Please log in and authorize the application.") - if port != original_port: - print(f"Note: Using port {port} instead of {original_port} due to port conflict.") - print("This window will close automatically after authentication.") - print("If the browser doesn't open, copy this URL:") - print(f" {auth_url}") - print("="*60) - - # Open the authorization URL in the default browser - try: - webbrowser.open(auth_url) - except Exception as e: - print(f"Could not open browser automatically: {e}") - print("Please manually open the URL above in your browser.") - - # Wait for the authorization code (timeout after 5 minutes) - if not auth_event.wait(timeout=300): - raise Exception("Authentication timed out after 5 minutes. Please try again.") - - if auth_error: - raise Exception(f"OAuth callback error: {auth_error}") - - if not auth_code: - raise Exception("No authorization code received. Authentication may have been cancelled.") - - # Verify the state parameter to prevent CSRF attacks - if auth_state != state: - raise Exception(f"State mismatch. Expected {state}, got {auth_state}. This could be a security issue.") - - # Exchange the authorization code for an access token - try: - token_info = kroger.authorization.get_token_with_authorization_code(auth_code) - except Exception as e: - raise Exception(f"Failed to exchange authorization code for token: {str(e)}") - - print("\n" + "="*60) - print("AUTHENTICATION SUCCESSFUL!") - print("="*60) - print("You can now use cart operations and user-specific features.") - print("Your authentication token has been saved for future use.") - print("="*60 + "\n") - - return kroger - - finally: - # Ensure the server is shut down properly - if server: + if token_info: + # Create a new client with the loaded token + _authenticated_client = KrogerAPI() + _authenticated_client.client.token_info = token_info + _authenticated_client.client.token_file = token_file + + if _authenticated_client.test_current_token(): + # Token is valid, use it + return _authenticated_client + + # Token is invalid, try to refresh it + if "refresh_token" in token_info: try: - server.shutdown() - # Give it a moment to fully shut down - time.sleep(0.5) - except Exception as e: - print(f"Note: OAuth server cleanup had an issue: {e}") - - except Exception as e: - error_msg = str(e) - print(f"\nAuthentication failed: {error_msg}") - print("\nTo resolve this issue:") - print("1. Make sure KROGER_REDIRECT_URI is set correctly in your .env file") - print("2. Ensure the redirect URI matches what's registered in Kroger Developer Portal") - print("3. If port issues persist, restart Claude Desktop or try a different port") - print("4. You can change the port by updating KROGER_REDIRECT_URI to http://localhost:8001/callback") - print("5. Make sure you have a stable internet connection") + _authenticated_client.authorization.refresh_token(token_info["refresh_token"]) + # If refresh was successful, return the client + if _authenticated_client.test_current_token(): + return _authenticated_client + except Exception: + # Refresh failed, need to re-authenticate + _authenticated_client = None - # Re-raise with a cleaner error message - if "timed out" in error_msg.lower(): - raise Exception("Authentication timed out. Please try again and complete the login process more quickly.") - elif "port" in error_msg.lower(): - raise Exception(f"Port conflict: {error_msg}") - elif "connection" in error_msg.lower(): - raise Exception(f"Connection error: {error_msg}") + # No valid token available, need user-initiated authentication + raise Exception( + "Authentication required. Please use the start_authentication tool to begin the OAuth flow, " + "then complete it with the complete_authentication tool." + ) + except Exception as e: + if "Authentication required" in str(e): + # This is an expected error when authentication is needed + raise else: - raise Exception(f"Authentication failed: {error_msg}") + # Other unexpected errors + raise Exception(f"Authentication failed: {str(e)}") def invalidate_authenticated_client():