payload_streamer.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. """
  2. Payload implementation for coroutines as data provider.
  3. As a simple case, you can upload data from file::
  4. @aiohttp.streamer
  5. async def file_sender(writer, file_name=None):
  6. with open(file_name, 'rb') as f:
  7. chunk = f.read(2**16)
  8. while chunk:
  9. await writer.write(chunk)
  10. chunk = f.read(2**16)
  11. Then you can use `file_sender` like this:
  12. async with session.post('http://httpbin.org/post',
  13. data=file_sender(file_name='huge_file')) as resp:
  14. print(await resp.text())
  15. ..note:: Coroutine must accept `writer` as first argument
  16. """
  17. import types
  18. import warnings
  19. from typing import Any, Awaitable, Callable, Dict, Tuple
  20. from .abc import AbstractStreamWriter
  21. from .payload import Payload, payload_type
  22. __all__ = ("streamer",)
  23. class _stream_wrapper:
  24. def __init__(
  25. self,
  26. coro: Callable[..., Awaitable[None]],
  27. args: Tuple[Any, ...],
  28. kwargs: Dict[str, Any],
  29. ) -> None:
  30. self.coro = types.coroutine(coro)
  31. self.args = args
  32. self.kwargs = kwargs
  33. async def __call__(self, writer: AbstractStreamWriter) -> None:
  34. await self.coro(writer, *self.args, **self.kwargs)
  35. class streamer:
  36. def __init__(self, coro: Callable[..., Awaitable[None]]) -> None:
  37. warnings.warn(
  38. "@streamer is deprecated, use async generators instead",
  39. DeprecationWarning,
  40. stacklevel=2,
  41. )
  42. self.coro = coro
  43. def __call__(self, *args: Any, **kwargs: Any) -> _stream_wrapper:
  44. return _stream_wrapper(self.coro, args, kwargs)
  45. @payload_type(_stream_wrapper)
  46. class StreamWrapperPayload(Payload):
  47. async def write(self, writer: AbstractStreamWriter) -> None:
  48. await self._value(writer)
  49. def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
  50. raise TypeError("Unable to decode.")
  51. @payload_type(streamer)
  52. class StreamPayload(StreamWrapperPayload):
  53. def __init__(self, value: Any, *args: Any, **kwargs: Any) -> None:
  54. super().__init__(value(), *args, **kwargs)
  55. async def write(self, writer: AbstractStreamWriter) -> None:
  56. await self._value(writer)