Pipeline

Pipeline

architecturedesign patterns

Headshot Bronislav Klučka, Aug 22, 2025, 04:43 PM

In my last article I discussed the rules engine patternu, pattern, a pattern motivated by separation of concerns, high cohesion, and low coupling. Today, we will continue with another pattern that supports these principles: pipeline

Let's start with two examples.

Example 1

The task is to open a CSV file with orders, filter orders for specific products, and send these orders via API to a service that processes orders.

Pseudo-implementation

function uploadOrders() {
	const file = fileSystem.open('./file.csv');
	while (const row = fileSystem.readCSV(file)) {
		if (row[3] === 'item1') {
			const customer = customerService.get(row[0]);
			const price = priceService.get(row[2]);
			const data = {
				name: customer.lastName + ' ' + customer.firstName,
				count: row[1],
				unitPrice: price.value,
				id: row[3],
			}
			net.post('https://company.com/api/order', { body: data });
		}
	}
	fileSystem.close(file);
}

The code looks fine, it's reasonably short, but try to imagine that the functionality will grow: what if there are more sources, not just CSV; what if there are more conditions, what if the API format becomes more complex. And at some point, the code may become unreadable.

Example 2

The task will be to authenticate and authorize requests to the API.

Pseudo-implementace

function authRequest() {
	const headers = request.headersAsMap();
	let user = null;
	if (headers.authorization && headers.authorization.startsWith('Bearer ')) {
		user = authModule.getUserByAuthzToken(headers.authorization); // returns User | null
	}
	if (user === null) throw Error('...')

	const allowedRoutes = routesModule.getValidRoutesForUser(user);
	if (!allowedRoutes[request.method] || !allowedRoutes[request.method].includes(request.path)) {
		throw Error('...')
	}

	let data = url.parseQuery(request.queryParams) ?? {};
	data = merge(data, JSON.parse(request.body));
	if (!validateApiRequest(request.method, request.path, data)) {
		throw Error('...')
	}

	return {
		user,
		data
	}
}

What if we have more authentication methods, or they become more complex? What if we modify the authorization logic to a complex system of rights? Even this example is pushing the limits...

Pipeline

pipeline

A pipeline is a pattern that divides a sequence of processes into individual steps and orchestrates them. Instead of one large process, the pipeline pattern consists of several independent subprocesses that are unaware of each other, and it is up to the parent process, the orchestrator, to execute them in the required order.

Basic features:

  • sequential data processing
  • depends on the order of steps

Example 1

interface Order {
	customerId: string;
	priceId: string;
	count: number;
	itemId: string;
}

interface ApiOrder {
	name: string;
	price: number;
	count: number;
	id: string;
}

function getOrders(): Order[] {
	const file = fileSystem.open('./file.csv');
	const result: Order[] = [];

	while (const row = fileSystem.readCSV(file)) {
		result.push({
			customerId: row[0];
			count: row[1];
			priceId: row[2];
			itemId: row[3];
		})
	}
	fileSystem.close(file);
	return result;
}

function filterOrder(order: Order): boolean {
	return order.itemId === 'item1';
}

function transformOrder(order: Order): ApiOrder {
	const customer = customerService.get(order.customerId);
	const price = priceService.get(order.priceId);
	return {
		name: customer.lastName + ' ' + customer.firstName,
		count: order.count,
		unitPrice: price.value,
		id: order.itemId,
	}
}

function sendOrder(order: ApiOrder): boolean {
	return net.post('https://company.com/api/order', { body: order });
}

// now, let's have fun

function processAllOrders1() {
	getOrders()
		.filter(filterOrder)
		.map(transformOrder)
		.map(sendOrder);
}

/**
* but what if getOrders() would return million records? how about memory consumption?
* generator to the rescue
*/

function* getOrders2(): Generator<Order[]>  {
	const file = fileSystem.open('./file.csv');

	while (const row = fileSystem.readCSV(file)) {
		yield {
			customerId: row[0];
			count: row[1];
			priceId: row[2];
			itemId: row[3];
		}
	}
	fileSystem.close(file);
}

function processAllOrders2() {

	// build iterator
	const iter = getOrders2()
		.filter(filterOrder)
		.map(transformOrder)
		.map(sendOrder)

	// run it
	let result = iterator.next();
	while (!result.done) {
		result = iterator.next();
	}
}

Suddenly, if we have a different source of orders, the only thing we will change is the source of orders without affecting anything else, etc.

Example 2

interface Context {
	user: User | null;
	routeAuthorized: boolean;
	data: object | null;
}

function authnUser(request: Request, context: Context): void {
	const headers = request.headersAsMap();
	if (headers.authorization && headers.authorization.startsWith('Bearer ')) {
		context.user = authModule.getUserByAuthzToken(headers.authorization); // returns User | null
	}
}

function authzUser(request: Request, context: Context): void {
	if (context.user !== null) {
		const allowedRoutes = routesModule.getValidRoutesForUser(context.user);
		context.routeAuthorized = allowedRoutes[request.method] && allowedRoutes[request.method].includes(request.path)
	}
}

function requestData(request: Request, context: Context): void {
	if (context.routeAuthorized) {
		let data = url.parseQuery(request.queryParams) ?? {};
		data = merge(data, JSON.parse(request.body));
		if (validateApiRequest(request.method, request.path, data)) {
			context.data = data
		}
	}
}


function authRequest(): Context {
	const result = {
		user: null;
		routeAuthorized: false;
		data: null;
	}
	authnUser(request, result);
	if (result.user === null) throw Error('...');
	authzUser(request, result);
	if (result.routeAuthorized === false) throw Error('...');
	requestData(request, result);
	return result;
}

Clear division of responsibilities, easy expandability.

What can you use the pipeline pattern for?

  • ETL / SFT processes
  • applying filters/transformations to data, e.g. images
  • compiler - tokenization, parsing, compilation, assembly, linking
  • interpreter - tokenization, parsing, execution
  • staging - a complex process in several stages in several modules (e.g., order processing - payment, warehouse, shipping, notification)
  • CI/CD pipeline

And many more.

Headshot Bronislav Klučka, Aug 22, 2025, 04:43 PM

Comments

Leave a comments

Comments are reviewed before published on these pages.

Unable to add a comment.

Your comment has been submitted and is awaiting review.

Your comment is too long.

We use cookies and similar technologies, such as Google Analytics, to analyze website traffic. This allows us to understand how visitors interact with our site.

More info

This website uses Google Analytics, a web analytics service offered by Google. Google Analytics uses cookies to help us analyze how users interact with our site. The information generated by the cookie about your use of the website (including your IP address) will be transmitted to and stored by Google. We use this information to compile reports on website activity and to provide other services related to website and internet activity.

Analytics cookies help us improve our services. We do not use them for marketing or advertising purposes. We do not sell this data.