diff --git a/core/utils.py b/core/utils.py
index bf1d250..f14b926 100644
--- a/core/utils.py
+++ b/core/utils.py
@@ -7,6 +7,7 @@ import ssl
 import asyncio
 import functools
 
+from pathlib import Path
 from typing import List, Optional
 
 from googleapiclient.errors import HttpError
@@ -29,6 +30,100 @@ class UserInputError(Exception):
     pass
 
 
+# Directories from which local file reads are allowed.
+# The user's home directory is the default safe base.
+# Override via ALLOWED_FILE_DIRS env var (colon-separated paths).
+_ALLOWED_FILE_DIRS_ENV = "ALLOWED_FILE_DIRS"
+
+
+def _get_allowed_file_dirs() -> list[Path]:
+    """Return the list of directories from which local file access is permitted."""
+    env_val = os.environ.get(_ALLOWED_FILE_DIRS_ENV)
+    if env_val:
+        return [Path(p).resolve() for p in env_val.split(":") if p.strip()]
+    home = Path.home()
+    return [home] if home else []
+
+
+def validate_file_path(file_path: str) -> Path:
+    """
+    Validate that a file path is safe to read from the server filesystem.
+
+    Resolves the path canonically (following symlinks), then verifies it falls
+    within one of the allowed base directories. Rejects paths to sensitive
+    system locations regardless of allowlist.
+
+    Args:
+        file_path: The raw file path string to validate.
+
+    Returns:
+        Path: The resolved, validated Path object.
+
+    Raises:
+        ValueError: If the path is outside allowed directories or targets
+                    a sensitive location.
+        FileNotFoundError: If the resolved path does not exist.
+    """
+    resolved = Path(file_path).resolve()
+
+    # Block sensitive file patterns regardless of allowlist
+    resolved_str = str(resolved)
+    file_name = resolved.name.lower()
+
+    # Block .env files and variants (.env, .env.local, .env.production, etc.)
+    if file_name == ".env" or file_name.startswith(".env."):
+        raise ValueError(
+            f"Access to '{resolved_str}' is not allowed: "
+            ".env files may contain secrets and cannot be read, uploaded, or attached."
+        )
+
+    # Block well-known sensitive system paths (including macOS /private variants)
+    sensitive_prefixes = (
+        "/proc", "/sys", "/dev",
+        "/etc/shadow", "/etc/passwd",
+        "/private/etc/shadow", "/private/etc/passwd",
+    )
+    for prefix in sensitive_prefixes:
+        if resolved_str == prefix or resolved_str.startswith(prefix + "/"):
+            raise ValueError(
+                f"Access to '{resolved_str}' is not allowed: "
+                "path is in a restricted system location."
+            )
+
+    # Block other credential/secret file patterns
+    sensitive_names = {
+        ".credentials", ".credentials.json", "credentials.json",
+        "client_secret.json", "client_secrets.json",
+        "service_account.json", "service-account.json",
+        ".npmrc", ".pypirc", ".netrc", ".docker/config.json",
+    }
+    if file_name in sensitive_names:
+        raise ValueError(
+            f"Access to '{resolved_str}' is not allowed: "
+            "this file commonly contains secrets or credentials."
+        )
+
+    allowed_dirs = _get_allowed_file_dirs()
+    if not allowed_dirs:
+        raise ValueError(
+            "No allowed file directories configured. "
+            "Set the ALLOWED_FILE_DIRS environment variable or ensure a home directory exists."
+        )
+
+    for allowed in allowed_dirs:
+        try:
+            resolved.relative_to(allowed)
+            return resolved
+        except ValueError:
+            continue
+
+    raise ValueError(
+        f"Access to '{resolved_str}' is not allowed: "
+        f"path is outside permitted directories ({', '.join(str(d) for d in allowed_dirs)}). "
+        "Set ALLOWED_FILE_DIRS to adjust."
+    )
+
+
 def check_credentials_directory_permissions(credentials_dir: str = None) -> None:
     """
     Check if the service has appropriate permissions to create and write to the .credentials directory.
diff --git a/gdrive/drive_tools.py b/gdrive/drive_tools.py
index ac2d92c..08fca3b 100644
--- a/gdrive/drive_tools.py
+++ b/gdrive/drive_tools.py
@@ -24,7 +24,7 @@ from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload
 from auth.service_decorator import require_google_service
 from auth.oauth_config import is_stateless_mode
 from core.attachment_storage import get_attachment_storage, get_attachment_url
-from core.utils import extract_office_xml_text, handle_http_errors
+from core.utils import extract_office_xml_text, handle_http_errors, validate_file_path
 from core.server import server
 from core.config import get_transport_mode
 from gdrive.drive_helpers import (
@@ -521,8 +521,8 @@ async def create_drive_file(
                 raw_path = f"//{netloc}{raw_path}"
             file_path = url2pathname(raw_path)
 
-            # Verify file exists
-            path_obj = Path(file_path)
+            # Validate path safety and verify file exists
+            path_obj = validate_file_path(file_path)
             if not path_obj.exists():
                 extra = (
                     " The server is running via streamable-http, so file:// URLs must point to files inside the container or remote host."
@@ -570,21 +570,20 @@ async def create_drive_file(
 
             # when running in stateless mode, deployment may not have access to local file system
             if is_stateless_mode():
-                async with httpx.AsyncClient(follow_redirects=True) as client:
-                    resp = await client.get(fileUrl)
-                    if resp.status_code != 200:
-                        raise Exception(
-                            f"Failed to fetch file from URL: {fileUrl} (status {resp.status_code})"
-                        )
-                    file_data = await resp.aread()
-                    # Try to get MIME type from Content-Type header
-                    content_type = resp.headers.get("Content-Type")
-                    if content_type and content_type != "application/octet-stream":
-                        mime_type = content_type
-                        file_metadata["mimeType"] = content_type
-                        logger.info(
-                            f"[create_drive_file] Using MIME type from Content-Type header: {content_type}"
-                        )
+                resp = await _ssrf_safe_fetch(fileUrl)
+                if resp.status_code != 200:
+                    raise Exception(
+                        f"Failed to fetch file from URL: {fileUrl} (status {resp.status_code})"
+                    )
+                file_data = resp.content
+                # Try to get MIME type from Content-Type header
+                content_type = resp.headers.get("Content-Type")
+                if content_type and content_type != "application/octet-stream":
+                    mime_type = content_type
+                    file_metadata["mimeType"] = content_type
+                    logger.info(
+                        f"[create_drive_file] Using MIME type from Content-Type header: {content_type}"
+                    )
 
                 media = MediaIoBaseUpload(
                     io.BytesIO(file_data),
@@ -607,36 +606,32 @@ async def create_drive_file(
                 # Use NamedTemporaryFile to stream download and upload
                 with NamedTemporaryFile() as temp_file:
                     total_bytes = 0
-                    # follow redirects
-                    async with httpx.AsyncClient(follow_redirects=True) as client:
-                        async with client.stream("GET", fileUrl) as resp:
-                            if resp.status_code != 200:
-                                raise Exception(
-                                    f"Failed to fetch file from URL: {fileUrl} (status {resp.status_code})"
-                                )
-
-                            # Stream download in chunks
-                            async for chunk in resp.aiter_bytes(
-                                chunk_size=DOWNLOAD_CHUNK_SIZE_BYTES
-                            ):
-                                await asyncio.to_thread(temp_file.write, chunk)
-                                total_bytes += len(chunk)
-
-                            logger.info(
-                                f"[create_drive_file] Downloaded {total_bytes} bytes from URL before upload."
-                            )
-
-                            # Try to get MIME type from Content-Type header
-                            content_type = resp.headers.get("Content-Type")
-                            if (
-                                content_type
-                                and content_type != "application/octet-stream"
-                            ):
-                                mime_type = content_type
-                                file_metadata["mimeType"] = mime_type
-                                logger.info(
-                                    f"[create_drive_file] Using MIME type from Content-Type header: {mime_type}"
-                                )
+                    # Use SSRF-safe fetch (validates each redirect target)
+                    resp = await _ssrf_safe_fetch(fileUrl)
+                    if resp.status_code != 200:
+                        raise Exception(
+                            f"Failed to fetch file from URL: {fileUrl} (status {resp.status_code})"
+                        )
+
+                    file_data = resp.content
+                    await asyncio.to_thread(temp_file.write, file_data)
+                    total_bytes = len(file_data)
+
+                    logger.info(
+                        f"[create_drive_file] Downloaded {total_bytes} bytes from URL before upload."
+                    )
+
+                    # Try to get MIME type from Content-Type header
+                    content_type = resp.headers.get("Content-Type")
+                    if (
+                        content_type
+                        and content_type != "application/octet-stream"
+                    ):
+                        mime_type = content_type
+                        file_metadata["mimeType"] = mime_type
+                        logger.info(
+                            f"[create_drive_file] Using MIME type from Content-Type header: {mime_type}"
+                        )
 
                     # Reset file pointer to beginning for upload
                     temp_file.seek(0)
@@ -708,16 +703,18 @@ GOOGLE_DOCS_IMPORT_FORMATS = {
 GOOGLE_DOCS_MIME_TYPE = "application/vnd.google-apps.document"
 
 
-def _validate_url_not_internal(url: str) -> None:
+def _resolve_and_validate_host(hostname: str) -> list[str]:
     """
