Bases: Task
Attaches a SQLite database as alias '_src', executes query against it,
and writes the result to parquet.
The query field must be a SELECT statement whose table references are
prefixed with the alias '_src.' (e.g. 'SELECT * FROM _src.my_table').
Methods:
Attributes:
source_task
instance-attribute
target_compression
class-attribute
instance-attribute
target_compression: str = 'zstd'
create
classmethod
create(
source_task: Task,
asset_id: str,
query: str,
target_compression: str = "zstd",
override_subfolder: PurePath | None = None,
) -> SqliteToParquetTask
Source code in mecfs_bio/build_system/task/sqlite_to_parquet_task.py
| @classmethod
def create(
cls,
source_task: Task,
asset_id: str,
query: str,
target_compression: str = "zstd",
override_subfolder: PurePath | None = None,
) -> "SqliteToParquetTask":
source_meta = source_task.meta
if isinstance(source_meta, ReferenceFileMeta):
if override_subfolder is None:
override_subfolder = source_meta.sub_folder
meta = ReferenceFileMeta(
group=source_meta.group,
sub_group=source_meta.sub_group,
sub_folder=override_subfolder,
id=AssetId(asset_id),
extension=f".parquet.{target_compression}",
read_spec=DataFrameReadSpec(format=DataFrameParquetFormat()),
)
return cls(
meta=meta,
source_task=source_task,
query=query,
target_compression=target_compression,
)
raise ValueError(f"Unsupported source meta type: {type(source_meta)}")
|
execute
execute(scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset
Source code in mecfs_bio/build_system/task/sqlite_to_parquet_task.py
| def execute(self, scratch_dir: Path, fetch: Fetch, wf: WF) -> Asset:
source_asset = fetch(self.source_task.meta.asset_id)
assert isinstance(source_asset, FileAsset)
source_path = source_asset.path
out_path = scratch_dir / "output_file.parquet"
conn = duckdb.connect()
conn.execute(f"ATTACH '{source_path}' AS {_DB_ALIAS} (TYPE SQLITE, READ_ONLY)")
copy_sql = f"""
COPY ({self.query})
TO '{out_path}' (FORMAT 'PARQUET', CODEC '{self.target_compression}');
"""
logger.info("running sql command", sql_command=copy_sql)
conn.execute(copy_sql)
conn.close()
return FileAsset(out_path)
|