Skip to content

Commit

Permalink
converted VehicleDatabase class to module; added index to pattern table
Browse files Browse the repository at this point in the history
  • Loading branch information
davidpeckham committed Feb 12, 2024
1 parent a3699c6 commit f8e0b4d
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 92 deletions.
18 changes: 8 additions & 10 deletions src/vin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from vin.constants import VIN_MODEL_YEAR_CHARACTERS
from vin.constants import VIN_POSITION_WEIGHTS
from vin.database import DecodedVehicle
from vin.database import VehicleDatabase
from vin.database import lookup_vehicle


class DecodingError(Exception):
Expand Down Expand Up @@ -130,15 +130,13 @@ def _decode_vin(self) -> None:
DecodingError: Unable to decode VIN using NHTSA vPIC.
"""
vehicle: DecodedVehicle = None
db_path = files("vin").joinpath("vehicle.db")
with VehicleDatabase(path=db_path) as db:
model_year = self._decode_model_year()
if model_year > 0:
vehicle = db.lookup_vehicle(self.wmi, self.descriptor, model_year)
else:
vehicle = db.lookup_vehicle(self.wmi, self.descriptor, abs(model_year))
if not vehicle:
vehicle = db.lookup_vehicle(self.wmi, self.descriptor, abs(model_year) - 30)
model_year = self._decode_model_year()
if model_year > 0:
vehicle = lookup_vehicle(self.wmi, self.descriptor, model_year)
else:
vehicle = lookup_vehicle(self.wmi, self.descriptor, abs(model_year))
if not vehicle:
vehicle = lookup_vehicle(self.wmi, self.descriptor, abs(model_year) - 30)
if vehicle is None:
raise DecodingError()

Expand Down
149 changes: 67 additions & 82 deletions src/vin/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,26 @@
import re
import sqlite3
from dataclasses import dataclass
from importlib.resources import files


log = logging.getLogger(__name__)

DATABASE_PATH = files("vin").joinpath("vehicle.db")


def regex(value, pattern) -> bool:
"""REGEXP shim for SQLite versions bundled with Python 3.11 and earlier"""
return re.match(pattern, value) is not None
# found = re.match(pattern, value) is not None
# print(f"{value=} {pattern=} {'found' if found else '---'}")
# return found


connection = sqlite3.connect(DATABASE_PATH, detect_types=sqlite3.PARSE_DECLTYPES)
connection.row_factory = sqlite3.Row
connection.create_function("REGEXP", 2, regex)


@dataclass
class DecodedVehicle:
Expand All @@ -20,88 +36,57 @@ class DecodedVehicle:
truck_type: str


def regex(value, pattern):
"""REGEXP shim for SQLite versions bundled with Python 3.11 and earlier"""
found = re.match(pattern, value) is not None
print(f"{value=} {pattern=} {'found' if found else '---'}")
return found


class VehicleDatabase:
def __init__(self, path):
"""return a SQLite3 database connection"""
assert path.exists()
self._path = path

def __enter__(self) -> "VehicleDatabase":
"""connect to the database
Build the database and schema if requested.
"""
log.debug(f"Opening database {self._path.absolute()}")
connection = sqlite3.connect(self._path, detect_types=sqlite3.PARSE_DECLTYPES)
connection.row_factory = sqlite3.Row
# version = sqlite3.sqlite_version_info
connection.create_function("REGEXP", 2, regex)
self._connection = connection
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self._connection.in_transaction:
log.debug("Auto commit")
self._connection.commit()
self._connection.close()

def query(self, sql: str, args: tuple = ()) -> list[sqlite3.Row]:
"""insert rows and return rowcount"""
cursor = self._connection.cursor()
results = cursor.execute(sql, args).fetchall()
cursor.close()

# print(sql)
print(args)
for result in results:
print(dict(result))

return results

def lookup_vehicle(self, wmi: str, vds: str, model_year: int) -> DecodedVehicle | None:
"""get vehicle details
Args:
vin: The 17-digit Vehicle Identification Number.
Returns:
Vehicle: the vehicle details
"""
if results := self.query(sql=LOOKUP_VEHICLE_SQL, args=(wmi, model_year, vds)):
details = {"series": None, "trim": None, "model_year": model_year}
for row in results:
if row["model"] is not None:
details.update(
{
k: row[k]
for k in [
"manufacturer",
"make",
"model",
"vehicle_type",
"truck_type",
"country",
]
}
)
elif row["series"] is not None:
details["series"] = row["series"]
elif row["trim"] is not None:
details["trim"] = row["trim"]
else:
raise Exception(
f"expected model and series WMI {wmi} VDS {vds} "
f"model year {model_year}, but got {row}"
)
return DecodedVehicle(**details)
return None
def query(sql: str, args: tuple = ()) -> list[sqlite3.Row]:
"""insert rows and return rowcount"""
cursor = connection.cursor()
results = cursor.execute(sql, args).fetchall()
cursor.close()

# print(sql)
print(args)
for result in results:
print(dict(result))

return results


def lookup_vehicle(wmi: str, vds: str, model_year: int) -> DecodedVehicle | None:
"""get vehicle details
Args:
vin: The 17-digit Vehicle Identification Number.
Returns:
Vehicle: the vehicle details
"""
if results := query(sql=LOOKUP_VEHICLE_SQL, args=(wmi, model_year, vds)):
details = {"series": None, "trim": None, "model_year": model_year}
for row in results:
if row["model"] is not None:
details.update(
{
k: row[k]
for k in [
"manufacturer",
"make",
"model",
"vehicle_type",
"truck_type",
"country",
]
}
)
elif row["series"] is not None:
details["series"] = row["series"]
elif row["trim"] is not None:
details["trim"] = row["trim"]
else:
raise Exception(
f"expected model and series WMI {wmi} VDS {vds} "
f"model year {model_year}, but got {row}"
)
return DecodedVehicle(**details)
return None


LOOKUP_VEHICLE_SQL = """
Expand Down

0 comments on commit f8e0b4d

Please sign in to comment.