@@ 1,255 0,0 @@
-'''
-FCC ULS Database to SQLite Database Conversion Tool
- Version 0.1
-
-Copyright (c) 2023 Chris Goff <mailbag@chrisapproved.com>
-
-Permission to use, copy, modify, and distribute this software for any
-purpose with or without fee is hereby granted, provided that the above
-copyright notice and this permission notice appear in all copies.
-
-THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
-WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
-ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
-WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
-ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
-OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-'''
-
-'''
-Reference Documentation:
-ULS Databases:
- https://www.fcc.gov/wireless/data/public-access-files-database-downloads
-Valid ULS defnitions:
- https://www.fcc.gov/sites/default/files/public_access_database_definitions_sql_v6_0_0.txt
-Valid ULS record types
- https://www.fcc.gov/sites/default/files/pubacc_intro_11122014.pdf
-'''
-
-import os
-import re
-import csv
-import subprocess
-import argparse
-from tqdm import tqdm
-from concurrent.futures import ProcessPoolExecutor
-
-
-# Define function to parse SQL file and generate SQLite schema
-def parse_sql_file_to_sqlite_schema(sql_file_path):
- '''Parses SQL file and generates SQLite schema'''
-
- # Read the SQL file
- with open(sql_file_path, "r", encoding="windows-1252") as f:
- sql_text = f.read()
-
- # Define regex patterns
- table_pattern = re.compile(r"create table (dbo\..*?)\n(.*?)\n\)", re.DOTALL)
- column_pattern = re.compile(r"^\s*([a-zA-Z_][a-zA-Z0-9_]*)\s.*$", re.MULTILINE)
-
- # Extract table definitions
- table_matches = table_pattern.findall(sql_text)
-
- # Build dictionary of table definitions
- table_definitions = {}
- for table_match in table_matches:
- table_name = table_match[0]
- column_definitions = column_pattern.findall(table_match[1])
- table_definitions[table_name] = column_definitions
-
- # Transform into SQLite schema definitions
- sqlite_schemas = {}
- for table_name, column_names in table_definitions.items():
- sqlite_schemas[table_name] = ", ".join(column_names)
-
- return sqlite_schemas
-
-
-def get_record_types_from_table_names(table_names):
- '''Returns a list of record types from a list of table names'''
- record_types = [table_name.split("_")[-1].upper() for table_name in table_names]
- return record_types
-
-
-def add_definitions_to_file(record_type, definitions, directory, file_extension=".csv"):
- '''Adds definitions as header to a file'''
-
- # Generate file path
- file_path = os.path.join(directory, f"{record_type}{file_extension}")
-
- # Split definitions into list
- headers = definitions.split(", ")
-
- # Check if file already exists
- if os.path.isfile(file_path):
- # Read existing data into a list
- with open(file_path, "r", encoding="windows-1252") as f:
- existing_data = f.readlines()
-
- # If the file only contains headers, return without overwriting
- if len(existing_data) <= 1:
- return
-
- # Write headers and existing data to a temporary list
- data_to_write = [headers]
- for line in existing_data:
- data_to_write.append(line.strip().split("|"))
-
- # Write the combined data back to the file
- with open(file_path, "w", newline="", encoding="windows-1252") as f:
- writer = csv.writer(f, delimiter="|")
- writer.writerows(data_to_write)
- else:
- # Write headers to new file
- with open(file_path, "w", newline="", encoding="windows-1252") as f:
- writer = csv.writer(f, delimiter="|")
- writer.writerow(headers)
-
-
-def insert_csv_to_sqlite(db_name, table_name, csv_filename, delimiter):
- '''Inserts data from a CSV file into a SQLite database using the sqlite-utils command line tool'''
- command = [
- "sqlite-utils",
- "insert",
- db_name,
- table_name,
- csv_filename,
- "--csv",
- "--delimiter",
- delimiter,
- "--encoding=windows-1252",
- ]
- try:
- result = subprocess.run(command, capture_output=True, check=True, text=True)
- except subprocess.CalledProcessError as e:
- print(
- f"Error inserting data from {csv_filename} to {db_name}. Error message: {e.stderr}"
- )
- return
-
-
-def remove_files(valid_records):
- '''Removes files from the current directory'''
- for record in valid_records:
- csv_filename = f"{record}.csv"
- if os.path.exists(csv_filename):
- os.remove(csv_filename)
-
-
-def process_record(record, file_extension=".csv"):
- '''Processes a single record'''
- filename = f"{record}{file_extension}"
- definitions = sqlite_schemas.get(f"dbo.PUBACC_{record}", "Record type not found")
- add_definitions_to_file(record, definitions, directory, file_extension)
- insert_csv_to_sqlite(db_name, record, filename, delimiter)
-
-
-def process_dat_files():
- '''Processes all .DAT files in the current directory'''
-
- # Check for both .DAT and .dat files in the current directory
- dat_files = [f for f in os.listdir() if f.lower().endswith(".dat")]
-
- # Extract record type from filenames (assuming format: RECORDTYPE.dat or recordtype.DAT)
- record_types = [os.path.splitext(f)[0] for f in dat_files]
-
- # Process each .DAT file in a case-insensitive manner
- for record in tqdm(record_types, desc="Processing .DAT files"):
- process_record(record, file_extension=".dat")
-
-# Define dictionary for SQLite schema definitions
-files = {}
-
-# SQL definitions file path, used to determine valid record types
-sql_file_path = "public_access_database_definitions_sql_v6_0_0.txt"
-
-# Check if file exists
-if os.path.isfile(sql_file_path):
- # Parse SQL file
- sqlite_schemas = parse_sql_file_to_sqlite_schema(sql_file_path)
-else:
- print(f"File does not exist: {sql_file_path}")
-
-# Define command-line argument parser
-parser = argparse.ArgumentParser(description="Example program with -file option")
-# Argument for import/input file
-parser.add_argument(
- "-i",
- "--import_file",
- metavar="<filename>",
- type=str,
- help="FCC ULS pipe-delimited file to import",
-)
-# Argument for export file
-parser.add_argument(
- "-e",
- "--export_file",
- metavar="<filename>",
- type=str,
- help="SQLite database file to export",
-)
-# Argument parsing for detecting -d or --dat parameter for process_dat_files()
-parser.add_argument(
- "-d",
- "--dat",
- action="store_true",
- help="Process .DAT files in the current directory.",
-)
-
-args = parser.parse_args()
-
-directory = "./"
-db_name = args.export_file
-delimiter = "|"
-
-# Get valid record types
-valid_record_types = get_record_types_from_table_names(sqlite_schemas.keys())
-
-# Code to process single ULS database file
-if args.import_file:
- with open(args.import_file, "r", encoding="windows-1252") as in_file:
- for i, line in enumerate(in_file, start=1):
- fields = line.strip().split("|")
- record_type = fields[0]
-
- # If the record type is not two letters, skip this line
- if len(record_type) != 2 or not record_type.isalpha():
- continue
-
- # If the record type is not in the valid record types, skip this line
- if record_type not in valid_record_types:
- continue
-
- # If we haven't seen this record type before, open a new file
- if record_type not in files:
- files[record_type] = open(
- f"{record_type}.csv", "w", encoding="windows-1252"
- )
-
- # Write the line to the appropriate file
- files[record_type].write(line)
-
-# Code to process .DAT files in the currect directory
-elif args.dat:
- process_dat_files()
-
-# Close all the files as a precaution to make sure we aren't consuming memory
-for file in files.values():
- file.close()
-
-# Get valid record types
-valid_records = get_record_types_from_table_names(sqlite_schemas.keys())
-
-
-# Using a process pool to parallelize the operations for each record
-with ProcessPoolExecutor() as executor:
- list(
- tqdm(
- executor.map(process_record, valid_records),
- total=len(valid_records),
- desc="Generating SQLite schema",
- )
- )
-
-remove_files(valid_records)