Comunicação Assíncrona
Este pacote implementa um sistema de mensageria assíncrona com suporte para múltiplos provedores (AWS SNS/SQS, Google Cloud Pub/Sub e RabbitMQ), abrangendo recursos de produção e consumo de mensagens, observabilidade e tratamento de erros.
Importante: Antes de utilizar o mecanismo de comunicação assíncrona, é necessário que toda a estrutura de tópicos e filas tenha sido criada previamente no provedor escolhido.
Configuração
O sistema detecta automaticamente o provedor de nuvem configurado através das variáveis de ambiente.
AWS / GCP (Padrão)
Por padrão, a variável COLIBRI_MESSAGING é definida como CLOUD_DEFAULT.
RabbitMQ
Para utilizar o RabbitMQ, configure as seguintes variáveis:
COLIBRI_MESSAGING: Defina comoRABBITMQ.RABBITMQ_URL: URL de acesso ao serviço (ex:amqp://guest:guest@localhost:5672/).
Nota: Ao utilizar RabbitMQ, os serviços de nuvem (SNS/SQS e Pub/Sub) são ignorados.
Inicialização
Para habilitar os recursos de mensageria, adicione a inicialização na função main.go:
// Inicialização do sistema de mensageria
messaging.Initialize()
Componentes Principais
1. Produtores (Publishers)
Utilizados para enviar mensagens para um tópico específico.
// Criação de um produtor
producer := messaging.NewProducer("NOME_DO_TOPICO")
// Publicação de mensagem
// O segundo parâmetro "action" ajuda a identificar o propósito da mensagem
err := producer.Publish(ctx, "create", minhaMensagem)
Características:
- Suporte a tipagem forte.
- Propagação automática do contexto de autenticação.
- Rastreamento de mensagens via UUID.
- Monitoramento integrado.
2. Consumidores (Consumers)
Para consumir mensagens, implemente a interface de consumidor e registre-a no sistema.
// Implementação do consumidor
type MeuConsumidor struct{}
func (c *MeuConsumidor) QueueName() string {
return "NOME_DA_FILA"
}
func (c *MeuConsumidor) Consume(ctx context.Context, msg *ProviderMessage) error {
var dados MinhaEstrutura
if err := msg.DecodeAndValidateMessage(&dados); err != nil {
return err
}
// Lógica de processamento da mensagem
return nil
}
// Registro do consumidor
messaging.NewConsumer(&MeuConsumidor{})
3. Estrutura da Mensagem (ProviderMessage)
As mensagens recebidas pelo consumidor seguem a estrutura abaixo:
type ProviderMessage struct {
ID uuid.UUID
Origin string
Action string
Message any
AuthContext *security.AuthenticationContext
}
Recursos Avançados
1. Suporte Multi-Cloud
Abstração completa para:
- AWS: SNS para tópicos e SQS para filas.
- Google Cloud: Pub/Sub para tópicos e assinaturas.
- RabbitMQ: Exchanges e Queues.
2. Observabilidade e Resiliência
- Integração nativa com OpenTelemetry.
- Logging estruturado e rastreamento de mensagens.
- Suporte a Dead Letter Queue (DLQ) para tratamento de falhas.
Exemplos de Uso
1. Publicando um Novo Usuário
type Usuario struct {
Nome string `json:"nome"`
Email string `json:"email"`
}
func PublicarNovoUsuario(ctx context.Context, usuario Usuario) error {
producer := messaging.NewProducer("USUARIOS_CRIADOS")
return producer.Publish(ctx, "create", usuario)
}
2. Processamento com Contexto de Autenticação
func (p *MeuConsumidor) Consume(ctx context.Context, msg *ProviderMessage) error {
// O contexto de autenticação é preenchido automaticamente
// se fornecido nos metadados da mensagem original.
tenantID := msg.AuthContext.TenantID
userID := msg.AuthContext.UserID
// Processamento com isolamento de dados ou permissões específicas
return nil
}
Boas Práticas
- Nomenclatura: Use nomes descritivos em maiúsculas com underscore (ex:
PEDIDOS_PROCESSADOS), preferencialmente prefixados pelo nome do serviço de origem. - Idempotência: Garanta que o processamento da mensagem seja idempotente para evitar efeitos colaterais em caso de reprocessamento.
- Validação: Utilize sempre
msg.DecodeAndValidateMessagepara garantir que o payload recebido está conforme o esperado. - Monitoramento: Acompanhe o tamanho das filas e a taxa de erros para identificar gargalos ou falhas na lógica de consumo.