-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathImportServerUtilities.py
237 lines (219 loc) · 9.4 KB
/
ImportServerUtilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#!/usr/bin/env python3
# Utility class for functions to be run on the server
import csv
import shutil
from pathlib import Path
from urllib.parse import unquote
from typing import Optional, List
import FoxmlWorker as FW
import ImportUtilities as IU
import json
import pickle
class ImportServerUtilities:
def __init__(self, namespace):
self.namespace = namespace
self.objectStore = '/usr/local/fedora/data/objectStore'
self.datastreamStore = '/usr/local/fedora/data/datastreamStore'
self.staging_dir = 'staging'
self.iu = IU.ImportUtilities(namespace)
self.mimemap = {"image/jpeg": ".jpg",
"image/jp2": ".jp2",
"image/png": ".png",
"image/tiff": ".tif",
"text/xml": ".xml",
"text/plain": ".txt",
"application/pdf": ".pdf",
"application/xml": ".xml",
"audio/x-wav": ".wav",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/octet-stream": ".bib",
"audio/mpeg": ".mp3",
"video/mp4": ".mp4",
"video/x-m4v": ".m4v",
"audio/vnd.wave": '.wav'
}
# Retrieves FOXml object store with pid
def get_foxml_from_pid(self, pid):
foxml_file = self.iu.dereference(pid)
foxml = f"{self.objectStore}/{foxml_file}"
try:
return FW.FWorker(foxml)
except:
print(f"No results found for {pid}")
# Gets PIDS, filtered by namespace directly from objectStore
@IU.ImportUtilities.timeit
def get_pids_from_objectstore(self, namespace=''):
namespace = f"info%3Afedora%2F{namespace}%3A"
wildcard = '*/*'
if namespace:
wildcard = f'*/{namespace}*'
pids = []
for p in Path(self.objectStore).rglob(wildcard):
pid = unquote(p.name).replace('info:fedora/', '')
pids.append(pid)
print(f"Total number of PIDs found: {len(pids)}")
return pids
# Gets all dc datastream from objectstore
def get_all_dc(self):
cursor = self.iu.conn.cursor()
statement = f"select pid from {self.namespace}"
headers = 'pid', 'dublin_core'
csv_file_path = f"{self.staging_dir}/{self.namespace}_dc.csv"
with open(csv_file_path, mode="w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(file, fieldnames=headers) # Pass the file object here
writer.writeheader()
for row in cursor.execute(statement):
pid = row['pid']
foxml_file = self.iu.dereference(pid)
foxml = f"{self.objectStore}/{foxml_file}"
try:
fw = FW.FWorker(foxml)
except:
print(f"No record found for {pid}")
continue
dc = fw.get_dc()
writer.writerow({'pid': pid, 'dublin_core': dc})
# Copies digital assets from dataStream store to staging directory
@IU.ImportUtilities.timeit
def stage_files(self, content_model: Optional[str] = None, datastreams: Optional[List] = None) -> None:
if datastreams is None:
datastreams = ['OBJ']
if content_model is None:
pids = self.get_pids_from_objectstore(self.namespace)
else:
pids = self.iu.get_pids_by_content_model(self.namespace, content_model)
for pid in pids:
nid = self.iu.get_nid_from_pid(self.namespace, pid)
if nid == '':
continue
fw = self.get_foxml_from_pid(pid)
all_files = fw.get_file_data()
for datastream in datastreams:
if datastream in all_files:
file_info = all_files[datastream]
source = f"{self.datastreamStore}/{self.iu.dereference(file_info['filename'])}"
extension = self.mimemap[file_info['mimetype']]
destination = f"{self.staging_dir}/{nid}_{datastream}{extension}"
shutil.copy(source, destination)
print(f"{nid} {pid} {destination}")
else:
print(f"Datastream not found for {nid}")
@IU.ImportUtilities.timeit
def stage_files_from_list(self, input_file):
with open(input_file, 'rb') as file:
pids = pickle.load(file)
# Builds record directly from objectStore
@IU.ImportUtilities.timeit
def build_record_from_pids(self, namespace, output_file):
pids = self.get_pids_from_objectstore(namespace)
headers = [
'title',
'pid',
'content_model',
'collection_pid',
'page_of',
'sequence',
'constituent_of']
with open(output_file, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
writer.writeheader()
for pid in pids:
foxml_file = self.iu.dereference(pid)
foxml = f"{self.objectStore}/{foxml_file}"
if (foxml):
fw = FW.FWorker(foxml)
if fw.get_state() != 'Active':
continue
relations = fw.get_rels_ext_values()
row = {}
row['title'] = fw.get_label()
row['pid'] = pid
for relation, value in relations.items():
if relation in self.iu.rels_map:
row[self.iu.rels_map[relation]] = value
writer.writerow(row)
else:
print(f"FoXML file for {pid} is missing")
# Adds all MODS records from datastreamStore to database.
@IU.ImportUtilities.timeit
def add_mods_to_database(self, table):
cursor = self.iu.conn.cursor()
pids = self.get_pids_from_objectstore(table)
for pid in pids:
foxml_file = self.iu.dereference(pid)
foxml = f"{self.objectStore}/{foxml_file}"
fw = FW.FWorker(foxml)
if fw.get_state() != 'Active':
continue
mapping = fw.get_file_data()
mods_info = mapping.get('MODS')
if mods_info:
mods_path = f"{self.datastreamStore}/{self.iu.dereference(mods_info['filename'])}"
mods_xml = Path(mods_path).read_text()
else:
mods_xml = fw.get_inline_mods()
if mods_xml:
mods_xml = mods_xml.replace("'", "''")
command = f"""UPDATE {table} set mods = '{mods_xml}' where pid = '{pid}'"""
cursor.execute(command)
self.iu.conn.commit()
def get_dsids_with_count(self, namespace):
dsids = {}
pids = self.get_pids_from_objectstore('ivoices')
for pid in pids:
foxml_file = self.iu.dereference(pid)
foxml = f"{self.objectStore}/{foxml_file}"
if (foxml):
fw = FW.FWorker(foxml)
if fw.get_state() != 'Active':
continue
datastreams = fw.get_datastream_types()
for datastream in datastreams.keys():
count = dsids.get(datastream)
if count is not None:
dsids[datastream] = count + 1
else:
dsids[datastream] = 1
with open("dsid.json", "w") as file:
json.dump(dsids, file, indent=4)
def get_inline_datastreams(self):
cursor = self.iu.conn.cursor()
statement = f"select pid from {self.namespace}"
headers = ['pid', 'dublin_core', 'pb_core', 'mods']
csv_file_path = f"{self.staging_dir}/{self.namespace}_inline.csv"
with open(csv_file_path, mode="w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(file, fieldnames=headers) # Pass the file object here
writer.writeheader()
for row in cursor.execute(statement):
pid = row['pid']
foxml_file = self.iu.dereference(pid)
foxml = f"{self.objectStore}/{foxml_file}"
try:
fw = FW.FWorker(foxml)
except:
print(f"No record found for {pid}")
continue
dc = fw.get_dc()
pb = fw.get_inline_pbcore()
mods = fw.get_inline_mods()
writer.writerow({'pid': pid, 'dublin_core': dc, 'pb_core': pb, 'mods': mods})
def stage_inline_pb(self):
cursor = self.iu.conn.cursor()
statement = f"select pid, nid from {self.namespace} where pbcore = ''"
for row in cursor.execute(statement):
pid = row['pid']
foxml_file = self.iu.dereference(pid)
foxml = f"{self.objectStore}/{foxml_file}"
try:
fw = FW.FWorker(foxml)
except:
print(f"No record found for {pid}")
continue
pb = fw.get_inline_pbcore()
if pb:
f = open(f"{row['nid']}_PBCORE.xml")
f.write(pb)
f.close()
if __name__ == '__main__':
MS = ImportServerUtilities('ivoices')
MS.get_inline_datastreams()