Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions backend/helpers/pluginhelper/api/api_extractor_stateful.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ limitations under the License.
package api

import (
"fmt"
"reflect"

"github.com/apache/incubator-devlake/core/dal"
Expand All @@ -30,7 +29,6 @@ import (
// StatefulApiExtractorArgs is a struct that contains the arguments for a stateful api extractor
type StatefulApiExtractorArgs struct {
*SubtaskCommonArgs
Table string // subtask normally reads from a table
Extract func(row *RawData) ([]any, errors.Error)
}

Expand All @@ -41,10 +39,6 @@ type StatefulApiExtractor struct {

// NewStatefulApiExtractor creates a new StatefulApiExtractor
func NewStatefulApiExtractor(args *StatefulApiExtractorArgs) (*StatefulApiExtractor, errors.Error) {
// process args
if args.BatchSize == 0 {
args.BatchSize = 500
}
stateManager, err := NewSubtaskStateManager(args.SubtaskCommonArgs)
if err != nil {
return nil, err
Expand All @@ -60,13 +54,14 @@ func (extractor *StatefulApiExtractor) Execute() errors.Error {
// load data from database
db := extractor.GetDal()
logger := extractor.GetLogger()
table := fmt.Sprintf("_raw_%s", extractor.Table)
table := extractor.GetRawDataTable()
params := extractor.GetRawDataParams()
if !db.HasTable(table) {
return nil
}
clauses := []dal.Clause{
dal.From(table),
dal.Where("params = ?", extractor.Params),
dal.Where("params = ?", params),
dal.Orderby("id ASC"),
}

Expand All @@ -86,10 +81,10 @@ func (extractor *StatefulApiExtractor) Execute() errors.Error {
if err != nil {
return errors.Default.Wrap(err, "error running DB query")
}
logger.Info("get data from %s where params=%s and got %d", table, extractor.Params, count)
logger.Info("get data from %s where params=%s and got %d", table, params, count)
defer cursor.Close()
// batch save divider
divider := NewBatchSaveDivider(extractor.SubTaskContext, extractor.BatchSize, table, extractor.Params)
divider := NewBatchSaveDivider(extractor.SubTaskContext, extractor.GetBatchSize(), table, params)
divider.SetIncrementalMode(extractor.IsIncremental())

// progress
Expand Down Expand Up @@ -121,8 +116,8 @@ func (extractor *StatefulApiExtractor) Execute() errors.Error {
// set raw data origin field
setRawDataOrigin(result, common.RawDataOrigin{
RawDataTable: table,
RawDataParams: params,
RawDataId: row.ID,
RawDataParams: row.Params,
})
// records get saved into db when slots were max outed
err = batch.Add(result)
Expand Down
129 changes: 129 additions & 0 deletions backend/helpers/pluginhelper/api/data_convertor_stateful.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
"reflect"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
plugin "github.com/apache/incubator-devlake/core/plugin"
)

type StatefulDataConverterArgs[InputType any] struct {
*SubtaskCommonArgs
Input func(*SubtaskStateManager) (dal.Rows, errors.Error)
Convert func(row *InputType) ([]any, errors.Error)
BatchSize int
}

type StatefulDataConverter[InputType any] struct {
*StatefulDataConverterArgs[InputType]
*SubtaskStateManager
}

func NewStatefulDataConverter[
OptType any,
InputType any,
](
args *StatefulDataConverterArgs[InputType],
) (*StatefulDataConverter[InputType], errors.Error) {
// process args
if args.BatchSize == 0 {
args.BatchSize = 500
}
stateManager, err := NewSubtaskStateManager(args.SubtaskCommonArgs)
if err != nil {
return nil, err
}
return &StatefulDataConverter[InputType]{
StatefulDataConverterArgs: args,
SubtaskStateManager: stateManager,
}, nil
}

func (converter *StatefulDataConverter[InputType]) Execute() errors.Error {
// load data from database
db := converter.GetDal()

table := converter.GetRawDataTable()
params := converter.GetRawDataParams()

// batch save divider
RAW_DATA_ORIGIN := "RawDataOrigin"
divider := NewBatchSaveDivider(converter, converter.BatchSize, table, params)
divider.SetIncrementalMode(converter.IsIncremental())

// set progress
converter.SetProgress(0, -1)

cursor, err := converter.Input(converter.SubtaskStateManager)
if err != nil {
return err
}
defer cursor.Close()
ctx := converter.GetContext()
// iterate all rows
for cursor.Next() {
select {
case <-ctx.Done():
return errors.Convert(ctx.Err())
default:
}
inputRow := new(InputType)
err := db.Fetch(cursor, inputRow)
if err != nil {
return errors.Default.Wrap(err, "error fetching rows")
}

results, err := converter.Convert(inputRow)
if err != nil {
return errors.Default.Wrap(err, "error calling Converter plugin implementation")
}

for _, result := range results {
// get the batch operator for the specific type
batch, err := divider.ForType(reflect.TypeOf(result))
if err != nil {
return errors.Default.Wrap(err, "error getting batch from result")
}
// set raw data origin field
origin := reflect.ValueOf(result).Elem().FieldByName(RAW_DATA_ORIGIN)
if origin.IsValid() {
origin.Set(reflect.ValueOf(inputRow).Elem().FieldByName(RAW_DATA_ORIGIN))
}
// records get saved into db when slots were max outed
err = batch.Add(result)
if err != nil {
return errors.Default.Wrap(err, "error adding result to batch")
}
}
converter.IncProgress(1)
}

// save the last batches
err = divider.Close()
if err != nil {
return err
}
// save the incremantal state
return converter.SubtaskStateManager.Close()
}

// Check if DataConverter implements SubTask interface
var _ plugin.SubTask = (*StatefulDataConverter[any])(nil)
39 changes: 33 additions & 6 deletions backend/helpers/pluginhelper/api/subtask_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,48 @@ limitations under the License.
package api

import (
"fmt"
"reflect"
"time"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/models"
plugin "github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/utils"
)

// SubtaskCommonArgs is a struct that contains the common arguments for a subtask
type SubtaskCommonArgs struct {
plugin.SubTaskContext
Params string // for filtering rows belonging to the scope (jira board, github repo) of the subtask
SubtaskConfig string // for determining whether the subtask should run in incremental or full sync mode
Table string // raw table name
Params any // for filtering rows belonging to the scope (jira board, github repo) of the subtask
SubtaskConfig any // for determining whether the subtask should run in incremental or full sync mode
BatchSize int // batch size for saving data
}

func (args *SubtaskCommonArgs) GetRawDataTable() string {
return fmt.Sprintf("_raw_%s", args.Table)
}

func (args *SubtaskCommonArgs) GetRawDataParams() string {
if args.Params == nil || reflect.ValueOf(args.Params).IsZero() {
panic(errors.Default.New("Params is nil"))
}
return utils.ToJsonString(args.Params)
}

func (args *SubtaskCommonArgs) GetSubtaskConfig() string {
return utils.ToJsonString(args.SubtaskConfig)
}

func (args *SubtaskCommonArgs) GetBatchSize() int {
if args.BatchSize == 0 {
args.BatchSize = 500
}
return args.BatchSize
}

// SubtaskStateManager manages the state of a subtask. It is used to determine whether
// the subtask should run in incremental mode or full sync mode and what time range to collect.
type SubtaskStateManager struct {
Expand All @@ -56,15 +82,16 @@ func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskState
if syncPolicy == nil {
syncPolicy = &models.SyncPolicy{}
}
params := args.GetRawDataParams()
// load the previous state from the database
state := &models.SubtaskState{}
err = db.First(state, dal.Where(`plugin = ? AND subtask =? AND params = ?`, plugin, subtask, args.Params))
err = db.First(state, dal.Where(`plugin = ? AND subtask =? AND params = ?`, plugin, subtask, params))
if err != nil {
if db.IsErrorNotFound(err) {
state = &models.SubtaskState{
Plugin: plugin,
Subtask: subtask,
Params: args.Params,
Params: params,
}
err = nil
} else {
Expand All @@ -81,7 +108,7 @@ func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskState
isIncremental: false,
since: syncPolicy.TimeAfter,
until: &now,
config: args.SubtaskConfig,
config: utils.ToJsonString(args.SubtaskConfig),
}
// fallback to the previous timeAfter if no new value
if stateManager.since == nil {
Expand All @@ -94,7 +121,7 @@ func NewSubtaskStateManager(args *SubtaskCommonArgs) (stateManager *SubtaskState
// if timeAfter is not set or NOT before the previous vaule, we are in the incremental mode
if (syncPolicy.TimeAfter == nil || state.TimeAfter == nil || !syncPolicy.TimeAfter.Before(*state.TimeAfter)) &&
// and the previous config is the same as the current config
(state.PrevConfig == stateManager.config) {
(state.PrevConfig == "" || state.PrevConfig == stateManager.config) {
stateManager.isIncremental = true
stateManager.since = state.PrevStartedAt
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestSubtaskStateManager(t *testing.T) {
},
{
name: "config no changed",
state: &models.SubtaskState{TimeAfter: &time1, PrevStartedAt: &time2, PrevConfig: "hello"},
state: &models.SubtaskState{TimeAfter: &time1, PrevStartedAt: &time2, PrevConfig: `"hello"`},
syncPolicy: &models.SyncPolicy{TimeAfter: &time1},
config: "hello",
expectedIsIncremental: true,
Expand Down Expand Up @@ -173,6 +173,7 @@ func TestSubtaskStateManager(t *testing.T) {
stateManager, err := NewSubtaskStateManager(&SubtaskCommonArgs{
SubTaskContext: mockSubtaskCtx,
SubtaskConfig: fmt.Sprintf("%v", tc.config),
Params: "whatever",
})
assert.Nil(t, err)
assert.Equal(t, tc.expectedSince, stateManager.since)
Expand Down
9 changes: 5 additions & 4 deletions backend/plugins/gitextractor/parser/clone_gitcli.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/log"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/core/utils"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
)

Expand All @@ -52,6 +51,7 @@ func NewGitcliCloner(basicRes context.BasicRes) *GitcliCloner {
// CloneRepoConfig is the configuration for the CloneRepo method
// the subtask should run in Full Sync mode whenever the configuration is changed
type CloneRepoConfig struct {
UseGoGit *bool
SkipCommitStat *bool
SkipCommitFiles *bool
NoShallowClone bool
Expand All @@ -63,12 +63,13 @@ func (g *GitcliCloner) CloneRepo(ctx plugin.SubTaskContext, localDir string) err
if !taskData.Options.NoShallowClone {
stateManager, err := api.NewSubtaskStateManager(&api.SubtaskCommonArgs{
SubTaskContext: ctx,
Params: utils.ToJsonString(taskData.Options.GitExtractorApiParams),
SubtaskConfig: utils.ToJsonString(CloneRepoConfig{
Params: taskData.Options.GitExtractorApiParams,
SubtaskConfig: CloneRepoConfig{
UseGoGit: taskData.Options.UseGoGit,
SkipCommitStat: taskData.Options.SkipCommitStat,
SkipCommitFiles: taskData.Options.SkipCommitFiles,
NoShallowClone: taskData.Options.NoShallowClone,
}),
},
})
if err != nil {
return err
Expand Down
Loading