Integración de PostgreSQL con RabbitMQ mediante pg_amqp
Resumen
En este documento, exploramos cómo extender la funcionalidad de PostgreSQL para trabajar directamente con RabbitMQ, un sistema avanzado de mensajería, utilizando la extensión pg_amqp
. Este enfoque permite que PostgreSQL actúe como productor de mensajes para arquitecturas distribuidas y sistemas basados en eventos, eliminando la necesidad de procesos intermediarios. A través de un caso de uso simple, demostramos cómo implementar esta integración de manera eficiente.
Introducción
Las arquitecturas modernas dependen en gran medida de la comunicación asíncrona y la reactividad en tiempo real. RabbitMQ es una solución de mensajería ampliamente utilizada que permite la distribución de eventos en sistemas distribuidos. PostgreSQL, con su capacidad de manejar lógica empresarial directamente en la base de datos, puede integrarse de manera nativa con RabbitMQ utilizando la extensión pg_amqp
.
Este documento presenta un caso práctico en el que PostgreSQL notifica cambios en una tabla directamente a RabbitMQ, donde otros servicios pueden procesar los eventos.
Preparación del entorno
Requisitos
PostgreSQL (versión 12+).
RabbitMQ (versión 3.8+).
Extensión
pg_amqp
.
Instalación de pg_amqp
Clonar el repositorio:
git clone https://github.com/omniti-labs/pg_amqp.git cd pg_amqp
Instalar dependencias:
sudo apt-get install postgresql-server-dev-all libssl-dev libcurl4-openssl-dev make gcc
Compilar e instalar:
make sudo make install
Habilitar la extensión en PostgreSQL:
CREATE EXTENSION amqp;
Caso de uso: Notificaciones de pedidos
Escenario
Imagina un sistema de comercio electrónico en el que se gestionan pedidos en la base de datos PostgreSQL. Cuando se inserta un nuevo pedido, la base de datos envía un mensaje a RabbitMQ con los detalles del pedido. Este mensaje puede ser procesado por servicios que manejan notificaciones, facturación o envío.
Implementación
1. Configurar conexión a RabbitMQ
Primero, registra el servidor RabbitMQ en PostgreSQL:
SELECT amqp.register_server(
'rabbit_server', -- Alias del servidor
'amqp://guest:guest@localhost:5672/' -- URL de conexión al RabbitMQ
);
2. Crear una función para publicar mensajes
Crea una función en PostgreSQL que envíe mensajes a RabbitMQ:
CREATE OR REPLACE FUNCTION publicar_pedido()
RETURNS trigger AS $$
BEGIN
-- Publicar un mensaje en RabbitMQ
PERFORM amqp.publish(
'rabbit_server', -- Alias del servidor RabbitMQ
'pedidos_exchange', -- Exchange en RabbitMQ
'nuevos_pedidos', -- Routing key
json_build_object(
'id_pedido', NEW.id,
'cliente', NEW.cliente,
'total', NEW.total,
'fecha', NEW.fecha
)::text -- Mensaje en formato JSON
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
3. Crear un trigger para automatizar el envío
Asocia la función a un trigger que se dispare al insertar un nuevo pedido:
CREATE TRIGGER trigger_pedido
AFTER INSERT ON pedidos
FOR EACH ROW
EXECUTE FUNCTION publicar_pedido();
4. Configurar RabbitMQ
Crea el exchange y la cola en RabbitMQ para recibir los mensajes:
# Conéctate a RabbitMQ
rabbitmqadmin declare exchange name=pedidos_exchange type=direct
rabbitmqadmin declare queue name=cola_pedidos
rabbitmqadmin declare binding source=pedidos_exchange destination=cola_pedidos routing_key=nuevos_pedidos
5. Consumir mensajes desde RabbitMQ
Ejemplo de consumidor en Python
Este script consume los mensajes enviados por PostgreSQL:
import pika
def callback(ch, method, properties, body):
print("Pedido recibido:", body.decode())
# Conexión a RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Configurar la cola
channel.queue_declare(queue='cola_pedidos')
# Consumir mensajes
channel.basic_consume(queue='cola_pedidos', on_message_callback=callback, auto_ack=True)
print('Esperando pedidos...')
channel.start_consuming()
Ejemplo completo
Insertar un pedido:
INSERT INTO pedidos (id, cliente, total, fecha) VALUES (1, 'Juan Perez', 100.00, NOW());
Mensaje publicado en RabbitMQ:
{ "id_pedido": 1, "cliente": "Juan Perez", "total": 100.00, "fecha": "2025-01-05T12:00:00Z" }
Consumidor procesa el mensaje:
Pedido recibido: {"id_pedido": 1, "cliente": "Juan Perez", "total": 100.00, "fecha": "2025-01-05T12:00:00Z"}
Ventajas de este enfoque
Desacoplamiento: PostgreSQL se integra directamente con RabbitMQ, permitiendo que otros servicios reaccionen a los eventos sin afectar la base de datos.
Escalabilidad: RabbitMQ maneja múltiples consumidores y distribuye la carga eficientemente.
Reactividad: Los eventos se procesan en tiempo real, ideal para sistemas dinámicos.
Conclusión
La integración de PostgreSQL con RabbitMQ mediante pg_amqp
simplifica la comunicación entre la base de datos y sistemas distribuidos. Este enfoque permite que PostgreSQL se convierta en un productor de eventos confiable, reduciendo la complejidad de la arquitectura y mejorando la reactividad del sistema.