Skip to content
Snippets Groups Projects
Unverified Commit 9779ea2d authored by Timo Furrer's avatar Timo Furrer
Browse files

Implement autoflow CLI to run Flow script locally

Example Flow script:

    def check_status(resp, expected_status_code = 200):
        if resp.status_code != expected_status_code:
            fail("Unexpected HTTP status code", resp.status_code)

    def check_content_type(resp, expected_content_type = "application/json"):
        ct = resp.header.get("Content-Type", [])
        if len(ct) == 0 or not ct[0].startswith(expected_content_type):
            fail("Unexpected Content-Type", ct)

    def handle_issue_updated(w, ev):
        print("running handle_issue_updated event handler")
        project_id = ev["data"]["project_id"]
        issue_id = ev["data"]["issue_id"]

        issues_resource_url = "{}/api/v4/projects/{}/issues/{}".format(w.vars.gitlab_url, project_id, issue_id)

        resp = w.http.do(
          method = "GET",
          url = "{}/related_merge_requests".format(issues_resource_url),
        )
        check_status(resp)
        check_content_type(resp)
        body = json.decode(str(resp.body))

        open_related_merge_requests = [x for x in body if x["state"] == "opened"]
        if len(open_related_merge_requests) == 0:
          print("no related merge requests found, exiting handler")
          return

        resp = w.http.do(
          method = "PUT",
          url = issues_resource_url,
          body = bytes(json.encode({
            "labels": [
              "workflow::in progress",
            ],
          })),
          header = {
            "Private-Token": w.vars.gitlab_token,
            "Content-Type": "application/json",
          },
        )
        check_status(resp)

    on_event(
        type="com.gitlab.events.issue_updated",
        handler=handle_issue_updated,
    )

Example Event:

    {
      "id": "test",
      "source": "test",
      "spec_version": "v1",
      "type": "com.gitlab.events.issue_updated",
      "attributes": {
        "datacontenttype": {
          "ce_string": "application/json"
        }
      },
      "text_data": "{\"project_id\": 40819621, \"issue_id\": 3}"
    }

Example Invocation:

    cat test-event | autoflow run-local -f flow.star
