Ver Fonte

implement subscriptions with event filtering

david há 6 dias atrás
pai
commit
6bb7cea6b2
13 ficheiros alterados com 1788 adições e 30 exclusões
  1. 1091 0
      CLIENT_GUIDE.md
  2. 0 2
      go.mod
  3. 0 1
      go.sum
  4. 22 9
      graph/converters.go
  5. 132 0
      graph/generated.go
  6. 212 0
      graph/integration_test.go
  7. 2 0
      graph/model/models_gen.go
  8. 100 0
      graph/resolver.go
  9. 2 0
      graph/schema.graphqls
  10. 195 9
      graph/schema.resolvers.go
  11. 0 4
      graph/testutil/client.go
  12. 2 1
      models/models.go
  13. 30 4
      server.go

+ 1091 - 0
CLIENT_GUIDE.md

@@ -0,0 +1,1091 @@
+# ARP Server Client Implementation Guide
+
+This document provides the necessary information to implement a client for the ARP (Agent-native ERP) GraphQL server.
+
+## Table of Contents
+
+1. [Overview](#overview)
+2. [Connection & Endpoint](#connection--endpoint)
+3. [Authentication](#authentication)
+4. [Authorization & Permissions](#authorization--permissions)
+5. [GraphQL Operations](#graphql-operations)
+6. [Error Handling](#error-handling)
+7. [Subscriptions](#subscriptions)
+
+---
+
+## Overview
+
+The ARP server exposes a **GraphQL API** for managing users, roles, permissions, services, tasks, notes, channels, and messages. All operations except `login` require authentication via a JWT bearer token.
+
+---
+
+## Connection & Endpoint
+
+- **Protocol**: HTTP/HTTPS
+- **Endpoint**: `/query` (GraphQL endpoint)
+- **Content-Type**: `application/json`
+- **Request Method**: `POST` for queries and mutations
+
+### Example Request Structure
+
+```json
+{
+  "query": "string (GraphQL query or mutation)",
+  "variables": "object (optional variables)",
+  "operationName": "string (optional operation name)"
+}
+```
+
+---
+
+## Authentication
+
+### Login Flow
+
+1. Call the `login` mutation with email and password
+2. Receive a JWT token in the response
+3. Include the token in subsequent requests via the `Authorization` header
+
+### Login Mutation
+
+```graphql
+mutation Login($email: String!, $password: String!) {
+  login(email: $email, password: $password) {
+    token
+    user {
+      id
+      email
+      roles {
+        id
+        name
+        permissions {
+          id
+          code
+          description
+        }
+      }
+    }
+  }
+}
+```
+
+**Variables:**
+```json
+{
+  "email": "user@example.com",
+  "password": "your-password"
+}
+```
+
+**Response:**
+```json
+{
+  "data": {
+    "login": {
+      "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
+      "user": {
+        "id": "1",
+        "email": "user@example.com",
+        "roles": [...]
+      }
+    }
+  }
+}
+```
+
+### Using the Token
+
+Include the token in all subsequent requests:
+
+```
+Authorization: Bearer <token>
+```
+
+### Token Details
+
+| Property | Value |
+|----------|-------|
+| Algorithm | HS256 |
+| Expiration | 10 years from issuance |
+| Claims | `user_id`, `email`, `roles`, `permissions` |
+
+---
+
+## Authorization & Permissions
+
+### Permission-Based Access Control
+
+Many operations require specific permissions. The permission format is `resource:action`.
+
+### Required Permissions by Operation
+
+| Operation | Required Permission |
+|-----------|---------------------|
+| `updateUser` | `user:update` |
+| `deleteUser` | `user:delete` |
+| `updateNote` | `note:update` |
+| `deleteNote` | `note:delete` |
+| `updateRole` | `role:update` |
+| `deleteRole` | `role:delete` |
+| `updatePermission` | `permission:update` |
+| `deletePermission` | `permission:delete` |
+| `updateService` | `service:update` |
+| `deleteService` | `service:delete` |
+| `updateTask` | `task:update` |
+| `deleteTask` | `task:delete` |
+| `updateTaskStatus` | `taskstatus:update` |
+| `deleteTaskStatus` | `taskstatus:delete` |
+| `updateChannel` | `channel:update` |
+| `deleteChannel` | `channel:delete` |
+| `updateMessage` | `message:update` |
+| `deleteMessage` | `message:delete` |
+
+### Authorization Errors
+
+If authentication or authorization fails, the API returns an error:
+
+```json
+{
+  "errors": [
+    {
+      "message": "unauthorized: authentication required"
+    }
+  ]
+}
+```
+
+Or for missing permissions:
+
+```json
+{
+  "errors": [
+    {
+      "message": "unauthorized: missing user:update permission"
+    }
+  ]
+}
+```
+
+---
+
+## GraphQL Operations
+
+### Queries (Read Operations)
+
+All queries require authentication.
+
+#### Users
+
+```graphql
+# Get all users
+query Users {
+  users {
+    id
+    email
+    roles {
+      id
+      name
+    }
+    createdAt
+    updatedAt
+  }
+}
+
+# Get single user by ID
+query User($id: ID!) {
+  user(id: $id) {
+    id
+    email
+    roles {
+      id
+      name
+      permissions {
+        id
+        code
+        description
+      }
+    }
+    createdAt
+    updatedAt
+  }
+}
+```
+
+#### Notes
+
+```graphql
+# Get all notes
+query Notes {
+  notes {
+    id
+    title
+    content
+    userId
+    user { id email }
+    serviceId
+    service { id name }
+    createdAt
+    updatedAt
+  }
+}
+
+# Get single note by ID
+query Note($id: ID!) {
+  note(id: $id) {
+    id
+    title
+    content
+    userId
+    serviceId
+    createdAt
+    updatedAt
+  }
+}
+```
+
+#### Roles
+
+```graphql
+# Get all roles
+query Roles {
+  roles {
+    id
+    name
+    description
+    permissions {
+      id
+      code
+      description
+    }
+  }
+}
+
+# Get single role by ID
+query Role($id: ID!) {
+  role(id: $id) {
+    id
+    name
+    description
+    permissions {
+      id
+      code
+    }
+  }
+}
+```
+
+#### Permissions
+
+```graphql
+# Get all permissions
+query Permissions {
+  permissions {
+    id
+    code
+    description
+  }
+}
+
+# Get single permission by ID
+query Permission($id: ID!) {
+  permission(id: $id) {
+    id
+    code
+    description
+  }
+}
+```
+
+#### Services
+
+```graphql
+# Get all services
+query Services {
+  services {
+    id
+    name
+    description
+    createdById
+    createdBy { id email }
+    participants { id email }
+    tasks { id title }
+    createdAt
+    updatedAt
+  }
+}
+
+# Get single service by ID
+query Service($id: ID!) {
+  service(id: $id) {
+    id
+    name
+    description
+    participants { id email }
+    tasks { id title status { code label } }
+    createdAt
+    updatedAt
+  }
+}
+```
+
+#### Tasks
+
+```graphql
+# Get all tasks
+query Tasks {
+  tasks {
+    id
+    title
+    content
+    createdById
+    createdBy { id email }
+    updatedById
+    updatedBy { id email }
+    assigneeId
+    assignee { id email }
+    statusId
+    status { id code label }
+    dueDate
+    priority
+    createdAt
+    updatedAt
+  }
+}
+
+# Get single task by ID
+query Task($id: ID!) {
+  task(id: $id) {
+    id
+    title
+    content
+    createdById
+    createdBy { id email }
+    updatedById
+    updatedBy { id email }
+    assignee { id email }
+    status { code label }
+    dueDate
+    priority
+    createdAt
+    updatedAt
+  }
+}
+```
+
+#### Task Statuses
+
+```graphql
+# Get all task statuses
+query TaskStatuses {
+  taskStatuses {
+    id
+    code
+    label
+    tasks { id title }
+    createdAt
+    updatedAt
+  }
+}
+
+# Get single task status by ID
+query TaskStatus($id: ID!) {
+  taskStatus(id: $id) {
+    id
+    code
+    label
+    tasks { id title }
+    createdAt
+    updatedAt
+  }
+}
+```
+
+#### Channels
+
+```graphql
+# Get all channels
+query Channels {
+  channels {
+    id
+    participants { id email }
+    createdAt
+    updatedAt
+  }
+}
+
+# Get single channel by ID
+query Channel($id: ID!) {
+  channel(id: $id) {
+    id
+    participants { id email }
+    createdAt
+    updatedAt
+  }
+}
+```
+
+#### Messages
+
+```graphql
+# Get all messages
+query Messages {
+  messages {
+    id
+    conversationId
+    senderId
+    sender { id email }
+    content
+    sentAt
+    createdAt
+    updatedAt
+  }
+}
+
+# Get single message by ID
+query Message($id: ID!) {
+  message(id: $id) {
+    id
+    conversationId
+    senderId
+    sender { id email }
+    content
+    sentAt
+    createdAt
+    updatedAt
+  }
+}
+```
+
+### Mutations (Write Operations)
+
+All mutations require authentication. Some require additional permissions (see Authorization section).
+
+#### Authentication
+
+```graphql
+mutation Login($email: String!, $password: String!) {
+  login(email: $email, password: $password) {
+    token
+    user { id email }
+  }
+}
+```
+
+#### Users
+
+```graphql
+# Create user
+mutation CreateUser($input: NewUser!) {
+  createUser(input: $input) {
+    id
+    email
+    roles { id name }
+    createdAt
+  }
+}
+# Variables: { "input": { "email": "...", "password": "...", "roles": ["1", "2"] } }
+
+# Update user (requires user:update permission)
+mutation UpdateUser($id: ID!, $input: UpdateUserInput!) {
+  updateUser(id: $id, input: $input) {
+    id
+    email
+    roles { id name }
+    updatedAt
+  }
+}
+# Variables: { "id": "1", "input": { "email": "new@example.com" } }
+
+# Delete user (requires user:delete permission)
+mutation DeleteUser($id: ID!) {
+  deleteUser(id: $id)
+}
+```
+
+#### Notes
+
+```graphql
+# Create note
+mutation CreateNote($input: NewNote!) {
+  createNote(input: $input) {
+    id
+    title
+    content
+    userId
+    serviceId
+    createdAt
+  }
+}
+# Variables: { "input": { "title": "...", "content": "...", "userId": "1", "serviceId": "1" } }
+
+# Update note (requires note:update permission)
+mutation UpdateNote($id: ID!, $input: UpdateNoteInput!) {
+  updateNote(id: $id, input: $input) {
+    id
+    title
+    content
+    updatedAt
+  }
+}
+
+# Delete note (requires note:delete permission)
+mutation DeleteNote($id: ID!) {
+  deleteNote(id: $id)
+}
+```
+
+#### Roles
+
+```graphql
+# Create role
+mutation CreateRole($input: NewRole!) {
+  createRole(input: $input) {
+    id
+    name
+    description
+    permissions { id code }
+    createdAt
+  }
+}
+# Variables: { "input": { "name": "...", "description": "...", "permissions": ["1", "2"] } }
+
+# Update role (requires role:update permission)
+mutation UpdateRole($id: ID!, $input: UpdateRoleInput!) {
+  updateRole(id: $id, input: $input) {
+    id
+    name
+    permissions { id code }
+    updatedAt
+  }
+}
+
+# Delete role (requires role:delete permission)
+mutation DeleteRole($id: ID!) {
+  deleteRole(id: $id)
+}
+```
+
+#### Permissions
+
+```graphql
+# Create permission
+mutation CreatePermission($input: NewPermission!) {
+  createPermission(input: $input) {
+    id
+    code
+    description
+    createdAt
+  }
+}
+# Variables: { "input": { "code": "resource:action", "description": "..." } }
+
+# Update permission (requires permission:update permission)
+mutation UpdatePermission($id: ID!, $input: UpdatePermissionInput!) {
+  updatePermission(id: $id, input: $input) {
+    id
+    code
+    updatedAt
+  }
+}
+
+# Delete permission (requires permission:delete permission)
+mutation DeletePermission($id: ID!) {
+  deletePermission(id: $id)
+}
+```
+
+#### Services
+
+```graphql
+# Create service
+mutation CreateService($input: NewService!) {
+  createService(input: $input) {
+    id
+    name
+    description
+    createdById
+    participants { id email }
+    createdAt
+  }
+}
+# Variables: { "input": { "name": "...", "description": "...", "createdById": "1", "participants": ["1", "2"] } }
+
+# Update service (requires service:update permission)
+mutation UpdateService($id: ID!, $input: UpdateServiceInput!) {
+  updateService(id: $id, input: $input) {
+    id
+    name
+    participants { id email }
+    updatedAt
+  }
+}
+
+# Delete service (requires service:delete permission)
+mutation DeleteService($id: ID!) {
+  deleteService(id: $id)
+}
+```
+
+#### Tasks
+
+```graphql
+# Create task
+mutation CreateTask($input: NewTask!) {
+  createTask(input: $input) {
+    id
+    title
+    content
+    createdById
+    assigneeId
+    statusId
+    dueDate
+    priority
+    createdAt
+  }
+}
+# Variables: { "input": { "title": "...", "content": "...", "createdById": "1", "priority": "high" } }
+
+# Update task (requires task:update permission)
+# Note: updatedBy is automatically set to the current authenticated user
+mutation UpdateTask($id: ID!, $input: UpdateTaskInput!) {
+  updateTask(id: $id, input: $input) {
+    id
+    title
+    status { code label }
+    assignee { id email }
+    updatedBy { id email }
+    dueDate
+    priority
+    updatedAt
+  }
+}
+
+# Delete task (requires task:delete permission)
+mutation DeleteTask($id: ID!) {
+  deleteTask(id: $id)
+}
+```
+
+#### Task Statuses
+
+```graphql
+# Create task status
+mutation CreateTaskStatus($input: NewTaskStatus!) {
+  createTaskStatus(input: $input) {
+    id
+    code
+    label
+    createdAt
+  }
+}
+# Variables: { "input": { "code": "in_progress", "label": "In Progress" } }
+
+# Update task status (requires taskstatus:update permission)
+mutation UpdateTaskStatus($id: ID!, $input: UpdateTaskStatusInput!) {
+  updateTaskStatus(id: $id, input: $input) {
+    id
+    code
+    label
+    updatedAt
+  }
+}
+
+# Delete task status (requires taskstatus:delete permission)
+mutation DeleteTaskStatus($id: ID!) {
+  deleteTaskStatus(id: $id)
+}
+```
+
+#### Channels
+
+```graphql
+# Create channel
+mutation CreateChannel($input: NewChannel!) {
+  createChannel(input: $input) {
+    id
+    participants { id email }
+    createdAt
+  }
+}
+# Variables: { "input": { "participants": ["1", "2"] } }
+
+# Update channel (requires channel:update permission)
+mutation UpdateChannel($id: ID!, $input: UpdateChannelInput!) {
+  updateChannel(id: $id, input: $input) {
+    id
+    participants { id email }
+    updatedAt
+  }
+}
+
+# Delete channel (requires channel:delete permission)
+mutation DeleteChannel($id: ID!) {
+  deleteChannel(id: $id)
+}
+```
+
+#### Messages
+
+```graphql
+# Create message
+mutation CreateMessage($input: NewMessage!) {
+  createMessage(input: $input) {
+    id
+    conversationId
+    senderId
+    sender { id email }
+    content
+    sentAt
+    createdAt
+  }
+}
+# Variables: { "input": { "conversationId": "1", "senderId": "1", "content": "Hello!" } }
+
+# Update message (requires message:update permission)
+mutation UpdateMessage($id: ID!, $input: UpdateMessageInput!) {
+  updateMessage(id: $id, input: $input) {
+    id
+    content
+    updatedAt
+  }
+}
+
+# Delete message (requires message:delete permission)
+mutation DeleteMessage($id: ID!) {
+  deleteMessage(id: $id)
+}
+```
+
+---
+
+## Error Handling
+
+### Error Response Format
+
+Errors are returned in the standard GraphQL error format:
+
+```json
+{
+  "errors": [
+    {
+      "message": "Error description",
+      "path": ["fieldName"],
+      "locations": [{ "line": 1, "column": 2 }]
+    }
+  ],
+  "data": null
+}
+```
+
+### Common Error Messages
+
+| Error Message | Cause |
+|---------------|-------|
+| `unauthorized: authentication required` | Missing or invalid JWT token |
+| `unauthorized: missing X:Y permission` | User lacks required permission |
+| `invalid credentials` | Wrong email/password on login |
+| `invalid X ID` | Malformed ID provided |
+| `X not found` | Resource with given ID doesn't exist |
+| `failed to create/update/delete X` | Database operation failed |
+
+---
+
+## Subscriptions
+
+The API supports real-time updates via GraphQL subscriptions. Subscriptions use WebSocket connections and provide filtered events based on user context.
+
+### Event Filtering
+
+Subscriptions are **filtered by user context**. Users only receive events that are relevant to them:
+
+| Subscription | Filtering Rule |
+|--------------|----------------|
+| `taskCreated` | Only if user is the **assignee** |
+| `taskUpdated` | Only if user is the **assignee** |
+| `taskDeleted` | Only if user is the **assignee** |
+| `messageAdded` | Only if user is a **participant** in the channel |
+
+This means:
+- A user will only receive task events for tasks assigned to them
+- A user will only receive message events for channels they are a participant in
+- Unassigned tasks do not trigger notifications to any user
+
+### Available Subscriptions
+
+```graphql
+# Task created - received only by assignee
+subscription TaskCreated {
+  taskCreated {
+    id
+    title
+    content
+    assigneeId
+    status { code label }
+    priority
+    createdAt
+  }
+}
+
+# Task updated - received only by assignee
+subscription TaskUpdated {
+  taskUpdated {
+    id
+    title
+    content
+    assigneeId
+    updatedBy { id email }
+    status { code label }
+    priority
+    updatedAt
+  }
+}
+
+# Task deleted - received only by assignee
+subscription TaskDeleted {
+  taskDeleted {
+    id
+    title
+    assigneeId
+  }
+}
+
+# Message added - received by all channel participants
+subscription MessageAdded {
+  messageAdded {
+    id
+    conversationId
+    senderId
+    sender { id email }
+    content
+    sentAt
+  }
+}
+```
+
+### WebSocket Connection
+
+To use subscriptions, establish a WebSocket connection:
+
+1. **Protocol**: WebSocket over HTTP/HTTPS
+2. **Endpoint**: `/query` (same as GraphQL endpoint)
+3. **Authentication**: Include the JWT token in the connection parameters or headers
+
+### WebSocket Subprotocol
+
+The server uses the standard GraphQL over WebSocket protocol (`graphql-ws`):
+
+#### Connection Initialization
+
+```json
+{
+  "type": "connection_init",
+  "payload": {
+    "Authorization": "Bearer <token>"
+  }
+}
+```
+
+#### Subscribe to a Topic
+
+```json
+{
+  "id": "1",
+  "type": "start",
+  "payload": {
+    "query": "subscription { taskCreated { id title assigneeId } }"
+  }
+}
+```
+
+#### Receive Events
+
+```json
+{
+  "id": "1",
+  "type": "data",
+  "payload": {
+    "data": {
+      "taskCreated": {
+        "id": "5",
+        "title": "New Task",
+        "assigneeId": "2"
+      }
+    }
+  }
+}
+```
+
+#### Unsubscribe
+
+```json
+{
+  "id": "1",
+  "type": "stop"
+}
+```
+
+### Example: JavaScript/TypeScript Client
+
+Using the `graphql-ws` library:
+
+```javascript
+import { createClient } from 'graphql-ws';
+
+const client = createClient({
+  url: 'wss://api.example.com/query',
+  connectionParams: {
+    Authorization: `Bearer ${token}`,
+  },
+});
+
+// Subscribe to taskCreated events
+const unsubscribe = client.subscribe(
+  {
+    query: `subscription { taskCreated { id title assigneeId } }`,
+  },
+  {
+    next: (data) => {
+      console.log('Task created:', data.data.taskCreated);
+    },
+    error: (error) => {
+      console.error('Subscription error:', error);
+    },
+    complete: () => {
+      console.log('Subscription closed');
+    },
+  }
+);
+
+// Later: unsubscribe
+unsubscribe();
+```
+
+### Example: Go Client
+
+Using `github.com/99designs/gqlgen/client`:
+
+```go
+import (
+    "github.com/99designs/gqlgen/client"
+)
+
+// Create WebSocket client with authentication
+wsClient := client.New(server, client.AddHeader("Authorization", "Bearer "+token))
+
+// Subscribe to taskCreated
+subscription := wsClient.Websocket(`subscription { taskCreated { id title } }`)
+defer subscription.Close()
+
+for {
+    var response struct {
+        TaskCreated *model.Task `json:"taskCreated"`
+    }
+    err := subscription.Next(&response)
+    if err != nil {
+        break // Connection closed or error
+    }
+    if response.TaskCreated != nil {
+        fmt.Printf("Task created: %s\n", response.TaskCreated.Title)
+    }
+}
+```
+
+### Subscription Use Cases
+
+| Use Case | Subscription | Notes |
+|----------|--------------|-------|
+| Task assignment notifications | `taskCreated` | User receives event when assigned a new task |
+| Task status updates | `taskUpdated` | User receives event when their assigned task is modified |
+| Task removal | `taskDeleted` | User receives event when their assigned task is deleted |
+| Chat messages | `messageAdded` | All channel participants receive new message events |
+
+### Best Practices
+
+1. **Reconnect on Disconnect**: Implement automatic reconnection with exponential backoff
+2. **Handle Auth Errors**: If authentication fails, re-authenticate and retry
+3. **Filter Client-Side**: Even though server filters, consider additional client-side filtering if needed
+4. **Connection Management**: Close subscriptions when no longer needed to free resources
+5. **Error Handling**: Always handle subscription errors gracefully
+
+---
+
+## Data Types Reference
+
+### Scalar Types
+
+| Type | Description |
+|------|-------------|
+| `ID` | Unique identifier (string representation) |
+| `String` | UTF-8 string |
+| `Boolean` | true or false |
+
+### Enum Values
+
+**Task Priority**: `low`, `medium`, `high` (string values)
+
+**Task Status Codes**: Customizable (e.g., `open`, `in_progress`, `done`)
+
+### Date/Time Format
+
+All timestamps use **ISO 8601 / RFC 3339** format:
+
+```
+2024-01-15T10:30:00Z
+```
+
+---
+
+## Quick Reference
+
+### Authentication Header
+
+```
+Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
+```
+
+### Typical Client Flow
+
+1. **Login** → Obtain JWT token
+2. **Store Token** → Securely persist the token
+3. **Authenticated Requests** → Include token in all subsequent requests
+4. **Handle Errors** → Check for authentication/authorization errors
+5. **Token Refresh** → Token has long expiration (10 years), but handle expiry if needed
+
+### Permission Check Pattern
+
+Before performing privileged operations, verify the user has the required permission by checking the `permissions` array in the JWT claims or the user's roles.
+
+---
+
+## Implementation Tips
+
+### Language-Agnostic Considerations
+
+1. **Use a GraphQL Client Library**: Most languages have mature GraphQL clients (Apollo, urql, gql-request, etc.)
+2. **Handle JWT Securely**: Store tokens securely; never in localStorage for web apps (use httpOnly cookies or secure storage)
+3. **Implement Retry Logic**: For network failures, implement exponential backoff
+4. **Cache Responses**: Use client-side caching to reduce redundant queries
+5. **Batch Requests**: Combine multiple queries in a single request when possible
+
+### Example HTTP Request
+
+```http
+POST /query HTTP/1.1
+Host: api.example.com
+Content-Type: application/json
+Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
+
+{
+  "query": "query { users { id email } }"
+}
+```
+
+### Example cURL Command
+
+```bash
+curl -X POST https://api.example.com/query \
+  -H "Content-Type: application/json" \
+  -H "Authorization: Bearer YOUR_TOKEN" \
+  -d '{"query": "query { users { id email } }"}'
+```

+ 0 - 2
go.mod

@@ -6,7 +6,6 @@ require (
 	github.com/99designs/gqlgen v0.17.87
 	github.com/bradleyjkemp/cupaloy/v2 v2.8.0
 	github.com/golang-jwt/jwt/v5 v5.3.1
-	github.com/stretchr/testify v1.11.1
 	github.com/vektah/gqlparser/v2 v2.5.32
 	golang.org/x/crypto v0.48.0
 	gorm.io/driver/sqlite v1.6.0
@@ -31,7 +30,6 @@ require (
 	golang.org/x/sync v0.19.0 // indirect
 	golang.org/x/text v0.34.0 // indirect
 	golang.org/x/tools v0.42.0 // indirect
-	gopkg.in/yaml.v3 v3.0.1 // indirect
 )
 
 tool github.com/99designs/gqlgen

+ 0 - 1
go.sum

@@ -64,7 +64,6 @@ golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk=
 golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA=
 golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
 golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
-gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

+ 22 - 9
graph/converters.go

@@ -188,16 +188,29 @@ func convertTask(t models.Task) *model.Task {
 		statusID = &sid
 		status = convertTaskStatus(t.Status)
 	}
+	var assigneeID *string
+	var assignee *model.User
+	if t.AssigneeID != nil && t.Assignee != nil {
+		aid := strconv.FormatUint(uint64(*t.AssigneeID), 10)
+		assigneeID = &aid
+		assignee = convertUser(*t.Assignee)
+	}
 	return &model.Task{
-		ID:        strconv.FormatUint(uint64(t.ID), 10),
-		Title:     t.Title,
-		Content:   t.Content,
-		StatusID:  statusID,
-		Status:    status,
-		DueDate:   dueDate,
-		Priority:  t.Priority,
-		CreatedAt: t.CreatedAt.String(),
-		UpdatedAt: t.UpdatedAt.String(),
+		ID:          strconv.FormatUint(uint64(t.ID), 10),
+		Title:       t.Title,
+		Content:     t.Content,
+		CreatedByID: strconv.FormatUint(uint64(t.CreatedByID), 10),
+		CreatedBy:   convertUser(t.CreatedBy),
+		UpdatedByID: strconv.FormatUint(uint64(t.UpdatedByID), 10),
+		UpdatedBy:   convertUser(t.UpdatedBy),
+		AssigneeID:  assigneeID,
+		Assignee:    assignee,
+		StatusID:    statusID,
+		Status:      status,
+		DueDate:     dueDate,
+		Priority:    t.Priority,
+		CreatedAt:   t.CreatedAt.String(),
+		UpdatedAt:   t.UpdatedAt.String(),
 	}
 }
 

+ 132 - 0
graph/generated.go

@@ -170,6 +170,8 @@ type ComplexityRoot struct {
 		StatusID    func(childComplexity int) int
 		Title       func(childComplexity int) int
 		UpdatedAt   func(childComplexity int) int
+		UpdatedBy   func(childComplexity int) int
+		UpdatedByID func(childComplexity int) int
 	}
 
 	TaskStatus struct {
@@ -1070,6 +1072,18 @@ func (e *executableSchema) Complexity(ctx context.Context, typeName, field strin
 		}
 
 		return e.ComplexityRoot.Task.UpdatedAt(childComplexity), true
+	case "Task.updatedBy":
+		if e.ComplexityRoot.Task.UpdatedBy == nil {
+			break
+		}
+
+		return e.ComplexityRoot.Task.UpdatedBy(childComplexity), true
+	case "Task.updatedById":
+		if e.ComplexityRoot.Task.UpdatedByID == nil {
+			break
+		}
+
+		return e.ComplexityRoot.Task.UpdatedByID(childComplexity), true
 
 	case "TaskStatus.code":
 		if e.ComplexityRoot.TaskStatus.Code == nil {
@@ -3091,6 +3105,10 @@ func (ec *executionContext) fieldContext_Mutation_createTask(ctx context.Context
 				return ec.fieldContext_Task_createdById(ctx, field)
 			case "createdBy":
 				return ec.fieldContext_Task_createdBy(ctx, field)
+			case "updatedById":
+				return ec.fieldContext_Task_updatedById(ctx, field)
+			case "updatedBy":
+				return ec.fieldContext_Task_updatedBy(ctx, field)
 			case "assigneeId":
 				return ec.fieldContext_Task_assigneeId(ctx, field)
 			case "assignee":
@@ -3160,6 +3178,10 @@ func (ec *executionContext) fieldContext_Mutation_updateTask(ctx context.Context
 				return ec.fieldContext_Task_createdById(ctx, field)
 			case "createdBy":
 				return ec.fieldContext_Task_createdBy(ctx, field)
+			case "updatedById":
+				return ec.fieldContext_Task_updatedById(ctx, field)
+			case "updatedBy":
+				return ec.fieldContext_Task_updatedBy(ctx, field)
 			case "assigneeId":
 				return ec.fieldContext_Task_assigneeId(ctx, field)
 			case "assignee":
@@ -4598,6 +4620,10 @@ func (ec *executionContext) fieldContext_Query_tasks(_ context.Context, field gr
 				return ec.fieldContext_Task_createdById(ctx, field)
 			case "createdBy":
 				return ec.fieldContext_Task_createdBy(ctx, field)
+			case "updatedById":
+				return ec.fieldContext_Task_updatedById(ctx, field)
+			case "updatedBy":
+				return ec.fieldContext_Task_updatedBy(ctx, field)
 			case "assigneeId":
 				return ec.fieldContext_Task_assigneeId(ctx, field)
 			case "assignee":
@@ -4656,6 +4682,10 @@ func (ec *executionContext) fieldContext_Query_task(ctx context.Context, field g
 				return ec.fieldContext_Task_createdById(ctx, field)
 			case "createdBy":
 				return ec.fieldContext_Task_createdBy(ctx, field)
+			case "updatedById":
+				return ec.fieldContext_Task_updatedById(ctx, field)
+			case "updatedBy":
+				return ec.fieldContext_Task_updatedBy(ctx, field)
 			case "assigneeId":
 				return ec.fieldContext_Task_assigneeId(ctx, field)
 			case "assignee":
@@ -5452,6 +5482,10 @@ func (ec *executionContext) fieldContext_Service_tasks(_ context.Context, field
 				return ec.fieldContext_Task_createdById(ctx, field)
 			case "createdBy":
 				return ec.fieldContext_Task_createdBy(ctx, field)
+			case "updatedById":
+				return ec.fieldContext_Task_updatedById(ctx, field)
+			case "updatedBy":
+				return ec.fieldContext_Task_updatedBy(ctx, field)
 			case "assigneeId":
 				return ec.fieldContext_Task_assigneeId(ctx, field)
 			case "assignee":
@@ -5567,6 +5601,10 @@ func (ec *executionContext) fieldContext_Subscription_taskCreated(_ context.Cont
 				return ec.fieldContext_Task_createdById(ctx, field)
 			case "createdBy":
 				return ec.fieldContext_Task_createdBy(ctx, field)
+			case "updatedById":
+				return ec.fieldContext_Task_updatedById(ctx, field)
+			case "updatedBy":
+				return ec.fieldContext_Task_updatedBy(ctx, field)
 			case "assigneeId":
 				return ec.fieldContext_Task_assigneeId(ctx, field)
 			case "assignee":
@@ -5624,6 +5662,10 @@ func (ec *executionContext) fieldContext_Subscription_taskUpdated(_ context.Cont
 				return ec.fieldContext_Task_createdById(ctx, field)
 			case "createdBy":
 				return ec.fieldContext_Task_createdBy(ctx, field)
+			case "updatedById":
+				return ec.fieldContext_Task_updatedById(ctx, field)
+			case "updatedBy":
+				return ec.fieldContext_Task_updatedBy(ctx, field)
 			case "assigneeId":
 				return ec.fieldContext_Task_assigneeId(ctx, field)
 			case "assignee":
@@ -5681,6 +5723,10 @@ func (ec *executionContext) fieldContext_Subscription_taskDeleted(_ context.Cont
 				return ec.fieldContext_Task_createdById(ctx, field)
 			case "createdBy":
 				return ec.fieldContext_Task_createdBy(ctx, field)
+			case "updatedById":
+				return ec.fieldContext_Task_updatedById(ctx, field)
+			case "updatedBy":
+				return ec.fieldContext_Task_updatedBy(ctx, field)
 			case "assigneeId":
 				return ec.fieldContext_Task_assigneeId(ctx, field)
 			case "assignee":
@@ -5910,6 +5956,78 @@ func (ec *executionContext) fieldContext_Task_createdBy(_ context.Context, field
 	return fc, nil
 }
 
+func (ec *executionContext) _Task_updatedById(ctx context.Context, field graphql.CollectedField, obj *model.Task) (ret graphql.Marshaler) {
+	return graphql.ResolveField(
+		ctx,
+		ec.OperationContext,
+		field,
+		ec.fieldContext_Task_updatedById,
+		func(ctx context.Context) (any, error) {
+			return obj.UpdatedByID, nil
+		},
+		nil,
+		ec.marshalNID2string,
+		true,
+		true,
+	)
+}
+
+func (ec *executionContext) fieldContext_Task_updatedById(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
+	fc = &graphql.FieldContext{
+		Object:     "Task",
+		Field:      field,
+		IsMethod:   false,
+		IsResolver: false,
+		Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
+			return nil, errors.New("field of type ID does not have child fields")
+		},
+	}
+	return fc, nil
+}
+
+func (ec *executionContext) _Task_updatedBy(ctx context.Context, field graphql.CollectedField, obj *model.Task) (ret graphql.Marshaler) {
+	return graphql.ResolveField(
+		ctx,
+		ec.OperationContext,
+		field,
+		ec.fieldContext_Task_updatedBy,
+		func(ctx context.Context) (any, error) {
+			return obj.UpdatedBy, nil
+		},
+		nil,
+		ec.marshalNUser2ᚖgogsᚗdmscᚗdevᚋarpᚋgraphᚋmodelᚐUser,
+		true,
+		true,
+	)
+}
+
+func (ec *executionContext) fieldContext_Task_updatedBy(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) {
+	fc = &graphql.FieldContext{
+		Object:     "Task",
+		Field:      field,
+		IsMethod:   false,
+		IsResolver: false,
+		Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) {
+			switch field.Name {
+			case "id":
+				return ec.fieldContext_User_id(ctx, field)
+			case "email":
+				return ec.fieldContext_User_email(ctx, field)
+			case "password":
+				return ec.fieldContext_User_password(ctx, field)
+			case "roles":
+				return ec.fieldContext_User_roles(ctx, field)
+			case "createdAt":
+				return ec.fieldContext_User_createdAt(ctx, field)
+			case "updatedAt":
+				return ec.fieldContext_User_updatedAt(ctx, field)
+			}
+			return nil, fmt.Errorf("no field named %q was found under type User", field.Name)
+		},
+	}
+	return fc, nil
+}
+
 func (ec *executionContext) _Task_assigneeId(ctx context.Context, field graphql.CollectedField, obj *model.Task) (ret graphql.Marshaler) {
 	return graphql.ResolveField(
 		ctx,
@@ -6291,6 +6409,10 @@ func (ec *executionContext) fieldContext_TaskStatus_tasks(_ context.Context, fie
 				return ec.fieldContext_Task_createdById(ctx, field)
 			case "createdBy":
 				return ec.fieldContext_Task_createdBy(ctx, field)
+			case "updatedById":
+				return ec.fieldContext_Task_updatedById(ctx, field)
+			case "updatedBy":
+				return ec.fieldContext_Task_updatedBy(ctx, field)
 			case "assigneeId":
 				return ec.fieldContext_Task_assigneeId(ctx, field)
 			case "assignee":
@@ -9907,6 +10029,16 @@ func (ec *executionContext) _Task(ctx context.Context, sel ast.SelectionSet, obj
 			if out.Values[i] == graphql.Null {
 				out.Invalids++
 			}
+		case "updatedById":
+			out.Values[i] = ec._Task_updatedById(ctx, field, obj)
+			if out.Values[i] == graphql.Null {
+				out.Invalids++
+			}
+		case "updatedBy":
+			out.Values[i] = ec._Task_updatedBy(ctx, field, obj)
+			if out.Values[i] == graphql.Null {
+				out.Invalids++
+			}
 		case "assigneeId":
 			out.Values[i] = ec._Task_assigneeId(ctx, field, obj)
 		case "assignee":

+ 212 - 0
graph/integration_test.go

@@ -5,11 +5,13 @@ import (
 	"fmt"
 	"strings"
 	"testing"
+	"time"
 
 	"github.com/99designs/gqlgen/client"
 	"github.com/99designs/gqlgen/graphql/handler"
 	"github.com/bradleyjkemp/cupaloy/v2"
 	"gogs.dmsc.dev/arp/auth"
+	"gogs.dmsc.dev/arp/graph/model"
 	"gogs.dmsc.dev/arp/graph/testutil"
 	"gorm.io/gorm"
 )
@@ -533,6 +535,216 @@ func TestIntegration_Delete(t *testing.T) {
 	})
 }
 
+// TestIntegration_Subscriptions tests subscription functionality
+func TestIntegration_Subscriptions(t *testing.T) {
+	db, err := testutil.SetupAndBootstrapTestDB()
+	if err != nil {
+		t.Fatalf("Failed to setup test database: %v", err)
+	}
+
+	resolver := NewResolver(db)
+	schema := NewExecutableSchema(Config{Resolvers: resolver})
+	srv := handler.NewDefaultServer(schema)
+	authSrv := auth.AuthMiddleware(srv)
+
+	// Create admin client
+	adminClient := client.New(authSrv)
+
+	// Login as admin
+	var loginResponse struct {
+		Login struct {
+			Token string `json:"token"`
+			User  struct {
+				ID    string `json:"id"`
+				Email string `json:"email"`
+			} `json:"user"`
+		} `json:"login"`
+	}
+	loginQuery := `mutation { login(email: "admin@example.com", password: "secret123") { token user { id email } } }`
+	err = adminClient.Post(loginQuery, &loginResponse)
+	if err != nil {
+		t.Fatalf("Failed to login as admin: %v", err)
+	}
+	adminToken := loginResponse.Login.Token
+	adminID := loginResponse.Login.User.ID
+	adminClient = client.New(authSrv, client.AddHeader("Authorization", "Bearer "+adminToken))
+
+	// Create user1
+	var createUserResponse struct {
+		CreateUser struct {
+			ID    string `json:"id"`
+			Email string `json:"email"`
+		} `json:"createUser"`
+	}
+
+	// Get user role ID
+	var rolesResponse struct {
+		Roles []struct {
+			ID   string `json:"id"`
+			Name string `json:"name"`
+		} `json:"roles"`
+	}
+	adminClient.Post(`query { roles { id name } }`, &rolesResponse)
+	var userRoleID string
+	for _, role := range rolesResponse.Roles {
+		if role.Name == "user" {
+			userRoleID = role.ID
+			break
+		}
+	}
+
+	createUserQuery := fmt.Sprintf(`mutation { createUser(input: {email: "user1@example.com", password: "password123", roles: ["%s"]}) { id email } }`, userRoleID)
+	err = adminClient.Post(createUserQuery, &createUserResponse)
+	if err != nil {
+		t.Fatalf("Failed to create user1: %v", err)
+	}
+	user1ID := createUserResponse.CreateUser.ID
+
+	// Login as user1
+	var user1LoginResponse struct {
+		Login struct {
+			Token string `json:"token"`
+		} `json:"login"`
+	}
+	user1LoginQuery := `mutation { login(email: "user1@example.com", password: "password123") { token } }`
+	err = adminClient.Post(user1LoginQuery, &user1LoginResponse)
+	if err != nil {
+		t.Fatalf("Failed to login as user1: %v", err)
+	}
+	user1Token := user1LoginResponse.Login.Token
+
+	// Create WebSocket client for user1
+	user1WSClient := client.New(authSrv, client.AddHeader("Authorization", "Bearer "+user1Token))
+
+	// Subscribe to taskCreated as user1
+	taskCreatedSub := `subscription { taskCreated { id title content assigneeId } }`
+	taskCreatedChan := make(chan *model.Task, 1)
+	taskCreatedSubscription := user1WSClient.Websocket(taskCreatedSub)
+	defer taskCreatedSubscription.Close()
+	go func() {
+		for {
+			var taskCreatedResponse struct {
+				TaskCreated *model.Task `json:"taskCreated"`
+			}
+			err := taskCreatedSubscription.Next(&taskCreatedResponse)
+			if err != nil {
+				return
+			}
+			if taskCreatedResponse.TaskCreated != nil {
+				select {
+				case taskCreatedChan <- taskCreatedResponse.TaskCreated:
+				default:
+				}
+			}
+		}
+	}()
+
+	// Subscribe to messageAdded as user1
+	messageAddedSub := `subscription { messageAdded { id content conversationId senderId } }`
+	messageAddedChan := make(chan *model.Message, 1)
+	messageAddedSubscription := user1WSClient.Websocket(messageAddedSub)
+	defer messageAddedSubscription.Close()
+	go func() {
+		for {
+			var messageAddedResponse struct {
+				MessageAdded *model.Message `json:"messageAdded"`
+			}
+			err := messageAddedSubscription.Next(&messageAddedResponse)
+			if err != nil {
+				return
+			}
+			if messageAddedResponse.MessageAdded != nil {
+				select {
+				case messageAddedChan <- messageAddedResponse.MessageAdded:
+				default:
+				}
+			}
+		}
+	}()
+
+	// Give subscriptions time to connect
+	time.Sleep(100 * time.Millisecond)
+
+	// Create TestConversation with admin and user1 as participants
+	var createChannelResponse struct {
+		CreateChannel struct {
+			ID string `json:"id"`
+		} `json:"createChannel"`
+	}
+	createChannelQuery := fmt.Sprintf(`mutation { createChannel(input: {participants: ["%s", "%s"]}) { id } }`, adminID, user1ID)
+	err = adminClient.Post(createChannelQuery, &createChannelResponse)
+	if err != nil {
+		t.Fatalf("Failed to create channel: %v", err)
+	}
+	channelID := createChannelResponse.CreateChannel.ID
+
+	// Admin sends a message to the channel
+	var createMessageResponse struct {
+		CreateMessage struct {
+			ID string `json:"id"`
+		} `json:"createMessage"`
+	}
+	createMessageQuery := fmt.Sprintf(`mutation { createMessage(input: {conversationId: "%s", senderId: "%s", content: "Hello user1!"}) { id } }`, channelID, adminID)
+	err = adminClient.Post(createMessageQuery, &createMessageResponse)
+	if err != nil {
+		t.Fatalf("Failed to create message: %v", err)
+	}
+
+	// Wait for messageAdded event
+	select {
+	case msg := <-messageAddedChan:
+		if msg == nil {
+			t.Error("Expected messageAdded event, got nil")
+		} else if msg.Content != "Hello user1!" {
+			t.Errorf("Expected message content 'Hello user1!', got '%s'", msg.Content)
+		} else {
+			t.Log("user1 received messageAdded event successfully")
+		}
+	case <-time.After(2 * time.Second):
+		t.Error("Timeout waiting for messageAdded event")
+	}
+
+	// Get a task status ID
+	var statusesResponse struct {
+		TaskStatuses []struct {
+			ID   string `json:"id"`
+			Code string `json:"code"`
+		} `json:"taskStatuses"`
+	}
+	adminClient.Post(`query { taskStatuses { id code } }`, &statusesResponse)
+	var statusID string
+	if len(statusesResponse.TaskStatuses) > 0 {
+		statusID = statusesResponse.TaskStatuses[0].ID
+	}
+
+	// Admin creates a task assigned to user1
+	var createTaskResponse struct {
+		CreateTask struct {
+			ID    string `json:"id"`
+			Title string `json:"title"`
+		} `json:"createTask"`
+	}
+	createTaskQuery := fmt.Sprintf(`mutation { createTask(input: {title: "Task for user1", content: "This is assigned to user1", createdById: "%s", assigneeId: "%s", statusId: "%s", priority: "medium"}) { id title } }`, adminID, user1ID, statusID)
+	err = adminClient.Post(createTaskQuery, &createTaskResponse)
+	if err != nil {
+		t.Fatalf("Failed to create task: %v", err)
+	}
+
+	// Wait for taskCreated event
+	select {
+	case task := <-taskCreatedChan:
+		if task == nil {
+			t.Error("Expected taskCreated event, got nil")
+		} else if task.Title != "Task for user1" {
+			t.Errorf("Expected task title 'Task for user1', got '%s'", task.Title)
+		} else {
+			t.Log("user1 received taskCreated event successfully")
+		}
+	case <-time.After(2 * time.Second):
+		t.Error("Timeout waiting for taskCreated event")
+	}
+}
+
 // bootstrapData creates all entities for testing (skips existing items)
 func bootstrapData(t *testing.T, tc *TestClient, tracker *IDTracker, seed testutil.SeedData) {
 	// Create Permissions (skip if already exists)

+ 2 - 0
graph/model/models_gen.go

@@ -133,6 +133,8 @@ type Task struct {
 	Content     string      `json:"content"`
 	CreatedByID string      `json:"createdById"`
 	CreatedBy   *User       `json:"createdBy"`
+	UpdatedByID string      `json:"updatedById"`
+	UpdatedBy   *User       `json:"updatedBy"`
 	AssigneeID  *string     `json:"assigneeId,omitempty"`
 	Assignee    *User       `json:"assignee,omitempty"`
 	StatusID    *string     `json:"statusId,omitempty"`

+ 100 - 0
graph/resolver.go

@@ -1,6 +1,9 @@
 package graph
 
 import (
+	"sync"
+
+	"gogs.dmsc.dev/arp/graph/model"
 	"gorm.io/gorm"
 )
 
@@ -9,6 +12,103 @@ import (
 // It serves as dependency injection for your app, add any dependencies you require
 // here.
 
+// TaskEvent represents a task event for subscriptions
+type TaskEvent struct {
+	Task      *model.Task
+	UserID    uint   // The user who should receive this event (assignee)
+	EventType string // "created", "updated", "deleted"
+}
+
+// MessageEvent represents a message event for subscriptions
+type MessageEvent struct {
+	Message        *model.Message
+	ChannelID      uint
+	ParticipantIDs []uint // IDs of users who should receive this event
+}
+
+// Resolver is the main resolver struct
 type Resolver struct {
 	DB *gorm.DB
+
+	// Pub/Sub channels for subscriptions
+	taskSubscribers   map[uint]chan TaskEvent // keyed by user ID
+	taskSubscribersMu sync.RWMutex
+
+	messageSubscribers   map[uint]chan MessageEvent // keyed by user ID
+	messageSubscribersMu sync.RWMutex
+}
+
+// NewResolver creates a new resolver with initialized pub/sub
+func NewResolver(db *gorm.DB) *Resolver {
+	return &Resolver{
+		DB:                 db,
+		taskSubscribers:    make(map[uint]chan TaskEvent),
+		messageSubscribers: make(map[uint]chan MessageEvent),
+	}
+}
+
+// SubscribeToTasks registers a user for task events and returns their event channel
+func (r *Resolver) SubscribeToTasks(userID uint) <-chan TaskEvent {
+	r.taskSubscribersMu.Lock()
+	defer r.taskSubscribersMu.Unlock()
+
+	ch := make(chan TaskEvent, 10)
+	r.taskSubscribers[userID] = ch
+	return ch
+}
+
+// PublishTaskEvent sends a task event to the assignee
+func (r *Resolver) PublishTaskEvent(task *model.Task, assigneeID *uint, eventType string) {
+	if assigneeID == nil {
+		return // No assignee, no one to notify
+	}
+
+	event := TaskEvent{
+		Task:      task,
+		UserID:    *assigneeID,
+		EventType: eventType,
+	}
+
+	r.taskSubscribersMu.RLock()
+	defer r.taskSubscribersMu.RUnlock()
+
+	if ch, ok := r.taskSubscribers[event.UserID]; ok {
+		select {
+		case ch <- event:
+		default:
+			// Channel full, skip event
+		}
+	}
+}
+
+// SubscribeToMessages registers a user for message events and returns their event channel
+func (r *Resolver) SubscribeToMessages(userID uint) <-chan MessageEvent {
+	r.messageSubscribersMu.Lock()
+	defer r.messageSubscribersMu.Unlock()
+
+	ch := make(chan MessageEvent, 10)
+	r.messageSubscribers[userID] = ch
+	return ch
+}
+
+// PublishMessageEvent sends a message event to all channel participants
+func (r *Resolver) PublishMessageEvent(message *model.Message, channelID uint, participantIDs []uint) {
+	event := MessageEvent{
+		Message:        message,
+		ChannelID:      channelID,
+		ParticipantIDs: participantIDs,
+	}
+
+	r.messageSubscribersMu.RLock()
+	defer r.messageSubscribersMu.RUnlock()
+
+	for _, userID := range participantIDs {
+		if ch, ok := r.messageSubscribers[userID]; ok {
+			select {
+			case ch <- event:
+			default:
+				// Channel full, skip event
+			}
+		}
+	}
 }

+ 2 - 0
graph/schema.graphqls

@@ -65,6 +65,8 @@ type Task {
   content: String!
   createdById: ID!
   createdBy: User!
+  updatedById: ID!
+  updatedBy: User!
   assigneeId: ID
   assignee: User
   statusId: ID

+ 195 - 9
graph/schema.resolvers.go

@@ -619,8 +619,12 @@ func (r *mutationResolver) CreateTask(ctx context.Context, input model.NewTask)
 	// Reload with associations
 	r.DB.Preload("CreatedBy").Preload("Assignee").Preload("Status").First(&task, task.ID)
 
+	// Publish task created event to assignee
+	graphqlTask := convertTask(task)
+	r.PublishTaskEvent(graphqlTask, task.AssigneeID, "created")
+
 	logging.LogMutation(ctx, "CREATE", "TASK", task.Title)
-	return convertTask(task), nil
+	return graphqlTask, nil
 }
 
 // UpdateTask is the resolver for the updateTask field.
@@ -633,6 +637,12 @@ func (r *mutationResolver) UpdateTask(ctx context.Context, id string, input mode
 		return nil, errors.New("unauthorized: missing task:update permission")
 	}
 
+	// Get current user for UpdatedBy
+	currentUser, err := auth.CurrentUser(ctx)
+	if err != nil {
+		return nil, fmt.Errorf("failed to get current user: %w", err)
+	}
+
 	taskID, err := toID(id)
 	if err != nil {
 		return nil, fmt.Errorf("invalid task ID: %w", err)
@@ -686,15 +696,22 @@ func (r *mutationResolver) UpdateTask(ctx context.Context, id string, input mode
 		existing.Priority = *input.Priority
 	}
 
+	// Set UpdatedByID to current user
+	existing.UpdatedByID = currentUser.ID
+
 	if err := r.DB.Save(&existing).Error; err != nil {
 		return nil, fmt.Errorf("failed to update task: %w", err)
 	}
 
 	// Reload with associations for response
-	r.DB.Preload("CreatedBy").Preload("Assignee").Preload("Status").First(&existing, existing.ID)
+	r.DB.Preload("CreatedBy").Preload("UpdatedBy").Preload("Assignee").Preload("Status").First(&existing, existing.ID)
+
+	// Publish task updated event to assignee
+	graphqlTask := convertTask(existing)
+	r.PublishTaskEvent(graphqlTask, existing.AssigneeID, "updated")
 
 	logging.LogMutation(ctx, "UPDATE", "TASK", existing.Title)
-	return convertTask(existing), nil
+	return graphqlTask, nil
 }
 
 // DeleteTask is the resolver for the deleteTask field.
@@ -930,6 +947,20 @@ func (r *mutationResolver) CreateMessage(ctx context.Context, input model.NewMes
 	// Reload with associations
 	r.DB.Preload("Sender").First(&message, message.ID)
 
+	// Get channel participants for publishing the message event
+	var channel models.Channel
+	if err := r.DB.Preload("Participants").First(&channel, conversationID).Error; err == nil {
+		// Build list of participant IDs
+		participantIDs := make([]uint, len(channel.Participants))
+		for i, participant := range channel.Participants {
+			participantIDs[i] = participant.ID
+		}
+
+		// Publish message event to channel participants
+		graphqlMessage := convertMessage(message)
+		r.PublishMessageEvent(graphqlMessage, conversationID, participantIDs)
+	}
+
 	logging.LogMutation(ctx, "CREATE", "MESSAGE", fmt.Sprintf("id=%d", message.ID))
 	return convertMessage(message), nil
 }
@@ -1192,7 +1223,7 @@ func (r *queryResolver) Tasks(ctx context.Context) ([]*model.Task, error) {
 	}
 
 	var tasks []models.Task
-	if err := r.DB.Preload("CreatedBy").Preload("Assignee").Preload("Status").Find(&tasks).Error; err != nil {
+	if err := r.DB.Preload("CreatedBy").Preload("UpdatedBy").Preload("Assignee").Preload("Status").Find(&tasks).Error; err != nil {
 		return nil, fmt.Errorf("failed to fetch tasks: %w", err)
 	}
 	logging.LogQuery(ctx, "TASKS", "all")
@@ -1212,7 +1243,7 @@ func (r *queryResolver) Task(ctx context.Context, id string) (*model.Task, error
 	}
 
 	var task models.Task
-	if err := r.DB.Preload("CreatedBy").Preload("Assignee").Preload("Status").First(&task, taskID).Error; err != nil {
+	if err := r.DB.Preload("CreatedBy").Preload("UpdatedBy").Preload("Assignee").Preload("Status").First(&task, taskID).Error; err != nil {
 		return nil, fmt.Errorf("task not found: %w", err)
 	}
 
@@ -1329,23 +1360,178 @@ func (r *queryResolver) Message(ctx context.Context, id string) (*model.Message,
 }
 
 // TaskCreated is the resolver for the taskCreated field.
+// Users only receive events for tasks where they are the assignee.
 func (r *subscriptionResolver) TaskCreated(ctx context.Context) (<-chan *model.Task, error) {
-	return nil, nil
+	// Get current user
+	user, err := auth.CurrentUser(ctx)
+	if err != nil {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+
+	// Subscribe to task events
+	eventChan := r.SubscribeToTasks(user.ID)
+
+	// Create output channel
+	outputChan := make(chan *model.Task, 10)
+
+	// Start goroutine to filter and forward events
+	go func() {
+		defer close(outputChan)
+
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case event, ok := <-eventChan:
+				if !ok {
+					return
+				}
+				// Only forward "created" events
+				if event.EventType == "created" && event.Task != nil {
+					select {
+					case outputChan <- event.Task:
+					default:
+						// Channel full, skip
+					}
+				}
+			}
+		}
+	}()
+
+	return outputChan, nil
 }
 
 // TaskUpdated is the resolver for the taskUpdated field.
+// Users only receive events for tasks where they are the assignee.
 func (r *subscriptionResolver) TaskUpdated(ctx context.Context) (<-chan *model.Task, error) {
-	return nil, nil
+	// Get current user
+	user, err := auth.CurrentUser(ctx)
+	if err != nil {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+
+	// Subscribe to task events
+	eventChan := r.SubscribeToTasks(user.ID)
+
+	// Create output channel
+	outputChan := make(chan *model.Task, 10)
+
+	// Start goroutine to filter and forward events
+	go func() {
+		defer close(outputChan)
+
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case event, ok := <-eventChan:
+				if !ok {
+					return
+				}
+				// Only forward "updated" events
+				if event.EventType == "updated" && event.Task != nil {
+					select {
+					case outputChan <- event.Task:
+					default:
+						// Channel full, skip
+					}
+				}
+			}
+		}
+	}()
+
+	return outputChan, nil
 }
 
 // TaskDeleted is the resolver for the taskDeleted field.
+// Users only receive events for tasks where they are the assignee.
 func (r *subscriptionResolver) TaskDeleted(ctx context.Context) (<-chan *model.Task, error) {
-	return nil, nil
+	// Get current user
+	user, err := auth.CurrentUser(ctx)
+	if err != nil {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+
+	// Subscribe to task events
+	eventChan := r.SubscribeToTasks(user.ID)
+
+	// Create output channel
+	outputChan := make(chan *model.Task, 10)
+
+	// Start goroutine to filter and forward events
+	go func() {
+		defer close(outputChan)
+
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case event, ok := <-eventChan:
+				if !ok {
+					return
+				}
+				// Only forward "deleted" events
+				if event.EventType == "deleted" && event.Task != nil {
+					select {
+					case outputChan <- event.Task:
+					default:
+						// Channel full, skip
+					}
+				}
+			}
+		}
+	}()
+
+	return outputChan, nil
 }
 
 // MessageAdded is the resolver for the messageAdded field.
+// Users only receive events for messages in channels where they are participants.
 func (r *subscriptionResolver) MessageAdded(ctx context.Context) (<-chan *model.Message, error) {
-	return nil, nil
+	// Get current user
+	user, err := auth.CurrentUser(ctx)
+	if err != nil {
+		return nil, errors.New("unauthorized: authentication required")
+	}
+
+	// Subscribe to message events
+	eventChan := r.SubscribeToMessages(user.ID)
+
+	// Create output channel
+	outputChan := make(chan *model.Message, 10)
+
+	// Start goroutine to filter and forward events
+	go func() {
+		defer close(outputChan)
+
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case event, ok := <-eventChan:
+				if !ok {
+					return
+				}
+				// Check if user is in the participant list
+				isParticipant := false
+				for _, participantID := range event.ParticipantIDs {
+					if participantID == user.ID {
+						isParticipant = true
+						break
+					}
+				}
+				if isParticipant && event.Message != nil {
+					select {
+					case outputChan <- event.Message:
+					default:
+						// Channel full, skip
+					}
+				}
+			}
+		}
+	}()
+
+	return outputChan, nil
 }
 
 // Mutation returns MutationResolver implementation.

+ 0 - 4
graph/testutil/client.go

@@ -1,4 +0,0 @@
-package testutil
-
-// This file previously contained a TestClient that caused an import cycle.
-// The TestClient is now defined directly in integration_test.go.

+ 2 - 1
models/models.go

@@ -81,7 +81,8 @@ type Task struct {
 	CreatedBy   User `gorm:"foreignKey:CreatedByID"`
 
 	// Who updated the task
-	UpdatedBy User `gorm:"foreignKey:CreatedByID"`
+	UpdatedByID uint
+	UpdatedBy   User `gorm:"foreignKey:UpdatedByID"`
 
 	// Assignment – can be nil (unassigned) or point to a user/agent
 	AssigneeID *uint

+ 30 - 4
server.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"log"
 	"net/http"
 	"os"
@@ -38,16 +39,41 @@ func main() {
 		log.Fatal("failed to migrate database:", err)
 	}
 
-	// Create resolver with DB instance
-	resolver := &graph.Resolver{
-		DB: db,
-	}
+	// Create resolver with DB instance using NewResolver for pub/sub support
+	resolver := graph.NewResolver(db)
 
 	srv := handler.New(graph.NewExecutableSchema(graph.Config{Resolvers: resolver}))
 
 	srv.AddTransport(transport.Options{})
 	srv.AddTransport(transport.GET{})
 	srv.AddTransport(transport.POST{})
+	srv.AddTransport(&transport.Websocket{
+		// Authenticate WebSocket connections
+		InitFunc: func(ctx context.Context, initPayload transport.InitPayload) (context.Context, *transport.InitPayload, error) {
+			// Get token from connection params
+			token, ok := initPayload["Authorization"].(string)
+			if !ok {
+				return ctx, nil, nil
+			}
+
+			// Validate token and add user to context
+			claims, err := auth.ValidateToken(token)
+			if err != nil {
+				return ctx, nil, nil
+			}
+
+			// Convert Claims to UserContext
+			userCtx := &auth.UserContext{
+				ID:          claims.UserID,
+				Email:       claims.Email,
+				Roles:       claims.Roles,
+				Permissions: claims.Permissions,
+			}
+
+			ctx = auth.WithUser(ctx, userCtx)
+			return ctx, nil, nil
+		},
+	})
 
 	srv.SetQueryCache(lru.New[*ast.QueryDocument](1000))