| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- package client
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "strings"
- "time"
- "github.com/gorilla/websocket"
- )
- // Client is a GraphQL client for the ARP API
- type Client struct {
- baseURL string
- httpClient *http.Client
- token string
- }
- // New creates a new GraphQL client
- func New(baseURL string) *Client {
- return &Client{
- baseURL: baseURL,
- httpClient: &http.Client{
- Timeout: 30 * time.Second,
- },
- }
- }
- // SetToken sets the authentication token
- func (c *Client) SetToken(token string) {
- c.token = token
- }
- // GraphQLRequest represents a GraphQL request
- type GraphQLRequest struct {
- Query string `json:"query"`
- Variables map[string]interface{} `json:"variables,omitempty"`
- OperationName string `json:"operationName,omitempty"`
- }
- // GraphQLResponse represents a GraphQL response
- type GraphQLResponse struct {
- Data json.RawMessage `json:"data,omitempty"`
- Errors []GraphQLError `json:"errors,omitempty"`
- }
- // GraphQLError represents a GraphQL error
- type GraphQLError struct {
- Message string `json:"message"`
- Locations []struct {
- Line int `json:"line"`
- Column int `json:"column"`
- } `json:"locations,omitempty"`
- Path []interface{} `json:"path,omitempty"`
- }
- // Error returns the error message
- func (e GraphQLError) Error() string {
- return e.Message
- }
- // Query executes a GraphQL query
- func (c *Client) Query(query string, variables map[string]interface{}) (*GraphQLResponse, error) {
- return c.doRequest(query, variables, "")
- }
- // Mutation executes a GraphQL mutation
- func (c *Client) Mutation(query string, variables map[string]interface{}) (*GraphQLResponse, error) {
- return c.doRequest(query, variables, "")
- }
- func (c *Client) doRequest(query string, variables map[string]interface{}, operationName string) (*GraphQLResponse, error) {
- reqBody := GraphQLRequest{
- Query: query,
- Variables: variables,
- OperationName: operationName,
- }
- bodyBytes, err := json.Marshal(reqBody)
- if err != nil {
- return nil, fmt.Errorf("failed to marshal request: %w", err)
- }
- req, err := http.NewRequest("POST", c.baseURL, bytes.NewReader(bodyBytes))
- if err != nil {
- return nil, fmt.Errorf("failed to create request: %w", err)
- }
- req.Header.Set("Content-Type", "application/json")
- if c.token != "" {
- req.Header.Set("Authorization", "Bearer "+c.token)
- }
- resp, err := c.httpClient.Do(req)
- if err != nil {
- return nil, fmt.Errorf("request failed: %w", err)
- }
- defer resp.Body.Close()
- body, err := io.ReadAll(resp.Body)
- if err != nil {
- return nil, fmt.Errorf("failed to read response: %w", err)
- }
- var gqlResp GraphQLResponse
- if err := json.Unmarshal(body, &gqlResp); err != nil {
- return nil, fmt.Errorf("failed to parse response: %w", err)
- }
- if len(gqlResp.Errors) > 0 {
- return &gqlResp, fmt.Errorf("GraphQL error: %s", gqlResp.Errors[0].Message)
- }
- return &gqlResp, nil
- }
- // WebSocketClient handles GraphQL subscriptions
- type WebSocketClient struct {
- conn *websocket.Conn
- baseURL string
- token string
- done chan struct{}
- messages chan json.RawMessage
- errors chan error
- }
- // NewWebSocketClient creates a new WebSocket client for subscriptions
- func NewWebSocketClient(baseURL string, token string) *WebSocketClient {
- // Convert HTTP URL to WebSocket URL
- wsURL := strings.Replace(baseURL, "http://", "ws://", 1)
- wsURL = strings.Replace(wsURL, "https://", "wss://", 1)
- return &WebSocketClient{
- baseURL: wsURL,
- token: token,
- done: make(chan struct{}),
- messages: make(chan json.RawMessage, 100),
- errors: make(chan error, 10),
- }
- }
- // Connect establishes a WebSocket connection
- func (w *WebSocketClient) Connect() error {
- u, err := url.Parse(w.baseURL)
- if err != nil {
- return fmt.Errorf("failed to parse URL: %w", err)
- }
- headers := http.Header{}
- if w.token != "" {
- headers.Set("Authorization", "Bearer "+w.token)
- }
- dialer := websocket.DefaultDialer
- conn, _, err := dialer.Dial(u.String(), headers)
- if err != nil {
- return fmt.Errorf("failed to connect: %w", err)
- }
- w.conn = conn
- // Send connection init
- initMsg := map[string]interface{}{
- "type": "connection_init",
- "payload": map[string]interface{}{
- "Authorization": "Bearer " + w.token,
- },
- }
- if err := w.conn.WriteJSON(initMsg); err != nil {
- return fmt.Errorf("failed to send init: %w", err)
- }
- // Wait for connection_ack
- _, msg, err := w.conn.ReadMessage()
- if err != nil {
- return fmt.Errorf("failed to read ack: %w", err)
- }
- var ack map[string]interface{}
- if err := json.Unmarshal(msg, &ack); err != nil {
- return fmt.Errorf("failed to parse ack: %w", err)
- }
- if ack["type"] != "connection_ack" {
- return fmt.Errorf("expected connection_ack, got: %s", ack["type"])
- }
- // Start reading messages
- go w.readMessages()
- return nil
- }
- // Subscribe starts a subscription
- func (w *WebSocketClient) Subscribe(id string, query string, variables map[string]interface{}) error {
- subMsg := map[string]interface{}{
- "id": id,
- "type": "start",
- "payload": map[string]interface{}{
- "query": query,
- "variables": variables,
- },
- }
- return w.conn.WriteJSON(subMsg)
- }
- // Unsubscribe stops a subscription
- func (w *WebSocketClient) Unsubscribe(id string) error {
- stopMsg := map[string]interface{}{
- "id": id,
- "type": "stop",
- }
- return w.conn.WriteJSON(stopMsg)
- }
- // Messages returns the message channel
- func (w *WebSocketClient) Messages() <-chan json.RawMessage {
- return w.messages
- }
- // Errors returns the error channel
- func (w *WebSocketClient) Errors() <-chan error {
- return w.errors
- }
- // Done returns the done channel
- func (w *WebSocketClient) Done() <-chan struct{} {
- return w.done
- }
- func (w *WebSocketClient) readMessages() {
- defer close(w.done)
- defer close(w.messages)
- defer close(w.errors)
- for {
- _, msg, err := w.conn.ReadMessage()
- if err != nil {
- select {
- case w.errors <- err:
- default:
- }
- return
- }
- var parsed map[string]interface{}
- if err := json.Unmarshal(msg, &parsed); err != nil {
- continue
- }
- msgType, ok := parsed["type"].(string)
- if !ok {
- continue
- }
- switch msgType {
- case "data":
- if payload, ok := parsed["payload"].(map[string]interface{}); ok {
- if data, ok := payload["data"]; ok {
- dataBytes, _ := json.Marshal(data)
- select {
- case w.messages <- dataBytes:
- default:
- }
- }
- if errs, ok := payload["errors"].([]interface{}); ok && len(errs) > 0 {
- if errBytes, err := json.Marshal(errs); err == nil {
- w.errors <- fmt.Errorf("subscription error: %s", string(errBytes))
- }
- }
- }
- case "complete":
- return
- case "error":
- if payload, ok := parsed["payload"].([]interface{}); ok {
- if errBytes, err := json.Marshal(payload); err == nil {
- w.errors <- fmt.Errorf("subscription error: %s", string(errBytes))
- }
- }
- }
- }
- }
- // Close closes the WebSocket connection
- func (w *WebSocketClient) Close() error {
- if w.conn != nil {
- return w.conn.Close()
- }
- return nil
- }
|