engine.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package core
  2. import (
  3. "JsRpc/config"
  4. "JsRpc/utils"
  5. "encoding/json"
  6. log "github.com/sirupsen/logrus"
  7. "math/rand"
  8. "time"
  9. )
  10. // GQueryFunc 发送请求到客户端
  11. func (c *Clients) GQueryFunc(funcName string, param string, resChan chan<- string) {
  12. if c.actionData[funcName] == nil {
  13. c.actionData[funcName] = make(map[string]chan string)
  14. }
  15. MessageId := ""
  16. gm.Lock()
  17. for {
  18. MessageId = utils.GetUUID()
  19. // 先判断action是否需要初始化
  20. if c.actionData[funcName][MessageId] == nil {
  21. c.actionData[funcName][MessageId] = make(chan string, 1) //此次action初始化1个消息
  22. //只有不存在的MessageId才会继续,
  23. break
  24. } else {
  25. utils.LogPrint("存在的消息id,跳过")
  26. }
  27. }
  28. gm.Unlock()
  29. WriteData := Message{Param: param, MessageId: MessageId, Action: funcName}
  30. data, _ := json.Marshal(WriteData)
  31. clientWs := c.clientWs
  32. err := clientWs.WriteMessage(1, data)
  33. if err != nil {
  34. log.Error(err, "写入数据失败")
  35. resChan <- "rpc发送数据失败"
  36. }
  37. select {
  38. case res := <-c.actionData[funcName][MessageId]:
  39. resChan <- res
  40. case <-time.After(time.Duration(config.DefaultTimeout) * time.Second):
  41. utils.LogPrint(MessageId + "超时了")
  42. resChan <- "黑脸怪:timeout"
  43. }
  44. // 清理资源
  45. gm.Lock()
  46. delete(c.actionData[funcName], MessageId)
  47. gm.Unlock()
  48. close(resChan)
  49. }
  50. func getRandomClient(group string, clientId string) *Clients {
  51. var client *Clients
  52. // 不传递clientId时候,从group分组随便拿一个
  53. if clientId != "" {
  54. clientName, ok := hlSyncMap.Load(group + "->" + clientId)
  55. if ok == false {
  56. return nil
  57. }
  58. client, _ = clientName.(*Clients)
  59. return client
  60. }
  61. groupClients := make([]*Clients, 0)
  62. //循环读取syncMap 获取group名字的
  63. hlSyncMap.Range(func(_, value interface{}) bool {
  64. tmpClients, ok := value.(*Clients)
  65. if !ok {
  66. return true
  67. }
  68. if tmpClients.clientGroup == group {
  69. groupClients = append(groupClients, tmpClients)
  70. }
  71. return true
  72. })
  73. if len(groupClients) == 0 {
  74. return nil
  75. }
  76. // 使用随机数发生器
  77. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  78. randomIndex := r.Intn(len(groupClients))
  79. client = groupClients[randomIndex]
  80. return client
  81. }