hyb
2025-10-24 43c4449e6c9231446895ad26d169825ca7a65c9a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# SPDX-FileCopyrightText: 2015 Eric Larson
#
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
 
import hashlib
import os
import tempfile
from textwrap import dedent
from typing import IO, TYPE_CHECKING
from pathlib import Path
 
from pip._vendor.cachecontrol.cache import BaseCache, SeparateBodyBaseCache
from pip._vendor.cachecontrol.controller import CacheController
 
if TYPE_CHECKING:
    from datetime import datetime
 
    from filelock import BaseFileLock
 
 
class _FileCacheMixin:
    """Shared implementation for both FileCache variants."""
 
    def __init__(
        self,
        directory: str | Path,
        forever: bool = False,
        filemode: int = 0o0600,
        dirmode: int = 0o0700,
        lock_class: type[BaseFileLock] | None = None,
    ) -> None:
        try:
            if lock_class is None:
                from filelock import FileLock
 
                lock_class = FileLock
        except ImportError:
            notice = dedent(
                """
            NOTE: In order to use the FileCache you must have
            filelock installed. You can install it via pip:
              pip install cachecontrol[filecache]
            """
            )
            raise ImportError(notice)
 
        self.directory = directory
        self.forever = forever
        self.filemode = filemode
        self.dirmode = dirmode
        self.lock_class = lock_class
 
    @staticmethod
    def encode(x: str) -> str:
        return hashlib.sha224(x.encode()).hexdigest()
 
    def _fn(self, name: str) -> str:
        # NOTE: This method should not change as some may depend on it.
        #       See: https://github.com/ionrock/cachecontrol/issues/63
        hashed = self.encode(name)
        parts = list(hashed[:5]) + [hashed]
        return os.path.join(self.directory, *parts)
 
    def get(self, key: str) -> bytes | None:
        name = self._fn(key)
        try:
            with open(name, "rb") as fh:
                return fh.read()
 
        except FileNotFoundError:
            return None
 
    def set(
        self, key: str, value: bytes, expires: int | datetime | None = None
    ) -> None:
        name = self._fn(key)
        self._write(name, value)
 
    def _write(self, path: str, data: bytes) -> None:
        """
        Safely write the data to the given path.
        """
        # Make sure the directory exists
        dirname = os.path.dirname(path)
        os.makedirs(dirname, self.dirmode, exist_ok=True)
 
        with self.lock_class(path + ".lock"):
            # Write our actual file
            (fd, name) = tempfile.mkstemp(dir=dirname)
            try:
                os.write(fd, data)
            finally:
                os.close(fd)
            os.chmod(name, self.filemode)
            os.replace(name, path)
 
    def _delete(self, key: str, suffix: str) -> None:
        name = self._fn(key) + suffix
        if not self.forever:
            try:
                os.remove(name)
            except FileNotFoundError:
                pass
 
 
class FileCache(_FileCacheMixin, BaseCache):
    """
    Traditional FileCache: body is stored in memory, so not suitable for large
    downloads.
    """
 
    def delete(self, key: str) -> None:
        self._delete(key, "")
 
 
class SeparateBodyFileCache(_FileCacheMixin, SeparateBodyBaseCache):
    """
    Memory-efficient FileCache: body is stored in a separate file, reducing
    peak memory usage.
    """
 
    def get_body(self, key: str) -> IO[bytes] | None:
        name = self._fn(key) + ".body"
        try:
            return open(name, "rb")
        except FileNotFoundError:
            return None
 
    def set_body(self, key: str, body: bytes) -> None:
        name = self._fn(key) + ".body"
        self._write(name, body)
 
    def delete(self, key: str) -> None:
        self._delete(key, "")
        self._delete(key, ".body")
 
 
def url_to_file_path(url: str, filecache: FileCache) -> str:
    """Return the file cache path based on the URL.
 
    This does not ensure the file exists!
    """
    key = CacheController.cache_url(url)
    return filecache._fn(key)