Skip to content

Fix newly introduced support for opener argument in open #690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ The released versions correspond to PyPi releases.

## Unreleased

### Fixes
* fixed support for `opener` introduced in previous patch release
(see [#689](../../issues/689))

## [Version 4.6.1](https://pypi.python.org/pypi/pyfakefs/4.6.1) (2022-07-13)
Fixes incompatibility with Python 3.11 beta 4.

Expand Down
24 changes: 22 additions & 2 deletions pyfakefs/fake_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -5746,7 +5746,7 @@ def call(self, file_: Union[AnyStr, int],
if opener is not None:
# opener shall return a file descriptor, which will be handled
# here as if directly passed
file_ = opener(file_, open_modes)
file_ = opener(file_, self._open_flags_from_open_modes(open_modes))

file_object, file_path, filedes, real_path = self._handle_file_arg(
file_)
Expand All @@ -5770,7 +5770,7 @@ def call(self, file_: Union[AnyStr, int],
if not filedes:
closefd = True

if (open_modes.must_not_exist and
if (not opener and open_modes.must_not_exist and
(file_object or self.filesystem.islink(file_path) and
not self.filesystem.is_windows_fs)):
self.filesystem.raise_os_error(errno.EEXIST, file_path)
Expand Down Expand Up @@ -5819,6 +5819,26 @@ def call(self, file_: Union[AnyStr, int],
fakefile.filedes = self.filesystem._add_open_file(fakefile)
return fakefile

@staticmethod
def _open_flags_from_open_modes(open_modes: _OpenModes) -> int:
flags = 0
if open_modes.can_read and open_modes.can_write:
flags |= os.O_RDWR
elif open_modes.can_read:
flags |= os.O_RDONLY
elif open_modes.can_write:
flags |= os.O_WRONLY

if open_modes.append:
flags |= os.O_APPEND
if open_modes.truncate:
flags |= os.O_TRUNC
if not open_modes.must_exist and open_modes.can_write:
flags |= os.O_CREAT
if open_modes.must_not_exist and open_modes.can_write:
flags |= os.O_EXCL
return flags

def _init_file_object(self, file_object: Optional[FakeFile],
file_path: AnyStr,
open_modes: _OpenModes,
Expand Down
92 changes: 92 additions & 0 deletions pyfakefs/tests/fake_open_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,98 @@ def use_real_fs(self):
return True


class FakeFileOpenWithOpenerTest(FakeFileOpenTestBase):
def opener(self, path, flags):
return self.os.open(path, flags)

def test_use_opener_with_read(self):
file_path = self.make_path('foo')
self.create_file(file_path, contents='test')
with self.open(file_path, opener=self.opener) as f:
assert f.read() == 'test'
with self.assertRaises(OSError):
f.write('foo')

def test_use_opener_with_read_plus(self):
file_path = self.make_path('foo')
self.create_file(file_path, contents='test')
with self.open(file_path, 'r+', opener=self.opener) as f:
assert f.read() == 'test'
assert f.write('bar') == 3
with self.open(file_path) as f:
assert f.read() == 'testbar'

def test_use_opener_with_write(self):
file_path = self.make_path('foo')
self.create_file(file_path, contents='foo')
with self.open(file_path, 'w', opener=self.opener) as f:
with self.assertRaises(OSError):
f.read()
assert f.write('bar') == 3
with self.open(file_path) as f:
assert f.read() == 'bar'

def test_use_opener_with_write_plus(self):
file_path = self.make_path('foo')
self.create_file(file_path, contents='test')
with self.open(file_path, 'w+', opener=self.opener) as f:
assert f.read() == ''
assert f.write('bar') == 3
with self.open(file_path) as f:
assert f.read() == 'bar'

def test_use_opener_with_append(self):
file_path = self.make_path('foo')
self.create_file(file_path, contents='foo')
with self.open(file_path, 'a', opener=self.opener) as f:
assert f.write('bar') == 3
with self.assertRaises(OSError):
f.read()
with self.open(file_path) as f:
assert f.read() == 'foobar'

def test_use_opener_with_append_plus(self):
file_path = self.make_path('foo')
self.create_file(file_path, contents='foo')
with self.open(file_path, 'a+', opener=self.opener) as f:
assert f.read() == ''
assert f.write('bar') == 3
with self.open(file_path) as f:
assert f.read() == 'foobar'

def test_use_opener_with_exclusive_write(self):
file_path = self.make_path('foo')
self.create_file(file_path, contents='test')
with self.assertRaises(OSError):
self.open(file_path, 'x', opener=self.opener)

file_path = self.make_path('bar')
with self.open(file_path, 'x', opener=self.opener) as f:
assert f.write('bar') == 3
with self.assertRaises(OSError):
f.read()
with self.open(file_path) as f:
assert f.read() == 'bar'

def test_use_opener_with_exclusive_plus(self):
file_path = self.make_path('foo')
self.create_file(file_path, contents='test')
with self.assertRaises(OSError):
self.open(file_path, 'x+', opener=self.opener)

file_path = self.make_path('bar')
with self.open(file_path, 'x+', opener=self.opener) as f:
assert f.write('bar') == 3
assert f.read() == ''
with self.open(file_path) as f:
assert f.read() == 'bar'


class RealFileOpenWithOpenerTest(FakeFileOpenWithOpenerTest):
def use_real_fs(self):
return True


@unittest.skipIf(sys.version_info < (3, 8),
'open_code only present since Python 3.8')
class FakeFilePatchedOpenCodeTest(FakeFileOpenTestBase):
Expand Down