-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathdb.mjs
142 lines (125 loc) · 3.92 KB
/
db.mjs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import pg from 'pg'
import { AppEnv } from './env.mjs'
import { logger } from './logger.mjs'
import { whereBuilder } from './util.mjs'
const { Client, Pool } = pg
import dayjs from 'dayjs'
const pool = new Pool({
user: AppEnv.DBUser,
password: AppEnv.DBPasswd,
host: AppEnv.DBAddr,
port: AppEnv.DBPort,
database: AppEnv.DBName,
idleTimeoutMillis: 30000,
max: 20,
connectionTimeoutMillis: 2000,
})
function getTableNameByDay(day) {
let today = dayjs().format('YYYY-MM-DD')
if (day === today) {
return 'records'
}
return `records_${day.replaceAll('-', '')}`
}
export async function tableSplit() {
let yestoday = dayjs().subtract(1, 'day').format("YYYYMMDD")
let today = dayjs().format("YYYYMMDD")
logger.info('tableSplit')
const client = await pool.connect()
try {
await client.query('BEGIN')
await client.query('CREATE table if not exists records_tmp (LIKE public.records INCLUDING all)')
await client.query(`ALTER TABLE records RENAME TO records_${yestoday}`)
await client.query('ALTER TABLE records_tmp RENAME TO records')
await client.query(`CREATE SEQUENCE record_id_seq_${today} OWNED BY records.id`)
await client.query(`ALTER TABLE records ALTER COLUMN id SET DEFAULT nextval('record_id_seq_${today}')`)
await client.query('COMMIT')
} catch (e) {
await client.query('ROLLBACK')
throw e
} finally {
client.release()
}
}
export async function deleteTable() {
let res = await pool.query(`
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public' and
table_name like 'records_%'
order by table_name desc
offset ${AppEnv.dataKeepDays};
`)
if (res.rows.length === 0) {
logger.info("没有需要删除的表")
}
for (const ele of res.rows) {
console.log(ele.table_name)
logger.info(`try delete table ${ele.table_name}`)
await pool.query(`DROP TABLE IF EXISTS ${ele.table_name} CASCADE;`)
}
}
export async function queryRecord(c) {
logger.info(c)
let wh = whereBuilder(c)
const sql = `
select
sip_call_id as "CallID",
to_char(min(create_time),'HH24:MI:SS') as "startTime",
to_char(min(create_time),'YYYY-MM-DD') as "day",
to_char(max(create_time),'HH24:MI:SS') as "stopTime",
to_char(max(create_time) - min(create_time),'HH24:MI:SS') as "duration",
min(from_user) as "caller",
min(to_user) as "callee",
count(*)::int as "msgTotal",
max(user_agent) as "UA",
max(response_code)::int as "finalCode",
max(cseq_method) as "cseq_method",
max(leg_uid) as "uid",
max(src_host) as "srcHost",
max(dst_host) as "dstHost",
string_agg(DISTINCT CASE WHEN response_code BETWEEN 170 AND 190 THEN response_code::text END, ',') AS "tempCode"
from
public.${getTableNameByDay(c.day)}
where
${wh.join(' and ')}
group by sip_call_id
having count(*) >= ${c.msg_min}
order by "startTime" desc
limit ${AppEnv.QueryLimit}
`
logger.info(sql)
const res = await pool.query(sql)
return res
}
export async function queryById(id, day) {
const sql = `
select
sip_call_id,
sip_method,
to_char(create_time,
'YYYY-MM-DD HH24:MI:SS') as create_time,
timestamp_micro,
raw_msg,
cseq_number,
case
when sip_protocol = 6 then 'TCP'
when sip_protocol = 17 then 'UDP'
when sip_protocol = 22 then 'TLS'
when sip_protocol = 50 then 'ESP'
else 'Unknown'
end as sip_protocl,
replace(src_host,':','_') as src_host,
replace(dst_host,':','_') as dst_host,
response_desc,
length(raw_msg) as msg_len
from
public.${getTableNameByDay(day)}
where
sip_call_id = '${id}'
order by create_time , timestamp_micro
`
logger.info(sql)
const res = await pool.query(sql)
return res
}