/
server.go
171 lines (155 loc) · 4.42 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package server
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/rkonfj/hu60bot/convo"
"github.com/rkonfj/hu60bot/pkg/hu60"
"github.com/sirupsen/logrus"
)
type WebsocketManager struct {
upgrader websocket.Upgrader
hu60Client *hu60.Client
websocketConnMap map[int]*websocket.Conn
cm *convo.ConversationManager
options ServerOptions
}
type Hu60Msg struct {
ID int `json:"id"`
ToUID int `json:"touid"`
ByUID int `json:"byuid"`
Type int `json:"type"`
Read int `json:"isread"`
Content string `json:"content"`
CreateTime time.Time `json:"ctime"`
}
type BotEvent struct {
Event string `json:"event"`
Data any `json:"data"`
}
type BotCmd struct {
Action string `json:"action"`
Data any `json:"data"`
}
type ChatResponse struct {
NewConversation bool `json:"newConversation"`
Response string `json:"response"`
}
func NewWebsocketManager(opts ServerOptions, cm *convo.ConversationManager) *WebsocketManager {
return &WebsocketManager{
upgrader: websocket.Upgrader{},
hu60Client: hu60.NewClient(opts.Hu60wap6APIURL),
websocketConnMap: make(map[int]*websocket.Conn),
cm: cm,
options: opts,
}
}
func (m *WebsocketManager) Push(msg *Hu60Msg) error {
if ws, ok := m.websocketConnMap[msg.ToUID]; ok {
err := ws.WriteJSON(BotEvent{Event: "msg", Data: msg})
if err != nil {
return err
}
return nil
} else {
return fmt.Errorf("uid %d not online", msg.ToUID)
}
}
func (m *WebsocketManager) Run() error {
http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
ws, err := m.upgrader.Upgrade(w, r, nil)
if err != nil {
logrus.Error("ws upgrade error: ", err.Error())
return
}
sid, err := r.Cookie("hu60_sid")
if err != nil {
m.responseUnauthenticated(ws)
logrus.Warn("unauthenticated: ", err.Error())
return
}
res, err := m.hu60Client.GetProfile(context.Background(), sid.Value)
if err != nil {
m.responseUnauthenticated(ws)
logrus.Warn("unauthenticated: ", err.Error())
return
}
err = ws.WriteMessage(websocket.TextMessage, []byte(`{"event": "connected"}`))
if err != nil {
logrus.Error(err)
return
}
m.websocketConnMap[res.Uid] = ws
logrus.Info("user ", res.Name, " is connected")
go func(userProfile hu60.GetProfileResponse, ws *websocket.Conn) {
for {
_, msg, err := ws.ReadMessage()
if err != nil {
logrus.Debugf("sid is %d, readMessage error: %w", userProfile.Uid, err)
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
ws.Close()
}
delete(m.websocketConnMap, userProfile.Uid)
logrus.Info("user ", userProfile.Name, " is disconnected")
break
}
var cmd BotCmd
err = json.NewDecoder(strings.NewReader(string(msg))).Decode(&cmd)
if err != nil {
m.responseError(ws, err)
return
}
m.processBotAction(cmd, res.Uid, ws)
}
}(res, ws)
})
logrus.Info("bot listening on ", m.options.Listen, " for interact now. websocket endpoint is /v1/ws")
return http.ListenAndServe(m.options.Listen, nil)
}
func (m *WebsocketManager) responseUnauthenticated(ws *websocket.Conn) {
ws.WriteMessage(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.ClosePolicyViolation, "Unauthenticated"))
ws.Close()
}
func (m *WebsocketManager) responseError(ws *websocket.Conn, err error) {
e := ws.WriteJSON(BotEvent{Event: "error", Data: err.Error()})
if e != nil {
logrus.Error(e.Error())
}
}
func (m *WebsocketManager) processBotAction(cmd BotCmd, uid int, ws *websocket.Conn) {
if cmd.Action == "chat" {
conversationKey := fmt.Sprintf("%d", uid)
if d, ok := cmd.Data.(string); ok {
responseText, newConversation, err := m.cm.Ask(d, conversationKey)
cr := ChatResponse{
NewConversation: newConversation,
Response: responseText,
}
if err != nil {
cr = ChatResponse{
NewConversation: true,
Response: err.Error(),
}
}
ws.WriteJSON(BotEvent{Event: "chat", Data: cr})
}
return
}
if cmd.Action == "rmconvo" {
conversationKey := fmt.Sprintf("%d", uid)
m.cm.MarkExpired(conversationKey)
ws.WriteJSON(BotEvent{Event: "rmconvo", Data: "ok"})
return
}
if cmd.Action == "ping" {
ws.WriteJSON(BotEvent{Event: "ping", Data: "pong"})
return
}
m.responseError(ws, errors.New("unsupported action"))
}