1
1
#!/usr/bin/env python3
2
2
3
- import os
3
+ import json
4
+ import pathlib
4
5
import subprocess
5
6
import time
6
7
7
8
import pytest
8
9
9
10
10
- @pytest .fixture ()
11
- def private_key (scope = 'session' ):
12
- basepath = os .path .dirname (os .path .realpath (__file__ ))
13
- keypath = os .path .join (basepath , 'files' , 'private.key' )
14
- binargs = [
15
- 'openssl' ,
16
- 'genrsa' ,
17
- '-out' ,
18
- keypath ,
19
- '2048' ,
20
- ]
21
- subprocess .run (binargs )
22
-
23
- pubpath = os .path .join (basepath , 'files' , 'public.key' )
24
- binargs = [
25
- 'openssl' ,
26
- 'rsa' ,
27
- '-in' ,
28
- keypath ,
29
- '-pubout' ,
30
- ]
31
- res = subprocess .run (binargs , capture_output = True , text = True )
32
- with open (pubpath , 'w' ) as f :
33
- key = '' .join (res .stdout .splitlines ()[1 :- 1 ])
34
- f .write (
35
- 'sel._domainkey.dkimpy.example.com v=DKIM1; k=rsa; '
36
- 'p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqf/MoqRqzK3/bcCyLSx5'
37
- 'CDvyPotNDBjLLFHdMmcWDiSZ8saslFyNR6FkFxuNtw843m7MkwOSJ9TRd9p+OoRLDv'
38
- 'H0jDR1Dqq22QOJKiG5XQ91aZwin9jpWKkuoRoRZRhWrzUOJWAybHarsEQm9iCPh2zn'
39
- 'dbSPSzPQL1OsjURIuw5G9+/nr5rhJ72Qi6v86zofWUKdXhLf+oVmho79D0xGMFFm0f'
40
- 'b98xIeZlgJTnmrj/zuxIKHeVmGKI1j6L3xttdcDiUVRGxoubkFzg9TIBGhdeFkpa0C'
41
- 'ZuhB/1/U3f1oG3Upx5o/jXTQk/dwVaaeEXnRmTsfGYn4GQ9ziity1ijLsQIDAQAB\n '
42
- )
43
- f .write (f'elpmaxe._domainkey.example.com v=DKIM1; k=rsa; h=sha256; p={ key } \n ' )
44
- f .write (f'xn--2j5b._domainkey.xn--vv4b606a.example.com v=DKIM1; k=rsa; h=sha256; p={ key } \n ' )
11
+ @pytest .fixture (scope = 'session' )
12
+ def private_key (tmp_path_factory , tool_path ):
13
+ basepath = tmp_path_factory .mktemp ('keys' )
45
14
15
+ for s , d in [
16
+ ['elpmaxe' , 'example.com' ],
17
+ ['xn--2j5b' , 'xn--vv4b606a.example.com' ],
18
+ ['unsafe' , 'example.com' ],
19
+ ]:
20
+ binargs = [
21
+ tool_path ('contrib/openarc-keygen' ),
22
+ '-D' ,
23
+ str (basepath ),
24
+ '-d' ,
25
+ d ,
26
+ '-s' ,
27
+ s ,
28
+ '--hash-algorithms' ,
29
+ 'sha256' ,
30
+ '-f' ,
31
+ 'testkey' ,
32
+ ]
33
+ subprocess .run (binargs , check = True )
34
+
35
+ basepath .joinpath ('unsafe._domainkey.example.com.key' ).chmod (0o644 )
36
+
37
+ testkeys = (
38
+ 'sel._domainkey.dkimpy.example.com v=DKIM1; k=rsa; '
39
+ 'p=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqf/MoqRqzK3/bcCyLSx5'
40
+ 'CDvyPotNDBjLLFHdMmcWDiSZ8saslFyNR6FkFxuNtw843m7MkwOSJ9TRd9p+OoRLDv'
41
+ 'H0jDR1Dqq22QOJKiG5XQ91aZwin9jpWKkuoRoRZRhWrzUOJWAybHarsEQm9iCPh2zn'
42
+ 'dbSPSzPQL1OsjURIuw5G9+/nr5rhJ72Qi6v86zofWUKdXhLf+oVmho79D0xGMFFm0f'
43
+ 'b98xIeZlgJTnmrj/zuxIKHeVmGKI1j6L3xttdcDiUVRGxoubkFzg9TIBGhdeFkpa0C'
44
+ 'ZuhB/1/U3f1oG3Upx5o/jXTQk/dwVaaeEXnRmTsfGYn4GQ9ziity1ijLsQIDAQAB\n '
45
+ )
46
+
47
+ for fname in [
48
+ 'elpmaxe._domainkey.example.com.txt' ,
49
+ 'xn--2j5b._domainkey.xn--vv4b606a.example.com.txt' ,
50
+ ]:
51
+ with open (basepath .joinpath (fname ), 'r' ) as f :
52
+ testkeys += f .read ()
46
53
47
- @pytest .fixture ()
48
- def tool_path (scope = 'session' ):
54
+ keyfile = basepath .joinpath ('public.key' )
55
+ with open (keyfile , 'w' ) as f :
56
+ f .write (testkeys )
57
+
58
+ return {
59
+ 'basepath' : basepath ,
60
+ 'public_keys' : str (keyfile ),
61
+ }
62
+
63
+
64
+ @pytest .fixture (scope = 'session' )
65
+ def tool_path ():
49
66
def _tool_path (tool ):
50
- binpath = os .path .dirname (os .path .realpath (__file__ ))
51
- binpath = os .path .join (binpath , '..' , tool )
52
- return os .path .realpath (binpath )
67
+ return pathlib .Path (__file__ ).parent .parent .joinpath (tool ).absolute ()
53
68
54
69
return _tool_path
55
70
56
71
57
72
@pytest .fixture ()
58
73
def milter_config (request , tmp_path , private_key ):
59
- base_path = os .path .join (request .fspath .dirname , 'files' )
74
+ base_path = request .path .parent .joinpath ('files' )
75
+
60
76
config = {
61
- 'cwd' : base_path ,
62
- 'file' : os .path .join (base_path , 'milter.conf' ),
63
- 'sock' : tmp_path .joinpath ('milter.sock' ),
77
+ 'Domain' : 'example.com' ,
78
+ 'AuthservID' : 'example.com' ,
79
+ 'TestKeys' : private_key ['public_keys' ],
80
+ 'Selector' : 'elpmaxe' ,
81
+ 'KeyFile' : 'elpmaxe._domainkey.example.com.key' ,
82
+ 'Mode' : 'sv' ,
83
+ 'FixedTimestamp' : '1234567890' ,
84
+ 'RequireSafeKeys' : 'false' , # tmp is world writeable
64
85
}
86
+
65
87
for candidate in [
66
- request .fspath . basename , # test file
88
+ request .path . name , # test file
67
89
request .function .__name__ , # test function
68
90
]:
69
- fname = os .path .join (base_path , '.' .join ([candidate , 'conf' ]))
70
- if os .path .isfile (fname ):
71
- config ['file' ] = fname
72
- return config
91
+ fname = base_path .joinpath (f'{ candidate } .conf' )
92
+ if fname .exists ():
93
+ config .update (json .loads (fname .read_text ()))
94
+
95
+ if config ['KeyFile' ]:
96
+ config ['KeyFile' ] = private_key ['basepath' ].joinpath (config ['KeyFile' ])
73
97
74
- return config
98
+ for static_file in ['PeerList' , 'InternalHosts' ]:
99
+ if config .get (static_file ):
100
+ config [static_file ] = base_path .joinpath (config [static_file ])
101
+
102
+ fname = tmp_path .joinpath ('milter.conf' )
103
+ with open (fname , 'w' ) as f :
104
+ for k , v in config .items ():
105
+ if v is not None :
106
+ f .write (f'{ k } { v } \n ' )
107
+
108
+ return {
109
+ 'file' : fname ,
110
+ 'sock' : tmp_path .joinpath ('milter.sock' ),
111
+ }
75
112
76
113
77
114
@pytest .fixture ()
@@ -89,8 +126,8 @@ def milter_cmdline(tmp_path, tool_path, milter_config):
89
126
90
127
@pytest .fixture ()
91
128
def milter (milter_cmdline , milter_config ):
92
- milter_proc = subprocess .Popen (milter_cmdline , cwd = milter_config [ 'cwd' ] )
93
- while not milter_proc .poll () and not os . path . exists ( milter_config ['sock' ]):
129
+ milter_proc = subprocess .Popen (milter_cmdline )
130
+ while not milter_proc .poll () and not milter_config ['sock' ]. exists ( ):
94
131
time .sleep (0.1 )
95
132
96
133
yield milter_proc
0 commit comments