Skip to content

Commit

Permalink
Allow for possible duplicate file_complete reports for any given file.
Browse files Browse the repository at this point in the history
The celery transaction model settings to guarantee we never lose a file
could result in duplicate file reports. Handle this by adding a
unique key to the transformation_result table. Attempt to insert the
record and just report a warning if that fails, but don't increment the
file counter.
  • Loading branch information
BenGalewsky committed Jan 14, 2025
1 parent 4a4ca9a commit 0e4d51d
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 70 deletions.
24 changes: 24 additions & 0 deletions servicex_app/migrations/versions/1.5.7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Add unique constraint to TransformResult on request_id and file_id
Revision ID: 1.5.7
Revises: v1_5_6
Create Date: 2025-01-13
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = 'v1_5_7'
down_revision = 'v1_5_6'
branch_labels = None
depends_on = None

def upgrade():
# Add unique constraint
op.create_unique_constraint('uix_file_request', 'transform_result', ['file_id', 'request_id'])


def downgrade():
# Remove unique constraint
op.drop_constraint('uix_file_request', 'transform_result', type_='unique')
4 changes: 4 additions & 0 deletions servicex_app/servicex_app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ class TransformationResult(db.Model):
total_bytes = db.Column(db.BigInteger, nullable=True)
avg_rate = db.Column(db.Float, nullable=True)

__table_args__ = (
db.UniqueConstraint('file_id', 'request_id', name='uix_file_request'),
)

@classmethod
def to_json_list(cls, a_list):
return [TransformationResult.to_json(msg) for msg in a_list]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
from functools import wraps
from logging import Logger

from flask import request, current_app
from flask import current_app, request
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, \
before_sleep_log, after_log
from tenacity import after_log, before_sleep_log, retry, retry_if_not_exception_type, \
stop_after_attempt, wait_exponential_jitter

from servicex_app import TransformerManager
from servicex_app.models import TransformRequest, TransformationResult, db, TransformStatus
from servicex_app.models import TransformRequest, TransformStatus, TransformationResult, \
db
from servicex_app.resources.servicex_resource import ServiceXResource


Expand Down Expand Up @@ -65,7 +67,8 @@ def wrapper(*args, **kwargs):
logger = logging.getLogger(__name__)

return retry(
stop=stop_after_attempt(3),
retry=retry_if_not_exception_type(IntegrityError),
stop=stop_after_attempt(5),
wait=wait_exponential_jitter(initial=0.1, max=30),
before_sleep=before_sleep_log(logger, logging.INFO),
after=after_log(logger, logging.INFO)
Expand All @@ -85,39 +88,59 @@ def put(self, request_id):
start_time = time.time()
info = request.get_json()
logger = current_app.logger
logger.info("FileComplete", extra={'requestId': request_id, 'metric': info})
log_extra = {
'requestId': request_id,
'file-id': info['file-id']
}

session = db.session

# Lookup the transformation request and increment either the successful or failed file count
transform_req = self.record_file_complete(session, current_app.logger, request_id, info)

if transform_req is None:
return "Request not found", 404

# Add the transformation result to the database
files_remaining = self.save_transform_result(transform_req, info, session)

# Now we can see if we are done with the transformation and
# can shut down the transformers. Files remaining is None if
# we are still waiting for final results from the DID finder

if files_remaining is not None and files_remaining == 0:
self.transform_complete(session, current_app.logger, transform_req, self.transformer_manager)
logger.info("FileComplete", extra={**log_extra, 'metric': info})
try:
session = db.session

try:
# Add the transformation result to the database and verify that
# we've not processed this file already
self.save_transform_result(request_id, info, session)
except IntegrityError:
logger.warning("Ignoring duplicate result report",
extra=log_extra)
return "Ignoring duplicate result report", 200

# Lookup the transformation request and increment either the successful or failed file count
transform_req = self.record_file_complete(session,
current_app.logger,
request_id,
info, log_extra)

current_app.logger.info("FileComplete. Request state.", extra={
'requestId': request_id,
'files_remaining': transform_req.files_remaining,
'files_completed': transform_req.files_completed,
'files_failed': transform_req.files_failed,
'report_processed_time': (time.time() - start_time)
})
return "Ok"
if transform_req is None:
logger.error("Request not found", extra=log_extra)
return "Request not found", 404

# Now we can see if we are done with the transformation and
# can shut down the transformers. Files remaining is None if
# we are still waiting for final results from the DID finder
with session.begin():
files_remaining = transform_req.files_remaining
if files_remaining is not None and files_remaining == 0:
self.transform_complete(session, current_app.logger, transform_req, self.transformer_manager)

current_app.logger.info("FileComplete. Request state.", extra={
**log_extra,
'files_remaining': transform_req.files_remaining,
'files_completed': transform_req.files_completed,
'files_failed': transform_req.files_failed,
'report_processed_time': (time.time() - start_time)
})
return "Ok", 200
except Exception as e:
logger.exception("Error processing file complete",
extra=log_extra, exc_info=e)
return "Error processing file complete", 500

@staticmethod
@file_complete_ops_retry
def record_file_complete(session: Session, logger: Logger, request_id: str,
info: dict[str, str]) -> TransformRequest | None:
info: dict[str, str], log_extra: dict[str, str]) -> TransformRequest | None:

