首先,创建一个SQLite数据库,并在其中插入一些包含身份证号、手机号和姓名的数据。
1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE personal_info ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, id_card TEXT NOT NULL, phone_number TEXT NOT NULL ); INSERT INTO personal_info (name, id_card, phone_number) VALUES ('张三', '110101199001011234', '13912345678'), ('李四', '110101198501019876', '15998765432'), ('王五', '110101199801013456', '15876543210');
然后我们来编写数据脱敏函数。查询personal_info表中的数据,并对每条记录的姓名、身份证号和手机号进行脱敏处理。最后将脱敏后的数据更新回数据库。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import sqlite3import redef mask_id_card (id_card ): return id_card[:6 ] + '*' * 8 + id_card[-4 :] def mask_phone_number (phone_number ): return phone_number[:3 ] + '*' * 4 + phone_number[-4 :] def mask_name (name ): return name[0 ] + '*' * (len (name) - 1 ) def mask_db_data (): conn = sqlite3.connect('data.db' ) cursor = conn.cursor() cursor.execute('SELECT id, name, id_card, phone_number FROM personal_info' ) rows = cursor.fetchall() for row in rows: masked_name = mask_name(row[1 ]) masked_id_card = mask_id_card(row[2 ]) masked_phone_number = mask_phone_number(row[3 ]) cursor.execute('UPDATE personal_info SET name=?, id_card=?, phone_number=? WHERE id=?' , (masked_name, masked_id_card, masked_phone_number, row[0 ])) conn.commit() conn.close() if __name__ == "__main__" : mask_db_data()
在某些情况下,原始数据需要进行保留,这时候可以采用动态脱敏的方法(可能会导致查询性能降低,特别是在处理大量数据时)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import sqlite3import redef mask_id_card (id_card ): return id_card[:6 ] + '*' * 8 + id_card[-4 :] def mask_phone_number (phone_number ): return phone_number[:3 ] + '*' * 4 + phone_number[-4 :] def mask_name (name ): return name[0 ] + '*' * (len (name) - 1 ) def get_masked_data (query ): conn = sqlite3.connect('data.db' ) cursor = conn.cursor() cursor.execute(query) rows = cursor.fetchall() masked_rows = [] for row in rows: masked_name = mask_name(row[1 ]) masked_id_card = mask_id_card(row[2 ]) masked_phone_number = mask_phone_number(row[3 ]) masked_rows.append((row[0 ], masked_name, masked_id_card, masked_phone_number)) conn.close() return masked_rows if __name__ == "__main__" : query = 'SELECT id, name, id_card, phone_number FROM personal_info' masked_data = get_masked_data(query) for data in masked_data: print (data)