12
12
13
13
class AsyncCrawlerSiteContent :
14
14
15
- def __init__ (self , domain_list , sqlite_db_path , max_requests , max_embedding_input , document_embedder_obj , redis_lock ):
15
+ def __init__ (self , domain_list , sqlite_db_path , max_requests , max_embedding_input , document_embedder_obj , distributed_lock ):
16
16
logger .info (f"[CRAWL_CONTENT] init, domain_list:{ domain_list } " )
17
17
self .domain_list = domain_list
18
18
self .sqlite_db_path = sqlite_db_path
19
19
self .semaphore = asyncio .Semaphore (max_requests )
20
20
self .max_embedding_input = max_embedding_input
21
21
self .document_embedder_obj = document_embedder_obj
22
- self .redis_lock = redis_lock
22
+ self .distributed_lock = distributed_lock
23
23
self .count = 0
24
24
self .batch_size = max_requests * 2
25
25
@@ -112,20 +112,19 @@ async def update_doc_status(self, doc_id_list, doc_status):
112
112
logger .info (f"[CRAWL_CONTENT] update_doc_status, doc_id_list:{ doc_id_list } , doc_status:{ doc_status } " )
113
113
timestamp = int (time .time ())
114
114
async with aiosqlite .connect (self .sqlite_db_path ) as db :
115
- # Enable WAL mode for better concurrency
116
115
await db .execute ("PRAGMA journal_mode=WAL;" )
117
116
118
- if await self . redis_lock . aacquire_lock () :
119
- try :
117
+ try :
118
+ with self . distributed_lock . lock () :
120
119
await db .execute (
121
120
"UPDATE t_raw_tab SET doc_status = ?, mtime = ? WHERE id IN ({placeholders})" .format (
122
121
placeholders = ',' .join (['?' for _ in doc_id_list ])
123
122
),
124
123
[doc_status , timestamp ] + doc_id_list
125
124
)
126
125
await db .commit ()
127
- finally :
128
- await self . redis_lock . arelease_lock ( )
126
+ except Exception as e :
127
+ logger . error ( f"process distributed_lock exception: { e } " )
129
128
130
129
async def fetch_existing_contents (self , doc_id_list ):
131
130
"""
@@ -134,7 +133,6 @@ async def fetch_existing_contents(self, doc_id_list):
134
133
logger .info (f"[CRAWL_CONTENT] fetch_existing_contents, doc_id_list:{ doc_id_list } " )
135
134
query = "SELECT id, content FROM t_raw_tab WHERE id IN ({})" .format (', ' .join ('?' for _ in doc_id_list ))
136
135
async with aiosqlite .connect (self .sqlite_db_path ) as db :
137
- # Enable WAL mode for better concurrency
138
136
await db .execute ("PRAGMA journal_mode=WAL;" )
139
137
140
138
cursor = await db .execute (query , doc_id_list )
@@ -192,33 +190,31 @@ async def process_updated_contents(self, updated_contents, url_dict):
192
190
content_update_queries .append ((content_json , content_length , 3 , timestamp , doc_id ))
193
191
194
192
# Lock to ensure database operations are atomic
195
- if await self .redis_lock .aacquire_lock ():
196
- try :
197
- async with aiosqlite .connect (self .sqlite_db_path ) as db :
198
- # Enable WAL mode for better concurrency
199
- await db .execute ("PRAGMA journal_mode=WAL;" )
193
+ async with aiosqlite .connect (self .sqlite_db_path ) as db :
194
+ await db .execute ("PRAGMA journal_mode=WAL;" )
200
195
196
+ try :
197
+ with self .distributed_lock .lock ():
201
198
# Update content details in t_raw_tab
202
199
await db .executemany (
203
200
"UPDATE t_raw_tab SET content = ?, content_length = ?, doc_status = ?, mtime = ? WHERE id = ?" ,
204
201
content_update_queries
205
202
)
206
203
await db .commit ()
207
- finally :
208
- await self . redis_lock . arelease_lock ( )
209
-
204
+ except Exception as e :
205
+ logger . error ( f"process distributed_lock exception: { e } " )
206
+
210
207
# Delete old embeddings
211
208
doc_id_list = list (updated_contents .keys ())
212
209
await self .delete_embedding_doc (doc_id_list )
213
210
214
211
# Prepare data for updating embeddings and database records
215
212
data_for_embedding = [(doc_id , url_dict [doc_id ], chunk_text_vec ) for doc_id , chunk_text_vec in updated_contents .items ()]
216
- if await self . redis_lock . aacquire_lock () :
217
- try :
213
+ try :
214
+ with self . distributed_lock . lock () :
218
215
records_to_add , records_to_update = await self .document_embedder_obj .aadd_content_embedding (data_for_embedding )
219
216
# Insert new embedding records and update t_raw_tab doc_status to 4
220
217
async with aiosqlite .connect (self .sqlite_db_path ) as db :
221
- # Enable WAL mode for better concurrency
222
218
await db .execute ("PRAGMA journal_mode=WAL;" )
223
219
224
220
if records_to_add :
@@ -229,25 +225,24 @@ async def process_updated_contents(self, updated_contents, url_dict):
229
225
if records_to_update :
230
226
await db .executemany ("UPDATE t_raw_tab SET doc_status = ?, mtime = ? WHERE id = ?" , records_to_update )
231
227
await db .commit ()
232
- finally :
233
- await self . redis_lock . arelease_lock ( )
228
+ except Exception as e :
229
+ logger . error ( f"process distributed_lock exception: { e } " )
234
230
235
231
async def update_unchanged_contents_status (self , unchanged_doc_ids ):
236
232
"""
237
233
Update the status of unchanged contents in the database to reflect they have been processed.
238
234
"""
239
235
logger .info (f"[CRAWL_CONTENT] update_unchanged_contents_status, unchanged_doc_ids:{ unchanged_doc_ids } " )
240
236
async with aiosqlite .connect (self .sqlite_db_path ) as db :
241
- # Enable WAL mode for better concurrency
242
237
await db .execute ("PRAGMA journal_mode=WAL;" )
243
238
244
- if await self . redis_lock . aacquire_lock () :
245
- try :
239
+ try :
240
+ with self . distributed_lock . lock () :
246
241
async with aiosqlite .connect (self .sqlite_db_path ) as db :
247
242
await db .execute ("UPDATE t_raw_tab SET doc_status = 4 WHERE id IN ({})" .format (', ' .join ('?' for _ in unchanged_doc_ids )), unchanged_doc_ids )
248
243
await db .commit ()
249
- finally :
250
- await self . redis_lock . arelease_lock ()
244
+ except Exception as e :
245
+ logger . error ( f"process distributed_lock exception: { e } " )
251
246
252
247
async def add_content (self , url_dict ):
253
248
"""Begin processing URLs from url_dict in batches for add."""
@@ -294,26 +289,25 @@ async def delete_embedding_doc(self, doc_id_vec):
294
289
doc_id_tuple = tuple (doc_id_vec )
295
290
placeholder = ',' .join ('?' * len (doc_id_vec )) # Create placeholders
296
291
async with aiosqlite .connect (self .sqlite_db_path ) as db :
297
- # Enable WAL mode for better concurrency
298
292
await db .execute ("PRAGMA journal_mode=WAL;" )
299
293
300
294
cursor = await db .execute (f"SELECT embedding_id_list FROM t_doc_embedding_map_tab WHERE doc_id IN ({ placeholder } )" , doc_id_tuple )
301
295
rows = await cursor .fetchall ()
302
296
# Parse embedding_id_list and flatten the list
303
297
embedding_id_vec = [id for row in rows for id in json .loads (row [0 ])]
304
298
305
- if await self . redis_lock . aacquire_lock () :
306
- try :
299
+ try :
300
+ with self . distributed_lock . lock () :
307
301
if embedding_id_vec :
308
302
logger .info (f"[CRAWL_CONTENT] delete_embedding_doc, document_embedder_obj.delete_content_embedding:{ embedding_id_vec } " )
309
303
self .document_embedder_obj .delete_content_embedding (embedding_id_vec )
310
304
311
305
# Delete records from t_doc_embedding_map_tab
312
306
await db .execute (f"DELETE FROM t_doc_embedding_map_tab WHERE doc_id IN ({ placeholder } )" , doc_id_tuple )
313
307
await db .commit ()
314
- finally :
315
- await self . redis_lock . arelease_lock ( )
316
-
308
+ except Exception as e :
309
+ logger . error ( f"process distributed_lock exception: { e } " )
310
+
317
311
async def delete_content (self , url_dict , delete_raw_table = True ):
318
312
"""Begin processing URLs from url_dict in batches for deletion."""
319
313
begin_time = int (time .time ())
@@ -345,12 +339,13 @@ async def process_delete_batch(self, batch, delete_raw_table):
345
339
# Delete records from t_raw_tab after deleting embeddings
346
340
async with aiosqlite .connect (self .sqlite_db_path ) as db :
347
341
await db .execute ("PRAGMA journal_mode=WAL;" )
348
- if await self .redis_lock .aacquire_lock ():
349
- try :
342
+
343
+ try :
344
+ with self .distributed_lock .lock ():
350
345
await db .execute (f"DELETE FROM t_raw_tab WHERE id IN ({ placeholder } )" , doc_id_tuple )
351
346
await db .commit ()
352
- finally :
353
- await self . redis_lock . arelease_lock ( )
347
+ except Exception as e :
348
+ logger . error ( f"process distributed_lock exception: { e } " )
354
349
355
350
async def update_content (self , url_dict ):
356
351
logger .info (f"[CRAWL_CONTENT] update_content begin, url_dict:{ url_dict } " )
@@ -361,7 +356,6 @@ async def update_content(self, url_dict):
361
356
async def check_and_update_domain_status (self ):
362
357
logger .info (f"[CRAWL_CONTENT] check_and_update_domain_status" )
363
358
async with aiosqlite .connect (self .sqlite_db_path ) as db :
364
- # Enable WAL mode for better concurrency
365
359
await db .execute ("PRAGMA journal_mode=WAL;" )
366
360
367
361
timestamp = int (time .time ())
@@ -375,13 +369,12 @@ async def check_and_update_domain_status(self):
375
369
"SELECT COUNT(*) FROM t_raw_tab WHERE domain = ? AND doc_status < 4" , (domain ,))
376
370
count_row = await cursor .fetchone ()
377
371
if count_row [0 ] == 0 : # If no records have doc_status < 4
378
- if await self . redis_lock . aacquire_lock () :
379
- try :
372
+ try :
373
+ with self . distributed_lock . lock () :
380
374
# Step 3: Update domain_status to 4 in t_domain_tab
381
375
await db .execute (
382
376
"UPDATE t_domain_tab SET domain_status = ?, mtime = ? WHERE domain = ?" , (4 , timestamp , domain ))
383
377
await db .commit ()
384
- finally :
385
- await self . redis_lock . arelease_lock ( )
378
+ except Exception as e :
379
+ logger . error ( f"process distributed_lock exception: { e } " )
386
380
logger .info (f"[CRAWL_CONTENT] check_and_update_domain_status, Domain status updated to 4 for domain:'{ domain } '" )
387
-
0 commit comments