main.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/gin-gonic/gin"
  6. "github.com/gorilla/websocket"
  7. log "github.com/sirupsen/logrus"
  8. "github.com/unrolled/secure"
  9. "net/http"
  10. "os"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. var (
  16. // BasicPort The original port without SSL certificate
  17. BasicPort = `:12080`
  18. // SSLPort "Secure" port with SSL certificate
  19. SSLPort = `:12443`
  20. // websocket.Upgrader specifies parameters for upgrading an HTTP connection to a
  21. // WebSocket connection.
  22. upGrader = websocket.Upgrader{
  23. CheckOrigin: func(r *http.Request) bool { return true },
  24. }
  25. hlSyncMap sync.Map
  26. gm = &sync.Mutex{}
  27. // 默认超时时间,没有得到数据的超时时间 单位:秒
  28. defaultTimeout = 30
  29. isPrint = false
  30. )
  31. type Clients struct {
  32. clientGroup string
  33. clientName string
  34. actionData map[string]chan string
  35. clientWs *websocket.Conn
  36. }
  37. type Message struct {
  38. Action string `json:"action"`
  39. Param string `json:"param"`
  40. }
  41. type logWriter struct{}
  42. func (w logWriter) Write(p []byte) (n int, err error) {
  43. return len(p), nil
  44. }
  45. // is print?
  46. func logPrint(p ...interface{}) {
  47. if isPrint {
  48. log.Infoln(p)
  49. }
  50. }
  51. // NewClient initializes a new Clients instance
  52. func NewClient(group string, name string, ws *websocket.Conn) *Clients {
  53. return &Clients{
  54. clientGroup: group,
  55. clientName: name,
  56. actionData: make(map[string]chan string, 1), // action有消息后就保存到chan里
  57. clientWs: ws,
  58. }
  59. }
  60. // ws, provides inject function for a job
  61. func ws(c *gin.Context) {
  62. group, name := c.Query("group"), c.Query("name")
  63. if group == "" || name == "" {
  64. return
  65. }
  66. wsClient, err := upGrader.Upgrade(c.Writer, c.Request, nil)
  67. if err != nil {
  68. fmt.Println("websocket err:", err)
  69. return
  70. }
  71. client := NewClient(group, name, wsClient)
  72. hlSyncMap.Store(group+"->"+name, client)
  73. for {
  74. //等待数据
  75. _, message, err := wsClient.ReadMessage()
  76. if err != nil {
  77. break
  78. }
  79. msg := string(message)
  80. check := []uint8{104, 108, 94, 95, 94}
  81. strIndex := strings.Index(msg, string(check))
  82. if strIndex >= 1 {
  83. action := msg[:strIndex]
  84. client.actionData[action] <- msg[strIndex+5:]
  85. logPrint("get_message:", msg[strIndex+5:])
  86. //hlSyncMap.Store(group+"->"+name, client)
  87. } else {
  88. fmt.Println(msg, "message error")
  89. }
  90. }
  91. defer func(ws *websocket.Conn) {
  92. _ = ws.Close()
  93. logPrint(group+"->"+name, "下线了")
  94. hlSyncMap.Range(func(key, value interface{}) bool {
  95. //client, _ := value.(*Clients)
  96. if key == group+"->"+name {
  97. hlSyncMap.Delete(key)
  98. }
  99. return true
  100. })
  101. }(wsClient)
  102. }
  103. func wsTest(c *gin.Context) {
  104. testClient, _ := upGrader.Upgrade(c.Writer, c.Request, nil)
  105. for {
  106. //等待数据
  107. _, message, err := testClient.ReadMessage()
  108. if err != nil {
  109. break
  110. }
  111. msg := string(message)
  112. logPrint("接收到测试消息", msg)
  113. _ = testClient.WriteMessage(1, []byte(msg))
  114. }
  115. defer func(ws *websocket.Conn) {
  116. _ = ws.Close()
  117. }(testClient)
  118. }
  119. func GQueryFunc(client *Clients, funcName string, param string, resChan chan<- string) {
  120. WriteDate := Message{}
  121. WriteDate.Action = funcName
  122. if param == "" {
  123. WriteDate.Param = ""
  124. } else {
  125. WriteDate.Param = param
  126. }
  127. data, _ := json.Marshal(WriteDate)
  128. clientWs := client.clientWs
  129. if client.actionData[funcName] == nil {
  130. client.actionData[funcName] = make(chan string, 1) //此次action初始化1个消息
  131. }
  132. gm.Lock()
  133. err := clientWs.WriteMessage(1, data)
  134. gm.Unlock()
  135. if err != nil {
  136. fmt.Println(err, "写入数据失败")
  137. }
  138. resultFlag := false
  139. for i := 0; i < defaultTimeout*10; i++ {
  140. if len(client.actionData[funcName]) > 0 {
  141. res := <-client.actionData[funcName]
  142. resChan <- res
  143. resultFlag = true
  144. break
  145. }
  146. time.Sleep(time.Millisecond * 100)
  147. }
  148. // 循环完了还是没有数据,那就超时退出
  149. if true != resultFlag {
  150. resChan <- "黑脸怪:timeout"
  151. }
  152. defer func() {
  153. close(resChan)
  154. }()
  155. }
  156. func ResultSet(c *gin.Context) {
  157. var getGroup, getName, Action, Param string
  158. //获取参数
  159. getGroup, getName, Action, Param = c.Query("group"), c.Query("name"), c.Query("action"), c.Query("param")
  160. //如果获取不到 说明是post提交的
  161. if getGroup == "" && getName == "" {
  162. //切换post获取方式
  163. getGroup, getName, Action, Param = c.PostForm("group"), c.PostForm("name"), c.PostForm("action"), c.PostForm("param")
  164. }
  165. if getGroup == "" || getName == "" {
  166. c.JSON(400, gin.H{"status": 400, "data": "input group and name"})
  167. return
  168. }
  169. clientName, ok := hlSyncMap.Load(getGroup + "->" + getName)
  170. if ok == false {
  171. c.JSON(400, gin.H{"status": 400, "data": "注入了ws?没有找到当前组和名字"})
  172. return
  173. }
  174. if Action == "" {
  175. c.JSON(200, gin.H{"group": getGroup, "name": getName})
  176. return
  177. }
  178. //取一个ws客户端
  179. client, ko := clientName.(*Clients)
  180. if !ko {
  181. return
  182. }
  183. c2 := make(chan string, 1)
  184. go GQueryFunc(client, Action, Param, c2)
  185. //把管道传过去,获得值就返回了
  186. c.JSON(200, gin.H{"status": 200, "group": client.clientGroup, "name": client.clientName, "data": <-c2})
  187. }
  188. func checkTimeout(c2 chan string) {
  189. // 100ms检查一次
  190. for i := 0; i < defaultTimeout*10; i++ {
  191. if len(c2) > 0 {
  192. return
  193. }
  194. time.Sleep(time.Millisecond * 100)
  195. }
  196. // 循环完了还是没有数据,那就超时退出
  197. c2 <- "黑脸怪:timeout"
  198. }
  199. func Execjs(c *gin.Context) {
  200. var getGroup, getName, JsCode string
  201. Action := "_execjs"
  202. //获取参数
  203. getGroup, getName, JsCode = c.Query("group"), c.Query("name"), c.Query("jscode")
  204. //如果获取不到 说明是post提交的
  205. if getGroup == "" && getName == "" {
  206. //切换post获取方式
  207. getGroup, getName, JsCode = c.PostForm("group"), c.PostForm("name"), c.PostForm("jscode")
  208. }
  209. if getGroup == "" || getName == "" {
  210. c.JSON(400, gin.H{"status": 400, "data": "input group and name"})
  211. return
  212. }
  213. logPrint(getGroup, getName, JsCode)
  214. clientName, ok := hlSyncMap.Load(getGroup + "->" + getName)
  215. if ok == false {
  216. c.JSON(400, gin.H{"status": 400, "data": "注入了ws?没有找到当前组和名字"})
  217. return
  218. }
  219. //取一个ws客户端
  220. client, ko := clientName.(*Clients)
  221. if !ko {
  222. return
  223. }
  224. c2 := make(chan string)
  225. go GQueryFunc(client, Action, JsCode, c2)
  226. c.JSON(200, gin.H{"status": "200", "group": client.clientGroup, "name": client.clientName, "data": <-c2})
  227. }
  228. func getList(c *gin.Context) {
  229. resList := "黑脸怪:\r\n\t"
  230. hlSyncMap.Range(func(key, value interface{}) bool {
  231. resList += key.(string) + "\r\n\t"
  232. return true
  233. })
  234. c.String(200, resList)
  235. }
  236. func Index(c *gin.Context) {
  237. c.String(200, "你好,我是黑脸怪~")
  238. }
  239. func TlsHandler() gin.HandlerFunc {
  240. return func(c *gin.Context) {
  241. secureMiddleware := secure.New(secure.Options{
  242. SSLRedirect: true,
  243. SSLHost: SSLPort,
  244. })
  245. err := secureMiddleware.Process(c.Writer, c.Request)
  246. if err != nil {
  247. c.Abort()
  248. return
  249. }
  250. c.Next()
  251. }
  252. }
  253. func main() {
  254. for _, v := range os.Args {
  255. if v == "log" {
  256. isPrint = true
  257. }
  258. }
  259. // 将默认的日志输出器设置为空
  260. //gin.DefaultWriter = logWriter{}
  261. fmt.Println("欢迎使用jsrpc~")
  262. gin.SetMode(gin.ReleaseMode)
  263. r := gin.Default()
  264. r.GET("/", Index)
  265. r.GET("/go", ResultSet)
  266. r.POST("/go", ResultSet)
  267. r.GET("/ws", ws)
  268. r.GET("/wst", wsTest)
  269. r.GET("/execjs", Execjs)
  270. r.POST("/execjs", Execjs)
  271. r.GET("/list", getList)
  272. r.Use(TlsHandler())
  273. //编译https版放开下面这行注释代码
  274. //go func() {
  275. // err := r.RunTLS(SSLPort, "zhengshu.pem", "zhengshu.key")
  276. // if err != nil {
  277. // fmt.Println(err)
  278. // }
  279. //}()
  280. _ = r.Run(BasicPort)
  281. }