Skip to content

mecfs_bio.build_system.task.sqlite_to_parquet_task

Task to query a SQLite database with duckdb and write the result as parquet. Initial version written by Claude

Classes:

  • SqliteToParquetTask

    Attaches a SQLite database as alias '_src', executes query against it,

Attributes:

logger module-attribute

logger = get_logger()

SqliteToParquetTask

Bases: Task

Attaches a SQLite database as alias '_src', executes query against it, and writes the result to parquet.

The query field must be a SELECT statement whose table references are prefixed with the alias '_src.' (e.g. 'SELECT * FROM _src.my_table').

Methods:

Attributes:

deps property

deps: list[Task]

meta instance-attribute

meta: Meta

query instance-attribute

query: str

source_task instance-attribute

source_task: Task

target_compression class-attribute instance-attribute

target_compression: str = 'zstd'

create classmethod

create(
    source_task: Task,
    asset_id: str,
    query: str,
    target_compression: str = "zstd",
    override_subfolder: PurePath | None = None,
) -> SqliteToParquetTask
Source code in mecfs_bio/build_system/task/sqlite_to_parquet_task.py
@classmethod
def create(
    cls,
    source_task: Task,
    asset_id: str,
    query: str,
    target_compression: str = "zstd",
    override_subfolder: PurePath | None = None,
) -> "SqliteToParquetTask":
    source_meta = source_task.meta
    if isinstance(source_meta, ReferenceFileMeta):
        if override_subfolder is None:
            override_subfolder = source_meta.sub_folder
        meta = ReferenceFileMeta(
            group=source_meta.group,
            sub_group=source_meta.sub_group,
            sub_folder=override_subfolder,
            id=AssetId(asset_id),
            extension=f".parquet.{target_compression}",
            read_spec=DataFrameReadSpec(format=DataFrameParquetFormat()),
        )
        return cls(
            meta=meta,
            source_task=source_task,
            query=query,
            target_compression=target_compression,
        )
    raise ValueError(f"Unsupported source meta type: {type(source_meta)}")

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/sqlite_to_parquet_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    source_asset = fetch(self.source_task.meta.asset_id)
    assert isinstance(source_asset, FileAsset)
    source_path = source_asset.path
    out_path = scratch_dir / "output_file.parquet"

    conn = duckdb.connect()
    conn.execute(f"ATTACH '{source_path}' AS {_DB_ALIAS} (TYPE SQLITE, READ_ONLY)")
    copy_sql = f"""
        COPY ({self.query})
        TO '{out_path}' (FORMAT 'PARQUET', CODEC '{self.target_compression}');
    """
    logger.info("running sql command", sql_command=copy_sql)
    conn.execute(copy_sql)
    conn.close()
    return FileAsset(out_path)