Commit 10b1dc09 authored by Denis Arh's avatar Denis Arh
Browse files

Messaging repo ported to store and cleaned

parent 677469c5
......@@ -19,7 +19,7 @@ type (
Name string `json:"name"`
Config ChartConfig `json:"config"`
NamespaceID uint64 `json:"namespaceID,string" db:"rel_namespace,string"`
NamespaceID uint64 `json:"namespaceID,string"`
CreatedAt time.Time `json:"createdAt,omitempty"`
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
......
......@@ -16,7 +16,7 @@ type (
Handle string `json:"handle"`
Name string `json:"name"`
Meta types.JSONText `json:"meta"`
Fields ModuleFieldSet `json:"fields" db:"-"`
Fields ModuleFieldSet `json:"fields"`
NamespaceID uint64 `json:"namespaceID,string"`
......
......@@ -28,7 +28,7 @@ type (
Blocks PageBlocks `json:"blocks"`
Children PageSet `json:"children,omitempty" db:"-"`
Children PageSet `json:"children,omitempty"`
Visible bool `json:"visible"`
Weight int `json:"weight"`
......
package repository
//import (
// "context"
// "time"
//
// "github.com/Masterminds/squirrel"
// "github.com/titpetric/factory"
//
// "github.com/cortezaproject/corteza-server/messaging/types"
// "github.com/cortezaproject/corteza-server/pkg/rh"
//)
//
//type (
// AttachmentRepository interface {
// With(ctx context.Context, db *factory.DB) AttachmentRepository
//
// FindAttachmentByID(id uint64) (*types.Attachment, error)
// FindAttachmentByMessageID(IDs ...uint64) (types.AttachmentSet, error)
//
// CreateAttachment(mod *types.Attachment) (*types.Attachment, error)
// DeleteAttachmentByID(id uint64) error
//
// BindAttachment(attachmentId, messageId uint64) error
// }
//
// attachment struct {
// *repository
// }
//)
//
//const (
// ErrAttachmentNotFound = repositoryError("AttachmentNotFound")
//)
//
//func Attachment(ctx context.Context, db *factory.DB) AttachmentRepository {
// return (&attachment{}).With(ctx, db)
//}
//
//func (r attachment) With(ctx context.Context, db *factory.DB) AttachmentRepository {
// return &attachment{
// repository: r.repository.With(ctx, db),
// }
//}
//
//func (r attachment) table() string {
// return "messaging_attachment"
//}
//
//func (r attachment) tableMessage() string {
// return "messaging_message_attachment"
//}
//
//func (r attachment) columns() []string {
// return []string{
// "a.id",
// "a.rel_user",
// "a.url",
// "a.preview_url",
// "a.name",
// "a.meta",
// "a.created_at",
// "a.updated_at",
// "a.deleted_at",
// }
//}
//
//func (r attachment) query() squirrel.SelectBuilder {
// return squirrel.
// Select(r.columns()...).
// From(r.table() + " AS a").
// Where("a.deleted_at IS NULL")
//
//}
//
//func (r attachment) FindAttachmentByID(ID uint64) (*types.Attachment, error) {
// return r.findOneBy("id", ID)
//}
//
//func (r attachment) findOneBy(field string, value interface{}) (*types.Attachment, error) {
// var (
// p = &types.Attachment{}
//
// q = r.query().
// Where(squirrel.Eq{field: value})
//
// err = rh.FetchOne(r.db(), q, p)
// )
//
// if err != nil {
// return nil, err
// } else if p.ID == 0 {
// return nil, ErrAttachmentNotFound
// }
//
// return p, nil
//}
//
//func (r attachment) FindAttachmentByMessageID(IDs ...uint64) (rval types.AttachmentSet, err error) {
// rval = types.AttachmentSet{}
//
// if len(IDs) == 0 {
// return
// }
//
// query := r.query().
// Columns("ma.rel_message").
// Join(r.tableMessage() + " AS ma ON (a.id = ma.rel_attachment)").
// Where(squirrel.Eq{"rel_message": IDs})
//
// return rval, rh.FetchAll(r.db(), query, &rval)
//}
//
//func (r attachment) CreateAttachment(mod *types.Attachment) (*types.Attachment, error) {
// if mod.ID == 0 {
// mod.ID = factory.Sonyflake.NextID()
// }
//
// mod.CreatedAt = time.Now()
//
// return mod, r.db().Insert(r.table(), mod)
//}
//
//func (r attachment) DeleteAttachmentByID(ID uint64) error {
// return rh.UpdateColumns(r.db(), r.table(), rh.Set{"deleted_at": time.Now()}, squirrel.Eq{"id": ID})
//}
//
//func (r attachment) BindAttachment(attachmentId, messageId uint64) error {
// bond := struct {
// RelAttachment uint64 `db:"rel_attachment"`
// RelMessage uint64 `db:"rel_message"`
// }{attachmentId, messageId}
//
// return r.db().Insert(r.tableMessage(), bond)
//}
package repository
import (
"context"
"strings"
"time"
"github.com/Masterminds/squirrel"
"github.com/titpetric/factory"
"github.com/cortezaproject/corteza-server/messaging/types"
"github.com/cortezaproject/corteza-server/pkg/rh"
)
type (
ChannelRepository interface {
With(ctx context.Context, db *factory.DB) ChannelRepository
FindByID(id uint64) (*types.Channel, error)
FindByMemberSet(memberID ...uint64) (*types.Channel, error)
Find(types.ChannelFilter) (types.ChannelSet, types.ChannelFilter, error)
Create(mod *types.Channel) (*types.Channel, error)
Update(mod *types.Channel) (*types.Channel, error)
ArchiveByID(id uint64) error
UnarchiveByID(id uint64) error
DeleteByID(id uint64) error
UndeleteByID(id uint64) error
}
channel struct {
*repository
}
)
const (
ErrChannelNotFound = repositoryError("ChannelNotFound")
)
func Channel(ctx context.Context, db *factory.DB) ChannelRepository {
return (&channel{}).With(ctx, db)
}
func (r channel) With(ctx context.Context, db *factory.DB) ChannelRepository {
return &channel{
repository: r.repository.With(ctx, db),
}
}
func (r channel) table() string {
return "messaging_channel"
}
func (r channel) columns() []string {
return []string{
"c.id",
"c.name",
"c.meta",
"c.membership_policy",
"c.created_at",
"c.updated_at",
"c.archived_at",
"c.deleted_at",
"c.rel_organisation",
"c.rel_creator",
"c.type",
"c.rel_last_message",
"c.topic",
}
}
func (r channel) query() squirrel.SelectBuilder {
return squirrel.
Select(r.columns()...).
From(r.table() + " AS c")
}
func (r channel) FindByID(ID uint64) (*types.Channel, error) {
return r.findOneBy(squirrel.Eq{"c.id": ID})
}
// FindByMemberSet searches for channel (group!) with exactly the same membership structure
func (r channel) FindByMemberSet(memberIDs ...uint64) (*types.Channel, error) {
return r.findOneBy(
squirrel.And{
squirrel.Eq{"type": types.ChannelTypeGroup},
squirrel.ConcatExpr("c.id IN (", (channelMember{}).queryExactMembers(memberIDs...), ")"),
})
}
func (r channel) findOneBy(cnd squirrel.Sqlizer) (*types.Channel, error) {
var (
ch = &types.Channel{}
q = r.query().
Where(cnd)
err = rh.FetchOne(r.db(), q, ch)
)
if err != nil {
return nil, err
} else if ch.ID == 0 {
return nil, ErrChannelNotFound
}
return ch, nil
}
func (r channel) Find(filter types.ChannelFilter) (set types.ChannelSet, f types.ChannelFilter, err error) {
f = filter
//if f.Sort == "" {
// f.Sort = "c.name ASC"
//}
query := r.query()
query = query.Where(squirrel.Eq{"c.archived_at": nil})
if !f.IncludeDeleted {
query = query.Where(squirrel.Eq{"c.deleted_at": nil})
}
if len(f.ChannelID) > 0 {
query = query.Where(squirrel.Eq{"c.id": f.ChannelID})
}
if f.Query != "" {
q := "%" + strings.ToLower(f.Query) + "%"
query = query.Where(squirrel.Like{"LOWER(name)": q})
}
if f.CurrentUserID > 0 {
query = query.Where(squirrel.Or{
squirrel.Eq{"c.type": types.ChannelTypePublic},
squirrel.ConcatExpr("c.id IN (", (channelMember{}).queryAnyMember(f.CurrentUserID), ")"),
})
}
//var orderBy []string
//
//if orderBy, err = rh.ParseOrder(f.Sort, r.columns()...); err != nil {
// return
//} else {
// query = query.OrderBy(orderBy...)
//}
return set, f, rh.FetchAll(r.db(), query, &set)
}
func (r channel) Create(mod *types.Channel) (*types.Channel, error) {
mod.ID = factory.Sonyflake.NextID()
rh.SetCurrentTimeRounded(&mod.CreatedAt)
mod.UpdatedAt = nil
if mod.Type == "" {
mod.Type = types.ChannelTypePublic
}
return mod, r.db().Insert("messaging_channel", mod)
}
func (r channel) Update(mod *types.Channel) (*types.Channel, error) {
rh.SetCurrentTimeRounded(&mod.UpdatedAt)
if mod.Type == "" {
mod.Type = types.ChannelTypePublic
}
whitelist := []string{"id", "name", "type", "membership_policy", "topic", "meta", "updated_at"}
return mod, r.db().UpdatePartial("messaging_channel", mod, whitelist, "id")
}
func (r channel) ArchiveByID(ID uint64) error {
return rh.UpdateColumns(r.db(), r.table(), rh.Set{"archived_at": time.Now()}, squirrel.Eq{"id": ID})
}
func (r channel) UnarchiveByID(ID uint64) error {
return rh.UpdateColumns(r.db(), r.table(), rh.Set{"archived_at": nil}, squirrel.Eq{"id": ID})
}
func (r channel) DeleteByID(ID uint64) error {
return rh.UpdateColumns(r.db(), r.table(), rh.Set{"deleted_at": time.Now()}, squirrel.Eq{"id": ID})
}
func (r channel) UndeleteByID(ID uint64) error {
return rh.UpdateColumns(r.db(), r.table(), rh.Set{"deleted_at": nil}, squirrel.Eq{"id": ID})
}
package repository
import (
"context"
"sort"
"strconv"
"github.com/Masterminds/squirrel"
"github.com/titpetric/factory"
"github.com/cortezaproject/corteza-server/messaging/types"
"github.com/cortezaproject/corteza-server/pkg/rh"
)
type (
// ChannelMemberRepository interface to channel member repository
ChannelMemberRepository interface {
With(ctx context.Context, db *factory.DB) ChannelMemberRepository
Find(filter types.ChannelMemberFilter) (types.ChannelMemberSet, error)
Create(mod *types.ChannelMember) (*types.ChannelMember, error)
Update(mod *types.ChannelMember) (*types.ChannelMember, error)
Delete(channelID, userID uint64) error
}
channelMember struct {
*repository
}
)
// ChannelMember creates new instance of channel member repository
func ChannelMember(ctx context.Context, db *factory.DB) ChannelMemberRepository {
return (&channelMember{}).With(ctx, db)
}
// With context...
func (r *channelMember) With(ctx context.Context, db *factory.DB) ChannelMemberRepository {
return &channelMember{
repository: r.repository.With(ctx, db),
}
}
func (r channelMember) table() string {
return "messaging_channel_member"
}
func (r channelMember) columns() []string {
return []string{
"cm.rel_channel",
"cm.rel_user",
"cm.type",
"cm.flag",
"cm.created_at",
"cm.updated_at",
}
}
func (r channelMember) query() squirrel.SelectBuilder {
return squirrel.
Select(r.columns()...).
From(r.table() + " AS cm")
}
// Finds channel ID(s) with any of the members
//
// Builds a (sub)query that returns list of channel IDs at least one of the members
//
func (r channelMember) queryAnyMember(memberIDs ...uint64) squirrel.SelectBuilder {
return squirrel.
Select("cm.rel_channel").
From(r.table() + " AS cm").
Where(squirrel.Eq{"cm.rel_user": memberIDs})
}
// Finds channel ID(s) with exact membership
//
// Builds a (sub)query that returns list of channel IDs that have this exact membership
//
func (r channelMember) queryExactMembers(memberIDs ...uint64) squirrel.SelectBuilder {
if len(memberIDs) == 0 {
return squirrel.
Select("null")
}
// Make sure members are sorted
sort.Slice(memberIDs, func(i, j int) bool {
return memberIDs[i] < memberIDs[j]
})
// Concatentating members fore
membersConcat := ""
for i := range memberIDs {
// Don't panic, we're adding , in the SQL as well
membersConcat += strconv.FormatUint(memberIDs[i], 10) + ","
}
return r.queryAnyMember(memberIDs...).
GroupBy("cm.rel_channel").
Having(squirrel.Eq{
"COUNT(*)": len(memberIDs),
"CONCAT(GROUP_CONCAT(cm.rel_user ORDER BY 1 ASC SEPARATOR ','),',')": membersConcat,
})
}
// Find fetches membership info
func (r *channelMember) Find(filter types.ChannelMemberFilter) (set types.ChannelMemberSet, err error) {
query := r.query()
if len(filter.MemberID) > 0 {
query = query.Where(squirrel.Eq{"cm.rel_user": filter.MemberID})
}
if len(filter.ChannelID) > 0 {
query = query.Where(squirrel.Eq{"cm.rel_channel": filter.ChannelID})
}
return set, rh.FetchAll(r.db(), query, &set)
}
// Create adds channel membership record
func (r *channelMember) Create(mod *types.ChannelMember) (*types.ChannelMember, error) {
rh.SetCurrentTimeRounded(&mod.CreatedAt)
mod.UpdatedAt = nil
return mod, r.db().Insert("messaging_channel_member", mod)
}
// Update modifies existing channel membership record
func (r *channelMember) Update(mod *types.ChannelMember) (*types.ChannelMember, error) {
rh.SetCurrentTimeRounded(&mod.UpdatedAt)
whitelist := []string{"type", "flag", "updated_at", "rel_channel", "rel_user"}
return mod, r.db().UpdatePartial("messaging_channel_member", mod, whitelist, "rel_channel", "rel_user")
}
// Delete removes existing channel membership record
func (r *channelMember) Delete(channelID, userID uint64) error {
return rh.Delete(r.db(), r.table(), squirrel.Eq{
"rel_channel": channelID,
"rel_user": userID,
})
}
package repository
import (
"github.com/pkg/errors"
)
type (
repositoryError string
)
const (
ErrEventsPullClosed = repositoryError("EventsPullClosed")
)
func (e repositoryError) Error() string {
return e.String()
}
func (e repositoryError) String() string {
return "messaging.repository." + string(e)
}
func (e repositoryError) Eq(err error) bool {
return err != nil && e.Error() == err.Error()
}
func (e repositoryError) New() error {
return errors.WithStack(e)
}
......@@ -2,6 +2,7 @@ package repository
import (
"context"
"fmt"
"github.com/titpetric/factory"
......@@ -29,6 +30,7 @@ The reading of the event queue table is triggered by pubsub.
clients, while the websocket API (currently), performs a local
broadcast, triggering the event poll only on other servers
*/
type (
......@@ -55,7 +57,7 @@ func (r *events) Pull(ctx context.Context) (*types.EventQueueItem, error) {
select {
case res, ok := <-r.pipe:
if !ok {
return res, ErrEventsPullClosed.New()
return res, fmt.Errorf("event pull closed")
}
return res, nil
case <-ctx.Done():
......
package repository
//import (
// "context"
// "time"
//
// "github.com/Masterminds/squirrel"
// "github.com/titpetric/factory"
//
// "github.com/cortezaproject/corteza-server/messaging/types"
// "github.com/cortezaproject/corteza-server/pkg/rh"
//)
//
//type (
// MentionRepository interface {
// With(ctx context.Context, db *factory.DB) MentionRepository
//
// FindByUserIDs(IDs ...uint64) (mm types.MentionSet, err error)
// FindByMessageIDs(IDs ...uint64) (mm types.MentionSet, err error)
// Create(m *types.Mention) (*types.Mention, error)
// DeleteByMessageID(ID uint64) error
// DeleteByID(ID uint64) error
// }
//
// mention struct {
// *repository
// }
//)
//
//var (
// ErrMentionNotFound = repositoryError("MentionNotFound")
//)
//
//func Mention(ctx context.Context, db *factory.DB) MentionRepository {
// return (&mention{}).With(ctx, db)
//}
//
//func (r mention) With(ctx context.Context, db *factory.DB) MentionRepository {
// return &mention{
// repository: r.repository.With(ctx, db),
// }
//}
//
//func (r mention) table() string {
// return "messaging_mention"
//}
//
//func (r mention) columns() []string {
// return []string{
// "mm.id",
// "mm.rel_message",
// "mm.rel_channel",
// "mm.rel_user",
// "mm.rel_mentioned_by",
// "mm.created_at",
// }
//}
//
//func (r mention) query() squirrel.SelectBuilder {
// return squirrel.
// Select(r.columns()...).
// From(r.table() + " AS mm")
//}
//
//func (r mention) FindByUserIDs(IDs ...uint64) (types.MentionSet, error) {
// return r.findAllBy(squirrel.Eq{"rel_user": IDs})
//}
//
//func (r mention) FindByMessageIDs(IDs ...uint64) (types.MentionSet, error) {
// return r.findAllBy(squirrel.Eq{"rel_message": IDs})
//}
//
//func (r mention) findAllBy(cnd squirrel.Sqlizer) (mm types.MentionSet, err error) {
// return mm, rh.FetchAll(r.db(), r.query().Where(cnd), &mm)
//}
//
//func (r mention) Create(m *types.Mention) (*types.Mention, error) {
// m.ID = factory.Sonyflake.NextID()
// m.CreatedAt = time.Now()
// return m, r.db().Insert(r.table(), m)
//}
//
//func (r mention) DeleteByMessageID(ID uint64) error {
// return rh.Delete(r.db(), r.table(), squirrel.Eq{"rel_message": ID})
//}
//
//func (r mention) DeleteByID(ID uint64) error {
// return rh.Delete(r.db(), r.table(), squirrel.Eq{"id": ID})
//}
package repository
import (
"context"
"io"
"strings"
"time"
"github.com/Masterminds/squirrel"
"github.com/titpetric/factory"
"github.com/cortezaproject/corteza-server/messaging/types"
"github.com/cortezaproject/corteza-server/pkg/rh"
)
type (
MessageRepository interface {
With(ctx context.Context, db *factory.DB) MessageRepository
FindByID(id uint64) (*types.Message, error)
Find(types.MessageFilter) (types.MessageSet, types.MessageFilter, error)
FindThreads(types.MessageFilter) (types.MessageSet, types.MessageFilter, error)
CountFromMessageID(channelID, threadID, messageID uint64) (uint32, error)
LastMessageID(channelID, threadID uint64) (uint64, error)
PrefillThreadParticipants(mm types.MessageSet) error
Create(mod *types.Message) (*types.Message, error)
Update(mod *types.Message) (*types.Message, error)
DeleteByID(ID uint64) error
BindAvatar(message *types.Message, avatar io.Reader) (*types.Message, error)
IncReplyCount(ID uint64) error
DecReplyCount(ID uint64) error
}
message struct {
*repository
}
)
const (
MESSAGES_MAX_LIMIT = 100
sqlCountFromMessageID = "SELECT COUNT(*) AS count " +
"FROM messaging_message " +
"WHERE rel_channel = ? " +
"AND reply_to = ? " +
"AND COALESCE(type, '') NOT IN (?) " +
"AND id > ? AND deleted_at IS NULL"
sqlLastMessageID = "SELECT COALESCE(MAX(id), 0) AS last " +
"FROM messaging_message " +
"WHERE rel_channel = ? " +
"AND reply_to = ? " +
"AND COALESCE(type, '') NOT IN (?) " +
"AND deleted_at IS NULL"
sqlMessageRepliesIncCount = `UPDATE messaging_message SET replies = replies + 1 WHERE id = ? AND reply_to = 0`
sqlMessageRepliesDecCount = `UPDATE messaging_message SET replies = replies - 1 WHERE id = ? AND reply_to = 0`
ErrMessageNotFound = repositoryError("MessageNotFound")
)
func Message(ctx context.Context, db *factory.DB) MessageRepository {
return (&message{}).With(ctx, db)
}
func (r *message) With(ctx context.Context, db *factory.DB) MessageRepository {
return &message{
repository: r.repository.With(ctx, db),
}
}
func (r message) table() string {
return "messaging_message"
}
func (r message) columns() []string {
return []string{
"m.id",
"COALESCE(m.type,'') AS type",
"m.message",
"m.rel_user",
"m.rel_channel",
"m.reply_to",
"m.replies",
"m.created_at",
"m.updated_at",
"m.deleted_at",
}
}
func (r message) query() squirrel.SelectBuilder {
return squirrel.
Select(r.columns()...).
From(r.table() + " AS m").
Where(squirrel.Eq{"m.deleted_at": nil})
}
func (r message) FindByID(id uint64) (*types.Message, error) {
return r.findOneBy(squirrel.Eq{"m.id": id})
}
func (r message) findOneBy(cnd squirrel.Sqlizer) (*types.Message, error) {
var (
ch = &types.Message{}
q = r.query().
Where(cnd)
err = rh.FetchOne(r.db(), q, ch)
)
if err != nil {
return nil, err
} else if ch.ID == 0 {
return nil, ErrMessageNotFound
}
return ch, nil
}
func (r message) Find(filter types.MessageFilter) (set types.MessageSet, f types.MessageFilter, err error) {
f = r.sanitizeFilter(filter)
query := r.query()
if f.Query != "" {
q := "%" + strings.ToLower(f.Query) + "%"
query = query.Where(squirrel.Like{"LOWER(m.message)": q})
}
if len(f.ChannelID) > 0 {
query = query.Where(squirrel.Eq{"m.rel_channel": f.ChannelID})
}
if len(f.UserID) > 0 {
query = query.Where(squirrel.Eq{"m.rel_user": f.UserID})
}
if len(f.ThreadID) > 0 {
query = query.Where(squirrel.Eq{"m.reply_to": f.ThreadID})
} else {
query = query.Where(squirrel.Eq{"m.reply_to": 0})
}
if f.AttachmentsOnly {
// Override Type filter
f.Type = []string{
types.MessageTypeAttachment.String(),
types.MessageTypeInlineImage.String(),
}
}
if len(f.Type) > 0 {
query = query.Where(squirrel.Eq{"m.type": f.Type})
}
// first, exclusive
if f.AfterID > 0 {
query = query.OrderBy("m.id ASC")
query = query.Where(squirrel.Gt{"m.id": f.AfterID})
}
// from, inclusive
if f.FromID > 0 {
query = query.OrderBy("m.id ASC")
query = query.Where(squirrel.GtOrEq{"m.id": f.FromID})
}
// last, exclusive
if f.BeforeID > 0 {
query = query.OrderBy("m.id DESC")
query = query.Where(squirrel.Lt{"m.id": f.BeforeID})
}
// to, inclusive
if f.ToID > 0 {
query = query.OrderBy("m.id DESC")
query = query.Where(squirrel.LtOrEq{"m.id": f.ToID})
}
if f.BookmarkedOnly || f.PinnedOnly {
flag := types.MessageFlagBookmarkedMessage
if f.PinnedOnly {
flag = types.MessageFlagPinnedToChannel
}
query = query.
Where(squirrel.ConcatExpr("m.id IN(", (messageFlag{}).queryMessagesWithFlags(flag), ")"))
}
query = query.
OrderBy("id DESC").
Limit(uint64(f.Limit))
return set, f, rh.FetchAll(r.db(), query, &set)
}
func (r *message) FindThreads(filter types.MessageFilter) (set types.MessageSet, f types.MessageFilter, err error) {
f = r.sanitizeFilter(filter)
// Selecting first valid (deleted_at IS NULL) messages in threads (replies > 0 && reply_to = 0)
// that belong to filtered channels and we've contributed to (or stated it)
originals := squirrel.
Select("id AS original_id").
From(r.table()).
Where(squirrel.And{
squirrel.Eq{
"deleted_at": nil,
"rel_channel": f.ChannelID,
"reply_to": 0,
},
squirrel.Gt{"replies": 0},
squirrel.Or{
squirrel.Eq{"rel_user": filter.CurrentUserID},
squirrel.Expr(
"id IN (SELECT DISTINCT reply_to FROM messaging_message WHERE rel_user = ?)",
filter.CurrentUserID),
},
}).
OrderBy("id DESC").
Limit(uint64(f.Limit))
// Prepare the actual message selector
query := r.query().Join("originals ON (original_id IN (id, reply_to))")
if f.Query != "" {
q := "%" + strings.ToLower(f.Query) + "%"
query = query.Where(squirrel.Like{"LOWER(m.message)": q})
}
// And create CTE
cte := squirrel.ConcatExpr("WITH originals AS (", originals, ") ", query)
return set, f, rh.FetchAll(r.db(), cte, &set)
}
func (r *message) CountFromMessageID(channelID, threadID, lastReadMessageID uint64) (uint32, error) {
if lastReadMessageID == 0 {
// No need for counting, zero unread messages...
return 0, nil
}
rval := struct{ Count uint32 }{}
return rval.Count, r.db().Get(&rval,
sqlCountFromMessageID,
channelID,
threadID,
types.MessageTypeChannelEvent,
lastReadMessageID,
)
}
func (r *message) LastMessageID(channelID, threadID uint64) (uint64, error) {
rval := struct{ Last uint64 }{}
return rval.Last, r.db().Get(&rval,
sqlLastMessageID,
channelID,
threadID,
types.MessageTypeChannelEvent,
)
}
func (r *message) PrefillThreadParticipants(mm types.MessageSet) (err error) {
var rval []struct {
ReplyTo uint64 `db:"reply_to"`
UserID uint64 `db:"rel_user"`
}
// Filter out only relevant messages -- ones with replies
mm, _ = mm.Filter(func(m *types.Message) (b bool, e error) {
return m.Replies > 0, nil
})
if len(mm) == 0 {
return nil
}
query := squirrel.
Select("reply_to", "rel_user").
From(r.table()).
Where(squirrel.Eq{"reply_to": mm.IDs()})
err = rh.FetchAll(r.db(), query, &rval)
if err != nil {
return
}
for _, p := range rval {
mm.FindByID(p.ReplyTo).RepliesFrom = append(mm.FindByID(p.ReplyTo).RepliesFrom, p.UserID)
}
return nil
}
func (r *message) sanitizeFilter(f types.MessageFilter) types.MessageFilter {
if f.Limit == 0 || f.Limit > MESSAGES_MAX_LIMIT {
f.Limit = MESSAGES_MAX_LIMIT
}
return f
}
func (r *message) Create(mod *types.Message) (*types.Message, error) {
mod.ID = factory.Sonyflake.NextID()
rh.SetCurrentTimeRounded(&mod.CreatedAt)
return mod, r.db().Insert("messaging_message", mod)
}
func (r *message) Update(mod *types.Message) (*types.Message, error) {
rh.SetCurrentTimeRounded(&mod.UpdatedAt)
return mod, r.db().Replace("messaging_message", mod)
}
func (r *message) BindAvatar(in *types.Message, avatar io.Reader) (*types.Message, error) {
// @todo: implement setting avatar on a message
in.Meta.Avatar = ""
return in, nil
}
func (r *message) DeleteByID(ID uint64) error {
return rh.UpdateColumns(r.db(), r.table(), rh.Set{"deleted_at": time.Now()}, squirrel.Eq{"id": ID})
}
func (r *message) IncReplyCount(ID uint64) error {
_, err := r.db().Exec(sqlMessageRepliesIncCount, ID)
return err
}
func (r *message) DecReplyCount(ID uint64) error {
_, err := r.db().Exec(sqlMessageRepliesDecCount, ID)
return err
}
package repository
import (
"context"
"time"
"github.com/Masterminds/squirrel"
"github.com/titpetric/factory"
"github.com/cortezaproject/corteza-server/messaging/types"
"github.com/cortezaproject/corteza-server/pkg/rh"
)
type (
MessageFlagRepository interface {
With(ctx context.Context, db *factory.DB) MessageFlagRepository
FindByID(ID uint64) (*types.MessageFlag, error)
FindByMessageIDs(IDs ...uint64) (types.MessageFlagSet, error)
FindByFlag(messageID, userID uint64, flag string) (*types.MessageFlag, error)
Create(mod *types.MessageFlag) (*types.MessageFlag, error)
DeleteByID(ID uint64) error
}
messageFlag struct {
*repository
}
)
const (
ErrMessageFlagNotFound = repositoryError("MessageFlagNotFound")
)
func MessageFlag(ctx context.Context, db *factory.DB) MessageFlagRepository {
return (&messageFlag{}).With(ctx, db)
}
func (r messageFlag) columns() []string {
return []string{
"mf.id",
"mf.rel_user",
"mf.rel_message",
"mf.rel_channel",
"mf.flag",
"mf.created_at",
}
}
func (r messageFlag) table() string {
return "messaging_message_flag"
}
func (r messageFlag) query() squirrel.SelectBuilder {
return squirrel.
Select(r.columns()...).
From(r.table() + " AS mf")
}
func (r messageFlag) queryMessagesWithFlags(flags ...string) squirrel.SelectBuilder {
return squirrel.
Select("mf.rel_message").
From(r.table() + " AS mf").
Where(squirrel.Eq{"flag": flags})
}
func (r messageFlag) With(ctx context.Context, db *factory.DB) MessageFlagRepository {
return &messageFlag{
repository: r.repository.With(ctx, db),
}
}
func (r messageFlag) FindByID(ID uint64) (*types.MessageFlag, error) {
return r.findOneBy(squirrel.Eq{"id": ID})
}
func (r messageFlag) FindByFlag(messageID, userID uint64, flag string) (*types.MessageFlag, error) {
cnd := squirrel.Eq{
"rel_message": messageID,
"flag": flag,
}
if userID > 0 {
cnd["rel_user"] = userID
}
return r.findOneBy(cnd)
}
func (r messageFlag) findOneBy(cnd squirrel.Sqlizer) (*types.MessageFlag, error) {
var (
mf = &types.MessageFlag{}
q = r.query().
Where(cnd)
err = rh.FetchOne(r.db(), q, mf)
)
if err != nil {
return nil, err
} else if mf.ID == 0 {
return nil, ErrMessageFlagNotFound
}
return mf, nil
}
// FindByMessageIDs returns all flags by message id range
func (r messageFlag) FindByMessageIDs(IDs ...uint64) (set types.MessageFlagSet, err error) {
if len(IDs) == 0 {
return
}
return set, rh.FetchAll(r.db(), r.query().Where(squirrel.Eq{"rel_message": IDs}), &set)
}
func (r messageFlag) Create(mod *types.MessageFlag) (*types.MessageFlag, error) {
mod.ID = factory.Sonyflake.NextID()
mod.CreatedAt = time.Now()
return mod, r.db().Insert(r.table(), mod)
}
func (r messageFlag) DeleteByID(ID uint64) error {
return rh.Delete(r.db(), r.table(), squirrel.Eq{"id": ID})
}
package repository
import (
"context"
)
type (
PubSub struct {
client pubSubModule
}
PubSubClient interface {
pubSubModule
Event(ctx context.Context, message string) error
}
pubSubModule interface {
Subscribe(ctx context.Context, channel string, onStart func() error, onMessage func(channel string, message []byte) error) error
Publish(ctx context.Context, channel string, message string) error
}
PubSubPayload struct {
Channel string
Message []byte
}
)
// var pubsub *PubSub
func (PubSub) New() *PubSub {
panic("pending reimplementation")
// @todo should be configured much earlier, do not depend on flags here
// // return singleton client
// if pubsub != nil {
// return pubsub
// }
//
// // store the singleton instance
// save := func(client pubSubModule) *PubSub {
// pubsub = &PubSub{client}
// return pubsub
// }
// // create isntances based on mode
// if flags != nil && flags.PubSub.Mode == "redis" {
// return save(PubSubRedis{}.New(flags.PubSub))
// }
// return save(PubSubMemory{}.New(flags.PubSub))
}
func (ps *PubSub) Subscribe(ctx context.Context, channel string, onStart func() error, onMessage func(channel string, message []byte) error) error {
return ps.client.Subscribe(ctx, channel, onStart, onMessage)
}
func (ps *PubSub) Event(ctx context.Context, message string) error {
return ps.Publish(ctx, "events", message)
}
func (ps *PubSub) Publish(ctx context.Context, channel, message string) error {
return ps.client.Publish(ctx, channel, message)
}
package repository
import (
"context"
"time"
"github.com/pkg/errors"
)
type PubSubMemory struct {
pollingInterval time.Duration
input chan *PubSubPayload
}
func (PubSubMemory) New(pollingInterval time.Duration) *PubSubMemory {
return &PubSubMemory{
pollingInterval: pollingInterval,
input: make(chan *PubSubPayload, 512),
}
}
func (ps *PubSubMemory) Subscribe(ctx context.Context, channel string, onStart func() error, onMessage func(channel string, payload []byte) error) error {
polling := func() error {
if err := onStart(); err != nil {
return err
}
for {
select {
// context cancelled
case <-ctx.Done():
return ctx.Err()
// triggered local event
case msg := <-ps.input:
if msg.Channel == channel {
onMessage(msg.Channel, msg.Message)
}
// polling event
case <-time.After(ps.pollingInterval):
onMessage(channel, []byte("pubsub tick event"))
}
}
}
defer func() {
close(ps.input)
}()
return polling()
}
func (ps *PubSubMemory) Publish(ctx context.Context, channel string, message string) (err error) {
defer func() {
// trying to send on closed channel panic, recover
if r := recover(); r != nil {
err = errors.Errorf("PubSubMemory.Publish: %+v", r)
}
}()
payload := &PubSubPayload{
channel,
[]byte(message),
}
select {
case ps.input <- payload:
case <-time.After(50 * time.Millisecond):
return errors.New("PubSubMemory.Publish: send timeout")
}
return nil
}
package repository
import (
"context"
"testing"
"time"
)
func TestPubSubMemory(t *testing.T) {
p := PubSubMemory{}.New(time.Second)
calledOnConnect := false
calledOnMessage := 0
onConnect := func() error {
calledOnConnect = true
return nil
}
onMessage := func(channel string, message []byte) error {
calledOnMessage++
return nil
}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() {
done <- p.Subscribe(ctx, "events", onConnect, onMessage)
}()
_ = p.Publish(ctx, "events", "new message event")
_ = p.Publish(ctx, "events", "new message event")
time.Sleep(2 * time.Millisecond)
if !calledOnConnect {
t.Fatalf("Expected initial call to 'onConnect'")
}
if calledOnMessage != 2 {
t.Fatalf("Expected calledOnMessage to be 2, got %d", calledOnMessage)
}
cancel()
select {
case <-done:
case <-time.After(10 * time.Millisecond):
t.Fatalf("Expected PubSub channel exit after context cancellation")
}
if err := p.Publish(ctx, "events", "new message event"); err == nil {
t.Fatalf("Expected error from sending message on closed channel")
}
}
package repository
import (
"context"
"time"
"github.com/gomodule/redigo/redis"
)
type PubSubRedis struct {
addr string
timeout time.Duration
pingTimeout time.Duration
pingPeriod time.Duration
}
func (PubSubRedis) New(addr string, to, pt, pp time.Duration) *PubSubRedis {
return &PubSubRedis{
addr: addr,
timeout: to,
pingTimeout: pt,
pingPeriod: pp,
}
}
func (ps *PubSubRedis) dial() (redis.Conn, error) {
return redis.Dial(
"tcp",
ps.addr,
redis.DialReadTimeout(ps.pingTimeout+ps.timeout),
redis.DialWriteTimeout(ps.timeout),
)
}
func (ps *PubSubRedis) Subscribe(ctx context.Context, channel string, onStart func() error, onMessage func(channel string, payload []byte) error) error {
// main redis connection
conn, err := ps.dial()
if err != nil {
return err
}
defer conn.Close()
// pubsub object
psc := redis.PubSubConn{Conn: conn}
if err := psc.Subscribe(redis.Args{}.Add(channel)...); err != nil {
return err
}
done := make(chan error, 1)
// Start a goroutine to receive notifications from the server.
go func() {
for {
switch n := psc.Receive().(type) {
case error:
done <- n
return
case redis.Message:
if err := onMessage(n.Channel, n.Data); err != nil {
done <- err
return
}
case redis.Subscription:
switch n.Count {
case 1:
// Notify application when all channels are subscribed.
if err := onStart(); err != nil {
done <- err
return
}
case 0:
// Return from the goroutine when all channels are unsubscribed.
done <- nil
return
}
}
}
}()
cleanup := func(err error) error {
psc.Unsubscribe()
return err
}
for {
select {
case <-time.After(ps.pingPeriod):
if err := psc.Ping(""); err != nil {
return cleanup(err)
}
case <-ctx.Done():
return cleanup(ctx.Err())
case err := <-done:
return err
}
}
}
func (ps *PubSubRedis) Publish(ctx context.Context, channel, message string) error {
conn, err := ps.dial()
if err != nil {
return err
}
defer conn.Close()
_, err = conn.Do("PUBLISH", channel, message)
return err
}
package repository
import (
"context"
"github.com/titpetric/factory"
)
type (
repository struct {
ctx context.Context
dbh *factory.DB
}
)
// DB produces a contextual DB handle
func DB(ctx context.Context) *factory.DB {
return factory.Database.MustGet().With(ctx)
}
// With updates repository and database contexts
func (r *repository) With(ctx context.Context, db *factory.DB) *repository {
return &repository{
ctx: ctx,
dbh: db,
}
}
// Context returns current active repository context
func (r *repository) Context() context.Context {
return r.ctx
}
// db returns context-aware db handle
func (r *repository) db() *factory.DB {
if r.dbh != nil {
return r.dbh
}
return DB(r.ctx)
}
package repository
import (
"context"
"github.com/Masterminds/squirrel"
"github.com/titpetric/factory"
"github.com/cortezaproject/corteza-server/messaging/types"
"github.com/cortezaproject/corteza-server/pkg/rh"
)
type (
// UnreadRepository interface to channel member repository
UnreadRepository interface {
With(ctx context.Context, db *factory.DB) UnreadRepository
Count(userID, channelID uint64, threadIDs ...uint64) (types.UnreadSet, error)
CountThreads(userID, channelID uint64) (types.UnreadSet, error)
Preset(channelID, threadID uint64, userIDs ...uint64) (err error)
Record(userID, channelID, threadID, lastReadMessageID uint64, count uint32) error
Inc(channelID, replyTo, userID uint64) error
Dec(channelID, replyTo, userID uint64) error
ClearThreads(channelID, userID uint64) (err error)
}
unread struct {
*repository
}
)
const (
sqlResetThreads = `UPDATE messaging_unread
SET count = 0
WHERE rel_reply_to > 0 AND rel_channel = ? AND rel_user = ?`
sqlUnreadIncCount = `UPDATE messaging_unread
SET count = count + 1
WHERE rel_channel = ? AND rel_reply_to = ? AND rel_user <> ?`
sqlUnreadDecCount = `UPDATE messaging_unread
SET count = count - 1
WHERE rel_channel = ? AND rel_reply_to = ? AND count > 0`
sqlResetCount = `REPLACE INTO messaging_unread (rel_channel, rel_reply_to, rel_user, count) VALUES (?, ?, ?, 0)`
sqlUnreadPresetChannel = `INSERT IGNORE INTO messaging_unread (rel_channel, rel_reply_to, rel_user) VALUES (?, ?, ?)`
sqlUnreadPresetThreads = `INSERT IGNORE INTO messaging_unread (rel_channel, rel_reply_to, rel_user)
SELECT rel_channel, id, ?
FROM messaging_message
WHERE rel_channel = ?
AND replies > 0`
)
// Unread creates new instance of channel member repository
func Unread(ctx context.Context, db *factory.DB) UnreadRepository {
return (&unread{}).With(ctx, db)
}
func (r unread) table() string {
return "messaging_unread"
}
// With context...
func (r *unread) With(ctx context.Context, db *factory.DB) UnreadRepository {
return &unread{
repository: r.repository.With(ctx, db),
}
}
// Count returns counts unread channel info
func (r *unread) Count(userID, channelID uint64, threadIDs ...uint64) (types.UnreadSet, error) {
var (
uu = types.UnreadSet{}
q = squirrel.
Select(
"rel_channel",
"rel_last_message",
"rel_user",
"rel_reply_to",
"count",
).
From(r.table())
)
if userID > 0 {
q = q.Where("rel_user = ?", userID)
}
if channelID > 0 {
q = q.Where("rel_channel = ?", channelID)
}
if len(threadIDs) == 0 {
q = q.Where("rel_reply_to = 0")
} else {
q = q.Where(squirrel.Eq{"rel_reply_to": threadIDs})
}
return uu, rh.FetchAll(r.db(), q, &uu)
}
// CountReplies counts unread thread info
func (r unread) CountThreads(userID, channelID uint64) (types.UnreadSet, error) {
type (
u struct {
Rel_channel, Rel_user uint64
Total, Count uint32
}
)
var (
err error
uu = types.UnreadSet{}
temp = []*u{}
q = squirrel.
Select(
"rel_channel",
"rel_user",
"sum(count) AS count",
"sum(CASE WHEN count > 0 THEN 1 ELSE 0 END) AS total",
).
From(r.table()).
Where("rel_reply_to > 0 AND count > 0").
GroupBy("rel_channel", "rel_user")
)
if userID > 0 {
q = q.Where("rel_user = ?", userID)
}
if channelID > 0 {
q = q.Where("rel_channel = ?", channelID)
}
err = rh.FetchAll(r.db(), q, &temp)
if err != nil {
return nil, err
}
for _, t := range temp {
uu = append(uu, &types.Unread{
ChannelID: t.Rel_channel,
UserID: t.Rel_user,
ThreadCount: t.Count,
ThreadTotal: t.Total,
})
}
return uu, nil
}
func (r unread) ClearThreads(channelID, userID uint64) (err error) {
_, err = r.db().Exec(sqlResetThreads, channelID, userID)
return
}
// Preset channel unread records for all users (and threads in that channel)
//
// Whenever channel member is added or a new thread is created
// we generate records
func (r unread) Preset(channelID, threadID uint64, userIDs ...uint64) (err error) {
if channelID == 0 {
return
}
for _, userID := range userIDs {
if userID == 0 {
continue
}
_, err = r.db().Exec(sqlUnreadPresetChannel, channelID, threadID, userID)
if err != nil {
return
}
if threadID == 0 {
// Preset for all threads in the channel
_, err = r.db().Exec(sqlUnreadPresetThreads, userID, channelID)
if err != nil {
return
}
}
}
return
}
// Record channel/thread view
func (r *unread) Record(userID, channelID, threadID, lastReadMessageID uint64, count uint32) error {
mod := &types.Unread{
ChannelID: channelID,
UserID: userID,
ReplyTo: threadID,
LastMessageID: lastReadMessageID,
Count: count,
}
return r.db().Replace("messaging_unread", mod)
}
// Inc increments unread message count on a channel/thread for all but one user
func (r *unread) Inc(channelID, threadID, userID uint64) (err error) {
_, err = r.db().Exec(sqlUnreadIncCount, channelID, threadID, userID)
if err != nil {
return err
}
return nil
}
// Dec decrements unread message count on a channel/thread for all but one user
func (r *unread) Dec(channelID, threadID, userID uint64) (err error) {
_, err = r.db().Exec(sqlUnreadDecCount, channelID, threadID)
if err != nil {
return err
}
_, err = r.db().Exec(sqlResetCount, channelID, threadID, userID)
if err != nil {
return err
}
return nil
}
......@@ -4,13 +4,11 @@ import (
"context"
"errors"
"fmt"
"github.com/cortezaproject/corteza-server/messaging/repository"
"github.com/cortezaproject/corteza-server/messaging/types"
"github.com/cortezaproject/corteza-server/pkg/actionlog"
"github.com/cortezaproject/corteza-server/pkg/auth"
"github.com/cortezaproject/corteza-server/pkg/id"
"github.com/cortezaproject/corteza-server/store"
"time"
)
type (
......@@ -338,7 +336,7 @@ func (svc *channel) Create(new *types.Channel) (ch *types.Channel, err error) {
m.ChannelID = ch.ID
// Create member
if m, err = svc.createMember(ctx, s, m); err != nil {
if err = svc.createMember(ctx, s, m); err != nil {
return err
}
......@@ -483,8 +481,11 @@ func (svc *channel) Update(upd *types.Channel) (ch *types.Channel, err error) {
if !changed {
return nil
}
ch.UpdatedAt = now()
// Save the updated channel
if ch, err = svc.channel.Update(upd); err != nil {
if err = store.UpdateMessagingChannel(ctx, s, ch); err != nil {
return
}
......@@ -521,18 +522,15 @@ func (svc *channel) Delete(ID uint64) (ch *types.Channel, err error) {
if ch.DeletedAt != nil {
return ChannelErrAlreadyDeleted()
} else {
now := time.Now()
ch.DeletedAt = &now
}
// Set deletedAt timestamp so that our clients can react properly...
ch.DeletedAt = now()
svc.scheduleSystemMessage(ch, "<@%d> deleted this channel", userID)
if err = svc.channel.DeleteByID(ID); err != nil {
if err = store.UpdateMessagingChannel(ctx, s, ch); err != nil {
return
} else {
// Set deletedAt timestamp so that our clients can react properly...
ch.DeletedAt = now()
}
_ = svc.sendChannelEvent(ch)
......@@ -569,16 +567,15 @@ func (svc *channel) Undelete(ID uint64) (ch *types.Channel, err error) {
return ChannelErrNotDeleted()
}
ch.DeletedAt = nil
svc.scheduleSystemMessage(ch, "<@%d> undeleted this channel", userID)
if err = svc.channel.UndeleteByID(ID); err != nil {
if err = store.UpdateMessagingChannel(ctx, s, ch); err != nil {
return
} else {
// Remove deletedAt timestamp so that our clients can react properly...
ch.DeletedAt = nil
}
svc.flushSystemMessages()
_ = svc.flushSystemMessages()
return svc.sendChannelEvent(ch)
})
......@@ -603,7 +600,8 @@ func (svc *channel) SetFlag(ID uint64, flag types.ChannelMembershipFlag) (ch *ty
aProps.setChannel(ch)
if members, err := svc.cmember.Find(types.ChannelMemberFilter{ChannelID: []uint64{ch.ID}, MemberID: []uint64{userID}}); err != nil {
f := types.ChannelMemberFilter{ChannelID: []uint64{ch.ID}, MemberID: []uint64{userID}}
if members, _, err := store.SearchMessagingChannelMembers(ctx, s, f); err != nil {
return err
} else if len(members) == 1 {
membership = members[0]
......@@ -614,7 +612,9 @@ func (svc *channel) SetFlag(ID uint64, flag types.ChannelMembershipFlag) (ch *ty
return ChannelErrNotMember()
}
if ch.Member, err = svc.cmember.Update(membership); err != nil {
ch.Member = membership
ch.Member.UpdatedAt = now()
if err = store.UpdateMessagingChannelMember(ctx, s, membership); err != nil {
return
}
......@@ -653,13 +653,12 @@ func (svc *channel) Archive(ID uint64) (ch *types.Channel, err error) {
return ChannelErrAlreadyArchived()
}
ch.ArchivedAt = now()
svc.scheduleSystemMessage(ch, "<@%d> archived this channel", userID)
if err = svc.channel.ArchiveByID(ID); err != nil {
if err = store.UpdateMessagingChannel(ctx, s, ch); err != nil {
return
} else {
// Set archivedAt timestamp so that our clients can react properly...
ch.ArchivedAt = now()
}
_ = svc.flushSystemMessages()
......@@ -694,11 +693,10 @@ func (svc *channel) Unarchive(ID uint64) (ch *types.Channel, err error) {
return ChannelErrNotArchived()
}
if err = svc.channel.UnarchiveByID(ID); err != nil {
ch.ArchivedAt = nil
if err = store.UpdateMessagingChannel(ctx, s, ch); err != nil {
return
} else {
// Unset archivedAt timestamp so that our clients can react properly...
ch.ArchivedAt = nil
}
svc.scheduleSystemMessage(ch, "<@%d> unarchived this channel", userID)
......@@ -748,7 +746,8 @@ func (svc *channel) InviteUser(channelID uint64, memberIDs ...uint64) (out types
return ChannelErrNotAllowedToManageMembers()
}
if existing, err = svc.cmember.Find(types.ChannelMemberFilterChannels(channelID)); err != nil {
f := types.ChannelMemberFilterChannels(channelID)
if existing, _, err = store.SearchMessagingChannelMembers(ctx, s, f); err != nil {
return
}
......@@ -762,12 +761,13 @@ func (svc *channel) InviteUser(channelID uint64, memberIDs ...uint64) (out types
svc.scheduleSystemMessage(ch, "<@%d> invited <@%d> to the channel", userID, memberID)
member := &types.ChannelMember{
CreatedAt: *now(),
ChannelID: channelID,
UserID: memberID,
Type: types.ChannelMembershipTypeInvitee,
}
if member, err = svc.createMember(member); err != nil {
if err = store.CreateMessagingChannelMember(ctx, s, member); err != nil {
return err
}
......@@ -814,7 +814,8 @@ func (svc *channel) AddMember(channelID uint64, memberIDs ...uint64) (out types.
return ChannelErrUnableToManageGroupMembers()
}
if existing, err = svc.cmember.Find(types.ChannelMemberFilterChannels(channelID)); err != nil {
f := types.ChannelMemberFilterChannels(channelID)
if existing, _, err = store.SearchMessagingChannelMembers(ctx, s, f); err != nil {
return
}
......@@ -851,9 +852,10 @@ func (svc *channel) AddMember(channelID uint64, memberIDs ...uint64) (out types.
}
if exists {
member, err = svc.cmember.Update(member)
member.UpdatedAt = now()
err = store.UpdateMessagingChannelMember(ctx, s, member)
} else {
member, err = svc.createMember(member)
err = svc.createMember(ctx, s, member)
}
if err != nil {
......@@ -877,13 +879,15 @@ func (svc *channel) AddMember(channelID uint64, memberIDs ...uint64) (out types.
}
// createMember orchestrates member creation
func (svc channel) createMember(ctx context.Context, s store.Storable, member *types.ChannelMember) (m *types.ChannelMember, err error) {
if m, err = svc.cmember.Create(member); err != nil {
func (svc channel) createMember(ctx context.Context, s store.Storable, m *types.ChannelMember) (err error) {
m.CreatedAt = *now()
if err = store.CreateMessagingChannelMember(ctx, s, m); err != nil {
return
}
// Create zero-count unread record
if err = svc.unread.Preset(m.ChannelID, 0, m.UserID); err != nil {
if err = store.PresetMessagingUnread(ctx, s, m.ChannelID, 0, m.UserID); err != nil {
return
}
......@@ -912,7 +916,8 @@ func (svc *channel) DeleteMember(channelID uint64, memberIDs ...uint64) (err err
return ChannelErrUnableToManageGroupMembers()
}
if existing, err = svc.cmember.Find(types.ChannelMemberFilterChannels(channelID)); err != nil {
f := types.ChannelMemberFilterChannels(channelID)
if existing, _, err = store.SearchMessagingChannelMembers(ctx, s, f); err != nil {
return
}
......@@ -934,7 +939,7 @@ func (svc *channel) DeleteMember(channelID uint64, memberIDs ...uint64) (err err
svc.scheduleSystemMessage(ch, "<@%d> removed from the channel", memberID)
}
if err = svc.cmember.Delete(channelID, memberID); err != nil {
if err = store.DeleteMessagingChannelMemberByChannelIDUserID(ctx, s, channelID, memberID); err != nil {
return err
}
......@@ -963,7 +968,10 @@ func (svc *channel) flushSystemMessages() (err error) {
}()
return svc.sysmsgs.Walk(func(msg *types.Message) error {
if msg, err = svc.message.Create(msg); err != nil {
msg.ID = id.Next()
msg.CreatedAt = *now()
if err = store.CreateMessagingMessage(svc.ctx, svc.store, msg); err != nil {
return err
} else {
return svc.event.Message(msg)
......@@ -978,7 +986,8 @@ func (svc *channel) sendChannelEvent(ch *types.Channel) (err error) {
// Preload members, if needed
if len(ch.Members) == 0 || ch.Member == nil {
if mm, err := svc.cmember.Find(types.ChannelMemberFilterChannels(ch.ID)); err != nil {
f := types.ChannelMemberFilterChannels(ch.ID)
if mm, _, err := store.SearchMessagingChannelMembers(svc.ctx, svc.store, f); err != nil {
return err
} else {
ch.Members = mm.AllMemberIDs()
......
......@@ -191,17 +191,15 @@ func (svc message) Create(msg *types.Message) (*types.Message, error) {
// First reply,
//
// reset unreads for all members
//var mm types.ChannelMemberSet
//mm, _, err = store.SearchMessagingChannelMembers(svc.ctx, svc.store, types.ChannelMemberFilterChannels(original.ChannelID))
//if err != nil {
// return err
//}
panic("reimplement this")
//err = svc.unread.Preset(original.ChannelID, original.ID, mm.AllMemberIDs()...)
//if err != nil {
// return err
//}
var mm types.ChannelMemberSet
mm, _, err = store.SearchMessagingChannelMembers(svc.ctx, svc.store, types.ChannelMemberFilterChannels(original.ChannelID))
if err != nil {
return err
}
if err = store.PresetMessagingUnread(ctx, s, original.ChannelID, original.ID, mm.AllMemberIDs()...); err != nil {
return err
}
}
// Increment counter, on struct and in store.
......
......@@ -2,7 +2,6 @@ package types
import (
"github.com/cortezaproject/corteza-server/pkg/filter"
"github.com/cortezaproject/corteza-server/pkg/rh"
"time"
"github.com/jmoiron/sqlx/types"
......@@ -12,44 +11,44 @@ import (
type (
Channel struct {
ID uint64 `json:"channelID" db:"id"`
Name string `json:"name" db:"name"`
Topic string `json:"topic" db:"topic"`
Type ChannelType `json:"type" db:"type"`
Meta types.JSONText `json:"-" db:"meta"`
MembershipPolicy ChannelMembershipPolicy `json:"membershipPolicy" db:"membership_policy""`
CreatorID uint64 `json:"creatorId" db:"rel_creator"`
OrganisationID uint64 `json:"organisationId" db:"rel_organisation"`
CreatedAt time.Time `json:"createdAt,omitempty" db:"created_at"`
UpdatedAt *time.Time `json:"updatedAt,omitempty" db:"updated_at"`
ArchivedAt *time.Time `json:"archivedAt,omitempty" db:"archived_at"`
DeletedAt *time.Time `json:"deletedAt,omitempty" db:"deleted_at"`
LastMessageID uint64 `json:",omitempty" db:"rel_last_message"`
CanJoin bool `json:"-" db:"-"`
CanPart bool `json:"-" db:"-"`
CanObserve bool `json:"-" db:"-"`
CanSendMessages bool `json:"-" db:"-"`
CanDeleteMessages bool `json:"-" db:"-"`
CanDeleteOwnMessages bool `json:"-" db:"-"`
CanUpdateMessages bool `json:"-" db:"-"`
CanUpdateOwnMessages bool `json:"-" db:"-"`
CanChangeMembers bool `json:"-" db:"-"`
CanChangeMembershipPolicy bool `json:"-" db:"-"`
CanUpdate bool `json:"-" db:"-"`
CanArchive bool `json:"-" db:"-"`
CanUnarchive bool `json:"-" db:"-"`
CanDelete bool `json:"-" db:"-"`
CanUndelete bool `json:"-" db:"-"`
Member *ChannelMember `json:"-" db:"-"`
Members []uint64 `json:"-" db:"-"`
Unread *Unread `json:"-" db:"-"`
ID uint64 `json:"channelID"`
Name string `json:"name"`
Topic string `json:"topic"`
Type ChannelType `json:"type"`
Meta types.JSONText `json:"-"`
MembershipPolicy ChannelMembershipPolicy `json:"membershipPolicy"`
CreatorID uint64 `json:"creatorId"`
OrganisationID uint64 `json:"organisationId"`
CreatedAt time.Time `json:"createdAt,omitempty"`
UpdatedAt *time.Time `json:"updatedAt,omitempty"`
ArchivedAt *time.Time `json:"archivedAt,omitempty"`
DeletedAt *time.Time `json:"deletedAt,omitempty"`
LastMessageID uint64 `json:",omitempty"`
CanJoin bool `json:"-"`
CanPart bool `json:"-"`
CanObserve bool `json:"-"`
CanSendMessages bool `json:"-"`
CanDeleteMessages bool `json:"-"`
CanDeleteOwnMessages bool `json:"-"`
CanUpdateMessages bool `json:"-"`
CanUpdateOwnMessages bool `json:"-"`
CanChangeMembers bool `json:"-"`
CanChangeMembershipPolicy bool `json:"-"`
CanUpdate bool `json:"-"`
CanArchive bool `json:"-"`
CanUnarchive bool `json:"-"`
CanDelete bool `json:"-"`
CanUndelete bool `json:"-"`
Member *ChannelMember `json:"-"`
Members []uint64 `json:"-"`
Unread *Unread `json:"-"`
}
ChannelFilter struct {
......@@ -64,8 +63,6 @@ type (
// @deprecated
IncludeDeleted bool
Deleted rh.FilterState `json:"deleted"`
// Check fn is called by store backend for each resource found function can
// modify the resource and return false if store should not return it
//
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment