|
@@ -10,6 +10,7 @@ import (
|
|
|
"os"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
var (
|
|
@@ -22,15 +23,17 @@ var (
|
|
|
upGrader = websocket.Upgrader{
|
|
|
CheckOrigin: func(r *http.Request) bool { return true },
|
|
|
}
|
|
|
- hlSyncMap sync.Map
|
|
|
- gm = &sync.Mutex{}
|
|
|
- gchan, isPrint = make(chan string), false
|
|
|
+ hlSyncMap sync.Map
|
|
|
+ gm = &sync.Mutex{}
|
|
|
+ // 默认超时时间,没有得到数据的超时时间 单位:秒
|
|
|
+ defaultTimeout = 30
|
|
|
+ isPrint = false
|
|
|
)
|
|
|
|
|
|
type Clients struct {
|
|
|
clientGroup string
|
|
|
clientName string
|
|
|
- Data map[string]string
|
|
|
+ actionData map[string]chan string
|
|
|
clientWs *websocket.Conn
|
|
|
}
|
|
|
|
|
@@ -51,7 +54,7 @@ func NewClient(group string, name string, ws *websocket.Conn) *Clients {
|
|
|
return &Clients{
|
|
|
clientGroup: group,
|
|
|
clientName: name,
|
|
|
- Data: make(map[string]string),
|
|
|
+ actionData: make(map[string]chan string), // action有消息后就保存到chan里
|
|
|
clientWs: ws,
|
|
|
}
|
|
|
}
|
|
@@ -62,16 +65,16 @@ func ws(c *gin.Context) {
|
|
|
if group == "" || name == "" {
|
|
|
return
|
|
|
}
|
|
|
- ws, err := upGrader.Upgrade(c.Writer, c.Request, nil)
|
|
|
+ wsClient, err := upGrader.Upgrade(c.Writer, c.Request, nil)
|
|
|
if err != nil {
|
|
|
fmt.Println("websocket err:", err)
|
|
|
return
|
|
|
}
|
|
|
- client := NewClient(group, name, ws)
|
|
|
+ client := NewClient(group, name, wsClient)
|
|
|
hlSyncMap.Store(group+"->"+name, client)
|
|
|
for {
|
|
|
//等待数据
|
|
|
- _, message, err := ws.ReadMessage()
|
|
|
+ _, message, err := wsClient.ReadMessage()
|
|
|
if err != nil {
|
|
|
break
|
|
|
}
|
|
@@ -80,9 +83,8 @@ func ws(c *gin.Context) {
|
|
|
strIndex := strings.Index(msg, string(check))
|
|
|
if strIndex >= 1 {
|
|
|
action := msg[:strIndex]
|
|
|
- client.Data[action] = msg[strIndex+5:]
|
|
|
- logPrint("get_message:", client.Data[action])
|
|
|
- gchan <- msg[strIndex+5:]
|
|
|
+ client.actionData[action] <- msg[strIndex+5:]
|
|
|
+ logPrint("get_message:", msg[strIndex+5:])
|
|
|
hlSyncMap.Store(group+"->"+name, client)
|
|
|
} else {
|
|
|
fmt.Println(msg, "message error")
|
|
@@ -99,7 +101,7 @@ func ws(c *gin.Context) {
|
|
|
}
|
|
|
return true
|
|
|
})
|
|
|
- }(ws)
|
|
|
+ }(wsClient)
|
|
|
}
|
|
|
|
|
|
func wsTest(c *gin.Context) {
|
|
@@ -128,14 +130,17 @@ func GQueryFunc(client *Clients, funcName string, param string, resChan chan<- s
|
|
|
WriteDate.Param = param
|
|
|
}
|
|
|
data, _ := json.Marshal(WriteDate)
|
|
|
- ws := client.clientWs
|
|
|
+ clientWs := client.clientWs
|
|
|
+ if client.actionData[funcName] == nil {
|
|
|
+ client.actionData[funcName] = make(chan string, 1) //此次action初始化1个消息
|
|
|
+ }
|
|
|
gm.Lock()
|
|
|
- err := ws.WriteMessage(2, data)
|
|
|
+ err := clientWs.WriteMessage(2, data)
|
|
|
gm.Unlock()
|
|
|
if err != nil {
|
|
|
fmt.Println(err, "写入数据失败")
|
|
|
}
|
|
|
- res := <-gchan
|
|
|
+ res := <-client.actionData[funcName]
|
|
|
resChan <- res
|
|
|
}
|
|
|
|
|
@@ -171,11 +176,24 @@ func ResultSet(c *gin.Context) {
|
|
|
|
|
|
c2 := make(chan string)
|
|
|
go GQueryFunc(client, Action, Param, c2)
|
|
|
+ go checkTimeout(c2)
|
|
|
//把管道传过去,获得值就返回了
|
|
|
c.JSON(200, gin.H{"status": 200, "group": client.clientGroup, "name": client.clientName, "data": <-c2})
|
|
|
|
|
|
}
|
|
|
|
|
|
+func checkTimeout(c2 chan string) {
|
|
|
+ // 100ms检查一次
|
|
|
+ for i := 0; i < defaultTimeout*10; i++ {
|
|
|
+ if len(c2) > 0 {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ time.Sleep(time.Millisecond * 100)
|
|
|
+ }
|
|
|
+ // 循环完了还是没有数据,那就超时退出
|
|
|
+ c2 <- "黑脸怪:timeout"
|
|
|
+}
|
|
|
+
|
|
|
func Execjs(c *gin.Context) {
|
|
|
var getGroup, getName, JsCode string
|
|
|
Action := "_execjs"
|