file.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. from __future__ import annotations
  2. from collections.abc import Callable, Mapping
  3. from io import SEEK_SET, UnsupportedOperation
  4. from os import PathLike
  5. from pathlib import Path
  6. from typing import Any, BinaryIO, cast
  7. from .. import (
  8. BrokenResourceError,
  9. ClosedResourceError,
  10. EndOfStream,
  11. TypedAttributeSet,
  12. to_thread,
  13. typed_attribute,
  14. )
  15. from ..abc import ByteReceiveStream, ByteSendStream
  16. class FileStreamAttribute(TypedAttributeSet):
  17. #: the open file descriptor
  18. file: BinaryIO = typed_attribute()
  19. #: the path of the file on the file system, if available (file must be a real file)
  20. path: Path = typed_attribute()
  21. #: the file number, if available (file must be a real file or a TTY)
  22. fileno: int = typed_attribute()
  23. class _BaseFileStream:
  24. def __init__(self, file: BinaryIO):
  25. self._file = file
  26. async def aclose(self) -> None:
  27. await to_thread.run_sync(self._file.close)
  28. @property
  29. def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
  30. attributes: dict[Any, Callable[[], Any]] = {
  31. FileStreamAttribute.file: lambda: self._file,
  32. }
  33. if hasattr(self._file, "name"):
  34. attributes[FileStreamAttribute.path] = lambda: Path(self._file.name)
  35. try:
  36. self._file.fileno()
  37. except UnsupportedOperation:
  38. pass
  39. else:
  40. attributes[FileStreamAttribute.fileno] = lambda: self._file.fileno()
  41. return attributes
  42. class FileReadStream(_BaseFileStream, ByteReceiveStream):
  43. """
  44. A byte stream that reads from a file in the file system.
  45. :param file: a file that has been opened for reading in binary mode
  46. .. versionadded:: 3.0
  47. """
  48. @classmethod
  49. async def from_path(cls, path: str | PathLike[str]) -> FileReadStream:
  50. """
  51. Create a file read stream by opening the given file.
  52. :param path: path of the file to read from
  53. """
  54. file = await to_thread.run_sync(Path(path).open, "rb")
  55. return cls(cast(BinaryIO, file))
  56. async def receive(self, max_bytes: int = 65536) -> bytes:
  57. try:
  58. data = await to_thread.run_sync(self._file.read, max_bytes)
  59. except ValueError:
  60. raise ClosedResourceError from None
  61. except OSError as exc:
  62. raise BrokenResourceError from exc
  63. if data:
  64. return data
  65. else:
  66. raise EndOfStream
  67. async def seek(self, position: int, whence: int = SEEK_SET) -> int:
  68. """
  69. Seek the file to the given position.
  70. .. seealso:: :meth:`io.IOBase.seek`
  71. .. note:: Not all file descriptors are seekable.
  72. :param position: position to seek the file to
  73. :param whence: controls how ``position`` is interpreted
  74. :return: the new absolute position
  75. :raises OSError: if the file is not seekable
  76. """
  77. return await to_thread.run_sync(self._file.seek, position, whence)
  78. async def tell(self) -> int:
  79. """
  80. Return the current stream position.
  81. .. note:: Not all file descriptors are seekable.
  82. :return: the current absolute position
  83. :raises OSError: if the file is not seekable
  84. """
  85. return await to_thread.run_sync(self._file.tell)
  86. class FileWriteStream(_BaseFileStream, ByteSendStream):
  87. """
  88. A byte stream that writes to a file in the file system.
  89. :param file: a file that has been opened for writing in binary mode
  90. .. versionadded:: 3.0
  91. """
  92. @classmethod
  93. async def from_path(
  94. cls, path: str | PathLike[str], append: bool = False
  95. ) -> FileWriteStream:
  96. """
  97. Create a file write stream by opening the given file for writing.
  98. :param path: path of the file to write to
  99. :param append: if ``True``, open the file for appending; if ``False``, any
  100. existing file at the given path will be truncated
  101. """
  102. mode = "ab" if append else "wb"
  103. file = await to_thread.run_sync(Path(path).open, mode)
  104. return cls(cast(BinaryIO, file))
  105. async def send(self, item: bytes) -> None:
  106. try:
  107. await to_thread.run_sync(self._file.write, item)
  108. except ValueError:
  109. raise ClosedResourceError from None
  110. except OSError as exc:
  111. raise BrokenResourceError from exc