Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
0202zc committed Feb 24, 2023
1 parent 0797210 commit 9bf356a
Show file tree
Hide file tree
Showing 21 changed files with 149 additions and 101 deletions.
7 changes: 4 additions & 3 deletions cmd/api/handler/comment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package handler

import (
"context"
"github.com/bytedance-youthcamp-jbzx/tiktok/cmd/api/rpc"
"github.com/bytedance-youthcamp-jbzx/tiktok/internal/response"
kitex "github.com/bytedance-youthcamp-jbzx/tiktok/kitex/kitex_gen/comment"
"github.com/cloudwego/hertz/pkg/app"
"net/http"
"strconv"

"github.com/bytedance-youthcamp-jbzx/tiktok/cmd/api/rpc"
"github.com/bytedance-youthcamp-jbzx/tiktok/internal/response"
kitex "github.com/bytedance-youthcamp-jbzx/tiktok/kitex/kitex_gen/comment"
)

func CommentAction(ctx context.Context, c *app.RequestContext) {
Expand Down
10 changes: 10 additions & 0 deletions cmd/api/handler/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ func MessageAction(ctx context.Context, c *app.RequestContext) {
return
}

if len(c.Query("content")) == 0 {
c.JSON(http.StatusOK, response.MessageAction{
Base: response.Base{
StatusCode: -1,
StatusMsg: "参数 content 不能为空",
},
})
return
}

// 调用kitex/kitex_gen
req := &kitex.MessageActionRequest{
Token: token,
Expand Down
22 changes: 21 additions & 1 deletion cmd/api/handler/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,27 @@ func Login(ctx context.Context, c *app.RequestContext) {
func UserInfo(ctx context.Context, c *app.RequestContext) {
userId := c.Query("user_id")
token := c.Query("token")
id, _ := strconv.ParseInt(userId, 10, 64)
if len(token) == 0 {
c.JSON(http.StatusOK, response.UserInfo{
Base: response.Base{
StatusCode: -1,
StatusMsg: "token 已过期",
},
User: nil,
})
return
}
id, err := strconv.ParseInt(userId, 10, 64)
if err != nil {
c.JSON(http.StatusOK, response.UserInfo{
Base: response.Base{
StatusCode: -1,
StatusMsg: "user_id 不合法",
},
User: nil,
})
return
}

//调用kitex/kitex_genit
req := &kitex.UserInfoRequest{
Expand Down
2 changes: 1 addition & 1 deletion cmd/api/handler/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Feed(ctx context.Context, c *app.RequestContext) {
}
res, _ := rpc.Feed(ctx, req)
if res.StatusCode == -1 {
c.JSON(http.StatusOK, response.FavoriteList{
c.JSON(http.StatusOK, response.Feed{
Base: response.Base{
StatusCode: -1,
StatusMsg: res.StatusMsg,
Expand Down
5 changes: 2 additions & 3 deletions cmd/favorite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ func init() {
}

func main() {
// logger = z.InitLogger()
// defer logger.Sync()
defer service.FavoriteMq.Destroy()

// 服务注册
r, err := etcd.NewEtcdRegistry([]string{etcdAddr})
Expand All @@ -52,7 +51,7 @@ func main() {
server.WithRegistry(r),
server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
server.WithMuxTransport(),
// server.WithSuite(tracing.NewServerSuite()),
//server.WithSuite(tracing.NewServerSuite()),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
)

Expand Down
16 changes: 15 additions & 1 deletion cmd/favorite/service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,25 @@ package service
import (
"context"
"encoding/json"
"fmt"
"github.com/bytedance-youthcamp-jbzx/tiktok/dal/db"
"github.com/bytedance-youthcamp-jbzx/tiktok/dal/redis"
favorite "github.com/bytedance-youthcamp-jbzx/tiktok/kitex/kitex_gen/favorite"
user "github.com/bytedance-youthcamp-jbzx/tiktok/kitex/kitex_gen/user"
video "github.com/bytedance-youthcamp-jbzx/tiktok/kitex/kitex_gen/video"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/minio"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/rabbitmq"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/zap"
amqp "github.com/rabbitmq/amqp091-go"
"strings"
)

// FavoriteServiceImpl implements the last service interface defined in the IDL.
type FavoriteServiceImpl struct{}

// FavoriteAction implements the FavoriteServiceImpl interface.
func (s *FavoriteServiceImpl) FavoriteAction(ctx context.Context, req *favorite.FavoriteActionRequest) (resp *favorite.FavoriteActionResponse, err error) {
logger := zap.InitLogger()
// 解析token,获取用户id
claims, err := Jwt.ParseToken(req.Token)
if err != nil {
Expand All @@ -36,8 +42,16 @@ func (s *FavoriteServiceImpl) FavoriteAction(ctx context.Context, req *favorite.
//CreatedAt: time.Now(),
}
jsonFC, _ := json.Marshal(fc)
fmt.Println("Publish new message: ", fc)
if err = FavoriteMq.PublishSimple(ctx, jsonFC); err != nil {
logger.Errorf("消息队列发布错误:%v", err.Error())
logger.Errorf("消息队列发布错误:%v", err.Error())
if strings.Contains(err.Error(), amqp.ErrClosed.Reason) {
// 检测到通道关闭,则重连
FavoriteMq.Destroy()
FavoriteMq = rabbitmq.NewRabbitMQSimple("favorite")
logger.Errorln("消息队列通道尝试重连:favorite")
go consume()
}
res := &favorite.FavoriteActionResponse{
StatusCode: -1,
StatusMsg: "操作失败:服务器内部错误",
Expand Down
3 changes: 2 additions & 1 deletion cmd/favorite/service/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ var (

func Init(signingKey string) {
Jwt = jwt.NewJWT([]byte(signingKey))
GoCron()
//GoCron()
go consume()
}
8 changes: 5 additions & 3 deletions cmd/favorite/service/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"

"github.com/bytedance-youthcamp-jbzx/tiktok/dal/redis"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/gocron"
)
Expand All @@ -16,19 +15,22 @@ func consume() {
msgs, err := FavoriteMq.ConsumeSimple()
if err != nil {
fmt.Println(err.Error())
logger.Errorf("FavoriteMQ Err: %s", err.Error())
}
// 将消息队列的消息全部取出
for msg := range msgs {
fmt.Printf("==> Get new message: %v", msg.MessageId)
fc := new(redis.FavoriteCache)
// 解析json
if err = json.Unmarshal(msg.Body, &fc); err != nil {
logger.Errorf("json unmarshal error: %s", err.Error())
fmt.Println("json unmarshal error:" + err.Error())
continue
}
fmt.Printf("==> Get new message: %v\n", fc)
// 将结构体存入redis
if err = redis.UpdateFavorite(context.Background(), fc); err != nil {
fmt.Println("add to redis error:" + err.Error())
logger.Errorf("json unmarshal error: %s", err.Error())
fmt.Println("json unmarshal error:" + err.Error())
continue
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/message/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func main() {
server.WithRegistry(r),
server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
server.WithMuxTransport(),
// server.WithSuite(tracing.NewServerSuite()),
//server.WithSuite(tracing.NewServerSuite()),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
)

Expand Down
23 changes: 22 additions & 1 deletion cmd/message/service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"

"github.com/bytedance-youthcamp-jbzx/tiktok/dal/db"
"github.com/bytedance-youthcamp-jbzx/tiktok/dal/redis"
"github.com/bytedance-youthcamp-jbzx/tiktok/internal/tool"
Expand Down Expand Up @@ -39,8 +40,9 @@ func (s *MessageServiceImpl) MessageChat(ctx context.Context, req *message.Messa
}

var results []*db.Message
if lastTimestamp == 0 {
if lastTimestamp == -1 {
results, err = db.GetMessagesByUserIDs(ctx, userID, req.ToUserId, int64(lastTimestamp))
lastTimestamp = 0
} else {
results, err = db.GetMessagesByUserToUser(ctx, req.ToUserId, userID, int64(lastTimestamp))
}
Expand Down Expand Up @@ -123,6 +125,25 @@ func (s *MessageServiceImpl) MessageAction(ctx context.Context, req *message.Mes

toUserID, actionType := req.ToUserId, req.ActionType

if userID == toUserID {
logger.Errorln("不能给自己发送消息")
res := &message.MessageActionResponse{
StatusCode: -1,
StatusMsg: "消息发送失败:不能给自己发送消息",
}
return res, nil
}

relation, err := db.GetRelationByUserIDs(ctx, userID, toUserID)
if relation == nil {
logger.Errorf("消息发送失败:非朋友关系,无法发送")
res := &message.MessageActionResponse{
StatusCode: -1,
StatusMsg: "消息发送失败:非朋友关系,无法发送",
}
return res, nil
}

rsaContent, err := tool.RsaEncrypt([]byte(req.Content), publicKey)
if err != nil {
logger.Errorf("rsa encrypt error: %v\n", err.Error())
Expand Down
9 changes: 4 additions & 5 deletions cmd/relation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"fmt"
"net"

"github.com/cloudwego/kitex/pkg/limit"
"github.com/cloudwego/kitex/pkg/rpcinfo"

"github.com/bytedance-youthcamp-jbzx/tiktok/cmd/relation/service"
"github.com/bytedance-youthcamp-jbzx/tiktok/kitex/kitex_gen/relation/relationservice"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/etcd"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/middleware"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/viper"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/zap"
"github.com/cloudwego/kitex/pkg/limit"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
)

Expand All @@ -30,7 +29,7 @@ func init() {
}

func main() {
// defer logger.Sync()
defer service.RelationMq.Destroy()

// 服务注册
r, err := etcd.NewEtcdRegistry([]string{etcdAddr})
Expand All @@ -51,7 +50,7 @@ func main() {
server.WithRegistry(r),
server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
server.WithMuxTransport(),
// server.WithSuite(tracing.NewServerSuite()),
//server.WithSuite(tracing.NewServerSuite()),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
)

Expand Down
12 changes: 11 additions & 1 deletion cmd/relation/service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
relation "github.com/bytedance-youthcamp-jbzx/tiktok/kitex/kitex_gen/relation"
user "github.com/bytedance-youthcamp-jbzx/tiktok/kitex/kitex_gen/user"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/minio"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/rabbitmq"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/zap"
amqp "github.com/rabbitmq/amqp091-go"
"strings"
)

// RelationServiceImpl implements the last service interface defined in the IDL.
Expand Down Expand Up @@ -68,7 +71,14 @@ func (s *RelationServiceImpl) RelationAction(ctx context.Context, req *relation.
}
jsonRc, _ := json.Marshal(relationCache)
if err = RelationMq.PublishSimple(ctx, jsonRc); err != nil {
logger.Errorln(err.Error())
logger.Errorf("消息队列发布错误:%v", err.Error())
if strings.Contains(err.Error(), amqp.ErrClosed.Reason) {
// 检测到通道关闭,则重连
RelationMq.Destroy()
RelationMq = rabbitmq.NewRabbitMQSimple("relation")
logger.Errorln("消息队列通道尝试重连:relation")
go consume()
}
res := &relation.RelationActionResponse{
StatusCode: -1,
StatusMsg: "服务器内部错误:操作失败",
Expand Down
3 changes: 2 additions & 1 deletion cmd/relation/service/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ var (
func Init(signingKey string) {
Jwt = jwt.NewJWT([]byte(signingKey))
privateKey, _ = tool.ReadKeyFromFile(tool.PrivateKeyFilePath)
GoCron()
//GoCron()
go consume()
}
9 changes: 7 additions & 2 deletions cmd/relation/service/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,31 @@ import (
const frequency = 10

// 点赞服务消息队列消费者
func consume() {
func consume() error {
msgs, err := RelationMq.ConsumeSimple()
if err != nil {
fmt.Println(err.Error())
logger.Errorf("RelationMQ Err: %s", err.Error())
return err
}
// 将消息队列的消息全部取出
for msg := range msgs {
fmt.Printf("==> Get new message: %v", msg.MessageId)
rc := new(redis.RelationCache)
// 解析json
if err = json.Unmarshal(msg.Body, &rc); err != nil {
fmt.Println("json unmarshal error:" + err.Error())
logger.Errorf("RelationMQ Err: %s", err.Error())
continue
}
fmt.Printf("==> Get new message: %v\n", rc)
// 将结构体存入redis
if err = redis.UpdateRelation(context.Background(), rc); err != nil {
fmt.Println("add to redis error:" + err.Error())
logger.Errorf("RelationMQ Err: %s", err.Error())
continue
}
}
return nil
}

// gocron定时任务,每隔一段时间就让Consumer消费消息队列的所有消息
Expand Down
3 changes: 1 addition & 2 deletions cmd/user/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"net"

"github.com/bytedance-youthcamp-jbzx/tiktok/cmd/user/service"
"github.com/kitex-contrib/obs-opentelemetry/tracing"

"github.com/bytedance-youthcamp-jbzx/tiktok/kitex/kitex_gen/user/userservice"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/etcd"
Expand Down Expand Up @@ -52,7 +51,7 @@ func main() {
server.WithRegistry(r),
server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
server.WithMuxTransport(),
server.WithSuite(tracing.NewServerSuite()),
//server.WithSuite(tracing.NewServerSuite()),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/video/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func main() {
server.WithRegistry(r),
server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
server.WithMuxTransport(),
// server.WithSuite(tracing.NewServerSuite()),
//server.WithSuite(tracing.NewServerSuite()),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
)

Expand Down
4 changes: 2 additions & 2 deletions dal/db/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ func CreateUsers(ctx context.Context, users []*User) error {
// CreateUser
//
// @Description: 新增一条用户数据
// @Date 2023-02-22 11:47:43
// @Date 2023-02-22 11:46:43
// @param ctx 数据库操作上下文
// @param users 用户数据
// @param user 用户数据
// @return error
func CreateUser(ctx context.Context, user *User) error {
err := GetDB().Clauses(dbresolver.Write).WithContext(ctx).Transaction(func(tx *gorm.DB) error {
Expand Down
Loading

0 comments on commit 9bf356a

Please sign in to comment.