mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-06-18 10:17:36 +07:00
feat(nodes): per-node client IP attribution for IP-limit
Record each panel's own Xray IP observations under its panelGuid and merge each node's guid-keyed report on the master, so the panel can tell which node a client IP is connecting through (the flat inbound_client_ips union is pushed back to every node and cannot attribute). Adds the NodeClientIp model + migration, the clientIpsByGuid endpoint and node-sync merge, node-name labels in the client IP log, and cleanup on node deletion.
This commit is contained in:
33
frontend/src/lib/clients/ip-log.ts
Normal file
33
frontend/src/lib/clients/ip-log.ts
Normal file
@ -0,0 +1,33 @@
|
||||
// Shape of one entry in a client's IP log, as returned by
|
||||
// POST /panel/api/clients/ips/:email. `node` is the name of the node the IP is
|
||||
// connecting through, or '' when it is on this local panel (or unattributed).
|
||||
export type ClientIpInfo = {
|
||||
ip: string;
|
||||
time: string;
|
||||
node: string;
|
||||
};
|
||||
|
||||
// normalizeClientIps accepts the API payload and returns typed entries. It also
|
||||
// tolerates the legacy shape (a plain array of "ip (time)" strings) so the UI
|
||||
// keeps working against older panels.
|
||||
export function normalizeClientIps(obj: unknown): ClientIpInfo[] {
|
||||
if (!Array.isArray(obj)) return [];
|
||||
const out: ClientIpInfo[] = [];
|
||||
for (const x of obj) {
|
||||
if (typeof x === 'string') {
|
||||
if (x.length > 0) out.push({ ip: x, time: '', node: '' });
|
||||
continue;
|
||||
}
|
||||
if (x && typeof x === 'object') {
|
||||
const o = x as Record<string, unknown>;
|
||||
const ip = typeof o.ip === 'string' ? o.ip : '';
|
||||
if (!ip) continue;
|
||||
out.push({
|
||||
ip,
|
||||
time: typeof o.time === 'string' ? o.time : '',
|
||||
node: typeof o.node === 'string' ? o.node : '',
|
||||
});
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}
|
||||
@ -24,6 +24,7 @@ import dayjs from 'dayjs';
|
||||
import type { Dayjs } from 'dayjs';
|
||||
import { HttpUtil, RandomUtil } from '@/utils';
|
||||
import { formatInboundLabel } from '@/lib/inbounds/label';
|
||||
import { normalizeClientIps, type ClientIpInfo } from '@/lib/clients/ip-log';
|
||||
import { DateTimePicker, SelectAllClearButtons } from '@/components/form';
|
||||
import { TLS_FLOW_CONTROL } from '@/schemas/primitives';
|
||||
import type { ClientRecord, InboundOption, ExternalLink, ExternalLinkInput } from '@/hooks/useClients';
|
||||
@ -177,7 +178,7 @@ export default function ClientFormModal({
|
||||
const [form, setForm] = useState<FormState>(emptyForm);
|
||||
const [submitting, setSubmitting] = useState(false);
|
||||
const [resetting, setResetting] = useState(false);
|
||||
const [clientIps, setClientIps] = useState<string[]>([]);
|
||||
const [clientIps, setClientIps] = useState<ClientIpInfo[]>([]);
|
||||
const [ipsLoading, setIpsLoading] = useState(false);
|
||||
const [ipsClearing, setIpsClearing] = useState(false);
|
||||
const [ipsModalOpen, setIpsModalOpen] = useState(false);
|
||||
@ -355,8 +356,7 @@ export default function ClientFormModal({
|
||||
try {
|
||||
const msg = await HttpUtil.post(`/panel/api/clients/ips/${encodeURIComponent(client.email)}`) as ApiMsg<unknown[]>;
|
||||
if (!msg?.success) { setClientIps([]); return; }
|
||||
const arr = Array.isArray(msg.obj) ? msg.obj : [];
|
||||
setClientIps(arr.filter((x): x is string => typeof x === 'string' && x.length > 0));
|
||||
setClientIps(normalizeClientIps(msg.obj));
|
||||
} finally {
|
||||
setIpsLoading(false);
|
||||
}
|
||||
@ -806,7 +806,7 @@ export default function ClientFormModal({
|
||||
>
|
||||
{clientIps.length > 0 ? (
|
||||
<div style={{ maxHeight: 360, overflowY: 'auto' }}>
|
||||
{clientIps.map((ip, idx) => (
|
||||
{clientIps.map((entry, idx) => (
|
||||
<Tag
|
||||
key={idx}
|
||||
color="blue"
|
||||
@ -819,7 +819,10 @@ export default function ClientFormModal({
|
||||
fontFamily: 'ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace',
|
||||
}}
|
||||
>
|
||||
{ip}
|
||||
{entry.ip}{entry.time ? ` (${entry.time})` : ''}
|
||||
{entry.node ? (
|
||||
<span style={{ marginInlineStart: 6, opacity: 0.85, fontWeight: 600 }}>@ {entry.node}</span>
|
||||
) : null}
|
||||
</Tag>
|
||||
))}
|
||||
</div>
|
||||
|
||||
@ -5,6 +5,7 @@ import { CopyOutlined, EyeOutlined, QrcodeOutlined, ReloadOutlined } from '@ant-
|
||||
|
||||
import { ClipboardManager, HttpUtil, IntlUtil, SizeFormatter } from '@/utils';
|
||||
import { formatInboundLabel } from '@/lib/inbounds/label';
|
||||
import { normalizeClientIps, type ClientIpInfo } from '@/lib/clients/ip-log';
|
||||
import { useDatepicker } from '@/hooks/useDatepicker';
|
||||
import type { ClientRecord, InboundOption } from '@/hooks/useClients';
|
||||
import { isPostQuantumLink } from '@/lib/xray/inbound-link';
|
||||
@ -80,7 +81,7 @@ export default function ClientInfoModal({
|
||||
const dateLabel = (ts?: number) => (!ts || ts <= 0 ? '-' : IntlUtil.formatDate(ts, datepicker));
|
||||
const [messageApi, messageContextHolder] = message.useMessage();
|
||||
const [links, setLinks] = useState<string[]>([]);
|
||||
const [clientIps, setClientIps] = useState<string[]>([]);
|
||||
const [clientIps, setClientIps] = useState<ClientIpInfo[]>([]);
|
||||
const [ipsLoading, setIpsLoading] = useState(false);
|
||||
const [ipsClearing, setIpsClearing] = useState(false);
|
||||
const [ipsModalOpen, setIpsModalOpen] = useState(false);
|
||||
@ -144,8 +145,7 @@ export default function ClientInfoModal({
|
||||
try {
|
||||
const msg = await HttpUtil.post(`/panel/api/clients/ips/${encodeURIComponent(client.email)}`) as ApiMsg<unknown[]>;
|
||||
if (!msg?.success) { setClientIps([]); return; }
|
||||
const arr = Array.isArray(msg.obj) ? msg.obj : [];
|
||||
setClientIps(arr.filter((x): x is string => typeof x === 'string' && x.length > 0));
|
||||
setClientIps(normalizeClientIps(msg.obj));
|
||||
} finally {
|
||||
setIpsLoading(false);
|
||||
}
|
||||
@ -503,7 +503,7 @@ export default function ClientInfoModal({
|
||||
>
|
||||
{clientIps.length > 0 ? (
|
||||
<div style={{ maxHeight: 360, overflowY: 'auto' }}>
|
||||
{clientIps.map((ip, idx) => (
|
||||
{clientIps.map((entry, idx) => (
|
||||
<Tag
|
||||
key={idx}
|
||||
color="blue"
|
||||
@ -516,7 +516,10 @@ export default function ClientInfoModal({
|
||||
fontFamily: 'ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, monospace',
|
||||
}}
|
||||
>
|
||||
{ip}
|
||||
{entry.ip}{entry.time ? ` (${entry.time})` : ''}
|
||||
{entry.node ? (
|
||||
<span style={{ marginInlineStart: 6, opacity: 0.85, fontWeight: 600 }}>@ {entry.node}</span>
|
||||
) : null}
|
||||
</Tag>
|
||||
))}
|
||||
</div>
|
||||
|
||||
@ -73,6 +73,7 @@ func initModels() error {
|
||||
&model.ClientGroup{},
|
||||
&model.InboundFallback{},
|
||||
&model.NodeClientTraffic{},
|
||||
&model.NodeClientIp{},
|
||||
&model.ClientGlobalTraffic{},
|
||||
&model.OutboundSubscription{},
|
||||
}
|
||||
|
||||
@ -50,6 +50,7 @@ func migrationModels() []any {
|
||||
&model.ClientExternalLink{},
|
||||
&model.InboundFallback{},
|
||||
&model.NodeClientTraffic{},
|
||||
&model.NodeClientIp{},
|
||||
&model.OutboundSubscription{},
|
||||
}
|
||||
}
|
||||
|
||||
27
internal/database/model/node_client_ip.go
Normal file
27
internal/database/model/node_client_ip.go
Normal file
@ -0,0 +1,27 @@
|
||||
package model
|
||||
|
||||
// ClientIpEntry is the wire/JSON shape of a single observed client IP with the
|
||||
// last time it was seen (unix seconds). It mirrors job.IPWithTimestamp and the
|
||||
// service-internal clientIpEntry so the per-node attribution blob round-trips
|
||||
// with the existing inbound_client_ips storage.
|
||||
type ClientIpEntry struct {
|
||||
IP string `json:"ip"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
|
||||
// NodeClientIp records which panel (identified by its stable panelGuid) observed
|
||||
// a client's IPs on its own Xray. Unlike InboundClientIps (a flattened,
|
||||
// cluster-wide union used for IP-limit counting and that is pushed back to every
|
||||
// node), this table preserves attribution: it never mixes in IPs a parent pushed
|
||||
// down, so the master can tell exactly which node a given IP is connecting to.
|
||||
//
|
||||
// Rows under the local panel's own panelGuid are written by check_client_ip_job
|
||||
// from local Xray observations; rows under remote guids are merged in by the node
|
||||
// sync job from each node's clientIpsByGuid report (its own panelGuid subtree plus
|
||||
// any descendants), so attribution survives across a chain of nodes.
|
||||
type NodeClientIp struct {
|
||||
Id int `json:"id" gorm:"primaryKey;autoIncrement"`
|
||||
NodeGuid string `json:"nodeGuid" gorm:"uniqueIndex:idx_nodeip_guid_email,priority:1;not null"`
|
||||
Email string `json:"email" gorm:"uniqueIndex:idx_nodeip_guid_email,priority:2;not null"`
|
||||
Ips string `json:"ips"`
|
||||
}
|
||||
@ -1,11 +1,8 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
|
||||
"github.com/mhsanaei/3x-ui/v3/internal/web/service"
|
||||
@ -74,6 +71,7 @@ func (a *ClientController) initRouter(g *gin.RouterGroup) {
|
||||
g.POST("/clearIps/:email", a.clearIps)
|
||||
g.POST("/onlines", a.onlines)
|
||||
g.POST("/onlinesByGuid", a.onlinesByGuid)
|
||||
g.POST("/clientIpsByGuid", a.clientIpsByGuid)
|
||||
g.POST("/activeInbounds", a.activeInbounds)
|
||||
g.POST("/lastOnline", a.lastOnline)
|
||||
}
|
||||
@ -402,38 +400,13 @@ func (a *ClientController) updateTrafficByEmail(c *gin.Context) {
|
||||
|
||||
func (a *ClientController) getIps(c *gin.Context) {
|
||||
email := c.Param("email")
|
||||
ips, err := a.inboundService.GetInboundClientIps(email)
|
||||
if err != nil || ips == "" {
|
||||
jsonObj(c, "No IP Record", nil)
|
||||
return
|
||||
}
|
||||
type ipWithTimestamp struct {
|
||||
IP string `json:"ip"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
var ipsWithTime []ipWithTimestamp
|
||||
if err := json.Unmarshal([]byte(ips), &ipsWithTime); err == nil && len(ipsWithTime) > 0 {
|
||||
formatted := make([]string, 0, len(ipsWithTime))
|
||||
for _, item := range ipsWithTime {
|
||||
if item.IP == "" {
|
||||
continue
|
||||
}
|
||||
if item.Timestamp > 0 {
|
||||
ts := time.Unix(item.Timestamp, 0).Local().Format("2006-01-02 15:04:05")
|
||||
formatted = append(formatted, fmt.Sprintf("%s (%s)", item.IP, ts))
|
||||
continue
|
||||
}
|
||||
formatted = append(formatted, item.IP)
|
||||
}
|
||||
jsonObj(c, formatted, nil)
|
||||
return
|
||||
}
|
||||
var oldIps []string
|
||||
if err := json.Unmarshal([]byte(ips), &oldIps); err == nil && len(oldIps) > 0 {
|
||||
jsonObj(c, oldIps, nil)
|
||||
return
|
||||
}
|
||||
jsonObj(c, ips, nil)
|
||||
infos, err := a.inboundService.GetClientIpsWithNodes(email)
|
||||
jsonObj(c, infos, err)
|
||||
}
|
||||
|
||||
func (a *ClientController) clientIpsByGuid(c *gin.Context) {
|
||||
data, err := a.inboundService.GetClientIpsByGuid()
|
||||
jsonObj(c, data, err)
|
||||
}
|
||||
|
||||
func (a *ClientController) clearIps(c *gin.Context) {
|
||||
|
||||
@ -252,6 +252,10 @@ func (j *CheckClientIpJob) processLogFile(enforce bool) bool {
|
||||
// hours ago is still live even though its timestamp is old.
|
||||
func (j *CheckClientIpJob) processObserved(observed map[string]map[string]int64, enforce, observedAreLive bool) bool {
|
||||
shouldCleanLog := false
|
||||
now := time.Now().Unix()
|
||||
// attribution accumulates this scan's local observations per email so they can
|
||||
// be recorded under this panel's own guid for cross-node IP attribution.
|
||||
attribution := make(map[string][]model.ClientIpEntry, len(observed))
|
||||
for email, ipTimestamps := range observed {
|
||||
|
||||
// The observations can still reference a client that was just renamed
|
||||
@ -271,8 +275,20 @@ func (j *CheckClientIpJob) processObserved(observed map[string]map[string]int64,
|
||||
|
||||
// Convert to IPWithTimestamp slice
|
||||
ipsWithTime := make([]IPWithTimestamp, 0, len(ipTimestamps))
|
||||
attrEntries := make([]model.ClientIpEntry, 0, len(ipTimestamps))
|
||||
for ip, timestamp := range ipTimestamps {
|
||||
ipsWithTime = append(ipsWithTime, IPWithTimestamp{IP: ip, Timestamp: timestamp})
|
||||
// Live API observations may carry an old lastSeen (connection start),
|
||||
// so stamp attribution with now; otherwise the stale cutoff would evict
|
||||
// an IP that is connected right now.
|
||||
attrTs := timestamp
|
||||
if observedAreLive {
|
||||
attrTs = now
|
||||
}
|
||||
attrEntries = append(attrEntries, model.ClientIpEntry{IP: ip, Timestamp: attrTs})
|
||||
}
|
||||
if len(attrEntries) > 0 {
|
||||
attribution[email] = attrEntries
|
||||
}
|
||||
|
||||
clientIpsRecord, err := j.getInboundClientIps(email)
|
||||
@ -284,9 +300,27 @@ func (j *CheckClientIpJob) processObserved(observed map[string]map[string]int64,
|
||||
shouldCleanLog = j.updateInboundClientIps(clientIpsRecord, inbound, email, ipsWithTime, enforce, observedAreLive) || shouldCleanLog
|
||||
}
|
||||
|
||||
j.recordLocalAttribution(attribution)
|
||||
|
||||
return shouldCleanLog
|
||||
}
|
||||
|
||||
// recordLocalAttribution stores this scan's local observations under this panel's
|
||||
// own guid so a parent panel can attribute each IP to the node it is on.
|
||||
// Best-effort: attribution is advisory and must never block IP-limit enforcement.
|
||||
func (j *CheckClientIpJob) recordLocalAttribution(attribution map[string][]model.ClientIpEntry) {
|
||||
if len(attribution) == 0 {
|
||||
return
|
||||
}
|
||||
guid, err := (&service.SettingService{}).GetPanelGuid()
|
||||
if err != nil || guid == "" {
|
||||
return
|
||||
}
|
||||
if err := (&service.InboundService{}).RecordLocalClientIps(guid, attribution); err != nil {
|
||||
logger.Debug("[LimitIP] record local ip attribution failed:", err)
|
||||
}
|
||||
}
|
||||
|
||||
// mergeClientIps folds this scan's observations into the persisted set,
|
||||
// dropping entries older than staleCutoff. newAlwaysLive exempts the new
|
||||
// entries from that cutoff: an API-observed IP is a live connection by
|
||||
|
||||
@ -294,4 +294,15 @@ func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSy
|
||||
logger.Warning("node traffic sync: push client ips to", n.Name, "failed:", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Per-node IP attribution: pull the node's guid-keyed subtree (its own
|
||||
// observations plus any descendants) so the master can tell which node each
|
||||
// IP is on. Old nodes without the endpoint just return an error — skip them.
|
||||
if guidTrees, err := rt.FetchClientIpsByGuid(ctx); err != nil {
|
||||
logger.Debug("node traffic sync: fetch client ip attribution from", n.Name, "failed:", err)
|
||||
} else if len(guidTrees) > 0 {
|
||||
if err := j.inboundService.MergeClientIpsByGuid(guidTrees); err != nil {
|
||||
logger.Warning("node traffic sync: merge client ip attribution from", n.Name, "failed:", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -617,3 +617,21 @@ func (r *Remote) PushAllClientIps(ctx context.Context, ips []model.InboundClient
|
||||
_, err := r.do(ctx, http.MethodPost, "panel/api/server/clientIps", ips)
|
||||
return err
|
||||
}
|
||||
|
||||
// FetchClientIpsByGuid pulls the node's per-node IP attribution subtree
|
||||
// (guid -> email -> observed IPs). Unlike FetchAllClientIps (the flat union the
|
||||
// master also pushes back), this preserves which physical node each IP is on.
|
||||
// Returns an empty map for older nodes that lack the endpoint.
|
||||
func (r *Remote) FetchClientIpsByGuid(ctx context.Context) (map[string]map[string][]model.ClientIpEntry, error) {
|
||||
env, err := r.do(ctx, http.MethodPost, "panel/api/clients/clientIpsByGuid", nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := map[string]map[string][]model.ClientIpEntry{}
|
||||
if len(env.Obj) > 0 {
|
||||
if err := json.Unmarshal(env.Obj, &out); err != nil {
|
||||
return nil, fmt.Errorf("decode client ips by guid: %w", err)
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
269
internal/web/service/inbound_node_ips.go
Normal file
269
internal/web/service/inbound_node_ips.go
Normal file
@ -0,0 +1,269 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/mhsanaei/3x-ui/v3/internal/database"
|
||||
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
|
||||
|
||||
"gorm.io/gorm/clause"
|
||||
)
|
||||
|
||||
// node_client_ips.go implements per-node client-IP attribution. The flat
|
||||
// inbound_client_ips table is a cluster-wide union (used for IP-limit counting
|
||||
// and pushed back to every node), so it cannot tell which node a given IP is
|
||||
// on. NodeClientIp keeps that attribution: each panel records its own Xray
|
||||
// observations under its panelGuid, and the master merges every node's
|
||||
// guid-keyed report — never mixing in IPs a parent pushed down.
|
||||
|
||||
// mergeModelClientIpEntries unions old and incoming observations, drops anything
|
||||
// older than cutoff, keeps the newest timestamp per IP, and sorts newest-first.
|
||||
// It mirrors mergeClientIpEntries but operates on the exported wire type.
|
||||
func mergeModelClientIpEntries(old, incoming []model.ClientIpEntry, cutoff int64) []model.ClientIpEntry {
|
||||
ipMap := make(map[string]int64, len(old)+len(incoming))
|
||||
for _, e := range old {
|
||||
if e.IP == "" || e.Timestamp < cutoff {
|
||||
continue
|
||||
}
|
||||
ipMap[e.IP] = e.Timestamp
|
||||
}
|
||||
for _, e := range incoming {
|
||||
if e.IP == "" || e.Timestamp < cutoff {
|
||||
continue
|
||||
}
|
||||
if cur, ok := ipMap[e.IP]; !ok || e.Timestamp > cur {
|
||||
ipMap[e.IP] = e.Timestamp
|
||||
}
|
||||
}
|
||||
out := make([]model.ClientIpEntry, 0, len(ipMap))
|
||||
for ip, ts := range ipMap {
|
||||
out = append(out, model.ClientIpEntry{IP: ip, Timestamp: ts})
|
||||
}
|
||||
sort.Slice(out, func(i, j int) bool { return out[i].Timestamp > out[j].Timestamp })
|
||||
return out
|
||||
}
|
||||
|
||||
// upsertNodeClientIps folds a guid's per-email observations into NodeClientIp,
|
||||
// merging with whatever is already stored for that (guid, email) and dropping
|
||||
// stale entries. Empty merged results delete the row so the table stays bounded.
|
||||
func upsertNodeClientIps(guid string, perEmail map[string][]model.ClientIpEntry) error {
|
||||
if guid == "" || len(perEmail) == 0 {
|
||||
return nil
|
||||
}
|
||||
db := database.GetDB()
|
||||
cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
|
||||
|
||||
var existing []model.NodeClientIp
|
||||
if err := db.Where("node_guid = ?", guid).Find(&existing).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
existingByEmail := make(map[string]*model.NodeClientIp, len(existing))
|
||||
for i := range existing {
|
||||
existingByEmail[existing[i].Email] = &existing[i]
|
||||
}
|
||||
|
||||
tx := db.Begin()
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
tx.Rollback()
|
||||
}
|
||||
}()
|
||||
|
||||
for email, incoming := range perEmail {
|
||||
if email == "" {
|
||||
continue
|
||||
}
|
||||
var old []model.ClientIpEntry
|
||||
if cur, ok := existingByEmail[email]; ok && cur.Ips != "" {
|
||||
_ = json.Unmarshal([]byte(cur.Ips), &old)
|
||||
}
|
||||
merged := mergeModelClientIpEntries(old, incoming, cutoff)
|
||||
if len(merged) == 0 {
|
||||
// Nothing fresh: drop any stale row so attribution doesn't linger.
|
||||
if _, ok := existingByEmail[email]; ok {
|
||||
if err := tx.Where("node_guid = ? AND email = ?", guid, email).
|
||||
Delete(&model.NodeClientIp{}).Error; err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
b, _ := json.Marshal(merged)
|
||||
row := model.NodeClientIp{NodeGuid: guid, Email: email, Ips: string(b)}
|
||||
if err := tx.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "node_guid"}, {Name: "email"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"ips"}),
|
||||
}).Create(&row).Error; err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
}
|
||||
return tx.Commit().Error
|
||||
}
|
||||
|
||||
// RecordLocalClientIps stores this panel's own Xray observations under its
|
||||
// panelGuid. Called by check_client_ip_job each scan with the live per-email IPs
|
||||
// the local core reported.
|
||||
func (s *InboundService) RecordLocalClientIps(panelGuid string, observed map[string][]model.ClientIpEntry) error {
|
||||
return upsertNodeClientIps(panelGuid, observed)
|
||||
}
|
||||
|
||||
// MergeClientIpsByGuid folds a node's guid-keyed attribution report (its own
|
||||
// panelGuid subtree plus any descendants) into the local table, preserving which
|
||||
// physical node each IP is on across a chain.
|
||||
func (s *InboundService) MergeClientIpsByGuid(trees map[string]map[string][]model.ClientIpEntry) error {
|
||||
for guid, perEmail := range trees {
|
||||
if err := upsertNodeClientIps(guid, perEmail); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetClientIpsByGuid returns this panel's full attribution subtree (guid -> email
|
||||
// -> fresh IPs), dropping stale entries. It is what the clientIpsByGuid endpoint
|
||||
// serves to a parent panel.
|
||||
func (s *InboundService) GetClientIpsByGuid() (map[string]map[string][]model.ClientIpEntry, error) {
|
||||
db := database.GetDB()
|
||||
var rows []model.NodeClientIp
|
||||
if err := db.Find(&rows).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
|
||||
out := make(map[string]map[string][]model.ClientIpEntry)
|
||||
for _, row := range rows {
|
||||
if row.NodeGuid == "" || row.Email == "" || row.Ips == "" {
|
||||
continue
|
||||
}
|
||||
var entries []model.ClientIpEntry
|
||||
if err := json.Unmarshal([]byte(row.Ips), &entries); err != nil {
|
||||
continue
|
||||
}
|
||||
fresh := mergeModelClientIpEntries(nil, entries, cutoff)
|
||||
if len(fresh) == 0 {
|
||||
continue
|
||||
}
|
||||
if out[row.NodeGuid] == nil {
|
||||
out[row.NodeGuid] = make(map[string][]model.ClientIpEntry)
|
||||
}
|
||||
out[row.NodeGuid][row.Email] = fresh
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// GetClientIpNodeAttribution returns, for one client email, a map of IP -> the
|
||||
// guid that most recently observed it (within the stale window). Used to label
|
||||
// each IP in the panel with the node it is connecting to.
|
||||
func (s *InboundService) GetClientIpNodeAttribution(email string) (map[string]string, error) {
|
||||
db := database.GetDB()
|
||||
var rows []model.NodeClientIp
|
||||
if err := db.Where("email = ?", email).Find(&rows).Error; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
|
||||
ipGuid := make(map[string]string)
|
||||
ipTs := make(map[string]int64)
|
||||
for _, row := range rows {
|
||||
if row.NodeGuid == "" || row.Ips == "" {
|
||||
continue
|
||||
}
|
||||
var entries []model.ClientIpEntry
|
||||
if err := json.Unmarshal([]byte(row.Ips), &entries); err != nil {
|
||||
continue
|
||||
}
|
||||
for _, e := range entries {
|
||||
if e.IP == "" || e.Timestamp < cutoff {
|
||||
continue
|
||||
}
|
||||
if cur, ok := ipTs[e.IP]; !ok || e.Timestamp > cur {
|
||||
ipTs[e.IP] = e.Timestamp
|
||||
ipGuid[e.IP] = row.NodeGuid
|
||||
}
|
||||
}
|
||||
}
|
||||
return ipGuid, nil
|
||||
}
|
||||
|
||||
// ClientIpInfo is one IP shown in the panel's per-client IP log, labelled with
|
||||
// the node it is connecting through ("" = this local panel).
|
||||
type ClientIpInfo struct {
|
||||
IP string `json:"ip"`
|
||||
Time string `json:"time"`
|
||||
Node string `json:"node"`
|
||||
}
|
||||
|
||||
// GetClientIpsWithNodes returns a client's recorded IPs (from the flat
|
||||
// inbound_client_ips display set) annotated with the node each IP is on, using
|
||||
// the per-node attribution table. Local IPs (and any IP without attribution)
|
||||
// carry an empty Node.
|
||||
func (s *InboundService) GetClientIpsWithNodes(email string) ([]ClientIpInfo, error) {
|
||||
raw, err := s.GetInboundClientIps(email)
|
||||
if err != nil || raw == "" {
|
||||
// Record-not-found (or empty) is "no IPs", not an error for the UI.
|
||||
return []ClientIpInfo{}, nil
|
||||
}
|
||||
|
||||
var entries []model.ClientIpEntry
|
||||
if jerr := json.Unmarshal([]byte(raw), &entries); jerr != nil || len(entries) == 0 {
|
||||
// Legacy shape: a plain JSON array of IP strings.
|
||||
var oldIps []string
|
||||
if json.Unmarshal([]byte(raw), &oldIps) == nil {
|
||||
entries = entries[:0]
|
||||
for _, ip := range oldIps {
|
||||
entries = append(entries, model.ClientIpEntry{IP: ip})
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(entries) == 0 {
|
||||
return []ClientIpInfo{}, nil
|
||||
}
|
||||
|
||||
attr, _ := s.GetClientIpNodeAttribution(email)
|
||||
guidName := s.nodeGuidNameMap()
|
||||
localGuid, _ := (&SettingService{}).GetPanelGuid()
|
||||
|
||||
out := make([]ClientIpInfo, 0, len(entries))
|
||||
for _, e := range entries {
|
||||
if e.IP == "" {
|
||||
continue
|
||||
}
|
||||
info := ClientIpInfo{IP: e.IP}
|
||||
if e.Timestamp > 0 {
|
||||
info.Time = time.Unix(e.Timestamp, 0).Local().Format("2006-01-02 15:04:05")
|
||||
}
|
||||
if guid, ok := attr[e.IP]; ok && guid != "" && guid != localGuid {
|
||||
info.Node = guidName[guid]
|
||||
}
|
||||
out = append(out, info)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// nodeGuidNameMap maps each known node's stable guid to its display name.
|
||||
func (s *InboundService) nodeGuidNameMap() map[string]string {
|
||||
db := database.GetDB()
|
||||
var nodes []model.Node
|
||||
if err := db.Model(&model.Node{}).Find(&nodes).Error; err != nil {
|
||||
return map[string]string{}
|
||||
}
|
||||
m := make(map[string]string, len(nodes))
|
||||
for _, n := range nodes {
|
||||
if n.Guid != "" {
|
||||
m[n.Guid] = n.Name
|
||||
}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// DeleteNodeClientIpsByGuid removes all attribution rows for a guid (e.g. when a
|
||||
// node is deleted) so its IPs stop being reported and counted.
|
||||
func (s *InboundService) DeleteNodeClientIpsByGuid(guid string) error {
|
||||
if guid == "" {
|
||||
return nil
|
||||
}
|
||||
db := database.GetDB()
|
||||
return db.Where("node_guid = ?", guid).Delete(&model.NodeClientIp{}).Error
|
||||
}
|
||||
168
internal/web/service/inbound_node_ips_test.go
Normal file
168
internal/web/service/inbound_node_ips_test.go
Normal file
@ -0,0 +1,168 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mhsanaei/3x-ui/v3/internal/database"
|
||||
"github.com/mhsanaei/3x-ui/v3/internal/database/model"
|
||||
)
|
||||
|
||||
func TestRecordLocalClientIps_RoundTripByGuid(t *testing.T) {
|
||||
setupClientIpTestDB(t)
|
||||
now := time.Now().Unix()
|
||||
svc := &InboundService{}
|
||||
|
||||
if err := svc.RecordLocalClientIps("guid-A", map[string][]model.ClientIpEntry{
|
||||
"u@x": {{IP: "1.1.1.1", Timestamp: now}, {IP: "2.2.2.2", Timestamp: now - 10}},
|
||||
}); err != nil {
|
||||
t.Fatalf("record: %v", err)
|
||||
}
|
||||
|
||||
trees, err := svc.GetClientIpsByGuid()
|
||||
if err != nil {
|
||||
t.Fatalf("byGuid: %v", err)
|
||||
}
|
||||
got := trees["guid-A"]["u@x"]
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("want 2 entries, got %v", got)
|
||||
}
|
||||
if got[0].IP != "1.1.1.1" { // newest-first ordering
|
||||
t.Fatalf("want newest first, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecordLocalClientIps_MergesAndDropsStale(t *testing.T) {
|
||||
setupClientIpTestDB(t)
|
||||
now := time.Now().Unix()
|
||||
svc := &InboundService{}
|
||||
|
||||
if err := svc.RecordLocalClientIps("g", map[string][]model.ClientIpEntry{
|
||||
"u@x": {{IP: "keep", Timestamp: now - 60}},
|
||||
}); err != nil {
|
||||
t.Fatalf("record 1: %v", err)
|
||||
}
|
||||
// Second scan refreshes keep, adds a stale entry (must be dropped) and a fresh one.
|
||||
if err := svc.RecordLocalClientIps("g", map[string][]model.ClientIpEntry{
|
||||
"u@x": {{IP: "keep", Timestamp: now}, {IP: "stale", Timestamp: now - 4000}, {IP: "new", Timestamp: now - 5}},
|
||||
}); err != nil {
|
||||
t.Fatalf("record 2: %v", err)
|
||||
}
|
||||
|
||||
trees, _ := svc.GetClientIpsByGuid()
|
||||
got := map[string]int64{}
|
||||
for _, e := range trees["g"]["u@x"] {
|
||||
got[e.IP] = e.Timestamp
|
||||
}
|
||||
if got["keep"] != now {
|
||||
t.Fatalf("keep should refresh to now: %v", got)
|
||||
}
|
||||
if _, ok := got["stale"]; ok {
|
||||
t.Fatalf("stale entry should be dropped: %v", got)
|
||||
}
|
||||
if got["new"] != now-5 {
|
||||
t.Fatalf("new missing: %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpsertNodeClientIps_EmptyMergeDeletesRow(t *testing.T) {
|
||||
setupClientIpTestDB(t)
|
||||
now := time.Now().Unix()
|
||||
db := database.GetDB()
|
||||
svc := &InboundService{}
|
||||
|
||||
// Seed an already-stale row, then record another all-stale observation: the
|
||||
// merge yields nothing fresh, so the row must be removed (not left lingering).
|
||||
staleIps, _ := json.Marshal([]model.ClientIpEntry{{IP: "old", Timestamp: now - 999999}})
|
||||
if err := db.Create(&model.NodeClientIp{NodeGuid: "g", Email: "u@x", Ips: string(staleIps)}).Error; err != nil {
|
||||
t.Fatalf("seed: %v", err)
|
||||
}
|
||||
if err := svc.RecordLocalClientIps("g", map[string][]model.ClientIpEntry{
|
||||
"u@x": {{IP: "old2", Timestamp: now - 999999}},
|
||||
}); err != nil {
|
||||
t.Fatalf("record: %v", err)
|
||||
}
|
||||
|
||||
var count int64
|
||||
database.GetDB().Model(&model.NodeClientIp{}).
|
||||
Where("node_guid = ? AND email = ?", "g", "u@x").Count(&count)
|
||||
if count != 0 {
|
||||
t.Fatalf("row should be deleted when merge is empty, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetClientIpNodeAttribution_NewestGuidWins(t *testing.T) {
|
||||
setupClientIpTestDB(t)
|
||||
now := time.Now().Unix()
|
||||
svc := &InboundService{}
|
||||
|
||||
// Same IP observed on two panels; the most recent observation attributes it.
|
||||
if err := svc.RecordLocalClientIps("gA", map[string][]model.ClientIpEntry{
|
||||
"u@x": {{IP: "9.9.9.9", Timestamp: now - 100}},
|
||||
}); err != nil {
|
||||
t.Fatalf("record gA: %v", err)
|
||||
}
|
||||
if err := svc.MergeClientIpsByGuid(map[string]map[string][]model.ClientIpEntry{
|
||||
"gB": {"u@x": {{IP: "9.9.9.9", Timestamp: now}}},
|
||||
}); err != nil {
|
||||
t.Fatalf("merge gB: %v", err)
|
||||
}
|
||||
|
||||
attr, err := svc.GetClientIpNodeAttribution("u@x")
|
||||
if err != nil {
|
||||
t.Fatalf("attribution: %v", err)
|
||||
}
|
||||
if attr["9.9.9.9"] != "gB" {
|
||||
t.Fatalf("newest guid should win, got %q", attr["9.9.9.9"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetClientIpsWithNodes_LabelsNodes(t *testing.T) {
|
||||
setupClientIpTestDB(t)
|
||||
now := time.Now().Unix()
|
||||
db := database.GetDB()
|
||||
svc := &InboundService{}
|
||||
|
||||
panelGuid, err := (&SettingService{}).GetPanelGuid()
|
||||
if err != nil || panelGuid == "" {
|
||||
t.Fatalf("panel guid: %v", err)
|
||||
}
|
||||
|
||||
if err := db.Create(&model.Node{Name: "edge-1", Guid: "node-guid", Address: "x", Port: 2053, ApiToken: "t"}).Error; err != nil {
|
||||
t.Fatalf("seed node: %v", err)
|
||||
}
|
||||
|
||||
// Flat display set (what the IP-log lists) holds both IPs.
|
||||
flat, _ := json.Marshal([]model.ClientIpEntry{{IP: "1.1.1.1", Timestamp: now}, {IP: "2.2.2.2", Timestamp: now}})
|
||||
if err := db.Create(&model.InboundClientIps{ClientEmail: "u@x", Ips: string(flat)}).Error; err != nil {
|
||||
t.Fatalf("seed flat ips: %v", err)
|
||||
}
|
||||
|
||||
// Attribution: 1.1.1.1 seen locally, 2.2.2.2 seen on the node.
|
||||
if err := svc.RecordLocalClientIps(panelGuid, map[string][]model.ClientIpEntry{
|
||||
"u@x": {{IP: "1.1.1.1", Timestamp: now}},
|
||||
}); err != nil {
|
||||
t.Fatalf("record local: %v", err)
|
||||
}
|
||||
if err := svc.MergeClientIpsByGuid(map[string]map[string][]model.ClientIpEntry{
|
||||
"node-guid": {"u@x": {{IP: "2.2.2.2", Timestamp: now}}},
|
||||
}); err != nil {
|
||||
t.Fatalf("merge node: %v", err)
|
||||
}
|
||||
|
||||
infos, err := svc.GetClientIpsWithNodes("u@x")
|
||||
if err != nil {
|
||||
t.Fatalf("getIpsWithNodes: %v", err)
|
||||
}
|
||||
byIP := map[string]string{}
|
||||
for _, in := range infos {
|
||||
byIP[in.IP] = in.Node
|
||||
}
|
||||
if byIP["1.1.1.1"] != "" {
|
||||
t.Fatalf("local IP should have empty node, got %q", byIP["1.1.1.1"])
|
||||
}
|
||||
if byIP["2.2.2.2"] != "edge-1" {
|
||||
t.Fatalf("node IP should be labelled edge-1, got %q", byIP["2.2.2.2"])
|
||||
}
|
||||
}
|
||||
@ -433,12 +433,24 @@ func FilterNodeSnapshot(n *model.Node, snap *runtime.TrafficSnapshot) {
|
||||
|
||||
func (s *NodeService) Delete(id int) error {
|
||||
db := database.GetDB()
|
||||
// Capture the node's guid before deleting the row so we can drop its per-node
|
||||
// IP attribution (NodeClientIp is keyed by guid, not node id).
|
||||
var guid string
|
||||
var n model.Node
|
||||
if err := db.Select("guid").Where("id = ?", id).First(&n).Error; err == nil {
|
||||
guid = n.Guid
|
||||
}
|
||||
if err := db.Where("id = ?", id).Delete(model.Node{}).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if err := db.Where("node_id = ?", id).Delete(&model.NodeClientTraffic{}).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
if guid != "" {
|
||||
if err := db.Where("node_guid = ?", guid).Delete(&model.NodeClientIp{}).Error; err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if mgr := runtime.GetManager(); mgr != nil {
|
||||
mgr.InvalidateNode(id)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user