transaction.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. from collections import deque
  2. class Transaction:
  3. """Filesystem transaction write context
  4. Gathers files for deferred commit or discard, so that several write
  5. operations can be finalized semi-atomically. This works by having this
  6. instance as the ``.transaction`` attribute of the given filesystem
  7. """
  8. def __init__(self, fs, **kwargs):
  9. """
  10. Parameters
  11. ----------
  12. fs: FileSystem instance
  13. """
  14. self.fs = fs
  15. self.files = deque()
  16. def __enter__(self):
  17. self.start()
  18. return self
  19. def __exit__(self, exc_type, exc_val, exc_tb):
  20. """End transaction and commit, if exit is not due to exception"""
  21. # only commit if there was no exception
  22. self.complete(commit=exc_type is None)
  23. if self.fs:
  24. self.fs._intrans = False
  25. self.fs._transaction = None
  26. self.fs = None
  27. def start(self):
  28. """Start a transaction on this FileSystem"""
  29. self.files = deque() # clean up after previous failed completions
  30. self.fs._intrans = True
  31. def complete(self, commit=True):
  32. """Finish transaction: commit or discard all deferred files"""
  33. while self.files:
  34. f = self.files.popleft()
  35. if commit:
  36. f.commit()
  37. else:
  38. f.discard()
  39. self.fs._intrans = False
  40. self.fs._transaction = None
  41. self.fs = None
  42. class FileActor:
  43. def __init__(self):
  44. self.files = []
  45. def commit(self):
  46. for f in self.files:
  47. f.commit()
  48. self.files.clear()
  49. def discard(self):
  50. for f in self.files:
  51. f.discard()
  52. self.files.clear()
  53. def append(self, f):
  54. self.files.append(f)
  55. class DaskTransaction(Transaction):
  56. def __init__(self, fs):
  57. """
  58. Parameters
  59. ----------
  60. fs: FileSystem instance
  61. """
  62. import distributed
  63. super().__init__(fs)
  64. client = distributed.default_client()
  65. self.files = client.submit(FileActor, actor=True).result()
  66. def complete(self, commit=True):
  67. """Finish transaction: commit or discard all deferred files"""
  68. if commit:
  69. self.files.commit().result()
  70. else:
  71. self.files.discard().result()
  72. self.fs._intrans = False
  73. self.fs = None