filesystem.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import os
  2. import os.path
  3. import shutil
  4. import stat
  5. from contextlib import contextmanager
  6. from tempfile import NamedTemporaryFile
  7. # NOTE: retrying is not annotated in typeshed as on 2017-07-17, which is
  8. # why we ignore the type on this import.
  9. from pip._vendor.retrying import retry # type: ignore
  10. from pip._vendor.six import PY2
  11. from pip._internal.utils.compat import get_path_uid
  12. from pip._internal.utils.misc import cast
  13. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  14. if MYPY_CHECK_RUNNING:
  15. from typing import BinaryIO, Iterator
  16. class NamedTemporaryFileResult(BinaryIO):
  17. @property
  18. def file(self):
  19. # type: () -> BinaryIO
  20. pass
  21. def check_path_owner(path):
  22. # type: (str) -> bool
  23. # If we don't have a way to check the effective uid of this process, then
  24. # we'll just assume that we own the directory.
  25. if not hasattr(os, "geteuid"):
  26. return True
  27. previous = None
  28. while path != previous:
  29. if os.path.lexists(path):
  30. # Check if path is writable by current user.
  31. if os.geteuid() == 0:
  32. # Special handling for root user in order to handle properly
  33. # cases where users use sudo without -H flag.
  34. try:
  35. path_uid = get_path_uid(path)
  36. except OSError:
  37. return False
  38. return path_uid == 0
  39. else:
  40. return os.access(path, os.W_OK)
  41. else:
  42. previous, path = path, os.path.dirname(path)
  43. return False # assume we don't own the path
  44. def copy2_fixed(src, dest):
  45. # type: (str, str) -> None
  46. """Wrap shutil.copy2() but map errors copying socket files to
  47. SpecialFileError as expected.
  48. See also https://bugs.python.org/issue37700.
  49. """
  50. try:
  51. shutil.copy2(src, dest)
  52. except (OSError, IOError):
  53. for f in [src, dest]:
  54. try:
  55. is_socket_file = is_socket(f)
  56. except OSError:
  57. # An error has already occurred. Another error here is not
  58. # a problem and we can ignore it.
  59. pass
  60. else:
  61. if is_socket_file:
  62. raise shutil.SpecialFileError("`%s` is a socket" % f)
  63. raise
  64. def is_socket(path):
  65. # type: (str) -> bool
  66. return stat.S_ISSOCK(os.lstat(path).st_mode)
  67. @contextmanager
  68. def adjacent_tmp_file(path):
  69. # type: (str) -> Iterator[NamedTemporaryFileResult]
  70. """Given a path to a file, open a temp file next to it securely and ensure
  71. it is written to disk after the context reaches its end.
  72. """
  73. with NamedTemporaryFile(
  74. delete=False,
  75. dir=os.path.dirname(path),
  76. prefix=os.path.basename(path),
  77. suffix='.tmp',
  78. ) as f:
  79. result = cast('NamedTemporaryFileResult', f)
  80. try:
  81. yield result
  82. finally:
  83. result.file.flush()
  84. os.fsync(result.file.fileno())
  85. _replace_retry = retry(stop_max_delay=1000, wait_fixed=250)
  86. if PY2:
  87. @_replace_retry
  88. def replace(src, dest):
  89. # type: (str, str) -> None
  90. try:
  91. os.rename(src, dest)
  92. except OSError:
  93. os.remove(dest)
  94. os.rename(src, dest)
  95. else:
  96. replace = _replace_retry(os.replace)