123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- import base64
- import io
- import re
- import requests
- import fsspec
- class JupyterFileSystem(fsspec.AbstractFileSystem):
- """View of the files as seen by a Jupyter server (notebook or lab)"""
- protocol = ("jupyter", "jlab")
- def __init__(self, url, tok=None, **kwargs):
- """
- Parameters
- ----------
- url : str
- Base URL of the server, like "http://127.0.0.1:8888". May include
- token in the string, which is given by the process when starting up
- tok : str
- If the token is obtained separately, can be given here
- kwargs
- """
- if "?" in url:
- if tok is None:
- try:
- tok = re.findall("token=([a-z0-9]+)", url)[0]
- except IndexError as e:
- raise ValueError("Could not determine token") from e
- url = url.split("?", 1)[0]
- self.url = url.rstrip("/") + "/api/contents"
- self.session = requests.Session()
- if tok:
- self.session.headers["Authorization"] = f"token {tok}"
- super().__init__(**kwargs)
- def ls(self, path, detail=True, **kwargs):
- path = self._strip_protocol(path)
- r = self.session.get(f"{self.url}/{path}")
- if r.status_code == 404:
- return FileNotFoundError(path)
- r.raise_for_status()
- out = r.json()
- if out["type"] == "directory":
- out = out["content"]
- else:
- out = [out]
- for o in out:
- o["name"] = o.pop("path")
- o.pop("content")
- if o["type"] == "notebook":
- o["type"] = "file"
- if detail:
- return out
- return [o["name"] for o in out]
- def cat_file(self, path, start=None, end=None, **kwargs):
- path = self._strip_protocol(path)
- r = self.session.get(f"{self.url}/{path}")
- if r.status_code == 404:
- return FileNotFoundError(path)
- r.raise_for_status()
- out = r.json()
- if out["format"] == "text":
- # data should be binary
- b = out["content"].encode()
- else:
- b = base64.b64decode(out["content"])
- return b[start:end]
- def pipe_file(self, path, value, **_):
- path = self._strip_protocol(path)
- json = {
- "name": path.rsplit("/", 1)[-1],
- "path": path,
- "size": len(value),
- "content": base64.b64encode(value).decode(),
- "format": "base64",
- "type": "file",
- }
- self.session.put(f"{self.url}/{path}", json=json)
- def mkdir(self, path, create_parents=True, **kwargs):
- path = self._strip_protocol(path)
- if create_parents and "/" in path:
- self.mkdir(path.rsplit("/", 1)[0], True)
- json = {
- "name": path.rsplit("/", 1)[-1],
- "path": path,
- "size": None,
- "content": None,
- "type": "directory",
- }
- self.session.put(f"{self.url}/{path}", json=json)
- def _rm(self, path):
- path = self._strip_protocol(path)
- self.session.delete(f"{self.url}/{path}")
- def _open(self, path, mode="rb", **kwargs):
- path = self._strip_protocol(path)
- if mode == "rb":
- data = self.cat_file(path)
- return io.BytesIO(data)
- else:
- return SimpleFileWriter(self, path, mode="wb")
- class SimpleFileWriter(fsspec.spec.AbstractBufferedFile):
- def _upload_chunk(self, final=False):
- """Never uploads a chunk until file is done
- Not suitable for large files
- """
- if final is False:
- return False
- self.buffer.seek(0)
- data = self.buffer.read()
- self.fs.pipe_file(self.path, data)
|