File tree Expand file tree Collapse file tree 2 files changed +51
-2
lines changed Expand file tree Collapse file tree 2 files changed +51
-2
lines changed Original file line number Diff line number Diff line change 33import os
44import pathlib
55import shutil
6+ import stat
67import sys
78import tempfile
89from collections import OrderedDict
@@ -61,7 +62,7 @@ def __init__(
6162
6263 @contextmanager
6364 def _get_stream (self ) -> Iterator [IO [str ]]:
64- if self .dotenv_path and os . path . isfile (self .dotenv_path ):
65+ if self .dotenv_path and _is_file_or_fifo (self .dotenv_path ):
6566 with open (self .dotenv_path , encoding = self .encoding ) as stream :
6667 yield stream
6768 elif self .stream is not None :
@@ -325,7 +326,7 @@ def _is_debugger():
325326
326327 for dirname in _walk_to_root (path ):
327328 check_path = os .path .join (dirname , filename )
328- if os . path . isfile (check_path ):
329+ if _is_file_or_fifo (check_path ):
329330 return check_path
330331
331332 if raise_error_if_not_found :
@@ -417,3 +418,18 @@ def dotenv_values(
417418 override = True ,
418419 encoding = encoding ,
419420 ).dict ()
421+
422+
423+ def _is_file_or_fifo (path : StrPath ) -> bool :
424+ """
425+ Return True if `path` exists and is either a regular file or a FIFO.
426+ """
427+ if os .path .isfile (path ):
428+ return True
429+
430+ try :
431+ st = os .stat (path )
432+ except (FileNotFoundError , OSError ):
433+ return False
434+
435+ return stat .S_ISFIFO (st .st_mode )
Original file line number Diff line number Diff line change 1+ import os
2+ import pathlib
3+ import sys
4+ import threading
5+
6+ import pytest
7+
8+ from dotenv import load_dotenv
9+
10+ pytestmark = pytest .mark .skipif (
11+ sys .platform .startswith ("win" ), reason = "FIFOs are Unix-only"
12+ )
13+
14+
15+ def test_load_dotenv_from_fifo (tmp_path : pathlib .Path , monkeypatch ):
16+ fifo = tmp_path / ".env"
17+ os .mkfifo (fifo ) # create named pipe
18+
19+ def writer ():
20+ with open (fifo , "w" , encoding = "utf-8" ) as w :
21+ w .write ("MY_PASSWORD=pipe-secret\n " )
22+
23+ t = threading .Thread (target = writer )
24+ t .start ()
25+
26+ # Ensure env is clean
27+ monkeypatch .delenv ("MY_PASSWORD" , raising = False )
28+
29+ ok = load_dotenv (dotenv_path = str (fifo ), override = True )
30+ t .join (timeout = 2 )
31+
32+ assert ok is True
33+ assert os .getenv ("MY_PASSWORD" ) == "pipe-secret"
You can’t perform that action at this time.
0 commit comments