jupyter.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import base64
  2. import io
  3. import re
  4. import requests
  5. import fsspec
  6. class JupyterFileSystem(fsspec.AbstractFileSystem):
  7. """View of the files as seen by a Jupyter server (notebook or lab)"""
  8. protocol = ("jupyter", "jlab")
  9. def __init__(self, url, tok=None, **kwargs):
  10. """
  11. Parameters
  12. ----------
  13. url : str
  14. Base URL of the server, like "http://127.0.0.1:8888". May include
  15. token in the string, which is given by the process when starting up
  16. tok : str
  17. If the token is obtained separately, can be given here
  18. kwargs
  19. """
  20. if "?" in url:
  21. if tok is None:
  22. try:
  23. tok = re.findall("token=([a-z0-9]+)", url)[0]
  24. except IndexError as e:
  25. raise ValueError("Could not determine token") from e
  26. url = url.split("?", 1)[0]
  27. self.url = url.rstrip("/") + "/api/contents"
  28. self.session = requests.Session()
  29. if tok:
  30. self.session.headers["Authorization"] = f"token {tok}"
  31. super().__init__(**kwargs)
  32. def ls(self, path, detail=True, **kwargs):
  33. path = self._strip_protocol(path)
  34. r = self.session.get(f"{self.url}/{path}")
  35. if r.status_code == 404:
  36. return FileNotFoundError(path)
  37. r.raise_for_status()
  38. out = r.json()
  39. if out["type"] == "directory":
  40. out = out["content"]
  41. else:
  42. out = [out]
  43. for o in out:
  44. o["name"] = o.pop("path")
  45. o.pop("content")
  46. if o["type"] == "notebook":
  47. o["type"] = "file"
  48. if detail:
  49. return out
  50. return [o["name"] for o in out]
  51. def cat_file(self, path, start=None, end=None, **kwargs):
  52. path = self._strip_protocol(path)
  53. r = self.session.get(f"{self.url}/{path}")
  54. if r.status_code == 404:
  55. return FileNotFoundError(path)
  56. r.raise_for_status()
  57. out = r.json()
  58. if out["format"] == "text":
  59. # data should be binary
  60. b = out["content"].encode()
  61. else:
  62. b = base64.b64decode(out["content"])
  63. return b[start:end]
  64. def pipe_file(self, path, value, **_):
  65. path = self._strip_protocol(path)
  66. json = {
  67. "name": path.rsplit("/", 1)[-1],
  68. "path": path,
  69. "size": len(value),
  70. "content": base64.b64encode(value).decode(),
  71. "format": "base64",
  72. "type": "file",
  73. }
  74. self.session.put(f"{self.url}/{path}", json=json)
  75. def mkdir(self, path, create_parents=True, **kwargs):
  76. path = self._strip_protocol(path)
  77. if create_parents and "/" in path:
  78. self.mkdir(path.rsplit("/", 1)[0], True)
  79. json = {
  80. "name": path.rsplit("/", 1)[-1],
  81. "path": path,
  82. "size": None,
  83. "content": None,
  84. "type": "directory",
  85. }
  86. self.session.put(f"{self.url}/{path}", json=json)
  87. def _rm(self, path):
  88. path = self._strip_protocol(path)
  89. self.session.delete(f"{self.url}/{path}")
  90. def _open(self, path, mode="rb", **kwargs):
  91. path = self._strip_protocol(path)
  92. if mode == "rb":
  93. data = self.cat_file(path)
  94. return io.BytesIO(data)
  95. else:
  96. return SimpleFileWriter(self, path, mode="wb")
  97. class SimpleFileWriter(fsspec.spec.AbstractBufferedFile):
  98. def _upload_chunk(self, final=False):
  99. """Never uploads a chunk until file is done
  100. Not suitable for large files
  101. """
  102. if final is False:
  103. return False
  104. self.buffer.seek(0)
  105. data = self.buffer.read()
  106. self.fs.pipe_file(self.path, data)