A lightweight functional reactive programming (FRP) library for composing asynchronous operations with dependency injection.
npm install superpipeimport superpipe from 'superpipe'
// Create a pipeline factory with dependencies
const sp = superpipe({
greet: name => `Hello, ${name}!`
})
// Build a pipeline
const pipeline = sp('greeting-pipeline')
.input(['name'])
.pipe('greet', 'name', 'message')
.pipe(message => console.log(message), 'message')
.end()
// Execute the pipeline
pipeline('World') // Output: Hello, World!A pipeline is a sequence of pipes that execute in order. Each pipe can:
- Transform data
- Produce outputs that become available to subsequent pipes
- Control flow (stop execution by returning
false)
Each pipe has three components:
- Function: The operation to perform (can be a function or a string referencing an injected dependency)
- Input: Dependencies the function needs (retrieved from the store or injected deps)
- Output: Names to assign to the return values
Dependencies are passed when creating the superpipe factory and are available to all pipelines:
const sp = superpipe({
db: databaseConnection,
logger: loggingService,
config: appConfig
})Creates a pipeline factory function.
deps(optional): Object containing dependencies available to all pipelines
Returns a function (name, defs?) => PipelineAPI | executor
Maps positional arguments to named dependencies.
sp('my-pipeline')
.input(['userId', 'action']) // First arg -> userId, second -> actionAdds a pipe to the pipeline.
fn: Function to execute, or string name of an injected dependencyinput: String or array of dependency names to pass as argumentsoutput: String or array of names to assign to return values
// Direct function
.pipe((a, b) => a + b, ['x', 'y'], 'sum')
// Injected function by name
.pipe('myFunction', ['arg1', 'arg2'], 'result')
// Using special inputs
.pipe((next, value) => {
setTimeout(() => next(null, 'key', value * 2), 100)
}, ['next', 'value'])Sets an error handler for the pipeline. Only one error handler is allowed per pipeline.
.error((error) => console.error('Pipeline error:', error), 'error')Finalizes the pipeline and returns an executor function.
const run = sp('my-pipeline')
.input(['x'])
.pipe(x => x * 2, 'x', 'doubled')
.end()
run(5) // Executes the pipeline with x=5Pipelines can also be defined declaratively:
const run = sp('math-pipeline', [
['input', ['a', 'b']],
[(a, b) => a + b, ['a', 'b'], 'sum'],
[(sum) => console.log('Sum:', sum), 'sum']
])
run(3, 4) // Output: Sum: 7next: Control when to proceed to the next pipe (for async operations)set: Manually set output values
// Async operation with next
.pipe((next, value) => {
fetchData(value, (err, result) => {
next(err, 'data', result)
})
}, ['next', 'value'], 'data')
// Manual output with set
.pipe((set, value) => {
set('output1', value * 2)
set('output2', value * 3)
}, ['set', 'value'], ['output1', 'output2'])When a pipe returns false, the pipeline stops (useful for guards/validation):
.pipe(user => user.isAdmin, 'user') // Stops if not admin
.pipe(() => console.log('Admin access granted'))Prefix function name with ! to invert boolean results:
.pipe('!isBlocked', 'user') // Continues if isBlocked returns falsePrefix function name with ? to skip if the dependency is undefined:
.pipe('?optionalHandler', 'maybeValue') // Skips if optionalHandler or maybeValue is undefinedRename outputs using originalName:newName syntax:
.pipe(getData, 'id', 'result:userProfile') // Maps 'result' output to 'userProfile'Returning a plain object automatically sets all its keys as outputs:
.pipe(() => ({ name: 'John', age: 30 })) // Sets both 'name' and 'age'Errors can be triggered by:
- Calling
next(error)with an error - Setting
set('error', error) - Throwing an exception in a pipe function
sp('safe-pipeline')
.input(['data'])
.pipe((data) => {
if (!data) throw new Error('Data required')
return data
}, 'data', 'validated')
.error((error) => {
console.error('Error:', error.message)
}, 'error')
.end()SuperPipe is written in TypeScript and includes type definitions:
import superpipe, { Dependencies, PipelineAPI } from 'superpipe'
interface MyDeps extends Dependencies {
logger: (msg: string) => void
}
const sp = superpipe<MyDeps>({
logger: console.log
})