12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- import os
- import shutil
- import subprocess
- import sys
- import time
- import pytest
- import fsspec
- from fsspec.implementations.cached import CachingFileSystem
- @pytest.fixture()
- def m():
- """
- Fixture providing a memory filesystem.
- """
- m = fsspec.filesystem("memory")
- m.store.clear()
- m.pseudo_dirs.clear()
- m.pseudo_dirs.append("")
- try:
- yield m
- finally:
- m.store.clear()
- m.pseudo_dirs.clear()
- m.pseudo_dirs.append("")
- @pytest.fixture
- def ftp_writable(tmpdir):
- """
- Fixture providing a writable FTP filesystem.
- """
- pytest.importorskip("pyftpdlib")
- from fsspec.implementations.ftp import FTPFileSystem
- FTPFileSystem.clear_instance_cache() # remove lingering connections
- CachingFileSystem.clear_instance_cache()
- d = str(tmpdir)
- with open(os.path.join(d, "out"), "wb") as f:
- f.write(b"hello" * 10000)
- P = subprocess.Popen(
- [sys.executable, "-m", "pyftpdlib", "-d", d, "-u", "user", "-P", "pass", "-w"]
- )
- try:
- time.sleep(1)
- yield "localhost", 2121, "user", "pass"
- finally:
- P.terminate()
- P.wait()
- try:
- shutil.rmtree(tmpdir)
- except Exception:
- pass
|