Skip to content

mecfs_bio.build_system.task.compressed_csv_to_parquet_task

Enable efficient SQL operations by converting CSV to parquet.

Classes:

Attributes:

logger module-attribute

logger = get_logger()

CompressedCSVToParquetTask

Bases: Task

Task for converting a gzipped CSV-type file to a parquet file Main use is for processing the SNP151 SNP database files

Methods:

Attributes:

csv_task instance-attribute

csv_task: Task

deps property

deps: list[Task]

meta property

meta: Meta

select_list class-attribute instance-attribute

select_list: list[str] | None = None

source_compression class-attribute instance-attribute

source_compression: str | None = 'gzip'

target_compression class-attribute instance-attribute

target_compression: str = 'zstd'

type_dict class-attribute instance-attribute

type_dict: Mapping[str, str] | None = None

create classmethod

create(
    csv_task: Task,
    asset_id: str,
    target_compression: str = "zstd",
    source_compression: str | None = "gzip",
    select_list: list[str] | None = None,
    type_dict: Mapping[str, str] | None = None,
) -> CompressedCSVToParquetTask
Source code in mecfs_bio/build_system/task/compressed_csv_to_parquet_task.py
@classmethod
def create(
    cls,
    csv_task: Task,
    asset_id: str,
    target_compression: str = "zstd",
    source_compression: str | None = "gzip",
    select_list: list[str] | None = None,
    type_dict: Mapping[str, str] | None = None,
) -> "CompressedCSVToParquetTask":
    source_meta = csv_task.meta
    if isinstance(source_meta, ReferenceFileMeta):
        meta = ReferenceFileMeta(
            group=source_meta.group,
            sub_group=source_meta.sub_group,
            sub_folder=source_meta.sub_folder,
            id=AssetId(asset_id),
            extension=f".parquet.{target_compression}",
            read_spec=DataFrameReadSpec(format=DataFrameParquetFormat()),
        )
        return cls(
            meta=meta,
            csv_task=csv_task,
            target_compression=target_compression,
            source_compression=source_compression,
            select_list=select_list,
            type_dict=type_dict,
        )
    raise ValueError("Unknown Meta")

execute

execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/compressed_csv_to_parquet_task.py
def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
    source_asset = fetch(self._source_id)
    assert isinstance(source_asset, FileAsset)
    source_path = source_asset.path
    read_spec = self._source_meta.read_spec()
    format = _get_format(read_spec)
    assert format.null_values is None
    delim = _get_sep(format)
    col_names = _get_column_names(format)
    out_path = scratch_dir / "output_file.parquet"
    name_list_str = "["
    for nm in col_names:
        name_list_str += f"'{nm}',"
    name_list_str += "]"
    select_list = "*" if self.select_list is None else ",".join(self.select_list)
    sql_command = f"""
                COPY (SELECT {select_list}
    	        FROM read_csv('{source_path}',
    	        AUTO_DETECT=TRUE, 
    	        HEADER={format.has_header}, 
    	        NAMES = {name_list_str}, 
    	        delim = '{delim}'"""
    if format.comment_char is not None:
        sql_command += f""",
                        comment = '{format.comment_char}'"""
    if self.source_compression is not None:
        sql_command += f""",
                      compression={self.source_compression} """
    if self.type_dict is not None:
        sql_command += f""",
        types={self.type_dict}"""

    sql_command += "))"
    sql_command += f"""
                TO '{out_path}' (FORMAT 'PARQUET', CODEC '{self.target_compression}');
                """

    logger.info(f"running sql command:\n {sql_command}")
    duckdb.sql(sql_command)
    return FileAsset(out_path)