-    Validate that a URL doesn't point to internal/private networks (SSRF protection).
+    Resolve a hostname to IP addresses and validate none are private/internal.
+
+    Uses getaddrinfo to handle both IPv4 and IPv6. Fails closed on DNS errors.
+
+    Returns:
+        list[str]: Validated resolved IP address strings.
 
     Raises:
-        ValueError: If URL points to localhost or private IP ranges
+        ValueError: If hostname resolves to private/internal IPs or DNS fails.
     """
-    parsed = urlparse(url)
-    hostname = parsed.hostname
-
     if not hostname:
         raise ValueError("Invalid URL: no hostname")
 
@@ -725,15 +722,92 @@ def _validate_url_not_internal(url: str) -> None:
     if hostname.lower() in ("localhost", "127.0.0.1", "::1", "0.0.0.0"):
         raise ValueError("URLs pointing to localhost are not allowed")
 
-    # Resolve hostname and check if it's a private IP
+    # Resolve hostname using getaddrinfo (handles both IPv4 and IPv6)
     try:
-        ip = ipaddress.ip_address(socket.gethostbyname(hostname))
-        if ip.is_private or ip.is_loopback or ip.is_reserved:
+        addr_infos = socket.getaddrinfo(hostname, None)
+    except socket.gaierror as e:
+        raise ValueError(
+            f"Cannot resolve hostname '{hostname}': {e}. "
+            "Refusing request (fail-closed)."
+        )
+
+    if not addr_infos:
+        raise ValueError(f"No addresses found for hostname: {hostname}")
+
+    resolved_ips: list[str] = []
+    for family, _type, _proto, _canonname, sockaddr in addr_infos:
+        ip_str = sockaddr[0]
+        ip = ipaddress.ip_address(ip_str)
+        if ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local:
             raise ValueError(
-                f"URLs pointing to private/internal networks are not allowed: {hostname}"
+                f"URLs pointing to private/internal networks are not allowed: "
+                f"{hostname} resolves to {ip_str}"
             )
-    except socket.gaierror:
-        pass  # Can't resolve, let httpx handle it
+        resolved_ips.append(ip_str)
+
+    return resolved_ips
+
+
+def _validate_url_not_internal(url: str) -> list[str]:
+    """
+    Validate that a URL doesn't point to internal/private networks (SSRF protection).
+
+    Returns:
+        list[str]: Validated resolved IP addresses for the hostname.
+
+    Raises:
+        ValueError: If URL points to localhost or private IP ranges.
+    """
+    parsed = urlparse(url)
+    return _resolve_and_validate_host(parsed.hostname)
+
+
+async def _ssrf_safe_fetch(url: str, *, stream: bool = False) -> httpx.Response:
+    """
+    Fetch a URL with SSRF protection that covers redirects and DNS rebinding.
+
+    Validates the initial URL and every redirect target against private/internal
+    networks. Disables automatic redirect following and handles redirects manually.
+
+    Args:
+        url: The URL to fetch.
+        stream: If True, returns a streaming response (caller must manage context).
+
+    Returns:
+        httpx.Response with the final response content.
+
+    Raises:
+        ValueError: If any URL in the redirect chain points to a private network.
+        Exception: If the HTTP request fails.
+    """
+    max_redirects = 10
+    current_url = url
+
+    for _ in range(max_redirects):
+        _validate_url_not_internal(current_url)
+
+        async with httpx.AsyncClient(follow_redirects=False) as client:
+            resp = await client.get(current_url)
+
+            if resp.status_code in (301, 302, 303, 307, 308):
+                location = resp.headers.get("location")
+                if not location:
+                    raise Exception(f"Redirect with no Location header from {current_url}")
+                # Resolve relative redirects
+                parsed_current = urlparse(current_url)
+                parsed_location = urlparse(location)
+                if not parsed_location.scheme:
+                    location = f"{parsed_current.scheme}://{parsed_current.netloc}{location}"
+                # Validate redirect target scheme
+                redirect_parsed = urlparse(location)
+                if redirect_parsed.scheme not in ("http", "https"):
+                    raise ValueError(f"Redirect to disallowed scheme: {redirect_parsed.scheme}")
+                current_url = location
+                continue
+
+            return resp
+
+    raise Exception(f"Too many redirects (max {max_redirects}) fetching {url}")
 
 
 def _detect_source_format(file_name: str, content: Optional[str] = None) -> str:
@@ -865,7 +939,7 @@ async def import_to_google_doc(
                 f"file_path should be a local path or file:// URL, got: {file_path}"
             )
 
-        path_obj = Path(actual_path)
+        path_obj = validate_file_path(actual_path)
         if not path_obj.exists():
             raise FileNotFoundError(f"File not found: {actual_path}")
         if not path_obj.is_file():
@@ -887,16 +961,13 @@ async def import_to_google_doc(
         if parsed_url.scheme not in ("http", "https"):
             raise ValueError(f"file_url must be http:// or https://, got: {file_url}")
 
-        # SSRF protection: block internal/private network URLs
-        _validate_url_not_internal(file_url)
-
-        async with httpx.AsyncClient(follow_redirects=True) as client:
-            resp = await client.get(file_url)
-            if resp.status_code != 200:
-                raise Exception(
-                    f"Failed to fetch file from URL: {file_url} (status {resp.status_code})"
-                )
-            file_data = resp.content
+        # SSRF protection: block internal/private network URLs and validate redirects
+        resp = await _ssrf_safe_fetch(file_url)
+        if resp.status_code != 200:
+            raise Exception(
+                f"Failed to fetch file from URL: {file_url} (status {resp.status_code})"
+            )
+        file_data = resp.content
 
         logger.info(
             f"[import_to_google_doc] Downloaded from URL: {len(file_data)} bytes"
diff --git a/gmail/gmail_tools.py b/gmail/gmail_tools.py
index a762610..2c16d47 100644
--- a/gmail/gmail_tools.py
+++ b/gmail/gmail_tools.py
@@ -22,7 +22,7 @@ from email.utils import formataddr
 from pydantic import Field
 
 from auth.service_decorator import require_google_service
-from core.utils import handle_http_errors
+from core.utils import handle_http_errors, validate_file_path
 from core.server import server
 from auth.scopes import (
     GMAIL_SEND_SCOPE,
@@ -288,7 +288,7 @@ def _prepare_gmail_message(
             try:
                 # If path is provided, read and encode the file
                 if file_path:
-                    path_obj = Path(file_path)
+                    path_obj = validate_file_path(file_path)
                     if not path_obj.exists():
                         logger.error(f"File not found: {file_path}")
                         continue
