filesystem.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. from pathlib import Path
  20. from typing import Any
  21. from airflow.hooks.base import BaseHook
  22. class FSHook(BaseHook):
  23. """
  24. Allows for interaction with an file server.
  25. Connection should have a name and a path specified under extra:
  26. example:
  27. Connection Id: fs_test
  28. Connection Type: File (path)
  29. Host, Schema, Login, Password, Port: empty
  30. Extra: {"path": "/tmp"}
  31. """
  32. conn_name_attr = "fs_conn_id"
  33. default_conn_name = "fs_default"
  34. conn_type = "fs"
  35. hook_name = "File (path)"
  36. @classmethod
  37. def get_connection_form_widgets(cls) -> dict[str, Any]:
  38. """Return connection widgets to add to connection form."""
  39. from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
  40. from flask_babel import lazy_gettext
  41. from wtforms import StringField
  42. return {"path": StringField(lazy_gettext("Path"), widget=BS3TextFieldWidget())}
  43. @classmethod
  44. def get_ui_field_behaviour(cls) -> dict[str, Any]:
  45. """Return custom field behaviour."""
  46. return {
  47. "hidden_fields": ["host", "schema", "port", "login", "password", "extra"],
  48. "relabeling": {},
  49. "placeholders": {},
  50. }
  51. def __init__(self, fs_conn_id: str = default_conn_name, **kwargs):
  52. super().__init__(**kwargs)
  53. conn = self.get_connection(fs_conn_id)
  54. self.basepath = conn.extra_dejson.get("path", "")
  55. self.conn = conn
  56. def get_conn(self) -> None:
  57. pass
  58. def get_path(self) -> str:
  59. """
  60. Get the path to the filesystem location.
  61. :return: the path.
  62. """
  63. return self.basepath
  64. def test_connection(self):
  65. """Test File connection."""
  66. try:
  67. p = self.get_path()
  68. if not p:
  69. return False, "File Path is undefined."
  70. if not Path(p).exists():
  71. return False, f"Path {p} does not exist."
  72. return True, f"Path {p} is existing."
  73. except Exception as e:
  74. return False, str(e)