package client import ( "bytes" "encoding/json" "fmt" "io" "net/http" "net/url" "strings" "time" "github.com/gorilla/websocket" ) // Client is a GraphQL client for the ARP API type Client struct { baseURL string httpClient *http.Client token string } // New creates a new GraphQL client func New(baseURL string) *Client { return &Client{ baseURL: baseURL, httpClient: &http.Client{ Timeout: 30 * time.Second, }, } } // SetToken sets the authentication token func (c *Client) SetToken(token string) { c.token = token } // GraphQLRequest represents a GraphQL request type GraphQLRequest struct { Query string `json:"query"` Variables map[string]interface{} `json:"variables,omitempty"` OperationName string `json:"operationName,omitempty"` } // GraphQLResponse represents a GraphQL response type GraphQLResponse struct { Data json.RawMessage `json:"data,omitempty"` Errors []GraphQLError `json:"errors,omitempty"` } // GraphQLError represents a GraphQL error type GraphQLError struct { Message string `json:"message"` Locations []struct { Line int `json:"line"` Column int `json:"column"` } `json:"locations,omitempty"` Path []interface{} `json:"path,omitempty"` } // Error returns the error message func (e GraphQLError) Error() string { return e.Message } // Query executes a GraphQL query func (c *Client) Query(query string, variables map[string]interface{}) (*GraphQLResponse, error) { return c.doRequest(query, variables, "") } // Mutation executes a GraphQL mutation func (c *Client) Mutation(query string, variables map[string]interface{}) (*GraphQLResponse, error) { return c.doRequest(query, variables, "") } func (c *Client) doRequest(query string, variables map[string]interface{}, operationName string) (*GraphQLResponse, error) { reqBody := GraphQLRequest{ Query: query, Variables: variables, OperationName: operationName, } bodyBytes, err := json.Marshal(reqBody) if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) } req, err := http.NewRequest("POST", c.baseURL, bytes.NewReader(bodyBytes)) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } req.Header.Set("Content-Type", "application/json") if c.token != "" { req.Header.Set("Authorization", "Bearer "+c.token) } resp, err := c.httpClient.Do(req) if err != nil { return nil, fmt.Errorf("request failed: %w", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response: %w", err) } var gqlResp GraphQLResponse if err := json.Unmarshal(body, &gqlResp); err != nil { return nil, fmt.Errorf("failed to parse response: %w", err) } if len(gqlResp.Errors) > 0 { return &gqlResp, fmt.Errorf("GraphQL error: %s", gqlResp.Errors[0].Message) } return &gqlResp, nil } // WebSocketClient handles GraphQL subscriptions type WebSocketClient struct { conn *websocket.Conn baseURL string token string done chan struct{} messages chan json.RawMessage errors chan error } // NewWebSocketClient creates a new WebSocket client for subscriptions func NewWebSocketClient(baseURL string, token string) *WebSocketClient { // Convert HTTP URL to WebSocket URL wsURL := strings.Replace(baseURL, "http://", "ws://", 1) wsURL = strings.Replace(wsURL, "https://", "wss://", 1) return &WebSocketClient{ baseURL: wsURL, token: token, done: make(chan struct{}), messages: make(chan json.RawMessage, 100), errors: make(chan error, 10), } } // Connect establishes a WebSocket connection func (w *WebSocketClient) Connect() error { u, err := url.Parse(w.baseURL) if err != nil { return fmt.Errorf("failed to parse URL: %w", err) } headers := http.Header{} if w.token != "" { headers.Set("Authorization", "Bearer "+w.token) } dialer := websocket.DefaultDialer conn, _, err := dialer.Dial(u.String(), headers) if err != nil { return fmt.Errorf("failed to connect: %w", err) } w.conn = conn // Send connection init initMsg := map[string]interface{}{ "type": "connection_init", "payload": map[string]interface{}{ "Authorization": "Bearer " + w.token, }, } if err := w.conn.WriteJSON(initMsg); err != nil { return fmt.Errorf("failed to send init: %w", err) } // Wait for connection_ack _, msg, err := w.conn.ReadMessage() if err != nil { return fmt.Errorf("failed to read ack: %w", err) } var ack map[string]interface{} if err := json.Unmarshal(msg, &ack); err != nil { return fmt.Errorf("failed to parse ack: %w", err) } if ack["type"] != "connection_ack" { return fmt.Errorf("expected connection_ack, got: %s", ack["type"]) } // Start reading messages go w.readMessages() return nil } // Subscribe starts a subscription func (w *WebSocketClient) Subscribe(id string, query string, variables map[string]interface{}) error { subMsg := map[string]interface{}{ "id": id, "type": "start", "payload": map[string]interface{}{ "query": query, "variables": variables, }, } return w.conn.WriteJSON(subMsg) } // Unsubscribe stops a subscription func (w *WebSocketClient) Unsubscribe(id string) error { stopMsg := map[string]interface{}{ "id": id, "type": "stop", } return w.conn.WriteJSON(stopMsg) } // Messages returns the message channel func (w *WebSocketClient) Messages() <-chan json.RawMessage { return w.messages } // Errors returns the error channel func (w *WebSocketClient) Errors() <-chan error { return w.errors } // Done returns the done channel func (w *WebSocketClient) Done() <-chan struct{} { return w.done } func (w *WebSocketClient) readMessages() { defer close(w.done) defer close(w.messages) defer close(w.errors) for { _, msg, err := w.conn.ReadMessage() if err != nil { select { case w.errors <- err: default: } return } var parsed map[string]interface{} if err := json.Unmarshal(msg, &parsed); err != nil { continue } msgType, ok := parsed["type"].(string) if !ok { continue } switch msgType { case "data": if payload, ok := parsed["payload"].(map[string]interface{}); ok { if data, ok := payload["data"]; ok { dataBytes, _ := json.Marshal(data) select { case w.messages <- dataBytes: default: } } if errs, ok := payload["errors"].([]interface{}); ok && len(errs) > 0 { if errBytes, err := json.Marshal(errs); err == nil { w.errors <- fmt.Errorf("subscription error: %s", string(errBytes)) } } } case "complete": return case "error": if payload, ok := parsed["payload"].([]interface{}); ok { if errBytes, err := json.Marshal(payload); err == nil { w.errors <- fmt.Errorf("subscription error: %s", string(errBytes)) } } } } } // Close closes the WebSocket connection func (w *WebSocketClient) Close() error { if w.conn != nil { return w.conn.Close() } return nil }