Implementing Image Processing Pipeline (resize, crop, watermark, format conversion)
Image Processing Pipeline — automated sequence of image operations: resizing, cropping, watermarking, format conversion. Executed on upload or on-demand for storage optimization and loading speed improvement.
Pipeline Architecture
Upload → Validation → Processing Queue → Transform → Storage → CDN
↓
resize → crop → watermark → convert → compress
Synchronous Pipeline (on Upload)
// Node.js + Sharp
const sharp = require('sharp')
const path = require('path')
class ImagePipeline {
constructor(config) {
this.config = config
}
async process(inputBuffer, filename) {
const ext = path.extname(filename).toLowerCase()
const baseName = path.basename(filename, ext)
const results = {}
// Basic processing
const image = sharp(inputBuffer)
const metadata = await image.metadata()
// Convert to WebP for all formats
const webp = await this._toWebP(image.clone(), metadata)
results.webp = webp
// Generate preview of different sizes
for (const [size, dims] of Object.entries(this.config.sizes)) {
results[size] = await this._resize(image.clone(), dims)
results[`${size}_webp`] = await this._resize(image.clone(), dims, 'webp')
}
// Watermark for public images
if (this.config.watermark) {
results.watermarked = await this._addWatermark(image.clone())
}
return results
}
async _resize(image, { width, height, fit = 'inside' }, format = null) {
let processed = image.resize(width, height, {
fit,
withoutEnlargement: true,
background: { r: 255, g: 255, b: 255, alpha: 1 }
})
if (format === 'webp') {
processed = processed.webp({ quality: 85, effort: 4 })
} else {
processed = processed.jpeg({ quality: 85, mozjpeg: true })
}
return processed.toBuffer()
}
async _toWebP(image, metadata) {
return image
.webp({ quality: 85, effort: 4, smartSubsample: true })
.toBuffer()
}
async _addWatermark(image) {
const watermark = await sharp('./assets/watermark.png')
.resize(200)
.toBuffer()
return image
.composite([{
input: watermark,
gravity: 'southeast',
blend: 'over'
}])
.toBuffer()
}
}
const pipeline = new ImagePipeline({
sizes: {
thumbnail: { width: 150, height: 150, fit: 'cover' },
medium: { width: 800, height: 600 },
large: { width: 1920, height: 1080 },
},
watermark: true
})
Asynchronous Pipeline via Queue
For high load — process in background:
# tasks.py (Celery)
from celery import Celery
from PIL import Image
import io, boto3
app = Celery('image_tasks', broker='redis://redis:6379')
@app.task(bind=True, max_retries=3)
def process_image(self, image_id: int):
try:
# Load original image
record = db.get_image(image_id)
raw = s3.get_object(Bucket='uploads', Key=record.original_key)['Body'].read()
img = Image.open(io.BytesIO(raw))
# Normalize EXIF orientation
img = ImageOps.exif_transpose(img)
# Convert RGBA → RGB for JPEG
if img.mode == 'RGBA':
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[3])
img = background
variants = {}
for name, (w, h) in SIZES.items():
resized = img.copy()
resized.thumbnail((w, h), Image.LANCZOS)
# Save as WebP
buf = io.BytesIO()
resized.save(buf, 'WEBP', quality=85, method=6)
buf.seek(0)
key = f"processed/{image_id}/{name}.webp"
s3.put_object(
Bucket='media',
Key=key,
Body=buf,
ContentType='image/webp',
CacheControl='public, max-age=31536000'
)
variants[name] = key
db.update_image_variants(image_id, variants)
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
On-the-fly Transformation (imgproxy)
Alternative to pre-generation — imgproxy: service that transforms images by URL:
docker run -p 8080:8080 \
-e IMGPROXY_KEY=YOUR_KEY \
-e IMGPROXY_SALT=YOUR_SALT \
-e IMGPROXY_MAX_SRC_RESOLUTION=50 \
darthsim/imgproxy
import hmac, hashlib, base64
def sign_imgproxy_url(path: str, key: str, salt: str) -> str:
encoded = base64.urlsafe_b64encode(
hmac.new(
bytes.fromhex(key),
bytes.fromhex(salt) + path.encode(),
hashlib.sha256
).digest()
).rstrip(b'=').decode()
return encoded
# URL format: /SIGNATURE/resize:fit:WIDTH:HEIGHT:ENLARGE/format:webp/base64(URL)
source_url = base64.urlsafe_b64encode(b'https://storage.com/original/photo.jpg').decode()
path = f"/resize:fill:800:600:1/format:webp/{source_url}"
signature = sign_imgproxy_url(path, IMGPROXY_KEY, IMGPROXY_SALT)
image_url = f"https://img.company.com/{signature}{path}"
Timeline
Implementation of image processing pipeline with Sharp or imgproxy + S3 integration — 2–3 working days.







