数据脱敏技术

首先,创建一个SQLite数据库,并在其中插入一些包含身份证号、手机号和姓名的数据。

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE personal_info (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
id_card TEXT NOT NULL,
phone_number TEXT NOT NULL
);

INSERT INTO personal_info (name, id_card, phone_number)
VALUES ('张三', '110101199001011234', '13912345678'),
('李四', '110101198501019876', '15998765432'),
('王五', '110101199801013456', '15876543210');

然后我们来编写数据脱敏函数。查询personal_info表中的数据,并对每条记录的姓名、身份证号和手机号进行脱敏处理。最后将脱敏后的数据更新回数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import sqlite3
import re

def mask_id_card(id_card):
return id_card[:6] + '*' * 8 + id_card[-4:]

def mask_phone_number(phone_number):
return phone_number[:3] + '*' * 4 + phone_number[-4:]

def mask_name(name):
return name[0] + '*' * (len(name) - 1)

def mask_db_data():
conn = sqlite3.connect('data.db')
cursor = conn.cursor()

cursor.execute('SELECT id, name, id_card, phone_number FROM personal_info')
rows = cursor.fetchall()

for row in rows:
masked_name = mask_name(row[1])
masked_id_card = mask_id_card(row[2])
masked_phone_number = mask_phone_number(row[3])

cursor.execute('UPDATE personal_info SET name=?, id_card=?, phone_number=? WHERE id=?', (masked_name, masked_id_card, masked_phone_number, row[0]))

conn.commit()
conn.close()

if __name__ == "__main__":
mask_db_data()

在某些情况下,原始数据需要进行保留,这时候可以采用动态脱敏的方法(可能会导致查询性能降低,特别是在处理大量数据时)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import sqlite3
import re

def mask_id_card(id_card):
return id_card[:6] + '*' * 8 + id_card[-4:]

def mask_phone_number(phone_number):
return phone_number[:3] + '*' * 4 + phone_number[-4:]

def mask_name(name):
return name[0] + '*' * (len(name) - 1)

def get_masked_data(query):
conn = sqlite3.connect('data.db')
cursor = conn.cursor()

cursor.execute(query)
rows = cursor.fetchall()

masked_rows = []
for row in rows:
masked_name = mask_name(row[1])
masked_id_card = mask_id_card(row[2])
masked_phone_number = mask_phone_number(row[3])
masked_rows.append((row[0], masked_name, masked_id_card, masked_phone_number))

conn.close()
return masked_rows

if __name__ == "__main__":
query = 'SELECT id, name, id_card, phone_number FROM personal_info'
masked_data = get_masked_data(query)
for data in masked_data:
print(data)