.png)
In a previous tutorial, sending an email directly from a request can slow down the user experience. Tasks such as generating PDF attachments, connecting to an SMTP server, and delivering emails may take several seconds to complete. During this time, the user is left waiting for the operation to finish. A better approach is to move email delivery to a background worker. In this tutorial, we will use Redis as a job queue, RQ (Redis Queue) as the background worker, and PostgreSQL to maintain a permanent audit trail of all outgoing emails
Prerequisite:This tutorial is part of the Automated Sales Invoice Series.
📚 View the Complete Automated Sales Invoice Series
⬅ Previous Part
Preliminary:
In this tutorial, I will walk you through how to queue jobs in Redis and update the details and status in storage. The file and folder are shown as follows:All the files remain intact as in the previous tutorial, except now I will add 3 files: poller.py, tasks.py, and worker.py, the app folder, and database-related files, shown as in the above diagram.
Every time the sales invoices are created and emailed, they will be marked as emailed and updated to the PostgreSQL database as soon as possible for user reference.
To record this activity, I have created a folder, which is the app folder, and under the app folder, there are the following:
- __init__.py
- create_database.py
- crud.py,
- db.py, and
- model.py
Step 1: Create a Docker Compose.yaml, Dockerfile and requirements.txt files
In fact, I have explained the basics of Docker containers and Dockerfiles in the last tutorial; please go ahead and check it out.
Before I proceed, I will launch the desktop Docker and allow the Docker engine to run in the background of the local computer.
Once Docker is on, I can now code the "docker-compose.yaml" file as follows:
services:
redis:
image: redis:7-alpine
container_name: redis
restart: unless-stopped
ports:
- "6379:6379"
redis-commander:
image: rediscommander/redis-commander:latest
container_name: redis_commander
restart: unless-stopped
environment:
REDIS_HOSTS: local:redis:6379
ports:
- "8081:8081"
depends_on:
- redis
postgres:
image: postgres:17
container_name: postgre_db
environment:
POSTGRES_DB: mydb
POSTGRES_USER: postgres
POSTGRES_PASSWORD: secret
ports:
- "5432:5432"
restart: unless-stopped
volumes:
- postgres_data:/var/lib/postgresql/data
pgadmin:
image: dpage/pgadmin4:latest
container_name: pgadmin
restart: unless-stopped
environment:
PGADMIN_DEFAULT_EMAIL: admin@example.com
PGADMIN_DEFAULT_PASSWORD: admin123
ports:
- "8080:80"
depends_on:
- postgres
volumes:
- pgadmin_data:/var/lib/pgadmin
poller:
build: .
container_name: poller
command: python poller.py
depends_on:
- redis
- postgres
restart: unless-stopped
environment:
REDIS_URL: redis://redis:6379/0
DATABASE_URL: postgresql://postgres:secret@postgres:5432/mydb
rq_worker:
build: .
container_name: rq_worker
command: rq worker default
depends_on:
- redis
- postgres
restart: unless-stopped
environment:
REDIS_URL: redis://redis:6379/0
DATABASE_URL: postgresql://postgres:secret@postgres:5432/mydb
volumes:
postgres_data:
pgadmin_data:What is the Docker Compose YAML file?
Docker Compose definition that creates/manages a container. In the above YAML file, I have included 6 container services within the same "docker-compose.yaml" file; they are as follows: Container name Image created
- Redis, redis:7-alpine
- Redis-commander, rediscommander/redis-commander:latest
- Postgresql, postgres:17
- PgAdmin, dpage/pgadmin4:latest
- Poller, built from Dockerfile (
build: .) - Rq_worler built from Dockerfile (
build: .)
For each service, I also assigned a container name for easy reference. To run PostgreSQL in a container, I use
postgresql://postgres:secret@postgres:5432/mydb
Meanwhile, if I decide to run on my local computer, I will use
postgresql://postgres:secret@localhost:5432/mydbIt depends on how I defined it in my 'Docker Compose YAML' file. Since I defined the service inside the container as 'postgres', it will look for the container's IP address. Both pooler and rq_worker within the containers will also connect to the 'postgres' container.
DATABASE_URL: postgresql://postgres:secret@postgres:5432/mydbOn the other hand, for convenience in creating a database table on a local computer, I will use the PowerShell terminal.
python -m app.create_database
But I have created a table in poller.py with create_table(); therefore, it is not relevant for this tutorial. I will explain in the following step.I also define how to log into PgAdmin, the login ID, and the password. PostgreSQL also has its own user ID, password, and DB name. Use the right authentication for the right container.
One thing worth noticing is that both poller and rq_worker relied on Redis and PostgreSQL URLs.
Why?
1) Redis
The poller will use Redis to enqueue jobs in Redis, while the rq_worker needs it to listen and fetch jobs from the queue.
Without Redis, no queue and no jobs were passing; therefore, the workflow would break.
2) PostgreSQL
The poller needs PostgreSQL to check if a record exists and avoid duplicate enqueuing. While rq_worker needs it to mark email sent and update
invoice_status.Without PostgreSQL, duplicate emails will be sent, and the same Google Sheet rows will be reprocessed forever. It is known as idempotency.
What is a Dockerfile?
If the Docker Compose YAML file is deemed a 'map', then the Dockerfile is a 'car', and it contains a sequential list of commands and instructions that Docker uses to automatically build a container image. Therefore, it defines exactly what operating system, dependencies, libraries, files, and environment configurations are needed to package and run the automation script identically on any machine. Below is my Dockerfile code:
# Use an official, lightweight Python 3.11 image
# as the foundation. "slim" keeps the image size small by omitting unnecessary
# development packages.
FROM python:3.11-slim
# Set the working directory inside the container to /app.
# All subsequent commands (like COPY and CMD) will run from this folder.
WORKDIR /app
# Copy all files and folders from your current local directory into
# the /app directory of the container.
COPY . /app
# Run pip to install the Python dependencies listed in your requirements.txt file.
RUN pip install -r requirements.txt
# Set an environment variable so Python knows it should look for modules/packages
# inside the /app directory. This prevents import errors.
ENV PYTHONPATH=/app
# Define the default command that runs when the container starts.
# This executes your 'worker.py' script using the Python interpreter.
CMD ["python", "worker.py"]
Since Docker sets up a partition to separate it from the local computer, what it requires is pointing to a folder in the container, copying all the relevant files and folders from the local computer to the container, and then installing the required dependencies inside Docker. Below are my Python modules:
psycopg2-binary==2.9.12
sqlalchemy==2.0.51
reportlab==5.0.0
dotenv==0.9.9
rq==1.16.2
redis==5.0.1
pygsheets==2.0.6
I have also mentioned the module with the current version precisely, which is to avoid future updates of the module. The Python path is very important, as it informs Docker where to locate the specific file, especially those files under the app folder. Without setting the Python path, it will render an error: module not found.
Finally, I also determined the Python command for how to get the worker.py up and running. Both the poller and Rq_worker are built from here.
What are their respective ports?
The following are the respective ports:
- Redis - http://localhost:6379,
- Redis Commander (GUI) - http://localhost:8081,
- Postgresql - http://localhost:5432, and
- PgAdmin (GUI) - http://localhost:8080
Step 2: Set up the PostgreSQL database and create a table under the app folder
Before diving in further, let us understand the following containers
(a) PostgreSQL and PgAdmin
(1) PostgreSQL - a powerful, open-source relational database management system (RDBMS). It is responsible for safely storing, organising, and retrieving data for the applications.
They only interact through your Python code inside the worker.
(2) PgAdmin - is the most popular open-source graphical user interface (GUI) management and administration tool designed specifically for PostgreSQL
I have mentioned above that I have maintained a PostgreSQL database to store the sales invoice details and mark them when a sales invoice is generated and sent. Therefore, now I need to set them up.
(i) __init__.py
from .db import Base, engine, SessionLocal
from .models import InvoiceStatus
It means I import those files from the same app folder; now I can import them to any Python file later on. Therefore, I intentionally mark the app as a package and provide a centralised export to create_table.py, db.py, or models.py. (ii) create_database.py
from app.db import engine, Base
from app.models import InvoiceStatus
# Initializes the database schema by creating all defined tables.
def create_tables():
Base.metadata.create_all(bind=engine)
print("Tables created successfully")
if __name__ == "__main__":
create_tables()
In the above, I have mentioned I can either create it manually, request it to create a database table only the first time, or use an idempotent method. When running poller.py, it will always check whether the database table exists. If yes, it does nothing at all; on the other hand, if it does not exist, it will create such tables for me. Please check the poller section below.from app.models import InvoiceStatus
from datetime import datetime
# Fetch a customer's invoice status by their email address.
def get_by_email(db, email):
return (
db.query(InvoiceStatus)
.filter(
InvoiceStatus.customer_email == email
)
.first()
)
# Create and stage a new invoice status record.
# NOTE: This function calls db.add() but does not commit the transaction.
def create_invoice_status(db, row):
record = InvoiceStatus(
customer_name=row["customer_name"],
customer_email=row["customer_email"],
email_sent=True,
sent_at=datetime.now()
)
db.add(record)
return recordI have 2 functions here. The first one is a query of a database: whether the email address exists in the database. If yes, the poller does nothing; if no, the poller will read from the Google Sheets.
Meanwhile, the second one helps me to insert a fresh record into the database. It checks whether the email already exists; skip the processing. If not, then generate a sales invoice and email it out, and finally mark it as email sent. Since I marked the email address as a reference, it will be detected by the email address; therefore, no duplicate email will be sent.
To summarise, the first one serves as a read from the database to determine whether this customer has already been processed, and it executes before generating the PDF and sending the email. Meanwhile, the second serves to create a database to record that processing was completed after the email has been sent successfully.
Even though I have set up querying and inserting, the database does not execute/commit here; instead, it executes/commits in tasks.py.
from sqlalchemy import create_engine
from sqlalchemy.orm import (sessionmaker,
declarative_base)
import os
# Database Configuration and Initialization
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql://postgres:secret@localhost:5432/mydb"
)
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(bind=engine,
autocommit=False,
autoflush=False)
Base = declarative_base()In my previous tutorial, I mostly used a raw database, whether it was a SQLite database or a PostgreSQL database or not. Here, I use SQLAlchemy as my database module. SQLAlchemy is a Pythonic object-relational mapper (ORM); it provides me with better code maintainability and database flexibility, which is the reason I am adopting it here.
The benefits of using SQLAlchemy:
- Much less code.
- Cleaner and easier to read.
- Works with multiple databases.
- Automatic escaping helps reduce SQL injection risks.
(v) models.py
from sqlalchemy import (Column, Integer, Text, String,
Boolean, DateTime)
from datetime import datetime
from app.db import Base
# Database Models / Schema Definitions
class InvoiceStatus(Base):
__tablename__ = "invoice_status"
id = Column(Integer, primary_key=True)
customer_name = Column(String)
customer_email = Column(String, unique=True)
email_sent = Column(Boolean, default=False)
sent_at = Column(DateTime)
Here is the database schema, the structure of the database. It sets up the name and column of the table, with which the querying and inserting rows will be in accordance with the corresponding column. Same as above, it is used in crud.py, poller.py, and worker.py.Step 3: Configure the poller, tasks, worker function, and Redis-commander.
In step 2 above, I have discussed the PostgreSQL database and PgAdmin; now I will go ahead to share about Redis and Redis Commander.
(a) Redis and Redis Commander
(1) Redis - Redis is an in-memory data store in RAM.
- Stores queued jobs
- Acts as a message broker
- Extremely fast,
- Usually temporary,
- Often used for caching, queues, and sessions,
- Holds things like:
- job IDs
- job status
- serialised task data
(2) Redis Commander - provides a web GUI for viewing and managing Redis data.
The entire workflow begins with the poller and worker and finally tasks; each stage plays an important role.
(b) Python Workflow Management
(i) poller.py
import time
from gsheet import get_gsheet_data
from redis import Redis
from rq import Queue
from tasks import process_invoice
from app.create_database import create_tables
# Connect to the Redis server
redis_conn = Redis(host="redis", port=6379)
# Create an RQ (Redis Queue) named "default"
queue = Queue("default", connection=redis_conn)
# Create database tables if they do not already exist
create_tables()
def process():
# Display a message indicating polling has started
print("Polling Google Sheet...")
# Retrieve all rows from the Google Sheet
rows = get_gsheet_data()
# Loop through each row retrieved from the sheet
for row in rows:
# Print the row that is about to be queued
print("Enqueue:", row)
# Add the processing task to the Redis queue
# Either of the following works:
# queue.enqueue("tasks.process_invoice", row)
queue.enqueue(process_invoice, row)
# Log progress for debugging
print("Polling Google Sheet...")
print(row)
print("Enqueueing job...")
# Continuously poll the Google Sheet every 10 seconds
while True:
# Fetch rows and enqueue processing jobs
process()
# Wait 10 seconds before polling again
time.sleep(10)
The poller will decide whether to create a job. The job will be queued in Redis, and I have also created a database table that is related to the create_database.py above.
What it does:
- Read Google Sheets
- Check for new/unprocessed rows
- Avoid duplicates (very important)
- Push job into Redis queue
queue.enqueue(process_invoice, row)
(ii) worker.py
import os
from redis import Redis
from rq import Worker, Queue
# Read the Redis connection URL from the environment variable.
# If REDIS_URL is not set, use the default Redis server running
# on the Docker service named "redis" at port 6379, database 0.
redis_url = os.getenv("REDIS_URL", "redis://redis:6379/0")
# Create a Redis connection using the URL.
redis_conn = Redis.from_url(redis_url)
# Create or connect to the "default" RQ queue.
# This is the queue the worker will monitor for incoming jobs.
queue = Queue("default", connection=redis_conn)
# Only execute this block when the script is run directly.
if __name__ == "__main__":
# Display the Redis server the worker is connecting to.
print(f"Connecting to Redis: {redis_url}")
# Create a worker that listens to the "default" queue.
worker = Worker(
[queue],
connection=redis_conn
)
# Inform the user that the worker is ready.
print("Worker is starting and listening on 'default' queue...")
# Start the worker.
# It continuously waits for new jobs, processes them,
# and remains running until it is stopped.
worker.work()
The worker listens to the Redis queue and executes jobs.
What it does:
- Connect to Redis
- Wait for jobs in the queue
- Pick a job from the queue
- Call the function inside tasks.py
worker = Worker([queue], connection=redis_conn)
worker.work()
(iii) Tasks.py
from pdf import generate_and_send_invoice
from email_util import send_email_with_attachment, SMTP_USER
from app.db import SessionLocal
from app.crud import (
get_by_email,
create_invoice_status
)
def process_invoice(row):
# Create a new database session
db = SessionLocal()
try:
# Business rule:
# If the customer's email already exists in the database,
# skip generating the invoice and sending the email.
if get_by_email(db, row["customer_email"]):
print(
f"{row['customer_email']} already exists. "
"No invoice generated and no email sent."
)
return
# Generate the invoice PDF using the row data
print("Generating PDF...")
pdf_path = generate_and_send_invoice(row)
# Send the generated invoice as an email attachment
print("Sending email...")
send_email_with_attachment(
from_addr=SMTP_USER, # Sender's email address
to_addrs={row["customer_email"]}, # Recipient's email address
subject="Sales Invoice", # Email subject
body="Please find your invoice attached.", # Email body
attachment_path=pdf_path # Path to the generated PDF
)
# Confirm that the email was sent successfully
print("Email sent successfully")
# Save the invoice processing record to PostgreSQL
create_invoice_status(db, row)
# Commit the transaction to make the changes permanent
db.commit()
print("PostgreSQL updated successfully")
except Exception as e:
# If any error occurs, undo any database changes
db.rollback()
# Display the error message
print("FAILED:", e)
# Re-raise the exception so the RQ worker marks the job as failed
raise
finally:
# Always close the database session to free resources
db.close()
The tasks where the real work happens. It determined whether an email existed or was inserted into the database when a sales invoice was generated and emailed.
PostgreSQL executed/committed here.
What it does:
- Generate invoice PDF
- Send email
- Update PostgreSQL
- Handle errors
def process_invoice(row):generate_and_send_invoice(row)
send_email_with_attachment()
create_invoice_status(db, row)
Step 4: Test run the entire function and monitor the activity in Redis Commander and PgAdmin
After all is set, now is the time to test-run the entire Python automation script. First, I need to launch Docker Desktop in the background of my local computer. Then, I need to run these few commands on the PowerShell terminal.
cd Redis_RQ
docker compose up -d --build
docker ps
Then, the terminal will be displayed as follows:
It will display all 6 containers that I have coded in the 'docker-compose.yaml' file. Docker is now up and running.Amazing, now I will go ahead to open a new browser and input the URL as localhost:8080 and log in to PgAdmin with the ID and password shown in the YAML file above; next, in the query input box, I will input the PostgreSQL command as follows:
SELECT * FROM invoice_status
Alternatively, I can run the query inside a PostgreSQL database container as follow:
docker exec -it postgre_db psql -U postgres -d mydb -c
"SELECT * FROM invoice_status;"
It will display as follows:
I could notice only one record in my PostgreSQL database, which is Orange Inc. Then, I fill out the Google Form to create and email a new sales invoice.
This Google Form will add a new row in Google Sheets since I have linked the Google Form to a Google Sheet.
I have noticed on my PowerShell terminal that the poller now detected a new row in Google Sheets and started enqueuing the job to Redis.
Great, I noticed Redis Commander actually captured the data and marked it as finished as I was processed by the rq worker.
rq_worker continues to run the following tasks:
1) Generate sales invoice PDF,
2) email the sales invoice, and
3) Update the PostgreSQL database.
To confirm it really works, I open my Gmail to check whether I have received an email.
True, I have received it. It proves that the entire process is working effectively. I returned to PgAdmin to check the PostgreSQL database.
My personal thoughts
What is amazing to me?
Yes, once the sales invoice is emailed, it will not email a duplicate email, as the script will check the database to see whether the same email exists in the database. If yes, abort the job; otherwise, continue the process. Therefore, this automation script is really 'smart'!
However, since I use the customer's email as a reference, the poller will detect whether the email exists in the database. Therefore, this is a weakness. It will hinder the same customer from placing a repeat order. I will resolve this issue in the future tutorial.
Final wrap-up:
In this tutorial, we built a production-oriented invoice processing system using Redis, RQ, and PostgreSQL to automatically generate and email sales invoices. By separating responsibilities between the poller, worker, task, and database layers, we created a scalable and maintainable workflow where Redis handles background job queues, PostgreSQL tracks processed invoices to prevent duplicate emails, and SQLAlchemy manages database operations cleanly. This architecture provides a solid foundation for developing reliable, asynchronous Python applications that can be easily extended with features such as retries, scheduling, and monitoring in future projects.
Published: Jun 2026
Last Updated: Jun 2026
Kelvin Loh is a Python developer focused on Flask, desktop applications, and business automation solutions. He shares practical tutorials and real-world coding projects to help developers and small businesses build useful applications.

%20docker_not%20sent.png)

%20polling_job.png)
%20redis_commander_record.png)
%20rq_worker_send_email.png)
%20email.png)
%20docker_sent.png)
%20rq_worker.png)
Comments
Post a Comment