Implement MCP-compatible auth flow with PKCE support

main
CupOfOwls 2 months ago
parent e713b2a7b6
commit d29de20a9a

@ -0,0 +1,41 @@
# Changelog
All notable changes to the `kroger-mcp` package will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.2.0] - 2025-05-28
### Added
- **MCP-Compatible Authentication Flow**: Implemented a new authentication flow designed for MCP environments
- New `start_authentication` tool to begin the OAuth flow
- New `complete_authentication` tool to finish the OAuth flow with a redirect URL
- Better error handling and messaging for authentication issues
### Changed
- **PKCE Support**: Updated to use the Proof Key for Code Exchange (PKCE) extension for enhanced OAuth security
- **Updated Dependencies**: Now requires kroger-api >= 0.2.0 for PKCE support
- **Improved Error Messaging**: Better error messages for authentication issues
### Removed
- **Browser-Based Authentication**: Removed the automatic browser-opening authentication flow, replaced with MCP-compatible flow
### Security
- Enhanced OAuth security with PKCE support, mitigating authorization code interception attacks
## [0.1.0] - 2025-05-23
### Added
- Initial release of the Kroger MCP server
- Support for FastMCP tools to interact with the Kroger API
- Location search and management
- Product search and details
- Cart management with local tracking
- Chain and department information
- User profile and authentication

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "kroger-mcp"
version = "0.1.0"
version = "0.2.0"
description = "FastMCP server for Kroger API integration"
license = {file = "LICENSE"}
authors = [
@ -28,7 +28,7 @@ classifiers = [
dependencies = [
"fastmcp>=2.0.0",
"kroger-api",
"kroger-api>=0.2.0",
"requests",
"pydantic>=2.0.0",
"python-dotenv",

@ -2,7 +2,7 @@
fastmcp
# Kroger API client
kroger-api
kroger-api>=0.2.0
# Additional dependencies (if not already included in kroger-api)
requests

@ -26,6 +26,7 @@ from .tools import cart_tools
from .tools import info_tools
from .tools import profile_tools
from .tools import utility_tools
from .tools import auth_tools
# Import prompts
from . import prompts
@ -54,7 +55,11 @@ def create_server() -> FastMCP:
5. View current cart with view_current_cart
6. Mark order as placed with mark_order_placed
Authentication is handled automatically - OAuth flows will open in your browser when needed.
Authentication Flow:
1. Use start_authentication to get an authorization URL
2. Open the URL in your browser and authorize the application
3. Copy the full redirect URL from your browser
4. Use complete_authentication with the redirect URL to finish the process
Cart Tracking:
This server maintains a local record of items added to your cart since the Kroger API
@ -70,6 +75,7 @@ def create_server() -> FastMCP:
info_tools.register_tools(mcp)
profile_tools.register_tools(mcp)
utility_tools.register_tools(mcp)
auth_tools.register_tools(mcp)
# Register prompts
prompts.register_prompts(mcp)

@ -7,5 +7,6 @@ This package contains all the tool modules organized by functionality:
- cart_tools: Shopping cart management with local tracking
- info_tools: Chain and department information
- profile_tools: User profile and authentication
- auth_tools: OAuth authentication tools
- shared: Common utilities and client management
"""

@ -0,0 +1,193 @@
"""
Authentication tools module for Kroger MCP server
This module provides OAuth authentication tools designed specifically for MCP context,
where the browser-based authentication flow needs to be handled through user interaction
rather than automated browser opening.
"""
import os
from typing import Dict, Any, Optional
from fastmcp import Context
from dotenv import load_dotenv
from urllib.parse import urlparse, parse_qs
# Import the PKCE utilities from kroger-api
from kroger_api.utils import generate_pkce_parameters
from kroger_api import KrogerAPI
# Load environment variables
load_dotenv()
# Store PKCE parameters between steps
_pkce_params = None
_auth_state = None
def register_auth_tools(mcp):
"""Register authentication-specific tools with the FastMCP server"""
@mcp.tool()
async def start_authentication(ctx: Context = None) -> Dict[str, Any]:
"""
Start the OAuth authentication flow with Kroger.
This tool returns a URL that the user needs to open in their browser
to authenticate with Kroger. After authorization, the user will be
redirected to a callback URL that they need to copy and paste back.
Returns:
Dictionary with authorization URL and instructions
"""
global _pkce_params, _auth_state
# Generate PKCE parameters
_pkce_params = generate_pkce_parameters()
# Generate a state parameter for CSRF protection
_auth_state = _pkce_params.get('state', _pkce_params.get('code_verifier')[:16])
# Get client_id and redirect_uri from environment
client_id = os.environ.get("KROGER_CLIENT_ID")
redirect_uri = os.environ.get("KROGER_REDIRECT_URI", "http://localhost:8000/callback")
if not client_id:
if ctx:
await ctx.error("Missing KROGER_CLIENT_ID environment variable")
return {
"error": True,
"message": "Missing KROGER_CLIENT_ID environment variable. Please set up your Kroger API credentials."
}
# Initialize the Kroger API client
kroger = KrogerAPI()
# Scopes needed for Kroger API (cart.basic:write is needed for cart operations)
scopes = "product.compact cart.basic:write"
# Get the authorization URL with PKCE
auth_url = kroger.authorization.get_authorization_url(
scope=scopes,
state=_auth_state,
code_challenge=_pkce_params["code_challenge"],
code_challenge_method=_pkce_params["code_challenge_method"]
)
if ctx:
await ctx.info(f"Generated auth URL with PKCE: {auth_url}")
return {
"auth_url": auth_url,
"instructions": (
"1. Open this URL in your browser\n"
"2. Log in to your Kroger account and authorize the application\n"
"3. After authorization, you'll be redirected to a callback URL\n"
"4. Copy the FULL redirect URL from your browser's address bar\n"
"5. Use the complete_authentication tool with that URL to complete the process"
)
}
@mcp.tool()
async def complete_authentication(redirect_url: str, ctx: Context = None) -> Dict[str, Any]:
"""
Complete the OAuth flow using the redirect URL from Kroger.
After opening the auth URL in your browser and authorizing the app,
you'll be redirected to a callback URL. Copy that entire URL and
pass it to this tool to complete the authentication process.
Args:
redirect_url: The full URL from your browser after authorization
Returns:
Dictionary indicating authentication status
"""
global _pkce_params, _auth_state
if not _pkce_params or not _auth_state:
if ctx:
await ctx.error("Authentication flow not started")
return {
"error": True,
"message": "Authentication flow not started. Please use start_authentication first."
}
try:
# Parse the redirect URL
parsed_url = urlparse(redirect_url)
query_params = parse_qs(parsed_url.query)
# Extract code and state
if 'code' not in query_params:
if ctx:
await ctx.error("Authorization code not found in redirect URL")
return {
"error": True,
"message": "Authorization code not found in redirect URL. Please check the URL and try again."
}
auth_code = query_params['code'][0]
received_state = query_params.get('state', [None])[0]
# Verify state parameter to prevent CSRF attacks
if received_state != _auth_state:
if ctx:
await ctx.error(f"State mismatch: expected {_auth_state}, got {received_state}")
return {
"error": True,
"message": "State parameter mismatch. This could indicate a CSRF attack. Please try authenticating again."
}
# Get client credentials
client_id = os.environ.get("KROGER_CLIENT_ID")
client_secret = os.environ.get("KROGER_CLIENT_SECRET")
redirect_uri = os.environ.get("KROGER_REDIRECT_URI", "http://localhost:8000/callback")
if not client_id or not client_secret:
if ctx:
await ctx.error("Missing Kroger API credentials")
return {
"error": True,
"message": "Missing Kroger API credentials. Please set KROGER_CLIENT_ID and KROGER_CLIENT_SECRET."
}
# Initialize Kroger API client
kroger = KrogerAPI()
# Exchange the authorization code for tokens with the code verifier
if ctx:
await ctx.info(f"Exchanging authorization code for tokens with code_verifier")
# Use the code_verifier from the PKCE parameters
token_info = kroger.authorization.get_token_with_authorization_code(
auth_code,
code_verifier=_pkce_params["code_verifier"]
)
# Clear PKCE parameters and state after successful exchange
_pkce_params = None
_auth_state = None
if ctx:
await ctx.info(f"Authentication successful!")
# Return success response
return {
"success": True,
"message": "Authentication successful! You can now use Kroger API tools that require authentication.",
"token_info": {
"expires_in": token_info.get("expires_in"),
"token_type": token_info.get("token_type"),
"scope": token_info.get("scope"),
"has_refresh_token": "refresh_token" in token_info
}
}
except Exception as e:
error_message = str(e)
if ctx:
await ctx.error(f"Authentication error: {error_message}")
return {
"error": True,
"message": f"Authentication failed: {error_message}"
}

@ -0,0 +1,9 @@
"""
Separate module for registering authentication tools to avoid circular imports
"""
from .auth import register_auth_tools
def register_tools(mcp):
"""Register authentication tools with the FastMCP server"""
register_auth_tools(mcp)

@ -3,17 +3,16 @@ Shared utilities and client management for Kroger MCP server
"""
import os
import socket
import threading
import time
import webbrowser
import json
from typing import Optional
from typing import Optional, Dict, Any
from dotenv import load_dotenv
from kroger_api.kroger_api import KrogerAPI
from kroger_api.auth import authenticate_user
from kroger_api.utils.env import load_and_validate_env, get_zip_code, get_redirect_uri
from kroger_api.utils.oauth import start_oauth_server, generate_random_state, extract_port_from_redirect_uri
from kroger_api.token_storage import load_token, save_token
from kroger_api.utils.env import load_and_validate_env, get_zip_code
from kroger_api.token_storage import load_token
# Load environment variables
load_dotenv()
# Global state for clients and preferred location
_authenticated_client: Optional[KrogerAPI] = None
@ -27,227 +26,95 @@ def get_client_credentials_client() -> KrogerAPI:
"""Get or create a client credentials authenticated client for public data"""
global _client_credentials_client
if _client_credentials_client is None:
try:
load_and_validate_env(["KROGER_CLIENT_ID", "KROGER_CLIENT_SECRET"])
_client_credentials_client = KrogerAPI()
# Try to load existing token first
token_file = ".kroger_token_client_product.compact.json"
token_info = load_token(token_file)
if token_info:
# Test if the token is still valid
_client_credentials_client.client.token_info = token_info
if _client_credentials_client.test_current_token():
# Token is valid, use it
pass
else:
# Token is invalid, get a new one
token_info = _client_credentials_client.authorization.get_token_with_client_credentials("product.compact")
else:
# No existing token, get a new one
token_info = _client_credentials_client.authorization.get_token_with_client_credentials("product.compact")
except Exception as e:
raise Exception(f"Failed to get client credentials: {str(e)}")
if _client_credentials_client is not None and _client_credentials_client.test_current_token():
return _client_credentials_client
_client_credentials_client = None
return _client_credentials_client
try:
load_and_validate_env(["KROGER_CLIENT_ID", "KROGER_CLIENT_SECRET"])
_client_credentials_client = KrogerAPI()
# Try to load existing token first
token_file = ".kroger_token_client_product.compact.json"
token_info = load_token(token_file)
if token_info:
# Test if the token is still valid
_client_credentials_client.client.token_info = token_info
if _client_credentials_client.test_current_token():
# Token is valid, use it
return _client_credentials_client
# Token is invalid or not found, get a new one
token_info = _client_credentials_client.authorization.get_token_with_client_credentials("product.compact")
return _client_credentials_client
except Exception as e:
raise Exception(f"Failed to get client credentials: {str(e)}")
def get_authenticated_client() -> KrogerAPI:
"""Get or create a user-authenticated client for cart operations with browser-based OAuth"""
"""Get or create a user-authenticated client for cart operations
This function attempts to load an existing token or prompts for authentication.
In an MCP context, the user needs to explicitly call start_authentication and
complete_authentication tools to authenticate.
Returns:
KrogerAPI: Authenticated client
Raises:
Exception: If no valid token is available and authentication is required
"""
global _authenticated_client
if _authenticated_client is None:
try:
load_and_validate_env(["KROGER_CLIENT_ID", "KROGER_CLIENT_SECRET", "KROGER_REDIRECT_URI"])
# Try to load existing user token first
token_file = ".kroger_token_user.json"
token_info = load_token(token_file)
if token_info:
# Test if the token is still valid
_authenticated_client = KrogerAPI()
_authenticated_client.client.token_info = token_info
_authenticated_client.client.token_file = token_file
if _authenticated_client.test_current_token():
# Token is valid, use it
return _authenticated_client
else:
# Token is invalid, try to refresh it
if "refresh_token" in token_info:
try:
new_token_info = _authenticated_client.authorization.refresh_token(token_info["refresh_token"])
# Token refreshed successfully
return _authenticated_client
except Exception as e:
print(f"Token refresh failed: {str(e)}")
# Refresh failed, need to re-authenticate
_authenticated_client = None
# No valid token available, need to authenticate with browser
if _authenticated_client is None:
_authenticated_client = _authenticate_with_browser()
except Exception as e:
raise Exception(f"Authentication failed: {str(e)}")
if _authenticated_client is not None and _authenticated_client.test_current_token():
# Client exists and token is still valid
return _authenticated_client
# Clear the reference if token is invalid
_authenticated_client = None
return _authenticated_client
def _authenticate_with_browser() -> KrogerAPI:
"""Authenticate user by opening browser and handling OAuth flow"""
server = None
try:
redirect_uri = get_redirect_uri()
port = extract_port_from_redirect_uri(redirect_uri)
# Check if port is already in use and try alternative ports
original_port = port
max_attempts = 5
load_and_validate_env(["KROGER_CLIENT_ID", "KROGER_CLIENT_SECRET", "KROGER_REDIRECT_URI"])
for attempt in range(max_attempts):
try:
# Test if port is available
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('127.0.0.1', port))
# Port is available, break out of loop
break
except OSError:
if attempt < max_attempts - 1:
port += 1
print(f"Port {port - 1} is in use, trying port {port}...")
else:
raise Exception(f"Ports {original_port}-{port} are all in use. Please free up port {original_port} or restart your system.")
# Try to load existing user token first
token_file = ".kroger_token_user.json"
token_info = load_token(token_file)
# Update redirect URI if we had to change the port
if port != original_port:
redirect_uri = redirect_uri.replace(f":{original_port}", f":{port}")
print(f"Using alternative port {port} for OAuth callback.")
# Create the API client
kroger = KrogerAPI()
# Generate a random state value for security
state = generate_random_state()
# Variables to store the authorization code
auth_code = None
auth_state = None
auth_event = threading.Event()
auth_error = None
# Callback for when the authorization code is received
def on_code_received(code, received_state):
nonlocal auth_code, auth_state, auth_error
try:
auth_code = code
auth_state = received_state
auth_event.set()
except Exception as e:
auth_error = str(e)
auth_event.set()
# Start the server to handle the OAuth2 redirect
try:
server, server_thread = start_oauth_server(port, on_code_received)
except Exception as e:
raise Exception(f"Failed to start OAuth server on port {port}: {str(e)}")
try:
# Get the authorization URL with the potentially updated redirect URI
if port != original_port:
# Temporarily override the redirect URI for this authentication
original_redirect = os.environ.get('KROGER_REDIRECT_URI')
os.environ['KROGER_REDIRECT_URI'] = redirect_uri
auth_url = kroger.authorization.get_authorization_url(
scope="cart.basic:write profile.compact",
state=state
)
# Restore original redirect URI if we changed it
if port != original_port and original_redirect:
os.environ['KROGER_REDIRECT_URI'] = original_redirect
print("\n" + "="*60)
print("KROGER AUTHENTICATION REQUIRED")
print("="*60)
print("Opening your browser for Kroger login...")
print("Please log in and authorize the application.")
if port != original_port:
print(f"Note: Using port {port} instead of {original_port} due to port conflict.")
print("This window will close automatically after authentication.")
print("If the browser doesn't open, copy this URL:")
print(f" {auth_url}")
print("="*60)
# Open the authorization URL in the default browser
try:
webbrowser.open(auth_url)
except Exception as e:
print(f"Could not open browser automatically: {e}")
print("Please manually open the URL above in your browser.")
# Wait for the authorization code (timeout after 5 minutes)
if not auth_event.wait(timeout=300):
raise Exception("Authentication timed out after 5 minutes. Please try again.")
if auth_error:
raise Exception(f"OAuth callback error: {auth_error}")
if not auth_code:
raise Exception("No authorization code received. Authentication may have been cancelled.")
# Verify the state parameter to prevent CSRF attacks
if auth_state != state:
raise Exception(f"State mismatch. Expected {state}, got {auth_state}. This could be a security issue.")
# Exchange the authorization code for an access token
try:
token_info = kroger.authorization.get_token_with_authorization_code(auth_code)
except Exception as e:
raise Exception(f"Failed to exchange authorization code for token: {str(e)}")
print("\n" + "="*60)
print("AUTHENTICATION SUCCESSFUL!")
print("="*60)
print("You can now use cart operations and user-specific features.")
print("Your authentication token has been saved for future use.")
print("="*60 + "\n")
return kroger
finally:
# Ensure the server is shut down properly
if server:
if token_info:
# Create a new client with the loaded token
_authenticated_client = KrogerAPI()
_authenticated_client.client.token_info = token_info
_authenticated_client.client.token_file = token_file
if _authenticated_client.test_current_token():
# Token is valid, use it
return _authenticated_client
# Token is invalid, try to refresh it
if "refresh_token" in token_info:
try:
server.shutdown()
# Give it a moment to fully shut down
time.sleep(0.5)
except Exception as e:
print(f"Note: OAuth server cleanup had an issue: {e}")
except Exception as e:
error_msg = str(e)
print(f"\nAuthentication failed: {error_msg}")
print("\nTo resolve this issue:")
print("1. Make sure KROGER_REDIRECT_URI is set correctly in your .env file")
print("2. Ensure the redirect URI matches what's registered in Kroger Developer Portal")
print("3. If port issues persist, restart Claude Desktop or try a different port")
print("4. You can change the port by updating KROGER_REDIRECT_URI to http://localhost:8001/callback")
print("5. Make sure you have a stable internet connection")
_authenticated_client.authorization.refresh_token(token_info["refresh_token"])
# If refresh was successful, return the client
if _authenticated_client.test_current_token():
return _authenticated_client
except Exception:
# Refresh failed, need to re-authenticate
_authenticated_client = None
# Re-raise with a cleaner error message
if "timed out" in error_msg.lower():
raise Exception("Authentication timed out. Please try again and complete the login process more quickly.")
elif "port" in error_msg.lower():
raise Exception(f"Port conflict: {error_msg}")
elif "connection" in error_msg.lower():
raise Exception(f"Connection error: {error_msg}")
# No valid token available, need user-initiated authentication
raise Exception(
"Authentication required. Please use the start_authentication tool to begin the OAuth flow, "
"then complete it with the complete_authentication tool."
)
except Exception as e:
if "Authentication required" in str(e):
# This is an expected error when authentication is needed
raise
else:
raise Exception(f"Authentication failed: {error_msg}")
# Other unexpected errors
raise Exception(f"Authentication failed: {str(e)}")
def invalidate_authenticated_client():

Loading…
Cancel
Save