with session.begin():
# Lock the row for update
Expand All @@ -126,25 +149,23 @@ def record_file_complete(session: Session, logger: Logger, request_id: str,

if transform_req is None:
msg = f"Request not found with id: '{request_id}'"
logger.error(msg, extra={'requestId': request_id})
logger.error(msg, extra=log_extra)
return None

if info['status'] == 'success':
transform_req.files_completed += 1
else:
transform_req.files_failed += 1

session.flush() # Flush the changes to the database

return transform_req

@staticmethod
@file_complete_ops_retry
def save_transform_result(transform_req: TransformRequest, info: dict[str, str], session: Session):
def save_transform_result(request_id: str, info: dict[str, str], session: Session):
with session.begin():
rec = TransformationResult(
file_id=info['file-id'],
request_id=transform_req.request_id,
request_id=request_id,
file_path=info['file-path'],
transform_status=info['status'],
transform_time=info['total-time'],
Expand All @@ -153,16 +174,14 @@ def save_transform_result(transform_req: TransformRequest, info: dict[str, str],
avg_rate=info['avg-rate']
)
session.add(rec)
return transform_req.files_remaining

@staticmethod
@file_complete_ops_retry
def transform_complete(session: Session, logger: Logger, transform_req: TransformRequest,
transformer_manager: TransformerManager):
with session.begin():
transform_req.status = TransformStatus.complete
transform_req.finish_time = datetime.now(tz=timezone.utc)
session.add(transform_req)
transform_req.status = TransformStatus.complete
transform_req.finish_time = datetime.now(tz=timezone.utc)
session.add(transform_req)

logger.info("Request completed. Shutting down transformers",
extra={'requestId': transform_req.request_id})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import psycopg2
import pytest
from sqlalchemy.exc import IntegrityError

from servicex_app.models import TransformationResult, TransformRequest, TransformStatus
from servicex_app.transformer_manager import TransformerManager
Expand Down Expand Up @@ -151,6 +152,45 @@ def test_put_transform_file_complete_no_files_remaining(self,
mock_transformer_manager.shutdown_transformer_job.assert_called_with('1234',
'my-ws')

def test_put_transform_file_complete_duplicate_report(self,
mocker,
mock_transformer_manager,
db_session,
mock_transform_request_lookup,
fake_transform_request,
file_complete_response,
test_client):
fake_transform_request.files_completed = 6
fake_transform_request.files_failed = 2
db_session.add.side_effect = [
None,
IntegrityError('duplicate key value violates unique constraint',
params=['request_id'], orig=Exception())
]

response1 = test_client.put(
'/servicex/internal/transformation/1234/file-complete',
json=file_complete_response)

response2 = test_client.put(
'/servicex/internal/transformation/1234/file-complete',
json=file_complete_response)

assert response1.status_code == 200
assert response2.status_code == 200

db_session.query.return_value.filter_by.assert_called_with(request_id='1234')
assert fake_transform_request.files_completed == 7
assert fake_transform_request.files_failed == 2

assert db_session.add.call_count == 2
assert isinstance(db_session.add.mock_calls[0][1][0], TransformationResult)
assert db_session.add.mock_calls[0][1][0].file_id == 42

assert db_session.add.call_count == 2
assert db_session.add.mock_calls[0][1][0].file_id == 42
assert db_session.add.mock_calls[1][1][0].file_id == 42

def test_put_transform_file_complete_unknown_request_id(self,
mock_transformer_manager,
db_session,
Expand Down Expand Up @@ -208,7 +248,6 @@ def test_database_error_request_update(self,
assert fake_transform_request.finish_time is None

# Verify that we retried after the database error
assert db_session.flush.call_count == 2
db_session.query.return_value.filter_by.assert_called_with(request_id='1234')

mock_transformer_manager.shutdown_transformer_job.assert_not_called()
Expand Down
Loading

0 comments on commit 0e4d51d

Please sign in to comment.