Asynchronous Communication
This package implements an asynchronous messaging system with support for multiple providers (AWS SNS/SQS, Google Cloud Pub/Sub, and RabbitMQ), covering message production and consumption, observability, and error handling features.
Important: Before using the asynchronous communication mechanism, all topic and queue structures must have been previously created in the chosen provider.
Configuration
The system automatically detects the configured cloud provider via environment variables.
AWS / GCP (Default)
By default, the COLIBRI_MESSAGING variable is set to CLOUD_DEFAULT.
RabbitMQ
To use RabbitMQ, configure the following variables:
COLIBRI_MESSAGING: Set toRABBITMQ.RABBITMQ_URL: Service access URL (e.g.,amqp://guest:guest@localhost:5672/).
Note: When using RabbitMQ, cloud services (SNS/SQS and Pub/Sub) are ignored.
Initialization
To enable messaging features, add initialization in the main.go function:
// Messaging system initialization
messaging.Initialize()
Main Components
1. Publishers (Producers)
Used to send messages to a specific topic.
// Creating a producer
producer := messaging.NewProducer("TOPIC_NAME")
// Message publication
// The second parameter "action" helps identify the message's purpose
err := producer.Publish(ctx, "create", myMessage)
Features:
- Strong typing support.
- Automatic authentication context propagation.
- Message tracking via UUID.
- Integrated monitoring.
2. Consumers
To consume messages, implement the consumer interface and register it with the system.
// Consumer implementation
type MyConsumer struct{}
func (c *MyConsumer) QueueName() string {
return "QUEUE_NAME"
}
func (c *MyConsumer) Consume(ctx context.Context, msg *ProviderMessage) error {
var data MyStructure
if err := msg.DecodeAndValidateMessage(&data); err != nil {
return err
}
// Message processing logic
return nil
}
// Consumer registration
messaging.NewConsumer(&MyConsumer{})
3. Message Structure (ProviderMessage)
Messages received by the consumer follow the structure below:
type ProviderMessage struct {
ID uuid.UUID
Origin string
Action string
Message any
AuthContext *security.AuthenticationContext
}
Advanced Features
1. Multi-Cloud Support
Complete abstraction for:
- AWS: SNS for topics and SQS for queues.
- Google Cloud: Pub/Sub for topics and subscriptions.
- RabbitMQ: Exchanges and Queues.
2. Observability and Resilience
- Native integration with OpenTelemetry.
- Structured logging and message tracking.
- Dead Letter Queue (DLQ) support for failure handling.
Usage Examples
1. Publishing a New User
type User struct {
Name string `json:"name"`
Email string `json:"email"`
}
func PublishNewUser(ctx context.Context, user User) error {
producer := messaging.NewProducer("USERS_CREATED")
return producer.Publish(ctx, "create", user)
}
2. Processing with Authentication Context
func (p *MyConsumer) Consume(ctx context.Context, msg *ProviderMessage) error {
// The authentication context is automatically populated
// if provided in the original message metadata.
tenantID := msg.AuthContext.TenantID
userID := msg.AuthContext.UserID
// Processing with data isolation or specific permissions
return nil
}
Best Practices
- Naming: Use descriptive uppercase names with underscores (e.g.,
ORDERS_PROCESSED), preferably prefixed by the origin service name. - Idempotency: Ensure message processing is idempotent to avoid side effects in case of reprocessing.
- Validation: Always use
msg.DecodeAndValidateMessageto ensure the received payload is as expected. - Monitoring: Track queue size and error rate to identify bottlenecks or consumption logic failures.