Skip to content

Commit

Permalink
feat(pkg/task/transfer.go): transform loop (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhu327 authored Aug 15, 2022
1 parent 0833906 commit 5549a34
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions pkg/task/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,11 @@ func NewGroupAlterEventTransfer(producer producer.Producer) *GroupAlterEventTran
func (t *GroupAlterEventTransfer) Run() {
logger := logging.GetWorkerLogger()

// run every 30 seconds
for range time.Tick(30 * time.Second) {
for {
logger.Info("Start transfer group alter event to subject action alter event")
t.stats.totalCount += 1

err := t.transform()
count, err := t.transform()
if err != nil {
t.stats.failCount += 1
logger.WithError(err).Error("transform fail")
Expand All @@ -128,8 +127,15 @@ func (t *GroupAlterEventTransfer) Run() {
"error": err.Error(),
},
)

time.Sleep(30 * time.Second)
} else {
t.stats.successCount += 1

// 时间段内的消息处理完成后, 休眠30秒
if count < int(eventLimit) {
time.Sleep(30 * time.Second)
}
}

if t.stats.totalCount%1000 == 0 || time.Since(t.stats.lastShowProcessTime) > 30*time.Second {
Expand All @@ -140,32 +146,33 @@ func (t *GroupAlterEventTransfer) Run() {
}
}

func (t *GroupAlterEventTransfer) transform() error {
func (t *GroupAlterEventTransfer) transform() (count int, err error) {
errorWrapf := errorx.NewLayerFunctionErrorWrapf(transferLayer, "transform")

createdAt := time.Now().Add(-30 * time.Second).Unix()
events, err := t.service.ListBeforeCreateAt(createdAt, eventLimit)
if err != nil {
return errorWrapf(err, "service.ListBeforeCreateAt fail createdAt=`%d`", createdAt)
return 0, errorWrapf(err, "service.ListBeforeCreateAt fail createdAt=`%d`", createdAt)
}

if len(events) == 0 {
return nil
count = len(events)
if count == 0 {
return count, nil
}

// 生成subject action alter event
subjectActionAlterEvents := convertToSubjectActionAlterEvent(events)

tx, err := database.GenerateDefaultDBTx()
if err != nil {
return errorWrapf(err, "database.GenerateDefaultDBTx fail", "")
return 0, errorWrapf(err, "database.GenerateDefaultDBTx fail", "")
}
defer database.RollBackWithLog(tx)

// 生成 subject action alter event 并同时删除 group alter event
err = t.subjectActionAlterEventService.BulkCreateWithTx(tx, subjectActionAlterEvents)
if err != nil {
return errorWrapf(
return 0, errorWrapf(
err,
"subjectActionAlterEventService.BulkCreateWithTx fail events=`%v`",
subjectActionAlterEvents,
Expand All @@ -179,12 +186,12 @@ func (t *GroupAlterEventTransfer) transform() error {

err = t.service.BulkDeleteWithTx(tx, eventUUIDs)
if err != nil {
return errorWrapf(err, "service.BulkDeleteWithTx fail eventUUIDs=`%v`", eventUUIDs)
return 0, errorWrapf(err, "service.BulkDeleteWithTx fail eventUUIDs=`%v`", eventUUIDs)
}

err = tx.Commit()
if err != nil {
return errorWrapf(err, "tx.Commit fail", "")
return 0, errorWrapf(err, "tx.Commit fail", "")
}

messageUUIDs := make([]string, 0, len(subjectActionAlterEvents))
Expand All @@ -195,21 +202,21 @@ func (t *GroupAlterEventTransfer) transform() error {
// 发送消息
err = t.producer.Publish(messageUUIDs...)
if err != nil {
return errorWrapf(err, "producer.Publish fail messageUUIDs=`%v`", messageUUIDs)
return 0, errorWrapf(err, "producer.Publish fail messageUUIDs=`%v`", messageUUIDs)
}

// 变更消息状态为已推送
err = t.subjectActionAlterEventService.BulkUpdateStatus(messageUUIDs, types.SubjectActionAlterEventStatusPushed)
if err != nil {
return errorWrapf(
return 0, errorWrapf(
err,
"subjectActionAlterEventService.BulkUpdateStatus fail messageUUIDs=`%v`, status=`%d`",
messageUUIDs,
types.SubjectActionAlterEventStatusPushed,
)
}

return nil
return count, nil
}

func convertToSubjectActionAlterEvent(events []types.GroupAlterEvent) []types.SubjectActionAlterEvent {
Expand Down

0 comments on commit 5549a34

Please sign in to comment.