Machine is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms.
It supports opentelemetry spans and metrics out of the box
It also supports building dynamic pipelines using
Components is a repository of different vertex and plugin implementations
Installation
Add the primary library to your project
go get -u github.com/whitaker-io/machineData is a library for getting and setting values in a map[string]interface{}
Documentation
Example
Basic receive -> process -> send Flow
stream := NewStream("unique_id1",
func(c context.Context) chan []Data {
channel := make(chan []Data)
// setup channel to collect data as long as
// the context has not completed
return channel
},
&Option{FIFO: boolP(false)},
&Option{Metrics: boolP(true)},
&Option{Span: boolP(false)},
)
stream.Builder().Map("unique_id2",
func(m Data) error {
var err error
// ...do some processing
return err
},
).Publish("publish_left_id", publishFN(func(d []data.Data) error {
// send the data somewhere
return nil
}),
)
if err := stream.Run(context.Background()); err != nil {
// Run will return an error in the case that
// one of the paths is not terminated (i.e. missing a Publish)
panic(err)
}🤝 Contributing
Contributions, issues and feature requests are welcome.
Feel free to check issues page if you want to contribute.
Check the contributing guide.
Author
- Twitter: @io_whitaker
- Github: @jonathan-whitaker
Show your support
Please
License
Machine is provided under the MIT License.
The MIT License (MIT)
Copyright (c) 2020 Jonathan Whitaker

