Commit ab2e76dd authored by Tomaž Jerman's avatar Tomaž Jerman Committed by Denis Arh
Browse files

Add support for batch record operations

Allows us to perform multiple operations with a single request
under the same transaction.
parent a662fcb0
......@@ -898,8 +898,14 @@
{
"type": "types.RecordValueSet",
"name": "values",
"required": true,
"required": false,
"title": "Record values"
},
{
"type": "types.BulkRecordSet",
"name": "records",
"required": false,
"title": "Records"
}
]
}
......@@ -938,8 +944,14 @@
{
"type": "types.RecordValueSet",
"name": "values",
"required": true,
"required": false,
"title": "Record values"
},
{
"type": "types.BulkRecordSet",
"name": "records",
"required": false,
"title": "Records"
}
]
}
......
......@@ -241,9 +241,15 @@
"post": [
{
"name": "values",
"required": true,
"required": false,
"title": "Record values",
"type": "types.RecordValueSet"
},
{
"name": "records",
"required": false,
"title": "Records",
"type": "types.BulkRecordSet"
}
]
}
......@@ -281,9 +287,15 @@
"post": [
{
"name": "values",
"required": true,
"required": false,
"title": "Record values",
"type": "types.RecordValueSet"
},
{
"name": "records",
"required": false,
"title": "Records",
"type": "types.BulkRecordSet"
}
]
}
......
......@@ -5,12 +5,13 @@ import (
"encoding/csv"
"encoding/json"
"fmt"
"github.com/cortezaproject/corteza-server/compose/service/values"
"github.com/cortezaproject/corteza-server/pkg/payload"
"net/http"
"path"
"strings"
"github.com/cortezaproject/corteza-server/compose/service/values"
"github.com/cortezaproject/corteza-server/pkg/payload"
"github.com/titpetric/factory/resputil"
"github.com/pkg/errors"
......@@ -33,13 +34,15 @@ type (
recordPayload struct {
*types.Record
Records types.RecordSet `json:"records,omitempty"`
CanUpdateRecord bool `json:"canUpdateRecord"`
CanDeleteRecord bool `json:"canDeleteRecord"`
}
recordSetPayload struct {
Filter types.RecordFilter `json:"filter"`
Set []*recordPayload `json:"set"`
Filter *types.RecordFilter `json:"filter,omitempty"`
Set []*recordPayload `json:"set"`
}
Record struct {
......@@ -103,7 +106,7 @@ func (ctrl *Record) List(ctx context.Context, r *request.RecordList) (interface{
rr, filter, err := ctrl.record.With(ctx).Find(rf)
return ctrl.makeFilterPayload(ctx, m, rr, filter, err)
return ctrl.makeFilterPayload(ctx, m, rr, &filter, err)
}
func (ctrl *Record) Read(ctx context.Context, r *request.RecordRead) (interface{}, error) {
......@@ -136,17 +139,42 @@ func (ctrl *Record) Create(ctx context.Context, r *request.RecordCreate) (interf
return nil, err
}
record, err := ctrl.record.With(ctx).Create(&types.Record{
NamespaceID: r.NamespaceID,
ModuleID: r.ModuleID,
Values: r.Values,
})
oo := make([]*types.BulkRecordOperation, 0)
// If defined, initialize parent record
if r.Values != nil {
rr := &types.Record{
NamespaceID: r.NamespaceID,
ModuleID: r.ModuleID,
Values: r.Values,
}
oo = append(oo, &types.BulkRecordOperation{
Record: rr,
Operation: types.OperationTypeCreate,
})
}
// If defined, initialize sub records for creation
oob, err := r.Records.ToBulkOperations(r.ModuleID, r.NamespaceID)
if err != nil {
return nil, err
}
// Validate returned bulk operations
for _, o := range oob {
if o.LinkBy != "" && len(oo) == 0 {
return nil, errors.New("missing parent record definition")
}
}
oo = append(oo, oob...)
rr, err := ctrl.record.With(ctx).Bulk(oo...)
if rve, is := err.(*types.RecordValueErrorSet); is && !rve.IsValid() {
return ctrl.handleValidationError(rve), nil
}
return ctrl.makePayload(ctx, m, record, err)
return ctrl.makePayloadBatch(ctx, m, err, rr...)
}
func (ctrl *Record) Update(ctx context.Context, r *request.RecordUpdate) (interface{}, error) {
......@@ -159,18 +187,47 @@ func (ctrl *Record) Update(ctx context.Context, r *request.RecordUpdate) (interf
return nil, err
}
record, err := ctrl.record.With(ctx).Update(&types.Record{
ID: r.RecordID,
NamespaceID: r.NamespaceID,
ModuleID: r.ModuleID,
Values: r.Values,
})
oo := make([]*types.BulkRecordOperation, 0)
// If defined, initialize parent record for creation
if r.Values != nil {
rr := &types.Record{
ID: r.RecordID,
NamespaceID: r.NamespaceID,
ModuleID: r.ModuleID,
Values: r.Values,
}
oo = append(oo, &types.BulkRecordOperation{
Record: rr,
Operation: types.OperationTypeUpdate,
})
}
// If defined, initialize sub records for creation
oob, err := r.Records.ToBulkOperations(r.ModuleID, r.NamespaceID)
if err != nil {
return nil, err
}
// Validate returned bulk operations
for _, o := range oob {
if o.LinkBy != "" && len(oo) == 0 {
return nil, errors.New("missing parent record definition")
}
}
oo = append(oo, oob...)
if err != nil {
return nil, err
}
rr, err := ctrl.record.With(ctx).Bulk(oo...)
if rve, is := err.(*types.RecordValueErrorSet); is && !rve.IsValid() {
return ctrl.handleValidationError(rve), nil
}
return ctrl.makePayload(ctx, m, record, err)
return ctrl.makePayloadBatch(ctx, m, err, rr...)
}
func (ctrl *Record) Delete(ctx context.Context, r *request.RecordDelete) (interface{}, error) {
......@@ -467,6 +524,20 @@ func (ctrl *Record) TriggerScriptOnList(ctx context.Context, r *request.RecordTr
return resputil.OK(), err
}
func (ctrl Record) makePayloadBatch(ctx context.Context, m *types.Module, err error, rr ...*types.Record) (*recordPayload, error) {
if err != nil || rr == nil {
return nil, err
}
return &recordPayload{
Record: rr[0],
Records: rr[1:],
CanUpdateRecord: ctrl.ac.CanUpdateRecord(ctx, m),
CanDeleteRecord: ctrl.ac.CanDeleteRecord(ctx, m),
}, nil
}
func (ctrl Record) makePayload(ctx context.Context, m *types.Module, r *types.Record, err error) (*recordPayload, error) {
if err != nil || r == nil {
return nil, err
......@@ -480,7 +551,7 @@ func (ctrl Record) makePayload(ctx context.Context, m *types.Module, r *types.Re
}, nil
}
func (ctrl Record) makeFilterPayload(ctx context.Context, m *types.Module, rr types.RecordSet, f types.RecordFilter, err error) (*recordSetPayload, error) {
func (ctrl Record) makeFilterPayload(ctx context.Context, m *types.Module, rr types.RecordSet, f *types.RecordFilter, err error) (*recordSetPayload, error) {
if err != nil {
return nil, err
}
......
......@@ -712,6 +712,10 @@ type RecordCreate struct {
rawValues string
Values types.RecordValueSet
hasRecords bool
rawRecords string
Records types.BulkRecordSet
hasNamespaceID bool
rawNamespaceID string
NamespaceID uint64 `json:",string"`
......@@ -731,6 +735,7 @@ func (r RecordCreate) Auditable() map[string]interface{} {
var out = map[string]interface{}{}
out["values"] = r.Values
out["records"] = r.Records
out["namespaceID"] = r.NamespaceID
out["moduleID"] = r.ModuleID
......@@ -868,6 +873,10 @@ type RecordUpdate struct {
hasValues bool
rawValues string
Values types.RecordValueSet
hasRecords bool
rawRecords string
Records types.BulkRecordSet
}
// NewRecordUpdate request
......@@ -883,6 +892,7 @@ func (r RecordUpdate) Auditable() map[string]interface{} {
out["namespaceID"] = r.NamespaceID
out["moduleID"] = r.ModuleID
out["values"] = r.Values
out["records"] = r.Records
return out
}
......@@ -1908,6 +1918,21 @@ func (r *RecordCreate) GetValues() types.RecordValueSet {
return r.Values
}
// HasRecords returns true if records was set
func (r *RecordCreate) HasRecords() bool {
return r.hasRecords
}
// RawRecords returns raw value of records parameter
func (r *RecordCreate) RawRecords() string {
return r.rawRecords
}
// GetRecords returns casted value of records parameter
func (r *RecordCreate) GetRecords() types.BulkRecordSet {
return r.Records
}
// HasNamespaceID returns true if namespaceID was set
func (r *RecordCreate) HasNamespaceID() bool {
return r.hasNamespaceID
......@@ -2043,6 +2068,21 @@ func (r *RecordUpdate) GetValues() types.RecordValueSet {
return r.Values
}
// HasRecords returns true if records was set
func (r *RecordUpdate) HasRecords() bool {
return r.hasRecords
}
// RawRecords returns raw value of records parameter
func (r *RecordUpdate) RawRecords() string {
return r.rawRecords
}
// GetRecords returns casted value of records parameter
func (r *RecordUpdate) GetRecords() types.BulkRecordSet {
return r.Records
}
// HasRecordIDs returns true if recordIDs was set
func (r *RecordBulkDelete) HasRecordIDs() bool {
return r.hasRecordIDs
......
......@@ -85,6 +85,7 @@ type (
Create(record *types.Record) (*types.Record, error)
Update(record *types.Record) (*types.Record, error)
Bulk(oo ...*types.BulkRecordOperation) (types.RecordSet, error)
DeleteByID(namespaceID, moduleID uint64, recordID ...uint64) error
......@@ -435,78 +436,257 @@ func (svc record) Export(filter types.RecordFilter, enc Encoder) (err error) {
return svc.recordAction(svc.ctx, aProps, RecordActionImport, err)
}
func (svc record) Create(new *types.Record) (rec *types.Record, err error) {
// Bulk handles provided set of bulk record operations.
// It's able to create or update records in a single transaction.
//
// Feature used mainly by Record Lines, but should be used when ever we wish to perform
// multiple operations over records.
func (svc record) Bulk(oo ...*types.BulkRecordOperation) (rec types.RecordSet, err error) {
ctr := map[string]uint{}
return rec, svc.db.Transaction(func() (err error) {
for _, p := range oo {
r := p.Record
res := fmt.Sprintf("compose:module:%d", r.ModuleID)
if _, ok := ctr[res]; !ok {
ctr[res] = 0
}
// Handle any pre processing, such as defining parent recordID.
if p.LinkBy != "" {
// As is, we can use the first record as the master record.
// This is valid, since we do not allow this, if the master record is not defined
r.Values = r.Values.Set(&types.RecordValue{
Name: p.LinkBy,
Value: strconv.FormatUint(rec[0].ID, 10),
Ref: rec[0].ID,
})
}
switch p.Operation {
case types.OperationTypeCreate:
r, err = svc.create(r)
break
case types.OperationTypeUpdate:
r, err = svc.update(r)
break
case types.OperationTypeDelete:
r, err = svc.delete(r.NamespaceID, r.ModuleID, r.ID)
break
default:
return errors.Errorf("unknown record bulk operation %s", p.Operation)
}
if err != nil {
if rve, ok := err.(*types.RecordValueErrorSet); ok {
// Attach additional meta to each value error for FE identification
for _, re := range rve.Set {
re.Meta["resource"] = res
re.Meta["item"] = ctr[res]
}
return rve
}
return err
}
rec = append(rec, r)
ctr[res]++
}
return nil
})
}
// Raw create function that is responsible for value validation, event dispatching
// and creation.
func (svc record) create(new *types.Record) (rec *types.Record, err error) {
var (
aProps = &recordActionProps{changed: new}
invokerID = auth.GetIdentityFromContext(svc.ctx).Identity()
ns *types.Namespace
m *types.Module
)
err = svc.db.Transaction(func() (err error) {
var (
ns *types.Namespace
m *types.Module
)
ns, m, _, err = svc.loadCombo(new.NamespaceID, new.ModuleID, 0)
if err != nil {
return
}
ns, m, _, err = svc.loadCombo(new.NamespaceID, new.ModuleID, 0)
if err != nil {
return
}
aProps.setNamespace(ns)
aProps.setModule(m)
aProps.setNamespace(ns)
aProps.setModule(m)
if !svc.ac.CanCreateRecord(svc.ctx, m) {
return nil, RecordErrNotAllowedToCreate()
}
if err = svc.generalValueSetValidation(m, new.Values); err != nil {
return
}
if !svc.ac.CanCreateRecord(svc.ctx, m) {
return RecordErrNotAllowedToCreate()
var (
rve *types.RecordValueErrorSet
)
if svc.optEmitEvents {
// Handle input payload
if rve = svc.procCreate(invokerID, m, new); !rve.IsValid() {
return nil, rve
}
if err = svc.generalValueSetValidation(m, new.Values); err != nil {
new.Values = svc.formatter.Run(m, new.Values)
if err = svc.eventbus.WaitFor(svc.ctx, event.RecordBeforeCreate(new, nil, m, ns, rve)); err != nil {
return
} else if !rve.IsValid() {
return nil, rve
}
}
var (
rve *types.RecordValueErrorSet
)
// Assign defaults (only on missing values)
new.Values = svc.setDefaultValues(m, new.Values)
if svc.optEmitEvents {
// Handle input payload
if rve = svc.procCreate(invokerID, m, new); !rve.IsValid() {
return rve
}
// Handle payload from automation scripts
if rve = svc.procCreate(invokerID, m, new); !rve.IsValid() {
return nil, rve
}
if new, err = svc.recordRepo.Create(new); err != nil {
return
}
if err = svc.recordRepo.UpdateValues(new.ID, new.Values); err != nil {
return
}
// At this point we can return the value
rec = new
if svc.optEmitEvents {
defer func() {
new.Values = svc.formatter.Run(m, new.Values)
if err = svc.eventbus.WaitFor(svc.ctx, event.RecordBeforeCreate(new, nil, m, ns, rve)); err != nil {
return
} else if !rve.IsValid() {
return rve
}
}
svc.eventbus.Dispatch(svc.ctx, event.RecordAfterCreateImmutable(new, nil, m, ns, nil))
}()
}
// Assign defaults (only on missing values)
new.Values = svc.setDefaultValues(m, new.Values)
return
}
// Handle payload from automation scripts
if rve = svc.procCreate(invokerID, m, new); !rve.IsValid() {
return rve
}
// Raw update function that is responsible for value validation, event dispatching
// and update.
func (svc record) update(upd *types.Record) (rec *types.Record, err error) {
var (
aProps = &recordActionProps{changed: upd}
invokerID = auth.GetIdentityFromContext(svc.ctx).Identity()
if new, err = svc.recordRepo.Create(new); err != nil {
return
ns *types.Namespace
m *types.Module
old *types.Record
)
if upd.ID == 0 {
return nil, RecordErrInvalidID()
}
ns, m, old, err = svc.loadCombo(upd.NamespaceID, upd.ModuleID, upd.ID)
if err != nil {
return
}
aProps.setNamespace(ns)
aProps.setModule(m)
aProps.setRecord(old)
if !svc.ac.CanUpdateRecord(svc.ctx, m) {
return nil, RecordErrNotAllowedToUpdate()
}
// Test if stale (update has an older version of data)
if isStale(upd.UpdatedAt, old.UpdatedAt, old.CreatedAt) {
return nil, RecordErrStaleData()
}
if err = svc.generalValueSetValidation(m, upd.Values); err != nil {
return
}
// Preload old record values so we can send it together with event
if err = svc.preloadValues(m, old); err != nil {
return
}
var (
rve *types.RecordValueErrorSet
)
if svc.optEmitEvents {
// Handle input payload
if rve = svc.procUpdate(invokerID, m, upd, old); !rve.IsValid() {
return nil, rve
}
if err = svc.recordRepo.UpdateValues(new.ID, new.Values); err != nil {
// Before we pass values to record-before-update handling events
// values needs do be cleaned up
//
// Value merge inside procUpdate sets delete flag we need
// when changes are applied but we do not want deleted values
// to be sent to handler
upd.Values = upd.Values.GetClean()
// Before we pass values to automation scripts, they should be formatted
upd.Values = svc.formatter.Run(m, upd.Values)
// Scripts can (besides simple error value) return complex record value error set
// that is passed back to the UI or any other API consumer
//
// rve (record-validation-errorset) struct is passed so it can be
// used & filled by automation scripts
if err = svc.eventbus.WaitFor(svc.ctx, event.RecordBeforeUpdate(upd, old, m, ns, rve)); err != nil {
return
} else if !rve.IsValid() {
return nil, rve
}
}
// At this point we can return the value
rec = new
// Handle payload from automation scripts
if rve = svc.procUpdate(invokerID, m, upd, old); !rve.IsValid() {
return nil, rve
}
if svc.optEmitEvents {
defer func() {
new.Values = svc.formatter.Run(m, new.Values)
svc.eventbus.Dispatch(svc.ctx, event.RecordAfterCreateImmutable(new, nil, m, ns, nil))
}()
}
if upd, err = svc.recordRepo.Update(upd); err != nil {
return
}
if err = svc.recordRepo.UpdateValues(upd.ID, upd.Values); err != nil {
return
}
// Final value cleanup
// These (clean) values are returned (and sent to after-update handler)
upd.Values = upd.Values.GetClean()
// At this point we can return the value
rec = upd
if svc.optEmitEvents {
defer func() {
// Before we pass values to automation scripts, they should be formatted
upd.Values = svc.formatter.Run(m, upd.Values)
svc.eventbus.Dispatch(svc.ctx, event.RecordAfterUpdateImmutable(upd, old, m, ns, nil))
}()
}
return
}
func (svc record) Create(new *types.Record) (rec *types.Record, err error) {
var (
aProps = &recordActionProps{changed: new}
)
err = svc.db.Transaction(func() error {
rec, err = svc.create(new)
aProps.setRecord(rec)
return err
})
return rec, svc.recordAction(svc.ctx, aProps, RecordActionCreate, err)
......@@ -549,103 +729,13 @@ func (svc record) procCreate(invokerID uint64, m *types.Module, new *types.Recor
func (svc record) Update(upd *types.Record) (rec *types.Record, err error) {
var (
aProps = &recordActionProps{changed: upd}
invokerID = auth.GetIdentityFromContext(svc.ctx).Identity()
aProps = &recordActionProps{changed: upd}
)
err = svc.db.Transaction(func() (err error) {
if upd.ID == 0 {
return RecordErrInvalidID()
}
ns, m, old, err := svc.loadCombo(upd.NamespaceID, upd.ModuleID, upd.ID)
if err != nil {
return
}
aProps.setNamespace(ns)
aProps.setModule(m)
aProps.setRecord(old)
if err = svc.generalValueSetValidation(m, upd.Values); err != nil {
return
}
// Test if stale (update has an older version of data)
if isStale(upd.UpdatedAt, old.UpdatedAt, old.CreatedAt) {
return RecordErrStaleData()
}
if !svc.ac.CanUpdateRecord(svc.ctx, m) {
return RecordErrNotAllowedToUpdate()
}
// Preload old record values so we can send it together with event
if err = svc.preloadValues(m, old); err != nil {
return
}
var (
rve *types.RecordValueErrorSet
)
if svc.optEmitEvents {
// Handle input payload
if rve = svc.procUpdate(invokerID, m, upd, old); !rve.IsValid() {
return rve
}
// Before we pass values to record-before-update handling events
// values needs do be cleaned up
//
// Value merge inside procUpdate sets delete flag we need
// when changes are applied but we do not want deleted values
// to be sent to handler
upd.Values = upd.Values.GetClean()
// Before we pass values to automation scripts, they should be formatted
upd.Values = svc.formatter.Run(m, upd.Values)
// Scripts can (besides simple error value) return complex record value error set
// that is passed back to the UI or any other API consumer
//
// rve (record-validation-errorset) struct is passed so it can be
// used & filled by automation scripts
if err = svc.eventbus.WaitFor(svc.ctx, event.RecordBeforeUpdate(upd, old, m, ns, rve)); err != nil {
return
} else if !rve.IsValid() {
return rve
}
}
// Handle payload from automation scripts
if rve = svc.procUpdate(invokerID, m, upd, old); !rve.IsValid() {
return rve
}
if upd, err = svc.recordRepo.Update(upd); err != nil {
return
}
if err = svc.recordRepo.UpdateValues(upd.ID, upd.Values); err != nil {
return
}
// Final value cleanup
// These (clean) values are returned (and sent to after-update handler)
upd.Values = upd.Values.GetClean()
// At this point we can return the value
rec = upd
if svc.optEmitEvents {
defer func() {
// Before we pass values to automation scripts, they should be formatted
upd.Values = svc.formatter.Run(m, upd.Values)
svc.eventbus.Dispatch(svc.ctx, event.RecordAfterUpdateImmutable(upd, old, m, ns, nil))
}()
}
return
err = svc.db.Transaction(func() error {
rec, err = svc.update(upd)
aProps.setRecord(rec)
return err
})
return rec, svc.recordAction(svc.ctx, aProps, RecordActionUpdate, err)
......@@ -706,27 +796,85 @@ func (svc record) recordInfoUpdate(r *types.Record) {
r.UpdatedBy = auth.GetIdentityFromContext(svc.ctx).Identity()
}
func (svc record) delete(namespaceID, moduleID, recordID uint64) (del *types.Record, err error) {
var (
ns *types.Namespace
m *types.Module
invokerID = auth.GetIdentityFromContext(svc.ctx).Identity()
)
if namespaceID == 0 {
return nil, RecordErrInvalidNamespaceID()
}
if moduleID == 0 {
return nil, RecordErrInvalidModuleID()
}
if recordID == 0 {
return nil, RecordErrInvalidID()
}
ns, m, del, err = svc.loadCombo(namespaceID, moduleID, recordID)
if err != nil {
return nil, err
}
if !svc.ac.CanDeleteRecord(svc.ctx, m) {
return nil, RecordErrNotAllowedToDelete()
}
if svc.optEmitEvents {
// Preload old record values so we can send it together with event
if err = svc.preloadValues(m, del); err != nil {
return nil, err
}
// Calling before-record-delete scripts
if err = svc.eventbus.WaitFor(svc.ctx, event.RecordBeforeDelete(nil, del, m, ns, nil)); err != nil {
return nil, err
}
}
del.DeletedAt = nowPtr()
del.DeletedBy = invokerID
if err = svc.recordRepo.Delete(del); err != nil {
return nil, err
}
if err = svc.recordRepo.DeleteValues(del); err != nil {
return nil, err
}
if svc.optEmitEvents {
defer svc.eventbus.Dispatch(svc.ctx, event.RecordAfterDeleteImmutable(nil, del, m, ns, nil))
}
return del, nil
}
// DeleteByID removes one or more records (all from the same module and namespace)
//
// Before and after each record is deleted beforeDelete and afterDelete events are emitted
// If beforeRecord aborts the action it does so for that specific record only
func (svc record) DeleteByID(namespaceID, moduleID uint64, recordIDs ...uint64) (err error) {
var (
isBulkDelete = len(recordIDs) > 0
now = *nowPtr()
aProps = &recordActionProps{
namespace: &types.Namespace{ID: namespaceID},
module: &types.Module{ID: moduleID},
}
isBulkDelete = len(recordIDs) > 1
ns *types.Namespace
m *types.Module
aProps = &recordActionProps{record: &types.Record{NamespaceID: namespaceID, ModuleID: moduleID}}
invokerID = auth.GetIdentityFromContext(svc.ctx).Identity()
r *types.Record
)
err = func() error {
if namespaceID == 0 {
return RecordErrInvalidNamespaceID()
}
if moduleID == 0 {
return RecordErrInvalidModuleID()
}
......@@ -751,72 +899,28 @@ func (svc record) DeleteByID(namespaceID, moduleID uint64, recordIDs ...uint64)
}
for _, recordID := range recordIDs {
if recordID == 0 {
// ignore invalid IDs, no harm done
continue
}
err = svc.db.Transaction(func() error {
var (
del *types.Record
)
err := svc.db.Transaction(func() (err error) {
r, err = svc.delete(namespaceID, moduleID, recordID)
aProps.setRecord(r)
del, err = svc.FindByID(namespaceID, recordID)
if err != nil {
return err
}
aProps.setRecord(del)
if svc.optEmitEvents {
// Preload old record values so we can send it together with event
if err = svc.preloadValues(m, del); err != nil {
return err
}
// Calling before-record-delete scripts
if err = svc.eventbus.WaitFor(svc.ctx, event.RecordBeforeDelete(nil, del, m, ns, nil)); err != nil {
return err
}
}
del.DeletedAt = &now
del.DeletedBy = invokerID
if err = svc.recordRepo.Delete(del); err != nil {
return err
}
if err = svc.recordRepo.DeleteValues(del); err != nil {
return err
}
if svc.optEmitEvents {
defer svc.eventbus.Dispatch(svc.ctx, event.RecordAfterDeleteImmutable(nil, del, m, ns, nil))
}
// Record successful action
return svc.recordAction(svc.ctx, aProps, RecordActionDelete, nil)
// Record each record deletion action
return svc.recordAction(svc.ctx, aProps, RecordActionDelete, err)
})
// handle error & record action
if err != nil {
if isBulkDelete {
// When doing bulk delete, errors are recorded, but ignored
_ = svc.recordAction(svc.ctx, aProps, RecordActionDelete, err)
return nil
}
return svc.recordAction(svc.ctx, aProps, RecordActionDelete, err)
// We'll not break for failed delete,
// if we are deleting records in bulk.
if err != nil && !isBulkDelete {
return err
}
}
// all errors (if any) were recorded
// and in case of error for a non-bulk record deletion
// error is already returned
return nil
}
// Organize - Record organizer
//
// Reorders records & sets field value
func (svc record) Organize(namespaceID, moduleID, recordID uint64, posField, position, filter, grpField, group string) (err error) {
var (
ns *types.Namespace
......
......@@ -9,6 +9,16 @@ import (
)
type (
bulkPreProcess func(m, r *Record) (*Record, error)
OperationType string
BulkRecordOperation struct {
Record *Record
LinkBy string
Operation OperationType
}
// Record is a stored row in the `record` table
Record struct {
ID uint64 `json:"recordID,string" db:"id"`
......@@ -40,6 +50,12 @@ type (
}
)
const (
OperationTypeCreate OperationType = "create"
OperationTypeUpdate OperationType = "update"
OperationTypeDelete OperationType = "delete"
)
// UserIDs returns a slice of user IDs from all items in the set
func (set RecordSet) UserIDs() (IDs []uint64) {
IDs = make([]uint64, 0)
......
package types
type (
BulkRecord struct {
RefField string `json:"refField,omitempty"`
Set RecordSet `json:"set,omitempty"`
}
BulkRecordSet []*BulkRecord
)
func (set BulkRecordSet) ToBulkOperations(dftModule uint64, dftNamespace uint64) (oo []*BulkRecordOperation, err error) {
for _, br := range set {
for _, rr := range br.Set {
// No use in allowing cross-namespace record creation.
rr.NamespaceID = dftNamespace
// default module
if rr.ModuleID == 0 {
rr.ModuleID = dftModule
}
b := &BulkRecordOperation{
Record: rr,
Operation: OperationTypeUpdate,
LinkBy: br.RefField,
}
// If no RecordID is defined, we should create it
if rr.ID == 0 {
b.Operation = OperationTypeCreate
}
if rr.DeletedAt != nil {
b.Operation = OperationTypeDelete
}
oo = append(oo, b)
}
}
return
}
package types
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestToBulkOperations(t *testing.T) {
tests := []struct {
name string
bb BulkRecordSet
size int
}{
{
name: "Return nothing if empty",
bb: BulkRecordSet{},
size: 0,
},
{
name: "Return all sets all records",
bb: BulkRecordSet{
&BulkRecord{
RefField: "f1",
Set: RecordSet{&Record{}},
},
&BulkRecord{
RefField: "f2",
Set: RecordSet{&Record{}},
},
},
size: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
rr, err := tt.bb.ToBulkOperations(0, 0)
if err != nil {
t.Errorf("unexpected error = %v,", err)
}
require.Equal(t,
tt.size,
len(rr))
})
}
}
func TestToBulkOperationsDefaultModule(t *testing.T) {
bb := BulkRecordSet{
&BulkRecord{
RefField: "f1",
Set: RecordSet{&Record{
ModuleID: 1000,
}},
},
&BulkRecord{
RefField: "f2",
Set: RecordSet{&Record{}},
},
}
rr, err := bb.ToBulkOperations(2000, 0)
if err != nil {
t.Errorf("unexpected error = %v,", err)
}
require.Equal(t,
uint64(1000),
rr[0].Record.ModuleID,
)
require.Equal(t,
uint64(2000),
rr[1].Record.ModuleID,
"Expected default value of \n%d got \n%d",
2000,
rr[1].Record.ModuleID,
)
}
func TestToBulkOperationsDetermineOperation(t *testing.T) {
bb := BulkRecordSet{
&BulkRecord{
RefField: "f1",
Set: RecordSet{&Record{
ID: 1000,
}},
},
&BulkRecord{
RefField: "f2",
Set: RecordSet{&Record{}},
},
}
rr, err := bb.ToBulkOperations(0, 0)
if err != nil {
t.Errorf("unexpected error = %v,", err)
}
require.Equal(t,
OperationTypeUpdate,
rr[0].Operation,
)
require.Equal(t,
OperationTypeCreate,
rr[1].Operation,
)
}
......@@ -127,9 +127,7 @@
| URI | Protocol | Method | Authentication |
| --- | -------- | ------ | -------------- |
| `/automation/` | HTTP/S | GET |
Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.crust/corteza-server/codegen/templates/README.tpl on line 32
|
| `/automation/` | HTTP/S | GET | |
#### Request parameters
......@@ -148,9 +146,7 @@ Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.cru
| URI | Protocol | Method | Authentication |
| --- | -------- | ------ | -------------- |
| `/automation/{bundle}-{type}.{ext}` | HTTP/S | GET |
Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.crust/corteza-server/codegen/templates/README.tpl on line 32
|
| `/automation/{bundle}-{type}.{ext}` | HTTP/S | GET | |
#### Request parameters
......@@ -166,9 +162,7 @@ Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.cru
| URI | Protocol | Method | Authentication |
| --- | -------- | ------ | -------------- |
| `/automation/trigger` | HTTP/S | POST |
Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.crust/corteza-server/codegen/templates/README.tpl on line 32
|
| `/automation/trigger` | HTTP/S | POST | |
#### Request parameters
......@@ -979,7 +973,8 @@ Compose records
| Parameter | Type | Method | Description | Default | Required? |
| --------- | ---- | ------ | ----------- | ------- | --------- |
| values | types.RecordValueSet | POST | Record values | N/A | YES |
| values | types.RecordValueSet | POST | Record values | N/A | NO |
| records | types.BulkRecordSet | POST | Records | N/A | NO |
| namespaceID | uint64 | PATH | Namespace ID | N/A | YES |
| moduleID | uint64 | PATH | Module ID | N/A | YES |
......@@ -1014,7 +1009,8 @@ Compose records
| recordID | uint64 | PATH | Record ID | N/A | YES |
| namespaceID | uint64 | PATH | Namespace ID | N/A | YES |
| moduleID | uint64 | PATH | Module ID | N/A | YES |
| values | types.RecordValueSet | POST | Record values | N/A | YES |
| values | types.RecordValueSet | POST | Record values | N/A | NO |
| records | types.BulkRecordSet | POST | Records | N/A | NO |
## Delete record row from module section
......
......@@ -451,9 +451,7 @@ Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.cru
| URI | Protocol | Method | Authentication |
| --- | -------- | ------ | -------------- |
| `/automation/` | HTTP/S | GET |
Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.crust/corteza-server/codegen/templates/README.tpl on line 32
|
| `/automation/` | HTTP/S | GET | |
#### Request parameters
......@@ -472,9 +470,7 @@ Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.cru
| URI | Protocol | Method | Authentication |
| --- | -------- | ------ | -------------- |
| `/automation/{bundle}-{type}.{ext}` | HTTP/S | GET |
Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.crust/corteza-server/codegen/templates/README.tpl on line 32
|
| `/automation/{bundle}-{type}.{ext}` | HTTP/S | GET | |
#### Request parameters
......@@ -490,9 +486,7 @@ Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.cru
| URI | Protocol | Method | Authentication |
| --- | -------- | ------ | -------------- |
| `/automation/trigger` | HTTP/S | POST |
Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.crust/corteza-server/codegen/templates/README.tpl on line 32
|
| `/automation/trigger` | HTTP/S | POST | |
#### Request parameters
......@@ -1189,9 +1183,7 @@ An organisation may have many roles. Roles may have many channels available. Acc
| URI | Protocol | Method | Authentication |
| --- | -------- | ------ | -------------- |
| `/subscription/` | HTTP/S | GET |
Warning: implode(): Invalid arguments passed in /private/tmp/Users/darh/Work.crust/corteza-server/codegen/templates/README.tpl on line 32
|
| `/subscription/` | HTTP/S | GET | |
#### Request parameters
......
package compose
import (
"fmt"
"net/http"
"strconv"
"testing"
"github.com/cortezaproject/corteza-server/compose/types"
"github.com/cortezaproject/corteza-server/tests/helpers"
jsonpath "github.com/steinfletcher/apitest-jsonpath"
)
func TestRecordCreate_batch(t *testing.T) {
h := newHelper(t)
ns := h.repoMakeNamespace("batch testing namespace")
module := h.repoMakeRecordModuleWithFieldsOnNs("record testing module", ns)
childModule := h.repoMakeRecordModuleWithFieldsOnNs("record testing module child", ns)
h.allow(types.ModulePermissionResource.AppendWildcard(), "record.create")
h.apiInit().
Post(fmt.Sprintf("/namespace/%d/module/%d/record/", module.NamespaceID, module.ID)).
JSON(fmt.Sprintf(`{"values": [], "records": [{"refField": "another_record", "set": [{"moduleID": "%d", "values": []}]}]}`, childModule.ID)).
Expect(t).
Status(http.StatusOK).
Assert(helpers.AssertNoErrors).
Assert(jsonpath.Present(`$.response.recordID`)).
Assert(jsonpath.Len(`$.response.records`, 1)).
Assert(jsonpath.Present(`$.response.records[0].recordID`)).
End()
}
func TestRecordUpdate_batch(t *testing.T) {
h := newHelper(t)
ns := h.repoMakeNamespace("batch testing namespace")
module := h.repoMakeRecordModuleWithFieldsOnNs("record testing module", ns)
childModule := h.repoMakeRecordModuleWithFieldsOnNs("record testing module child", ns)
h.allow(types.ModulePermissionResource.AppendWildcard(), "record.update")
record := h.repoMakeRecord(module)
childRecord := h.repoMakeRecord(childModule, &types.RecordValue{Name: "another_record", Value: strconv.FormatUint(record.ID, 10), Ref: record.ID})
h.apiInit().
Post(fmt.Sprintf("/namespace/%d/module/%d/record/%d", module.NamespaceID, module.ID, record.ID)).
JSON(fmt.Sprintf(`{"values": [{"name": "name", "value": "Some Name"}], "records": [{"refField": "another_record", "set": [{"moduleID": "%d", "recordID": "%d", "values": [{"name": "name", "value": "Another Name"}]}]}]}`, childModule.ID, childRecord.ID)).
Expect(t).
Status(http.StatusOK).
Assert(helpers.AssertNoErrors).
Assert(jsonpath.Equal(`$.response.recordID`, strconv.FormatUint(record.ID, 10))).
Assert(jsonpath.Len(`$.response.records`, 1)).
Assert(jsonpath.Equal(`$.response.records[0].recordID`, strconv.FormatUint(childRecord.ID, 10))).
End()
}
func TestRecordDelete_batch(t *testing.T) {
h := newHelper(t)
ns := h.repoMakeNamespace("batch testing namespace")
module := h.repoMakeRecordModuleWithFieldsOnNs("record testing module", ns)
childModule := h.repoMakeRecordModuleWithFieldsOnNs("record testing module child", ns)
h.allow(types.ModulePermissionResource.AppendWildcard(), "record.update")
h.allow(types.ModulePermissionResource.AppendWildcard(), "record.delete")
record := h.repoMakeRecord(module)
childRecord := h.repoMakeRecord(childModule, &types.RecordValue{Name: "another_record", Value: strconv.FormatUint(record.ID, 10), Ref: record.ID})
h.apiInit().
Post(fmt.Sprintf("/namespace/%d/module/%d/record/%d", module.NamespaceID, module.ID, record.ID)).
JSON(fmt.Sprintf(`{"values": [{"name": "name", "value": "Some Name"}], "records": [{"refField": "another_record", "set": [{"deletedAt": "2020-05-15T15:01:02+02:00", "moduleID": "%d", "recordID": "%d", "values": [{"name": "name", "value": "Another Name"}]}]}]}`, childModule.ID, childRecord.ID)).
Expect(t).
Status(http.StatusOK).
Assert(helpers.AssertNoErrors).
Assert(jsonpath.Equal(`$.response.recordID`, strconv.FormatUint(record.ID, 10))).
Assert(jsonpath.Len(`$.response.records`, 1)).
Assert(jsonpath.Equal(`$.response.records[0].recordID`, strconv.FormatUint(childRecord.ID, 10))).
End()
_, err := h.repoRecord().FindByID(module.NamespaceID, childRecord.ID)
h.a.Error(err, "compose.repository.RecordNotFound")
}
func TestRecordMixed_batch(t *testing.T) {
h := newHelper(t)
ns := h.repoMakeNamespace("batch testing namespace")
module := h.repoMakeRecordModuleWithFieldsOnNs("record testing module", ns)
childModule := h.repoMakeRecordModuleWithFieldsOnNs("record testing module child", ns)
h.allow(types.ModulePermissionResource.AppendWildcard(), "record.update")
h.allow(types.ModulePermissionResource.AppendWildcard(), "record.create")
record := h.repoMakeRecord(module)
childRecord := h.repoMakeRecord(childModule, &types.RecordValue{Name: "another_record", Value: strconv.FormatUint(record.ID, 10), Ref: record.ID})
h.apiInit().
Post(fmt.Sprintf("/namespace/%d/module/%d/record/%d", module.NamespaceID, module.ID, record.ID)).
JSON(fmt.Sprintf(`{"values": [{"name": "name", "value": "Some Name"}], "records": [{"refField": "another_record", "set": [{"moduleID": "%d", "values": [{"name": "name", "value": "Added Name"}]},{"moduleID": "%d", "recordID": "%d", "values": [{"name": "name", "value": "Another Name"}]}]}]}`, childModule.ID, childModule.ID, childRecord.ID)).
Expect(t).
Status(http.StatusOK).
Assert(helpers.AssertNoErrors).
Assert(jsonpath.Equal(`$.response.recordID`, strconv.FormatUint(record.ID, 10))).
Assert(jsonpath.Len(`$.response.records`, 2)).
Assert(jsonpath.Present(`$.response.records[0].recordID`)).
Assert(jsonpath.Equal(`$.response.records[1].recordID`, strconv.FormatUint(childRecord.ID, 10))).
End()
}
......@@ -30,6 +30,37 @@ func (h helper) repoRecord() repository.RecordRepository {
return repository.Record(context.Background(), db())
}
func (h helper) repoMakeRecordModuleWithFieldsOnNs(name string, namespace *types.Namespace, ff ...*types.ModuleField) *types.Module {
h.allow(types.NamespacePermissionResource.AppendWildcard(), "read")
h.allow(types.ModulePermissionResource.AppendWildcard(), "read")
h.allow(types.ModulePermissionResource.AppendWildcard(), "record.read")
if len(ff) == 0 {
// Default fields
ff = types.ModuleFieldSet{
&types.ModuleField{
Name: "name",
},
&types.ModuleField{
Name: "email",
},
&types.ModuleField{
Name: "options",
Multi: true,
},
&types.ModuleField{
Name: "description",
},
&types.ModuleField{
Name: "another_record",
Kind: "Record",
},
}
}
return h.repoMakeModule(namespace, name, ff...)
}
func (h helper) repoMakeRecordModuleWithFields(name string, ff ...*types.ModuleField) *types.Module {
namespace := h.repoMakeNamespace("record testing namespace")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment