webhdfs.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. # https://hadoop.apache.org/docs/r1.0.4/webhdfs.html
  2. import logging
  3. import os
  4. import secrets
  5. import shutil
  6. import tempfile
  7. import uuid
  8. from contextlib import suppress
  9. from urllib.parse import quote
  10. import requests
  11. from ..spec import AbstractBufferedFile, AbstractFileSystem
  12. from ..utils import infer_storage_options, tokenize
  13. logger = logging.getLogger("webhdfs")
  14. class WebHDFS(AbstractFileSystem):
  15. """
  16. Interface to HDFS over HTTP using the WebHDFS API. Supports also HttpFS gateways.
  17. Four auth mechanisms are supported:
  18. insecure: no auth is done, and the user is assumed to be whoever they
  19. say they are (parameter ``user``), or a predefined value such as
  20. "dr.who" if not given
  21. spnego: when kerberos authentication is enabled, auth is negotiated by
  22. requests_kerberos https://github.com/requests/requests-kerberos .
  23. This establishes a session based on existing kinit login and/or
  24. specified principal/password; parameters are passed with ``kerb_kwargs``
  25. token: uses an existing Hadoop delegation token from another secured
  26. service. Indeed, this client can also generate such tokens when
  27. not insecure. Note that tokens expire, but can be renewed (by a
  28. previously specified user) and may allow for proxying.
  29. basic-auth: used when both parameter ``user`` and parameter ``password``
  30. are provided.
  31. """
  32. tempdir = str(tempfile.gettempdir())
  33. protocol = "webhdfs", "webHDFS"
  34. def __init__(
  35. self,
  36. host,
  37. port=50070,
  38. kerberos=False,
  39. token=None,
  40. user=None,
  41. password=None,
  42. proxy_to=None,
  43. kerb_kwargs=None,
  44. data_proxy=None,
  45. use_https=False,
  46. session_cert=None,
  47. session_verify=True,
  48. **kwargs,
  49. ):
  50. """
  51. Parameters
  52. ----------
  53. host: str
  54. Name-node address
  55. port: int
  56. Port for webHDFS
  57. kerberos: bool
  58. Whether to authenticate with kerberos for this connection
  59. token: str or None
  60. If given, use this token on every call to authenticate. A user
  61. and user-proxy may be encoded in the token and should not be also
  62. given
  63. user: str or None
  64. If given, assert the user name to connect with
  65. password: str or None
  66. If given, assert the password to use for basic auth. If password
  67. is provided, user must be provided also
  68. proxy_to: str or None
  69. If given, the user has the authority to proxy, and this value is
  70. the user in who's name actions are taken
  71. kerb_kwargs: dict
  72. Any extra arguments for HTTPKerberosAuth, see
  73. `<https://github.com/requests/requests-kerberos/blob/master/requests_kerberos/kerberos_.py>`_
  74. data_proxy: dict, callable or None
  75. If given, map data-node addresses. This can be necessary if the
  76. HDFS cluster is behind a proxy, running on Docker or otherwise has
  77. a mismatch between the host-names given by the name-node and the
  78. address by which to refer to them from the client. If a dict,
  79. maps host names ``host->data_proxy[host]``; if a callable, full
  80. URLs are passed, and function must conform to
  81. ``url->data_proxy(url)``.
  82. use_https: bool
  83. Whether to connect to the Name-node using HTTPS instead of HTTP
  84. session_cert: str or Tuple[str, str] or None
  85. Path to a certificate file, or tuple of (cert, key) files to use
  86. for the requests.Session
  87. session_verify: str, bool or None
  88. Path to a certificate file to use for verifying the requests.Session.
  89. kwargs
  90. """
  91. if self._cached:
  92. return
  93. super().__init__(**kwargs)
  94. self.url = f"{'https' if use_https else 'http'}://{host}:{port}/webhdfs/v1"
  95. self.kerb = kerberos
  96. self.kerb_kwargs = kerb_kwargs or {}
  97. self.pars = {}
  98. self.proxy = data_proxy or {}
  99. if token is not None:
  100. if user is not None or proxy_to is not None:
  101. raise ValueError(
  102. "If passing a delegation token, must not set "
  103. "user or proxy_to, as these are encoded in the"
  104. " token"
  105. )
  106. self.pars["delegation"] = token
  107. self.user = user
  108. self.password = password
  109. if password is not None:
  110. if user is None:
  111. raise ValueError(
  112. "If passing a password, the user must also be"
  113. "set in order to set up the basic-auth"
  114. )
  115. else:
  116. if user is not None:
  117. self.pars["user.name"] = user
  118. if proxy_to is not None:
  119. self.pars["doas"] = proxy_to
  120. if kerberos and user is not None:
  121. raise ValueError(
  122. "If using Kerberos auth, do not specify the "
  123. "user, this is handled by kinit."
  124. )
  125. self.session_cert = session_cert
  126. self.session_verify = session_verify
  127. self._connect()
  128. self._fsid = f"webhdfs_{tokenize(host, port)}"
  129. @property
  130. def fsid(self):
  131. return self._fsid
  132. def _connect(self):
  133. self.session = requests.Session()
  134. if self.session_cert:
  135. self.session.cert = self.session_cert
  136. self.session.verify = self.session_verify
  137. if self.kerb:
  138. from requests_kerberos import HTTPKerberosAuth
  139. self.session.auth = HTTPKerberosAuth(**self.kerb_kwargs)
  140. if self.user is not None and self.password is not None:
  141. from requests.auth import HTTPBasicAuth
  142. self.session.auth = HTTPBasicAuth(self.user, self.password)
  143. def _call(self, op, method="get", path=None, data=None, redirect=True, **kwargs):
  144. path = self._strip_protocol(path) if path is not None else ""
  145. url = self._apply_proxy(self.url + quote(path, safe="/="))
  146. args = kwargs.copy()
  147. args.update(self.pars)
  148. args["op"] = op.upper()
  149. logger.debug("sending %s with %s", url, method)
  150. out = self.session.request(
  151. method=method.upper(),
  152. url=url,
  153. params=args,
  154. data=data,
  155. allow_redirects=redirect,
  156. )
  157. if out.status_code in [400, 401, 403, 404, 500]:
  158. try:
  159. err = out.json()
  160. msg = err["RemoteException"]["message"]
  161. exp = err["RemoteException"]["exception"]
  162. except (ValueError, KeyError):
  163. pass
  164. else:
  165. if exp in ["IllegalArgumentException", "UnsupportedOperationException"]:
  166. raise ValueError(msg)
  167. elif exp in ["SecurityException", "AccessControlException"]:
  168. raise PermissionError(msg)
  169. elif exp in ["FileNotFoundException"]:
  170. raise FileNotFoundError(msg)
  171. else:
  172. raise RuntimeError(msg)
  173. out.raise_for_status()
  174. return out
  175. def _open(
  176. self,
  177. path,
  178. mode="rb",
  179. block_size=None,
  180. autocommit=True,
  181. replication=None,
  182. permissions=None,
  183. **kwargs,
  184. ):
  185. """
  186. Parameters
  187. ----------
  188. path: str
  189. File location
  190. mode: str
  191. 'rb', 'wb', etc.
  192. block_size: int
  193. Client buffer size for read-ahead or write buffer
  194. autocommit: bool
  195. If False, writes to temporary file that only gets put in final
  196. location upon commit
  197. replication: int
  198. Number of copies of file on the cluster, write mode only
  199. permissions: str or int
  200. posix permissions, write mode only
  201. kwargs
  202. Returns
  203. -------
  204. WebHDFile instance
  205. """
  206. block_size = block_size or self.blocksize
  207. return WebHDFile(
  208. self,
  209. path,
  210. mode=mode,
  211. block_size=block_size,
  212. tempdir=self.tempdir,
  213. autocommit=autocommit,
  214. replication=replication,
  215. permissions=permissions,
  216. )
  217. @staticmethod
  218. def _process_info(info):
  219. info["type"] = info["type"].lower()
  220. info["size"] = info["length"]
  221. return info
  222. @classmethod
  223. def _strip_protocol(cls, path):
  224. return infer_storage_options(path)["path"]
  225. @staticmethod
  226. def _get_kwargs_from_urls(urlpath):
  227. out = infer_storage_options(urlpath)
  228. out.pop("path", None)
  229. out.pop("protocol", None)
  230. if "username" in out:
  231. out["user"] = out.pop("username")
  232. return out
  233. def info(self, path):
  234. out = self._call("GETFILESTATUS", path=path)
  235. info = out.json()["FileStatus"]
  236. info["name"] = path
  237. return self._process_info(info)
  238. def ls(self, path, detail=False):
  239. out = self._call("LISTSTATUS", path=path)
  240. infos = out.json()["FileStatuses"]["FileStatus"]
  241. for info in infos:
  242. self._process_info(info)
  243. info["name"] = path.rstrip("/") + "/" + info["pathSuffix"]
  244. if detail:
  245. return sorted(infos, key=lambda i: i["name"])
  246. else:
  247. return sorted(info["name"] for info in infos)
  248. def content_summary(self, path):
  249. """Total numbers of files, directories and bytes under path"""
  250. out = self._call("GETCONTENTSUMMARY", path=path)
  251. return out.json()["ContentSummary"]
  252. def ukey(self, path):
  253. """Checksum info of file, giving method and result"""
  254. out = self._call("GETFILECHECKSUM", path=path, redirect=False)
  255. if "Location" in out.headers:
  256. location = self._apply_proxy(out.headers["Location"])
  257. out2 = self.session.get(location)
  258. out2.raise_for_status()
  259. return out2.json()["FileChecksum"]
  260. else:
  261. out.raise_for_status()
  262. return out.json()["FileChecksum"]
  263. def home_directory(self):
  264. """Get user's home directory"""
  265. out = self._call("GETHOMEDIRECTORY")
  266. return out.json()["Path"]
  267. def get_delegation_token(self, renewer=None):
  268. """Retrieve token which can give the same authority to other uses
  269. Parameters
  270. ----------
  271. renewer: str or None
  272. User who may use this token; if None, will be current user
  273. """
  274. if renewer:
  275. out = self._call("GETDELEGATIONTOKEN", renewer=renewer)
  276. else:
  277. out = self._call("GETDELEGATIONTOKEN")
  278. t = out.json()["Token"]
  279. if t is None:
  280. raise ValueError("No token available for this user/security context")
  281. return t["urlString"]
  282. def renew_delegation_token(self, token):
  283. """Make token live longer. Returns new expiry time"""
  284. out = self._call("RENEWDELEGATIONTOKEN", method="put", token=token)
  285. return out.json()["long"]
  286. def cancel_delegation_token(self, token):
  287. """Stop the token from being useful"""
  288. self._call("CANCELDELEGATIONTOKEN", method="put", token=token)
  289. def chmod(self, path, mod):
  290. """Set the permission at path
  291. Parameters
  292. ----------
  293. path: str
  294. location to set (file or directory)
  295. mod: str or int
  296. posix epresentation or permission, give as oct string, e.g, '777'
  297. or 0o777
  298. """
  299. self._call("SETPERMISSION", method="put", path=path, permission=mod)
  300. def chown(self, path, owner=None, group=None):
  301. """Change owning user and/or group"""
  302. kwargs = {}
  303. if owner is not None:
  304. kwargs["owner"] = owner
  305. if group is not None:
  306. kwargs["group"] = group
  307. self._call("SETOWNER", method="put", path=path, **kwargs)
  308. def set_replication(self, path, replication):
  309. """
  310. Set file replication factor
  311. Parameters
  312. ----------
  313. path: str
  314. File location (not for directories)
  315. replication: int
  316. Number of copies of file on the cluster. Should be smaller than
  317. number of data nodes; normally 3 on most systems.
  318. """
  319. self._call("SETREPLICATION", path=path, method="put", replication=replication)
  320. def mkdir(self, path, **kwargs):
  321. self._call("MKDIRS", method="put", path=path)
  322. def makedirs(self, path, exist_ok=False):
  323. if exist_ok is False and self.exists(path):
  324. raise FileExistsError(path)
  325. self.mkdir(path)
  326. def mv(self, path1, path2, **kwargs):
  327. self._call("RENAME", method="put", path=path1, destination=path2)
  328. def rm(self, path, recursive=False, **kwargs):
  329. self._call(
  330. "DELETE",
  331. method="delete",
  332. path=path,
  333. recursive="true" if recursive else "false",
  334. )
  335. def rm_file(self, path, **kwargs):
  336. self.rm(path)
  337. def cp_file(self, lpath, rpath, **kwargs):
  338. with self.open(lpath) as lstream:
  339. tmp_fname = "/".join([self._parent(rpath), f".tmp.{secrets.token_hex(16)}"])
  340. # Perform an atomic copy (stream to a temporary file and
  341. # move it to the actual destination).
  342. try:
  343. with self.open(tmp_fname, "wb") as rstream:
  344. shutil.copyfileobj(lstream, rstream)
  345. self.mv(tmp_fname, rpath)
  346. except BaseException:
  347. with suppress(FileNotFoundError):
  348. self.rm(tmp_fname)
  349. raise
  350. def _apply_proxy(self, location):
  351. if self.proxy and callable(self.proxy):
  352. location = self.proxy(location)
  353. elif self.proxy:
  354. # as a dict
  355. for k, v in self.proxy.items():
  356. location = location.replace(k, v, 1)
  357. return location
  358. class WebHDFile(AbstractBufferedFile):
  359. """A file living in HDFS over webHDFS"""
  360. def __init__(self, fs, path, **kwargs):
  361. super().__init__(fs, path, **kwargs)
  362. kwargs = kwargs.copy()
  363. if kwargs.get("permissions", None) is None:
  364. kwargs.pop("permissions", None)
  365. if kwargs.get("replication", None) is None:
  366. kwargs.pop("replication", None)
  367. self.permissions = kwargs.pop("permissions", 511)
  368. tempdir = kwargs.pop("tempdir")
  369. if kwargs.pop("autocommit", False) is False:
  370. self.target = self.path
  371. self.path = os.path.join(tempdir, str(uuid.uuid4()))
  372. def _upload_chunk(self, final=False):
  373. """Write one part of a multi-block file upload
  374. Parameters
  375. ==========
  376. final: bool
  377. This is the last block, so should complete file, if
  378. self.autocommit is True.
  379. """
  380. out = self.fs.session.post(
  381. self.location,
  382. data=self.buffer.getvalue(),
  383. headers={"content-type": "application/octet-stream"},
  384. )
  385. out.raise_for_status()
  386. return True
  387. def _initiate_upload(self):
  388. """Create remote file/upload"""
  389. kwargs = self.kwargs.copy()
  390. if "a" in self.mode:
  391. op, method = "APPEND", "POST"
  392. else:
  393. op, method = "CREATE", "PUT"
  394. kwargs["overwrite"] = "true"
  395. out = self.fs._call(op, method, self.path, redirect=False, **kwargs)
  396. location = self.fs._apply_proxy(out.headers["Location"])
  397. if "w" in self.mode:
  398. # create empty file to append to
  399. out2 = self.fs.session.put(
  400. location, headers={"content-type": "application/octet-stream"}
  401. )
  402. out2.raise_for_status()
  403. # after creating empty file, change location to append to
  404. out2 = self.fs._call("APPEND", "POST", self.path, redirect=False, **kwargs)
  405. self.location = self.fs._apply_proxy(out2.headers["Location"])
  406. def _fetch_range(self, start, end):
  407. start = max(start, 0)
  408. end = min(self.size, end)
  409. if start >= end or start >= self.size:
  410. return b""
  411. out = self.fs._call(
  412. "OPEN", path=self.path, offset=start, length=end - start, redirect=False
  413. )
  414. out.raise_for_status()
  415. if "Location" in out.headers:
  416. location = out.headers["Location"]
  417. out2 = self.fs.session.get(self.fs._apply_proxy(location))
  418. return out2.content
  419. else:
  420. return out.content
  421. def commit(self):
  422. self.fs.mv(self.path, self.target)
  423. def discard(self):
  424. self.fs.rm(self.path)