Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/pre-processing-service/app/model/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ class S3UploadData(BaseModel):
uploaded_at: str = Field(
..., title="업로드 완료 시간", description="S3 업로드 완료 시간"
)
# 🆕 임시: 콘텐츠 생성용 단일 상품만 추가 (나중에 삭제 예정)
selected_product_for_content: Optional[Dict] = Field(
None,
title="콘텐츠 생성용 선택 상품",
description="임시: 블로그 콘텐츠 생성을 위해 선택된 단일 상품 정보",
)


# 최종 응답 모델
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Dict, Optional
# app/service/blog/blog_publish_service.py
from typing import Dict
from app.errors.CustomException import CustomException
from app.model.schemas import RequestBlogPublish
from app.service.blog.blog_service_factory import BlogServiceFactory
Expand All @@ -10,28 +11,23 @@ class BlogPublishService:
def __init__(self):
self.factory = BlogServiceFactory()

def publish_content(
self,
request: RequestBlogPublish,
) -> Dict:
def publish_content(self, request: RequestBlogPublish) -> Dict:
"""
생성된 블로그 콘텐츠를 배포합니다.

Args:
request: 블로그 발행 요청 데이터
blog_id: 블로그 아이디
blog_password: 블로그 비밀번호
request: RequestBlogPublish 객체
"""
try:
# 팩토리를 통해 적절한 서비스 생성
# 블로그 서비스 생성 (네이버, 티스토리, 블로거 등)
blog_service = self.factory.create_service(
request.tag,
blog_id=request.blog_id,
blog_password=request.blog_pw,
blog_name=request.blog_name,
)

# 공통 인터페이스로 포스팅 실행
# 콘텐츠 포스팅
response_data = blog_service.post_content(
title=request.post_title,
content=request.post_content,
Expand All @@ -40,16 +36,20 @@ def publish_content(

if not response_data:
raise CustomException(
500, f"{request.tag} 블로그 포스팅에 실패했습니다.", "POSTING_FAIL"
detail=f"{request.tag} 블로그 포스팅에 실패했습니다.",
status_code=500,
code="POSTING_FAIL",
)

return response_data

except CustomException:
# 이미 처리된 예외는 그대로 전달
# 이미 CustomException이면 그대로 전달
raise
except Exception as e:
# 예상치 못한 예외 처리
raise CustomException(
500, f"블로그 포스팅 중 오류가 발생했습니다: {str(e)}", "ERROR"
detail=f"블로그 포스팅 중 오류가 발생했습니다: {str(e)}",
status_code=500,
code="ERROR",
)
95 changes: 94 additions & 1 deletion apps/pre-processing-service/app/service/s3_upload_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,16 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict:
if product_index < len(crawled_products):
await asyncio.sleep(1)

# 🆕 임시: 콘텐츠 생성용 단일 상품 선택 로직
selected_product_for_content = self._select_single_product_for_content(
crawled_products, upload_results
)

logger.success(
f"S3 업로드 서비스 완료: 총 성공 이미지 {total_success_images}개, 총 실패 이미지 {total_fail_images}개"
)

# 간소화된 응답 데이터 구성
# 기존 응답 데이터 구성
data = {
"upload_results": upload_results,
"summary": {
Expand All @@ -115,6 +120,8 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict:
"total_fail_images": total_fail_images,
},
"uploaded_at": time.strftime("%Y-%m-%d %H:%M:%S"),
# 🆕 임시: 콘텐츠 생성용 단일 상품만 추가 (나중에 삭제 예정)
"selected_product_for_content": selected_product_for_content,
}

message = f"S3 업로드 완료: {total_success_images}개 이미지 업로드 성공, 상품 데이터 JSON 파일 포함"
Expand All @@ -123,3 +130,89 @@ async def upload_crawled_products_to_s3(self, request: RequestS3Upload) -> dict:
except Exception as e:
logger.error(f"S3 업로드 서비스 전체 오류: {e}")
raise InvalidItemDataException()

def _select_single_product_for_content(
self, crawled_products: List[Dict], upload_results: List[Dict]
) -> Dict:
"""
🆕 임시: 콘텐츠 생성을 위한 단일 상품 선택 로직
우선순위: 1) S3 업로드 성공한 상품 중 이미지 개수가 많은 것
2) 없다면 크롤링 성공한 첫 번째 상품
"""
try:
# 1순위: S3 업로드 성공하고 이미지가 있는 상품들
successful_uploads = [
result
for result in upload_results
if result.get("status") == "completed"
and result.get("success_count", 0) > 0
]

if successful_uploads:
# 이미지 개수가 가장 많은 상품 선택
best_upload = max(
successful_uploads, key=lambda x: x.get("success_count", 0)
)
selected_index = best_upload["product_index"]

# 원본 크롤링 데이터에서 해당 상품 찾기
for product_info in crawled_products:
if product_info.get("index") == selected_index:
logger.info(
f"콘텐츠 생성용 상품 선택: index={selected_index}, "
f"title='{product_info.get('product_detail', {}).get('title', 'Unknown')[:30]}', "
f"images={best_upload.get('success_count', 0)}개"
)
return {
"selection_reason": "s3_upload_success_with_most_images",
"product_info": product_info,
"s3_upload_info": best_upload,
}

# 2순위: 크롤링 성공한 첫 번째 상품 (S3 업로드 실패해도)
for product_info in crawled_products:
if product_info.get("status") == "success" and product_info.get(
"product_detail"
):

# 해당 상품의 S3 업로드 정보 찾기
upload_info = None
for result in upload_results:
if result.get("product_index") == product_info.get("index"):
upload_info = result
break

logger.info(
f"콘텐츠 생성용 상품 선택 (fallback): index={product_info.get('index')}, "
f"title='{product_info.get('product_detail', {}).get('title', 'Unknown')[:30]}'"
)
return {
"selection_reason": "first_crawl_success",
"product_info": product_info,
"s3_upload_info": upload_info,
}

# 3순위: 아무거나 (모든 상품이 실패한 경우)
if crawled_products:
logger.warning("모든 상품이 크롤링 실패 - 첫 번째 상품으로 fallback")
return {
"selection_reason": "fallback_first_product",
"product_info": crawled_products[0],
"s3_upload_info": upload_results[0] if upload_results else None,
}

logger.error("선택할 상품이 없습니다")
return {
"selection_reason": "no_products_available",
"product_info": None,
"s3_upload_info": None,
}

except Exception as e:
logger.error(f"단일 상품 선택 오류: {e}")
return {
"selection_reason": "selection_error",
"product_info": crawled_products[0] if crawled_products else None,
"s3_upload_info": upload_results[0] if upload_results else None,
"error": str(e),
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public ObjectNode build(Task task, Map<String, JsonNode> workflowContext) {
.ifPresent(tagsNode -> body.set("post_tags", tagsNode));
});

body.put("tag", "tistory");
body.put("blog_id", "[email protected]");
body.put("blog_pw", "kdyn2641*");
body.put("tag", "NAVER_BLOG");
body.put("blog_id", "wtecho331");
body.put("blog_pw", "wt505033@#");

return body;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class BlogRagBodyBuilder implements TaskBodyBuilder {
private final ObjectMapper objectMapper;
private static final String TASK_NAME = "블로그 RAG 생성 태스크";
private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크";
private static final String CRAWL_SOURCE_TASK = "상품 정보 크롤링 태스크";
private static final String S3_UPLOAD_SOURCE_TASK = "S3 업로드 태스크"; // 변경: 크롤링 → S3 업로드

@Override
public boolean supports(String taskName) {
Expand All @@ -36,9 +36,14 @@ public ObjectNode build(Task task, Map<String, JsonNode> workflowContext) {
.map(node -> node.path("data").path("keyword"))
.ifPresent(keywordNode -> body.set("keyword", keywordNode));

// 크롤링된 상품 정보 가져오기
Optional.ofNullable(workflowContext.get(CRAWL_SOURCE_TASK))
.map(node -> node.path("data").path("product_detail"))
// S3 업로드에서 선택된 상품 정보 가져오기 (변경된 부분)
Optional.ofNullable(workflowContext.get(S3_UPLOAD_SOURCE_TASK))
.map(
node ->
node.path("data")
.path("selected_product_for_content")
.path("product_info")
.path("product_detail"))
.ifPresent(productNode -> body.set("product_info", productNode));

// 기본 콘텐츠 설정
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import lombok.RequiredArgsConstructor;
Expand All @@ -30,11 +31,27 @@ public boolean supports(String taskName) {
public ObjectNode build(Task task, Map<String, JsonNode> workflowContext) {
ObjectNode body = objectMapper.createObjectNode();

// 유사도 분석에서 선택된 상품의 URL 가져오기
// ArrayNode 준비 (product_urls 배열로 변경)
ArrayNode productUrls = objectMapper.createArrayNode();

// 유사도 분석에서 선택된 상품들의 URL 가져오기 (복수로 변경)
Optional.ofNullable(workflowContext.get(SIMILARITY_SOURCE_TASK))
.map(node -> node.path("data").path("selected_product").path("url"))
.filter(urlNode -> !urlNode.isMissingNode() && !urlNode.asText().isEmpty())
.ifPresent(urlNode -> body.set("product_url", urlNode));
.ifPresent(
node -> {
JsonNode topProducts = node.path("data").path("top_products");
if (topProducts.isArray()) {
// top_products 배열에서 각 상품의 URL 추출
topProducts.forEach(
product -> {
JsonNode urlNode = product.path("url");
if (!urlNode.isMissingNode() && !urlNode.asText().isEmpty()) {
productUrls.add(urlNode.asText());
}
});
}
});

body.set("product_urls", productUrls);

return body;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package site.icebang.domain.workflow.runner.fastapi.body;

import java.util.Map;
import java.util.Optional;

import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import lombok.RequiredArgsConstructor;

import site.icebang.domain.workflow.model.Task;

@Component
@RequiredArgsConstructor
public class S3UploadBodyBuilder implements TaskBodyBuilder {

private final ObjectMapper objectMapper;
private static final String TASK_NAME = "S3 업로드 태스크";
private static final String KEYWORD_SOURCE_TASK = "키워드 검색 태스크";
private static final String CRAWL_SOURCE_TASK = "상품 정보 크롤링 태스크";

@Override
public boolean supports(String taskName) {
return TASK_NAME.equals(taskName);
}

@Override
public ObjectNode build(Task task, Map<String, JsonNode> workflowContext) {
ObjectNode body = objectMapper.createObjectNode();

// 키워드 정보 가져오기
Optional.ofNullable(workflowContext.get(KEYWORD_SOURCE_TASK))
.map(node -> node.path("data").path("keyword"))
.ifPresent(keywordNode -> body.set("keyword", keywordNode));

// 크롤링된 상품 데이터 가져오기
Optional.ofNullable(workflowContext.get(CRAWL_SOURCE_TASK))
.map(node -> node.path("data").path("crawled_products"))
.ifPresent(crawledProductsNode -> body.set("crawled_products", crawledProductsNode));

// 기본 폴더 설정
body.put("base_folder", "product");

return body;
}
}
34 changes: 22 additions & 12 deletions apps/user-service/src/main/resources/sql/03-insert-workflow.sql
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,29 @@ INSERT INTO `task` (`id`, `name`, `type`, `parameters`) VALUES
)),
(5, '상품 정보 크롤링 태스크', 'FastAPI', JSON_OBJECT(
'endpoint', '/products/crawl', 'method', 'POST',
'body', JSON_OBJECT('product_url', 'String') -- { "product_url": str }
'body', JSON_OBJECT('product_urls', 'List') -- { "product_urls": List[str] } 수정됨
)),
-- 🆕 S3 업로드 태스크 추가
(6, 'S3 업로드 태스크', 'FastAPI', JSON_OBJECT(
'endpoint', '/products/s3-upload', 'method', 'POST',
'body', JSON_OBJECT( -- { keyword: str, crawled_products: List, base_folder: str }
'keyword', 'String',
'crawled_products', 'List',
'base_folder', 'String'
)
)),
-- RAG관련 request body는 추후에 결정될 예정
(6, '블로그 RAG 생성 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/blogs/rag/create', 'method', 'POST')),
(7, '블로그 발행 태스크', 'FastAPI', JSON_OBJECT(
(7, '블로그 RAG 생성 태스크', 'FastAPI', JSON_OBJECT('endpoint', '/blogs/rag/create', 'method', 'POST')),
(8, '블로그 발행 태스크', 'FastAPI', JSON_OBJECT(
'endpoint', '/blogs/publish', 'method', 'POST',
'body', JSON_OBJECT( -- { tag: str, blog_id: str, ... }
'tag', 'String',
'blog_id', 'String',
'blog_pw', 'String',
'blog_name', 'String',
'post_title', 'String',
'post_content', 'String',
'post_tags', 'List'
'tag', 'NAVER_BLOG',
'blog_id', 'wtecho331',
'blog_pw', 'wt505033@#',
'blog_name', '박스박스dasdsafs.',
'post_title', '박스박스dasdsafs.',
'post_content', '퉁퉁퉁퉁퉁퉁퉁사후르',
'post_tags', '[]'
)
))
ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parameters = VALUES(parameters), updated_at = NOW();
Expand All @@ -80,8 +89,9 @@ INSERT INTO `workflow_job` (`workflow_id`, `job_id`, `execution_order`) VALUES

-- Job-Task 연결
INSERT INTO `job_task` (`job_id`, `task_id`, `execution_order`) VALUES
(1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5),
(2, 6, 1), (2, 7, 2)
-- Job 1: 상품 분석 (키워드검색 → 상품검색 → 매칭 → 유사도 → 크롤링 → S3업로드)
(1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, 5), (1, 6, 6),
(2, 7, 1), (2, 8, 2)
ON DUPLICATE KEY UPDATE execution_order = VALUES(execution_order);

-- 스케줄 설정 (매일 오전 8시)
Expand Down
Loading