resolver.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package graph
  2. import (
  3. "sync"
  4. "gogs.dmsc.dev/arp/graph/model"
  5. "gorm.io/gorm"
  6. )
  7. // This file will not be regenerated automatically.
  8. //
  9. // It serves as dependency injection for your app, add any dependencies you require
  10. // here.
  11. // TaskEvent represents a task event for subscriptions
  12. type TaskEvent struct {
  13. Task *model.Task
  14. UserID uint // The user who should receive this event (assignee)
  15. EventType string // "created", "updated", "deleted"
  16. }
  17. // MessageEvent represents a message event for subscriptions
  18. type MessageEvent struct {
  19. Message *model.Message
  20. ReceiverIDs []uint // IDs of users who should receive this event
  21. }
  22. // Resolver is the main resolver struct
  23. type Resolver struct {
  24. DB *gorm.DB
  25. // Pub/Sub channels for subscriptions (slice of channels per user for multiple subscriptions)
  26. taskSubscribers map[uint][]chan TaskEvent // keyed by user ID
  27. taskSubscribersMu sync.RWMutex
  28. messageSubscribers map[uint][]chan MessageEvent // keyed by user ID
  29. messageSubscribersMu sync.RWMutex
  30. }
  31. // NewResolver creates a new resolver with initialized pub/sub
  32. func NewResolver(db *gorm.DB) *Resolver {
  33. return &Resolver{
  34. DB: db,
  35. taskSubscribers: make(map[uint][]chan TaskEvent),
  36. messageSubscribers: make(map[uint][]chan MessageEvent),
  37. }
  38. }
  39. // SubscribeToTasks registers a user for task events and returns their event channel
  40. // Each subscription gets its own channel to avoid competition between goroutines
  41. func (r *Resolver) SubscribeToTasks(userID uint) <-chan TaskEvent {
  42. r.taskSubscribersMu.Lock()
  43. defer r.taskSubscribersMu.Unlock()
  44. ch := make(chan TaskEvent, 10)
  45. r.taskSubscribers[userID] = append(r.taskSubscribers[userID], ch)
  46. return ch
  47. }
  48. // PublishTaskEvent sends a task event to all subscribers for the assignee
  49. func (r *Resolver) PublishTaskEvent(task *model.Task, assigneeID *uint, eventType string) {
  50. if assigneeID == nil {
  51. return // No assignee, no one to notify
  52. }
  53. event := TaskEvent{
  54. Task: task,
  55. UserID: *assigneeID,
  56. EventType: eventType,
  57. }
  58. r.taskSubscribersMu.RLock()
  59. defer r.taskSubscribersMu.RUnlock()
  60. // Send to all channels for this user
  61. for _, ch := range r.taskSubscribers[event.UserID] {
  62. select {
  63. case ch <- event:
  64. default:
  65. // Channel full, skip event
  66. }
  67. }
  68. }
  69. // SubscribeToMessages registers a user for message events and returns their event channel
  70. // Each subscription gets its own channel to avoid competition between goroutines
  71. func (r *Resolver) SubscribeToMessages(userID uint) <-chan MessageEvent {
  72. r.messageSubscribersMu.Lock()
  73. defer r.messageSubscribersMu.Unlock()
  74. ch := make(chan MessageEvent, 10)
  75. r.messageSubscribers[userID] = append(r.messageSubscribers[userID], ch)
  76. return ch
  77. }
  78. // PublishMessageEvent sends a message event to all subscribers who are receivers
  79. func (r *Resolver) PublishMessageEvent(message *model.Message, receiverIDs []uint) {
  80. event := MessageEvent{
  81. Message: message,
  82. ReceiverIDs: receiverIDs,
  83. }
  84. r.messageSubscribersMu.RLock()
  85. defer r.messageSubscribersMu.RUnlock()
  86. for _, userID := range receiverIDs {
  87. // Send to all channels for this user
  88. for _, ch := range r.messageSubscribers[userID] {
  89. select {
  90. case ch <- event:
  91. default:
  92. // Channel full, skip event
  93. }
  94. }
  95. }
  96. }