在看文章前可以先去 https://zznq.hyuga.icu 体验下。
之前写的代码太拉自己都看不下去一直想重构来着,正好瞅见了 0成本快速创建高匿HTTP/DNS LOG 文章中总结 Hyuga 的优缺点:
一次性给它改进下,打造一款:the best tools for monitoring out-of-band traffic.
这次首先移除中间件redis,持久化采用 leveldb,而用户的oob record存入带有超时缓存cache的 lru 中,代码如下:
type Recorder struct {
pool *cache.Cache
eventbus *event.EventBus
}
func NewRecorder(eventbus *event.EventBus) *Recorder {
c := cache.New(defaultCacheExpiration, defaultCacheCleanup)
c.OnEvicted(func(key string, v any) {
logrus.Debugf("[db][recorder] key:%s deleted", key)
l, ok := v.(*lru.Cache[int, any])
if ok {
putlru(l)
}
})
return &Recorder{
pool: c,
eventbus: eventbus,
}
}
func (r *Recorder) Record(userid string, v any) error {
if r.eventbus != nil {
r.eventbus.Publish(userid, v)
}
lru_, ok := r.pool.Get(userid)
if !ok {
l := getlru()
l.Add(0, v)
r.pool.Set(userid, l, cache.DefaultExpiration)
return nil
} else {
l := lru_.(*lru.Cache[int, any])
key := l.Len()
if key >= defaultCacheSize {
key = l.Keys()[l.Len()-1]
key++
}
l.Add(key, v)
r.pool.Set(userid, l, cache.DefaultExpiration)
}
return nil
}
func (r *Recorder) Get(userid string) ([]any, error) {
lru_, ok := r.pool.Get(userid)
if !ok {
return nil, nil
}
l := lru_.(*lru.Cache[int, any])
if l.Len() == 0 {
return nil, nil
}
res := make([]any, 0, l.Len())
for _, key := range l.Keys() {
v, ok := l.Get(key)
if ok {
res = append(res, v)
}
}
return res, nil
}
实现个简单 eventbus 通过websocket推送到用户页面,也支持第三方应用的消息推送,使用 https://github.com/moonD4rk/notifier 库实现,在服务启动时订阅全部主题,读取用户配置以此推送到第三方应用:
g.Go(func() error {
// subscribe all event
s := eventbus.Subscribe("*")
defer eventbus.Unsubscribe(s)
for {
select {
case <-ctx.Done():
logrus.Info("[server][notify] shutdown")
return ctx.Err()
case msg := <-s.Out():
r, ok := msg.(oob.Record)
if !ok {
continue
}
logrus.Infof("[server][notify] eventbus '*' receive msg'%s', '%s'", r.Type.String(), r.Name)
u, err := db.GetUserBySid(r.Sid)
if err == nil && u != nil && u.Notify.Enable {
if u.Notify.Bark.Key != "" {
notifier.WithBark(u.Notify.Bark.Key, u.Notify.Bark.Server, r.Type.String(), r.Name)
}
...
}
}
}
})
websocket 那里只要订阅用户自己的消息:
func (w *restfulHandler) record(c *gin.Context) {
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
logrus.Warnf("[restful] upgrade websocket failed: %v", err)
return
}
sid := c.GetString("sid")
go func() {
defer ws.Close()
logrus.Infof("[restful] start record stream")
c := make(chan struct{})
go func() {
if _, _, err := ws.ReadMessage(); err != nil {
close(c)
}
}()
// get user all records
records, err := w.recorder.Get(sid)
if err != nil {
logrus.Warnf("[restful] get user records err: %s", err.Error())
return
}
for _, r := range records {
if err = ws.WriteJSON(r); err != nil {
logrus.Infof("[restful][stream] push record err: %s", err.Error())
}
}
// subscribe user record event
s := w.eventbus.Subscribe(sid)
defer w.eventbus.Unsubscribe(s)
for {
select {
case <-c:
logrus.Infof("[restful] close record stream")
return
case msg := <-s.Out():
logrus.Infof("[restful][stream] push record msg: %v", msg)
if err = ws.WriteJSON(msg); err != nil {
return
}
}
}
}()
}
Hyuga 通过github action 自动发布,使用 embed 打包好前端资源文件,所以部署特别简单:
hyuga.icu
,准备一台vps,如公网ip 1.1.1.1
。