Skip to content

Commit

Permalink
Merge pull request #173 from kookmin-sw/server/develop
Browse files Browse the repository at this point in the history
Server/develop
  • Loading branch information
donggook-me authored May 24, 2024
2 parents 5425452 + cc720e6 commit 2d64db7
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/model/fastapi/kafkaProducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def kafkaSend(cameraId, interestFrameCnt, passedFrameCnt, arriveTime, leaveTime,
# 로컬 서버
# bootstrap_servers=['localhost:9092'],
# 클라우드 카프카 서버 -> 테스트시 가능하면 해당 서버로 보내주세요.
bootstrap_servers=['43.202.60.64:29092'],
bootstrap_servers=['3.34.47.236:29092'],
value_serializer=lambda x:dumps(x).encode('utf-8')
)

Expand Down
71 changes: 71 additions & 0 deletions src/model/fastapi/kafkaProducerTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from kafka import KafkaProducer
from json import dumps
import time
import datetime

# 사용 라이브러리 -> pip install kafka-python

# Parameter 값 설명

# cameraId => 임의로 보내는 카메라 Id 값 (하나의 모델이라고 가정시 같은 Integer 값으로 보내면 된다.) => 테스트시 1,2,3 으로 보내기 !

# interestFrameCnt => 등장한 인물의 관심 있는 것으로 추정되는 프레임의 개수 (int)

# passedFrameCnt => 등장한 인물의 관심 있는 것으로 추정되는 프레임의 개수 (int)

# arriveTime -> 인물의 등장 시간 - time.localtime() 형식 (문제 있을 시 변경 요청 해주셔도 됩니다.)

# leaveTime -> 인물의 퇴장 시간 - time.localtime()

# staringframeData -> 인물 등장하고 관심 표현 여부 0,1로 나타낸 리스트

# male -> 남성 : 1 / 여성 : 0

# age -> 추정 나이 - ex: 23 (integer)

# fps -> frame per second - 1 로 통일


def kafkaSend(producer, cameraId, interestFrameCnt, passedFrameCnt, arriveTime, leaveTime, staringframeData, male, age, fps):
data = {'cameraId' : cameraId, 'startAt' : arriveTime, 'leaveAt' : leaveTime,
'passedFrameCnt' : passedFrameCnt, 'interestFrameCnt' : interestFrameCnt, "staringData" : staringframeData,
"male": male, "age" : age, "fps" : fps}
producer.send('drm-face-topic', value=data)
# producer.poll()



if __name__ == '__main__':
producer = KafkaProducer(
acks=0,
compression_type='gzip',
# bootstrap_servers=['43.203.218.109:29092'],
bootstrap_servers=['localhost:29092'],
value_serializer=lambda x:dumps(x).encode('utf-8')
)
exampleTime = time.localtime(time.time())
printTime = datetime.datetime.now()

print("KAFKA TEST START")
print("TEST TIME : ", printTime.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
for i in range(1000):
# 아래의 테스트 전송 10개를 1 suite 로 지정
if(i % 50 == 0):
print("TEST PROCESSING : ", i)
kafkaSend(producer, 2, 0, 5, exampleTime, exampleTime, [0,0,0,0,0], 0, 32, 10)
kafkaSend(producer, 2, 9, 18, exampleTime, exampleTime, [0,0,0,1,1,1,0,0,0,1,1,1,0,0,0,1,1,1], 0, 18, 10)
kafkaSend(producer, 2, 1, 5, exampleTime, exampleTime, [0,0,0,0,1], 1, 45, 5)

kafkaSend(producer, 6, 9, 18, exampleTime, exampleTime, [0,0,0,1,1,1,0,0,0,1,1,1,0,0,0,1,1,1], 1, 80, 10)
kafkaSend(producer, 6, 0, 5, exampleTime, exampleTime, [0,0,0,0,0], 0, 25, 10)
kafkaSend(producer, 6, 6, 14, exampleTime, exampleTime, [0,0,0,1,1,1,0, 0,0,0,1,1,1,0], 0, 23, 10)
kafkaSend(producer, 6, 9, 18, exampleTime, exampleTime, [0,0,0,1,1,1,0,0,0,1,1,1,0,0,0,1,1,1], 0, 22, 5)

kafkaSend(producer, 3, 9, 18, exampleTime, exampleTime, [0,0,0,1,1,1,0,0,0,1,1,1,0,0,0,1,1,1], 1, 15, 10)
kafkaSend(producer, 3, 6, 14, exampleTime, exampleTime, [0,0,0,1,1,1,0, 0,0,0,1,1,1,0], 1, 20, 10)
kafkaSend(producer, 3, 9, 18, exampleTime, exampleTime, [0,0,0,1,1,1,0,0,0,1,1,1,0,0,0,1,1,1], 0, 27, 5)
producer.flush()
printEndTime = datetime.datetime.now()
print("TEST TIME : ", printTime.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
print("TEST END TIME : ", printEndTime.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])
print("TEST SEND DONE")
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public dataPerDay(){
public static class dataFilter {
private boolean male;
private boolean female;
@Schema(description = "필터링 나이대(10대 이하, 10대, 20대, 30대, 40대, 50대, 60대 이상) 총 7개 분류", example = "[true, false, false, false, false, false, true]")
@Schema(description = "필터링 나이대(10대, 20대, 30대, 40대, 50대, 60대 이상) 총 6개 분류", example = "[true, false, false, false, false, false]")
private List<Boolean> ageRanges; // Add this field for age ranges
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ public DashboardDetailDataInfo(){
this.attentionRatio = 0F;
}

public void addDetailDataInfo(DashboardDetailDataInfo inputInfo){
// divide by 0 를 막기 위해 우선적으로 더해주는 것이 좋음.
this.totalPeopleCount += inputInfo.getTotalPeopleCount();
this.InterestPeopleCnt += inputInfo.getInterestPeopleCnt();
this.avgStaringTime = calculateStaringTime(inputInfo.getAvgStaringTime(), inputInfo.getInterestPeopleCnt());
this.attentionRatio = calculateAttentionRatio();
}
// public void addDetailDataInfo(DashboardDetailDataInfo inputInfo){
// // divide by 0 를 막기 위해 우선적으로 더해주는 것이 좋음.
// this.totalPeopleCount += inputInfo.getTotalPeopleCount();
// this.InterestPeopleCnt += inputInfo.getInterestPeopleCnt();
// this.avgStaringTime = calculateStaringTime(inputInfo.getAvgStaringTime(), inputInfo.getInterestPeopleCnt());
// this.attentionRatio = calculateAttentionRatio();
// }
public void addDetailData(DailyDetailBoard board){
this.totalPeopleCount += board.getTotalPeopleCount();
this.InterestPeopleCnt += board.getInterestCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,58 @@ public class DetectedFaceConsumer {
private final DailyDetailBoardService dailyDetailBoardService;


//컨슈머가 캐치하는 구간
// 컨슈머가 캐치하는 구간
// @KafkaListener(topics = "drm-face-topic")
//// @KafkaListener(topics = "drm-face-topic", containerFactory = "kafkaBatchListeningFactory")
// public void updateQty(String kafkaMessage) {
// log.info("얼굴인식 Kafka Message: ->" + kafkaMessage);
//
// Map<Object, Object> map = new HashMap<>();
// ObjectMapper mapper = new ObjectMapper();
// try {
// map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
// } catch (JsonProcessingException ex) {
// ex.printStackTrace();
// }
//
// List<Integer> startTimeList = (List<Integer>) map.get("startAt");
// List<Integer> arriveTimeList = (List<Integer>) map.get("leaveAt");
//
// DetectedFace detectedFace = DetectedFace.toEntity(map, startTimeList, arriveTimeList);
//
// MediaApplication mediaApplication = mediaApplicationService.findByCameraIdAndDate((Integer) map.get("cameraId"), detectedFace.getArriveAt());
// detectedFace.updateMediaApplication(mediaApplication);
// DetectedFace savedDetectedFace = detectedFaceRepository.save(detectedFace);
// DailyMediaBoard dailyMediaBoard = dailyMediaBoardService.updateDailyBoard(savedDetectedFace);
// dailyDetailBoardService.updateDatailBoard(detectedFace,dailyMediaBoard);
// }

// 컨슈머가 캐치하는 구간 -> Batch Listener
@KafkaListener(topics = "drm-face-topic")
public void updateQty(String kafkaMessage) {
log.info("얼굴인식 Kafka Message: ->" + kafkaMessage);

Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
public void updateBatchQty(List<String> kafkaMessages) {
for (String kafkaMessage : kafkaMessages) {
log.info("얼굴인식 Kafka Message: ->" + kafkaMessage);

List<Integer> startTimeList = (List<Integer>) map.get("startAt");
List<Integer> arriveTimeList = (List<Integer>) map.get("leaveAt");
// Your existing processing logic for each Kafka message
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {
});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}

DetectedFace detectedFace = DetectedFace.toEntity(map, startTimeList, arriveTimeList);
List<Integer> startTimeList = (List<Integer>) map.get("startAt");
List<Integer> arriveTimeList = (List<Integer>) map.get("leaveAt");

MediaApplication mediaApplication = mediaApplicationService.findByCameraIdAndDate((Integer) map.get("cameraId"), detectedFace.getArriveAt());
detectedFace.updateMediaApplication(mediaApplication);
DetectedFace savedDetectedFace = detectedFaceRepository.save(detectedFace);
DailyMediaBoard dailyMediaBoard = dailyMediaBoardService.updateDailyBoard(savedDetectedFace);
dailyDetailBoardService.updateDatailBoard(detectedFace,dailyMediaBoard);
DetectedFace detectedFace = DetectedFace.toEntity(map, startTimeList, arriveTimeList);

MediaApplication mediaApplication = mediaApplicationService.findByCameraIdAndDate((Integer) map.get("cameraId"), detectedFace.getArriveAt());
detectedFace.updateMediaApplication(mediaApplication);
DetectedFace savedDetectedFace = detectedFaceRepository.save(detectedFace);
DailyMediaBoard dailyMediaBoard = dailyMediaBoardService.updateDailyBoard(savedDetectedFace);
dailyDetailBoardService.updateDatailBoard(detectedFace, dailyMediaBoard);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,27 @@
@EnableKafka
@Configuration
public class KafkaConsumerConfig {


@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
// properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://3.35.141.194:29092"); // 배포 환경 설정
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:29092");
// properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://43.203.218.109:29092"); // 배포 환경 설정
// properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "175.45.203.201:9092"); // 배포 환경 설정
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());

kafkaListenerContainerFactory.setConcurrency(5);
kafkaListenerContainerFactory.setBatchListener(true);
return kafkaListenerContainerFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public DashboardResponse.DashboardDataInfo getDayBoards(Long userId, Long mediaA
// 광고 집행 단위가 유저의 것인지 확인
User user = userService.getUser(userId);
MediaApplication mediaApplication = mediaApplicationService.findById(mediaApplicationId);
mediaApplicationService.deleteVerify(mediaApplication, user);
mediaApplicationService.verifyUser(mediaApplication, user);
// 일별 데이터 조회
DailyMediaBoard board = dailyMediaBoardService.findDailyBoardByDateAndApplication(mediaApplication, date);

Expand Down Expand Up @@ -169,7 +169,7 @@ public void deleteAll() {

public DashboardResponse.DashboardDetailDataInfo getDashboardFiltered(Long userId, Long dashboardId, boolean male, boolean female, List<Boolean> ageRanges) {
verifyUser(userId, dashboardId);
if(ageRanges.size() != 7) throw new IllegalArgumentException("FILTER DASHBOARD SEARCH FAILED : AGE RANGES DOES NOT HAVE 7 COLUMN");
if(ageRanges.size() != 6) throw new IllegalArgumentException("FILTER DASHBOARD SEARCH FAILED : AGE RANGES DOES NOT HAVE 6 COLUMN");
DashboardResponse.DashboardDetailDataInfo infoSum = new DashboardResponse.DashboardDetailDataInfo();
List<MediaApplication> mediaAppList = findMediaApplicationByDashboardId(userId, dashboardId);

Expand All @@ -180,24 +180,44 @@ public DashboardResponse.DashboardDetailDataInfo getDashboardFiltered(Long userI
// 해당 집행된 광고에 대한 age / male or female 여부에 따른 합계
for (int i = 0; i < ageRanges.size(); i++) {
if (ageRanges.get(i)) {
if (male) {
List<DailyDetailBoard> detailBoards = dailyDetailBoardService.findDetailBoardByDailyBoard(board, (i + 1), true);
for (DailyDetailBoard detailBoard : detailBoards) {
infoSum.addDetailData(detailBoard);
// 마지막 range -> 60대 이상의 경우 60,70,80,90대 조회 후 합계
int range = i+1; // 나이대 range -> 1,2,3,4,5,6 (10대,20대,30대,40대,50대,60대 이상)
if(range == 6){
for (int j=i; j<=9; j++){
if (male) {
addDetailDashboardData(board, infoSum, j, true);
}
if (female) {
addDetailDashboardData(board, infoSum, j, false);
}
}
}
if (female) {
List<DailyDetailBoard> detailBoards = dailyDetailBoardService.findDetailBoardByDailyBoard(board, (i + 1), false);
for (DailyDetailBoard detailBoard : detailBoards) {
infoSum.addDetailData(detailBoard);
// 10,20,30,40,50 대의 경우 해당 구간만 조회하여 반영
else {
if (male) {
addDetailDashboardData(board, infoSum, range, true);
}
if (female) {
addDetailDashboardData(board, infoSum, range, false);
}
}

}
}
}
}
return infoSum;

}

private DashboardResponse.DashboardDetailDataInfo addDetailDashboardData(DailyMediaBoard board, DashboardResponse.DashboardDetailDataInfo infoSum, int ageRangeVal, Boolean maleCheck){
List<DailyDetailBoard> detailBoards = dailyDetailBoardService.findDetailBoardByDailyBoard(board, ageRangeVal, maleCheck);
for (DailyDetailBoard detailBoard : detailBoards) {
infoSum.addDetailData(detailBoard);
}
return infoSum;
}


}

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.drm.server.domain.user.User;
import com.drm.server.exception.ForbiddenException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import software.amazon.ion.NullValueException;
Expand All @@ -25,6 +26,7 @@

import static com.drm.server.domain.mediaApplication.Status.WAITING;

@Slf4j
@Service
@RequiredArgsConstructor
public class MediaApplicationService {
Expand All @@ -37,7 +39,9 @@ public MediaApplication createMediaApplication(Media media, Location location, S
MediaApplication mediaApplication = MediaApplication.toEntity(startDate, endDate, media, location);
// 같은 장소에 송출 날짜 겹치는 경우 예외
verifyDuplicateMediaInSameLocation(mediaApplication);
return mediaApplicationRepository.save(mediaApplication);
MediaApplication saved = mediaApplicationRepository.save(mediaApplication);
log.info("CREATE MEDIA APPLICATION : " + saved.getMediaApplicationId());
return saved;
}
public void deleteMediaApplication(Long mediaId, Long mediaApplicationId, User user){
MediaApplication mediaApplication = findById( mediaApplicationId);
Expand Down Expand Up @@ -94,9 +98,12 @@ public void verifyMedia(MediaApplication mediaApplication, Long mediaId ){
if(mediaApplication.getMedia().getMediaId() != mediaId) throw new IllegalArgumentException("mediaId가 잘못되었습니다");
}
public void verifyUser(MediaApplication mediaApplication, User user){
if(!mediaApplication.getMedia().getDashboard().getUser().equals(user)) throw new ForbiddenException("해당 신청에 접근 권한이 없습니다");
// 객체 동등한지 여부 확인 -> Id 값 수정으로 변경 (Test 코드에서 복사해와서 함수 실행하는 부분 편리를 위해)
// if(!mediaApplication.getMedia().getDashboard().getUser().equals(user)) throw new ForbiddenException("해당 신청에 접근 권한이 없습니다");
if(mediaApplication.getMedia().getDashboard().getUser().getUserId() != user.getUserId()) throw new ForbiddenException("해당 신청에 접근 권한이 없습니다");
}

// 아래는 삭제할때 MediaApp - user 의 일치를 확인하는 verify
public void deleteVerify(MediaApplication mediaApplication, User user){
verifyUser(mediaApplication,user);
if(!mediaApplication.getStatus().equals(WAITING)) throw new ForbiddenException("신청 대기일때만 삭제 가능합니다");
Expand Down
Loading

0 comments on commit 2d64db7

Please sign in to comment.