graphql.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package client
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "strings"
  10. "time"
  11. "github.com/gorilla/websocket"
  12. )
  13. // Client is a GraphQL client for the ARP API
  14. type Client struct {
  15. baseURL string
  16. httpClient *http.Client
  17. token string
  18. }
  19. // New creates a new GraphQL client
  20. func New(baseURL string) *Client {
  21. return &Client{
  22. baseURL: baseURL,
  23. httpClient: &http.Client{
  24. Timeout: 30 * time.Second,
  25. },
  26. }
  27. }
  28. // SetToken sets the authentication token
  29. func (c *Client) SetToken(token string) {
  30. c.token = token
  31. }
  32. // GraphQLRequest represents a GraphQL request
  33. type GraphQLRequest struct {
  34. Query string `json:"query"`
  35. Variables map[string]interface{} `json:"variables,omitempty"`
  36. OperationName string `json:"operationName,omitempty"`
  37. }
  38. // GraphQLResponse represents a GraphQL response
  39. type GraphQLResponse struct {
  40. Data json.RawMessage `json:"data,omitempty"`
  41. Errors []GraphQLError `json:"errors,omitempty"`
  42. }
  43. // GraphQLError represents a GraphQL error
  44. type GraphQLError struct {
  45. Message string `json:"message"`
  46. Locations []struct {
  47. Line int `json:"line"`
  48. Column int `json:"column"`
  49. } `json:"locations,omitempty"`
  50. Path []interface{} `json:"path,omitempty"`
  51. }
  52. // Error returns the error message
  53. func (e GraphQLError) Error() string {
  54. return e.Message
  55. }
  56. // Query executes a GraphQL query
  57. func (c *Client) Query(query string, variables map[string]interface{}) (*GraphQLResponse, error) {
  58. return c.doRequest(query, variables, "")
  59. }
  60. // Mutation executes a GraphQL mutation
  61. func (c *Client) Mutation(query string, variables map[string]interface{}) (*GraphQLResponse, error) {
  62. return c.doRequest(query, variables, "")
  63. }
  64. func (c *Client) doRequest(query string, variables map[string]interface{}, operationName string) (*GraphQLResponse, error) {
  65. reqBody := GraphQLRequest{
  66. Query: query,
  67. Variables: variables,
  68. OperationName: operationName,
  69. }
  70. bodyBytes, err := json.Marshal(reqBody)
  71. if err != nil {
  72. return nil, fmt.Errorf("failed to marshal request: %w", err)
  73. }
  74. req, err := http.NewRequest("POST", c.baseURL, bytes.NewReader(bodyBytes))
  75. if err != nil {
  76. return nil, fmt.Errorf("failed to create request: %w", err)
  77. }
  78. req.Header.Set("Content-Type", "application/json")
  79. if c.token != "" {
  80. req.Header.Set("Authorization", "Bearer "+c.token)
  81. }
  82. resp, err := c.httpClient.Do(req)
  83. if err != nil {
  84. return nil, fmt.Errorf("request failed: %w", err)
  85. }
  86. defer resp.Body.Close()
  87. body, err := io.ReadAll(resp.Body)
  88. if err != nil {
  89. return nil, fmt.Errorf("failed to read response: %w", err)
  90. }
  91. var gqlResp GraphQLResponse
  92. if err := json.Unmarshal(body, &gqlResp); err != nil {
  93. return nil, fmt.Errorf("failed to parse response: %w", err)
  94. }
  95. if len(gqlResp.Errors) > 0 {
  96. return &gqlResp, fmt.Errorf("GraphQL error: %s", gqlResp.Errors[0].Message)
  97. }
  98. return &gqlResp, nil
  99. }
  100. // WebSocketClient handles GraphQL subscriptions
  101. type WebSocketClient struct {
  102. conn *websocket.Conn
  103. baseURL string
  104. token string
  105. done chan struct{}
  106. messages chan json.RawMessage
  107. errors chan error
  108. }
  109. // NewWebSocketClient creates a new WebSocket client for subscriptions
  110. func NewWebSocketClient(baseURL string, token string) *WebSocketClient {
  111. // Convert HTTP URL to WebSocket URL
  112. wsURL := strings.Replace(baseURL, "http://", "ws://", 1)
  113. wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
  114. return &WebSocketClient{
  115. baseURL: wsURL,
  116. token: token,
  117. done: make(chan struct{}),
  118. messages: make(chan json.RawMessage, 100),
  119. errors: make(chan error, 10),
  120. }
  121. }
  122. // Connect establishes a WebSocket connection
  123. func (w *WebSocketClient) Connect() error {
  124. u, err := url.Parse(w.baseURL)
  125. if err != nil {
  126. return fmt.Errorf("failed to parse URL: %w", err)
  127. }
  128. headers := http.Header{}
  129. if w.token != "" {
  130. headers.Set("Authorization", "Bearer "+w.token)
  131. }
  132. dialer := websocket.DefaultDialer
  133. conn, _, err := dialer.Dial(u.String(), headers)
  134. if err != nil {
  135. return fmt.Errorf("failed to connect: %w", err)
  136. }
  137. w.conn = conn
  138. // Send connection init
  139. initMsg := map[string]interface{}{
  140. "type": "connection_init",
  141. "payload": map[string]interface{}{
  142. "Authorization": "Bearer " + w.token,
  143. },
  144. }
  145. if err := w.conn.WriteJSON(initMsg); err != nil {
  146. return fmt.Errorf("failed to send init: %w", err)
  147. }
  148. // Wait for connection_ack
  149. _, msg, err := w.conn.ReadMessage()
  150. if err != nil {
  151. return fmt.Errorf("failed to read ack: %w", err)
  152. }
  153. var ack map[string]interface{}
  154. if err := json.Unmarshal(msg, &ack); err != nil {
  155. return fmt.Errorf("failed to parse ack: %w", err)
  156. }
  157. if ack["type"] != "connection_ack" {
  158. return fmt.Errorf("expected connection_ack, got: %s", ack["type"])
  159. }
  160. // Start reading messages
  161. go w.readMessages()
  162. return nil
  163. }
  164. // Subscribe starts a subscription
  165. func (w *WebSocketClient) Subscribe(id string, query string, variables map[string]interface{}) error {
  166. subMsg := map[string]interface{}{
  167. "id": id,
  168. "type": "start",
  169. "payload": map[string]interface{}{
  170. "query": query,
  171. "variables": variables,
  172. },
  173. }
  174. return w.conn.WriteJSON(subMsg)
  175. }
  176. // Unsubscribe stops a subscription
  177. func (w *WebSocketClient) Unsubscribe(id string) error {
  178. stopMsg := map[string]interface{}{
  179. "id": id,
  180. "type": "stop",
  181. }
  182. return w.conn.WriteJSON(stopMsg)
  183. }
  184. // Messages returns the message channel
  185. func (w *WebSocketClient) Messages() <-chan json.RawMessage {
  186. return w.messages
  187. }
  188. // Errors returns the error channel
  189. func (w *WebSocketClient) Errors() <-chan error {
  190. return w.errors
  191. }
  192. // Done returns the done channel
  193. func (w *WebSocketClient) Done() <-chan struct{} {
  194. return w.done
  195. }
  196. func (w *WebSocketClient) readMessages() {
  197. defer close(w.done)
  198. defer close(w.messages)
  199. defer close(w.errors)
  200. for {
  201. _, msg, err := w.conn.ReadMessage()
  202. if err != nil {
  203. select {
  204. case w.errors <- err:
  205. default:
  206. }
  207. return
  208. }
  209. var parsed map[string]interface{}
  210. if err := json.Unmarshal(msg, &parsed); err != nil {
  211. continue
  212. }
  213. msgType, ok := parsed["type"].(string)
  214. if !ok {
  215. continue
  216. }
  217. switch msgType {
  218. case "data":
  219. if payload, ok := parsed["payload"].(map[string]interface{}); ok {
  220. if data, ok := payload["data"]; ok {
  221. dataBytes, _ := json.Marshal(data)
  222. select {
  223. case w.messages <- dataBytes:
  224. default:
  225. }
  226. }
  227. if errs, ok := payload["errors"].([]interface{}); ok && len(errs) > 0 {
  228. if errBytes, err := json.Marshal(errs); err == nil {
  229. w.errors <- fmt.Errorf("subscription error: %s", string(errBytes))
  230. }
  231. }
  232. }
  233. case "complete":
  234. return
  235. case "error":
  236. if payload, ok := parsed["payload"].([]interface{}); ok {
  237. if errBytes, err := json.Marshal(payload); err == nil {
  238. w.errors <- fmt.Errorf("subscription error: %s", string(errBytes))
  239. }
  240. }
  241. }
  242. }
  243. }
  244. // Close closes the WebSocket connection
  245. func (w *WebSocketClient) Close() error {
  246. if w.conn != nil {
  247. return w.conn.Close()
  248. }
  249. return nil
  250. }