parent cfefdf67
No related merge requests found
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
go_library(
name = "autoflow_lib",
srcs = ["main.go"],
importpath = "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/cmd/autoflow",
visibility = ["//visibility:private"],
deps = [
"//cmd",
"//cmd/autoflow/autoflowapp",
],
)
go_binary(
name = "autoflow",
embed = [":autoflow_lib"],
visibility = ["//visibility:public"],
)
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "autoflowapp",
srcs = ["app.go"],
importpath = "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/cmd/autoflow/autoflowapp",
visibility = ["//visibility:public"],
deps = [
"//cmd/autoflow/autoflowapp/commands/run-local",
"//internal/tool/logz",
"@com_github_spf13_cobra//:cobra",
],
)
package autoflowapp
import (
"log/slog"
"os"
"github.com/spf13/cobra"
runlocal "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/cmd/autoflow/autoflowapp/commands/run-local"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/internal/tool/logz"
)
type App struct {
log *slog.Logger
}
func NewCommand() *cobra.Command {
lockedWriter := &logz.LockedWriter{
Writer: os.Stderr,
}
a := App{
log: slog.New(slog.NewTextHandler(lockedWriter, &slog.HandlerOptions{Level: slog.LevelDebug})),
}
c := &cobra.Command{
Use: "autoflow",
Short: "GitLab AutoFlow command line utility",
Args: cobra.NoArgs,
SilenceUsage: true,
}
c.AddCommand(runlocal.NewCommand(a.log))
return c
}
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("//build:build.bzl", "go_custom_test")
go_library(
name = "run-local",
srcs = ["command.go"],
importpath = "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/cmd/autoflow/autoflowapp/commands/run-local",
visibility = ["//visibility:public"],
deps = [
"//internal/module/autoflow/engine",
"//internal/module/autoflow/flow",
"//internal/tool/httpz",
"//internal/tool/logz",
"//pkg/event",
"@com_github_spf13_cobra//:cobra",
"@net_starlark_go//lib/time",
"@net_starlark_go//starlark",
"@org_golang_google_protobuf//encoding/protojson",
],
)
go_custom_test(
name = "run-local_test",
srcs = ["command_test.go"],
embed = [":run-local"],
deps = ["//internal/module/autoflow/flow"],
)
package runlocal
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"reflect"
"time"
"github.com/spf13/cobra"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/internal/module/autoflow/engine"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/internal/module/autoflow/flow"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/internal/tool/httpz"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/internal/tool/logz"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/pkg/event"
startime "go.starlark.net/lib/time"
"go.starlark.net/starlark"
"google.golang.org/protobuf/encoding/protojson"
)
const defaultGitLabURL = "https://gitlab.com"
type runLocalOptions struct {
log *slog.Logger
flowScriptFilePath string
event *event.CloudEvent
gitlabURL string
gitlabToken string
}
func NewCommand(log *slog.Logger) *cobra.Command {
o := &runLocalOptions{
log: log,
}
cmd := &cobra.Command{
Use: "run-local",
Short: "Run an AutoFlow script locally for a given event",
Long: `This command can be used to locally run an event handler from an AutoFlow script given an event.
The event data is received in stdin while the AutoFlow script must be provided as a script file with the --flow-script flag.
The GitLab URL and token are injected from either environment variables or the --gitlab-url and --gitlab-token flags, respectively.
`,
Example: `cat test-event | autoflow run-local -f flow.star`,
RunE: func(cmd *cobra.Command, args []string) error {
o.complete()
if err := o.validate(); err != nil {
return err
}
if err := o.consumeIn(cmd.InOrStdin()); err != nil {
return err
}
return o.run(cmd.Context())
},
}
flags := cmd.Flags()
flags.StringVarP(&o.flowScriptFilePath, "flow-script", "f", "", "Filepath to the AutoFlow script to run")
flags.StringVarP(&o.gitlabURL, "gitlab-url", "u", defaultGitLabURL, "URL to the GitLab instance")
cobra.CheckErr(cmd.MarkFlagRequired("flow-script"))
return cmd
}
func (o *runLocalOptions) complete() {
o.gitlabToken = os.Getenv("GITLAB_TOKEN")
}
func (o *runLocalOptions) validate() error {
if o.gitlabToken == "" {
return errors.New("missing GitLab token. Use $GITLAB_TOKEN to set it")
}
return nil
}
func (o *runLocalOptions) consumeIn(in io.Reader) error {
eventData, err := io.ReadAll(in)
if err != nil {
return err
}
event := &event.CloudEvent{}
err = protojson.Unmarshal(eventData, event)
if err != nil {
return err
}
o.event = event
return nil
}
func (o *runLocalOptions) run(ctx context.Context) error {
log := o.log.With(logz.FlowScript(o.flowScriptFilePath))
log.Info("Running flow script for event", logz.EventType(o.event.Type))
flowScript, err := os.ReadFile(o.flowScriptFilePath)
if err != nil {
return err
}
modules := []flow.ModuleSource{
{
SourceName: "",
File: o.flowScriptFilePath,
Data: flowScript,
},
}
builtins := &localFlowBuiltins{
log: log,
}
runtime, err := flow.NewRuntimeFromSource(ctx, flow.RuntimeOptions{
Entrypoint: o.flowScriptFilePath,
Builtins: builtins,
Source: flow.NewTestingSource(modules),
})
if err != nil {
return err
}
eventDict, err := engine.EventToDict(o.event)
if err != nil {
return err
}
eventHandlerIDs, err := runtime.EvaluateEventHandlerIDs(o.event.Type, eventDict)
if err != nil {
return err
}
log.Info("Found event handlers", logz.NumberOfEventHandlers(len(eventHandlerIDs)))
fc := &localFlowContext{
ctx: ctx,
rt: http.DefaultTransport,
}
vars := starlark.StringDict{
"gitlab_url": starlark.String(o.gitlabURL),
"gitlab_token": starlark.String(o.gitlabToken),
}
for i, eventHandlerID := range eventHandlerIDs {
l := log.With(logz.EventHandler(eventHandlerID))
l.Info("Running event handler")
runtimeErr := runtime.Execute(o.event.Type, uint32(i), fc, vars, eventDict) //nolint:gosec
if runtimeErr != nil {
l.Error("Executed event handler with error", logz.Error(runtimeErr))
} else {
l.Info("Executed event handler successfully")
}
}
return nil
}
type localFlowBuiltins struct {
log *slog.Logger
}
func (b *localFlowBuiltins) Now() time.Time {
return time.Now()
}
func (b *localFlowBuiltins) Print(msg string) {
b.log.Debug(msg)
}
type localFlowContext struct {
ctx context.Context
rt http.RoundTripper
}
func (f *localFlowContext) HTTPDo(reqURL, method string, query url.Values, header http.Header, body []byte) (*flow.HTTPResponse, error) {
u, err := url.Parse(reqURL)
if err != nil {
return nil, err
}
url := httpz.MergeURLPathAndQuery(u, "", query)
statusCode, status, header, data, err := engine.HTTPDo(f.ctx, f.rt, url, method, body, header)
if err != nil {
return nil, err
}
return &flow.HTTPResponse{
Status: status,
StatusCode: uint32(statusCode), //nolint:gosec
Header: header,
Body: data,
}, nil
}
func (f *localFlowContext) Select(cases *starlark.Dict, defaultCase starlark.Value, timeout startime.Duration, timeoutValue starlark.Value) (starlark.Value, error) {
selectCases := make([]reflect.SelectCase, 0, cases.Len())
lookup := make([]starlark.Value, 0, cases.Len())
for _, kw := range cases.Items() {
k, ok := kw[0].(starlark.String)
if !ok {
return nil, fmt.Errorf("key must be a string, got %s", kw[0].Type())
}
v := kw[1]
switch v := v.(type) {
// NOTE: I think we need another abstraction, not future - but timer or channel or something.
case *flow.FutureType:
// In this case it's a timer ...
timer, ok := v.Unwrap().(*time.Timer)
if !ok {
return nil, errors.New("internal error: future type did not return a wrapped timer")
}
selectCases = append(selectCases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(timer.C)})
lookup = append(lookup, k)
case *flow.ReceiveChannelType:
// TODO: support receiving from channel
return nil, errors.New("the run-local runtime does not implement receiving from a channel yet")
default:
return nil, fmt.Errorf("unsupported value type %s for the select key %q", v.Type(), k)
}
}
if timeout != 0 {
timer := time.NewTimer(time.Duration(timeout))
selectCases = append(selectCases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(timer.C)})
lookup = append(lookup, timeoutValue)
}
if defaultCase != starlark.None {
selectCases = append(selectCases, reflect.SelectCase{Dir: reflect.SelectDefault})
lookup = append(lookup, defaultCase)
}
chosen, _, ok := reflect.Select(selectCases)
if !ok {
// we have the default case
return defaultCase, nil
}
if chosen >= len(lookup) {
return nil, errors.New("selected case out of range")
}
selected := lookup[chosen]
return selected, nil
}
func (f *localFlowContext) Signal(workflowID string, signalName string, payloadJSON starlark.Value, runID string) (starlark.Value, error) {
// TODO: support signal
return nil, errors.New("the run-local runtime does not implement the signal feature yet")
}
func (f *localFlowContext) SignalChannel(signalName string) (flow.ReceiveChannel, error) {
// TODO: support signal channel
return nil, errors.New("the run-local runtime does not implement the signal channel feature yet")
}
func (f *localFlowContext) Sleep(duration startime.Duration) (starlark.Value, error) {
select {
case <-f.ctx.Done():
return nil, fmt.Errorf("aborted sleep: %w", f.ctx.Err())
case <-time.After(time.Duration(duration)):
return starlark.None, nil
}
}
func (f *localFlowContext) Timer(duration startime.Duration) flow.Future {
return &localTimer{
ctx: f.ctx,
t: time.NewTimer(time.Duration(duration)),
}
}
type localTimer struct {
ctx context.Context
t *time.Timer
}
func (t *localTimer) Get() error {
select {
case <-t.ctx.Done():
return t.ctx.Err()
case <-t.t.C:
return nil
}
}
func (t *localTimer) Unwrap() any {
return t.t
}
package runlocal
import "gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/internal/module/autoflow/flow"
var (
_ flow.Builtins = (*localFlowBuiltins)(nil)
_ flow.EventHandlerContext = (*localFlowContext)(nil)
_ flow.Future = (*localTimer)(nil)
)
package main
import (
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/cmd"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v17/cmd/autoflow/autoflowapp"
)
func main() {
cmd.Run(autoflowapp.NewCommand())
}
......@@ -253,3 +253,19 @@ func ListenerName(name string) slog.Attr {
func Attempt(attempt int) slog.Attr {
return slog.Int("attempt", attempt)
}
func FlowScript(path string) slog.Attr {
return slog.String("flow_script", path)
}
func EventType(typ string) slog.Attr {
return slog.String("event_type", typ)
}
func EventHandler(name string) slog.Attr {
return slog.String("event_handler", name)
}
func NumberOfEventHandlers(n int) slog.Attr {
return slog.Int("number_of_event_handlers", n)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment