engine.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package core
  2. import (
  3. "JsRpc/config"
  4. "JsRpc/utils"
  5. "context"
  6. "encoding/json"
  7. log "github.com/sirupsen/logrus"
  8. "math/rand"
  9. "time"
  10. )
  11. // GQueryFunc 发送请求到客户端
  12. func (c *Clients) GQueryFunc(funcName string, param string, resChan chan<- string) {
  13. if c.actionData[funcName] == nil {
  14. rwMu.Lock()
  15. c.actionData[funcName] = make(map[string]chan string)
  16. rwMu.Unlock()
  17. }
  18. var MessageId string
  19. for {
  20. MessageId = utils.GetUUID()
  21. if c.readFromMap(funcName, MessageId) == nil {
  22. rwMu.Lock()
  23. c.actionData[funcName][MessageId] = make(chan string, 1)
  24. rwMu.Unlock()
  25. break
  26. }
  27. utils.LogPrint("存在的消息id,跳过")
  28. }
  29. // 确保资源释放
  30. defer func() {
  31. rwMu.Lock()
  32. delete(c.actionData[funcName], MessageId)
  33. rwMu.Unlock()
  34. close(resChan)
  35. }()
  36. // 构造消息并发送
  37. WriteData := Message{Param: param, MessageId: MessageId, Action: funcName}
  38. data, err := json.Marshal(WriteData)
  39. if err != nil {
  40. log.Error(err, "JSON序列化失败")
  41. resChan <- "JSON序列化失败"
  42. return
  43. }
  44. rwMu.Lock()
  45. err = c.clientWs.WriteMessage(1, data)
  46. rwMu.Unlock()
  47. if err != nil {
  48. log.Error(err, "写入数据失败")
  49. resChan <- "rpc发送数据失败"
  50. return
  51. }
  52. // 使用 context 控制超时
  53. ctx, cancel := context.WithTimeout(context.Background(), time.Duration(config.DefaultTimeout)*time.Second)
  54. defer cancel()
  55. resultChan := c.readFromMap(funcName, MessageId)
  56. if resultChan == nil {
  57. resChan <- "消息ID对应的管道不存在"
  58. return
  59. }
  60. select {
  61. case res := <-resultChan:
  62. resChan <- res
  63. case <-ctx.Done():
  64. utils.LogPrint(MessageId + "超时了")
  65. resChan <- "获取结果超时 timeout"
  66. }
  67. }
  68. func getRandomClient(group string, clientId string) *Clients {
  69. var client *Clients
  70. // 不传递clientId时候,从group分组随便拿一个
  71. if clientId != "" {
  72. clientName, ok := hlSyncMap.Load(group + "->" + clientId)
  73. if ok == false {
  74. return nil
  75. }
  76. client, _ = clientName.(*Clients)
  77. return client
  78. }
  79. groupClients := make([]*Clients, 0)
  80. //循环读取syncMap 获取group名字的
  81. hlSyncMap.Range(func(_, value interface{}) bool {
  82. tmpClients, ok := value.(*Clients)
  83. if !ok {
  84. return true
  85. }
  86. if tmpClients.clientGroup == group {
  87. groupClients = append(groupClients, tmpClients)
  88. }
  89. return true
  90. })
  91. if len(groupClients) == 0 {
  92. return nil
  93. }
  94. // 使用随机数发生器
  95. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  96. randomIndex := r.Intn(len(groupClients))
  97. client = groupClients[randomIndex]
  98. return client
  99. }