1 # Automatic source code provision (AGPL compliance)
10 class SourceShipmentPreparer():
11 def __init__(s
, destdir
):
12 # caller may modify, and should read after calling generate()
13 s
.output_name
= 'srcbomb.tar.gz'
14 # defaults, caller can modify after creation
15 s
.src_filter
= s
.src_filter_glob
16 s
.src_package_globs
= ['!/usr/local/*', '/usr*']
17 s
.src_filter_globs
= ['!/etc/*']
18 s
.src_likeparent
= s
.src_likeparent_git
19 s
.report_from_packages
= s
.report_from_packages_debian
21 s
.find_rune_base
= "find -type f -perm -004 \! -path '*/tmp/*'"
22 s
.excludes
= ['*~', '*.bak', '*.tmp', '#*#',
23 '[0-9][0-9][0-9][0-9]-src.cpio']
24 s
.rune_shell
= ['/bin/bash', '-ec']
25 s
.show_pathnames
= True
30 # ^ by default, is find ... -print0
32 cpio -Hustar -o --quiet -0 -R 1000:1000 || \
33 cpio -Hustar -o --quiet -0
36 s
.rune_portmanteau
= r
'''
39 GZIP=-1 tar zcf "$outfile" "$@"
41 s
.manifest_name
='0000-MANIFEST.txt'
47 s
._package_files
= { } # map filename => infol
49 def thing_matches_globs(s
, thing
, globs
):
51 negate
= pat
.startswith('!')
52 if negate
: pat
= pat
[1:]
53 if fnmatch
.fnmatch(thing
, pat
):
57 def src_filter_glob(s
, src
): # default s.src_filter
58 return s
.thing_matches_globs(src
, s
.src_filter_globs
)
60 def src_likeparent_git(s
, src
):
62 os
.stat(os
.path
.join(src
, '.git/.'))
63 except FileNotFoundError
:
68 def src_parentfinder(s
, src
, infol
): # callers may monkey-patch away
69 for deref
in (False,True):
74 search
= os
.path
.realpath(search
)
78 xinfo
.append(os
.path
.basename(search
))
79 search
= os
.path
.dirname(search
)
82 stab
= os
.lstat(search
)
83 except FileNotFoundError
:
85 if stat
.S_ISREG(stab
.st_mode
):
88 while not os
.path
.ismount(search
):
89 if s
.src_likeparent(search
):
91 if len(xinfo
): infol
.append('want=' + os
.path
.join(*xinfo
))
96 # no .git found anywhere
99 def path_prenormaliser(s
, d
, infol
): # callers may monkey-patch away
100 return os
.path
.join(s
.cwd
, os
.path
.abspath(d
))
102 def srcdir_find_rune(s
, d
):
103 script
= s
.find_rune_base
104 for excl
in s
.excludes
+ [s
.output_name
, s
.manifest_name
]:
105 assert("'" not in excl
)
106 script
+= r
" \! -name '%s'" % excl
110 def manifest_append(s
, name
, infol
):
111 s
._manifest
.append({ 'file':name
, 'info':' '.join(infol
) })
113 def manifest_append_absentfile(s
, name
, infol
):
114 s
._manifest
.append({ 'file_print':name
, 'info':' '.join(infol
) })
116 def new_output_name(s
, nametail
, infol
):
118 name
= '%04d-%s' %
(s
._outcounter
, nametail
)
119 s
.manifest_append(name
, infol
)
122 def open_output_fh(s
, name
, mode
):
123 return open(os
.path
.join(s
._destdir
, name
), mode
)
125 def src_dir(s
, d
, infol
):
126 try: name
= s
._dirmap
[d
]
127 except KeyError: pass
129 s
.manifest_append(name
, infol
)
132 if s
.show_pathnames
: infol
.append(d
)
133 find_rune
= s
.srcdir_find_rune(d
)
134 total_rune
= s
.rune_cpio % find_rune
136 name
= s
.new_output_name('src.cpio', infol
)
138 fh
= s
.open_output_fh(name
, 'wb')
140 subprocess
.run(s
.rune_shell
+ [total_rune
],
142 stdin
=subprocess
.DEVNULL
,
144 restore_signals
=True,
148 def src_indir(s
, d
, infol
):
149 d
= s
.path_prenormaliser(d
, infol
)
150 if not s
.src_filter(d
): return
152 d
= s
.src_parentfinder(d
, infol
)
156 def report_from_packages_debian(s
, files
):
157 dpkg_S_in
= tempfile
.TemporaryFile(mode
='w+')
158 for (file, infols
) in files
.items():
159 assert('\n' not in file)
160 dpkg_S_in
.write(file)
161 dpkg_S_in
.write('\0')
163 cmdl
= ['xargs','-0r','dpkg','-S','--']
164 dpkg_S
= subprocess
.Popen(cmdl
,
167 stdout
=subprocess
.PIPE
,
170 dpkg_show_in
= tempfile
.TemporaryFile(mode
='w+')
172 for l
in dpkg_S
.stdout
:
173 l
= l
.strip(b
'\n').decode('utf-8')
174 (pkgs
, fname
) = l
.split(': ',1)
175 pks
= pkgs
.split(', ')
177 pkginfos
.setdefault(pk
,{'files':[]})['files'].append(fname
)
178 print(pk
, file=dpkg_show_in
)
179 assert(dpkg_S
.wait() == 0)
181 cmdl
= ['xargs','-r','dpkg-query',
182 r
'-f${binary:Package}\t${Package}\t${Architecture}\t${Version}\t${source:Package}\t${source:Version}\n',
184 dpkg_show
= subprocess
.Popen(cmdl
,
187 stdout
=subprocess
.PIPE
,
190 for l
in dpkg_show
.stdout
:
191 l
= l
.strip(b
'\n').decode('utf-8')
192 (pk
,p
,a
,v
,sp
,sv
) = l
.split('\t')
193 pkginfos
[pk
]['binary'] = p
194 pkginfos
[pk
]['arch'] = a
195 pkginfos
[pk
]['version'] = v
196 pkginfos
[pk
]['source'] = sp
197 pkginfos
[pk
]['sourceversion'] = sv
198 assert(dpkg_show
.wait() == 0)
199 for pk
in sorted(pkginfos
.keys()):
201 debfname
= '%s_%s_%s.deb' %
(pi
['binary'], pi
['version'], pi
['arch'])
202 dscfname
= '%s_%s.dsc' %
(pi
['source'], pi
['sourceversion'])
203 s
.manifest_append_absentfile(dscfname
, [debfname
])
204 for fname
in pi
['files']:
206 if s
.show_pathnames
: infol
= infol
+ [fname
]
207 s
.manifest_append_absentfile(' \t' + debfname
, infol
)
209 def thing_ought_packaged(s
, fname
):
210 return s
.thing_matches_globs(fname
, s
.src_package_globs
)
212 def src_file_packaged(s
, fname
, infol
):
213 s
._package_files
.setdefault(fname
,[]).extend(infol
)
215 def src_file(s
, fname
, infol
):
218 infol_copy
= infol
.copy()
219 yield (infol_copy
, s
.path_prenormaliser(fname
, infol_copy
))
220 yield (infol
, os
.path
.realpath(fname
))
222 for (tinfol
, tfname
) in fngens():
223 if s
.thing_ought_packaged(tfname
):
224 s
.src_file_packaged(tfname
, tinfol
)
227 s
.src_indir(fname
, infol
)
229 def src_argv0(s
, program
, infol
):
230 s
.src_file(program
, infol
)
232 def src_syspath(s
, fname
, infol
):
233 s
.src_indir(fname
, infol
)
235 def src_module(s
, m
, infol
):
236 try: fname
= m
.__file__
237 except AttributeError: return
238 infol
.append(m
.__name__
)
240 if s
.thing_ought_packaged(fname
):
241 s
.src_file_packaged(fname
, infol
)
243 s
.src_indir(fname
, infol
)
245 def srcs_allitems(s
, dirs
=sys
.path
):
246 s
.src_argv0(sys
.argv
[0], ['argv[0]'])
248 s
.src_syspath(d
, ['sys.path'])
249 for m
in sys
.modules
.values():
250 s
.src_module(m
, ['sys.modules'])
251 s
.report_from_packages(s
._package_files
)
253 def mk_portmanteau(s
):
254 cmdl
= s
.rune_shell
+ [ s
.rune_portmanteau
, 'x',
255 s
.output_name
, s
.manifest_name
]
256 mfh
= s
.open_output_fh(s
.manifest_name
,'w')
257 for me
in s
._manifest
:
258 try: fname
= me
['file']
259 except KeyError: fname
= me
.get('file_print','')
260 else: cmdl
.append(fname
)
261 print('%s\t%s' %
(fname
, me
['info']), file=mfh
)
265 stdin
=subprocess
.DEVNULL
,
267 restore_signals
=True,