Skip to content

API reference

Auto-generated from the source docstrings. Everything below is available as a top-level attribute of freshdata (e.g. import freshdata as fd; fd.clean(...)).

Cleaning

freshdata.clean

clean(df: DataFrame, config: CleanConfig | Mapping[str, object] | None = None, *, return_report: bool = False, source_provenance: dict[str, object] | None = None, provenance_confidence_threshold: float = 0.7, contract: object | None = None, on_unexpected: str = 'warn', on_missing: str = 'fail', domain: str | None = None, column_map: dict[str, str] | None = None, gtfs_file: str | None = None, fhir_resource: str | None = None, media_type: str | None = None, finance_mode: str | None = None, audit_include_phi: bool = False, domain_kwargs: dict[str, object] | None = None, engine: str = 'pandas', output_format: str = 'pandas', engine_config: EngineConfig | None = None, memory: object | None = None, context: str | None = None, policy: object | None = None, strict: bool = False, **options: object) -> pd.DataFrame | tuple[pd.DataFrame, CleanReport]

Clean a DataFrame and return a new, repaired one.

Two layers run in order. Representation repair always happens first:

  1. column_names — snake_case column names, deduplicate collisions.
  2. strip_whitespace — trim surrounding whitespace in text cells.
  3. normalize_sentinels — turn "N/A", "null", "-", "" … into missing.
  4. drop_empty_columns / drop_empty_rows — remove all-missing ones.
  5. fix_dtypes — text that is really numeric / datetime / boolean gets the right dtype (validated; numeric_threshold of values must parse).
  6. drop_duplicates — resolve duplicate rows (duplicate_keep chooses first/last/drop/aggregate; time-indexed frames are protected).

Then, with strategy="auto" (the default), the decision engine profiles every column — missing ratio, dtype, skewness, cardinality, inferred role (id / target / datetime / text / categorical), whether missingness looks informative — and applies threshold rules for missing values and outliers. Nothing is done silently: every action (including deliberately preserving a column) is logged with a rationale, a risk level, and a confidence score. strategy="conservative" disables the engine; imputation and outlier handling are then opt-in via impute= / outliers=.

Parameters

df: The DataFrame to clean. config: A prebuilt :class:~freshdata.CleanConfig to start from. return_report: If True, return (cleaned_df, CleanReport). The report carries per-action rationale/risk/confidence, missing counts before/after, warnings, and recommendations for manual review. domain: Optional domain validator pack (e.g. "finance"). When set, generic cleaning runs first (defaulting to strategy="conservative" so the statistical engine never silently alters ledgers/IDs unless you pass an explicit strategy), then the pack validates in layers and repairs separately; findings and a domain_trust_score are folded into the report. Unknown names raise :class:~freshdata.domains.UnknownDomainError. column_map: Optional {actual_column: canonical_field} overrides for the domain pack's column detection. Requires domain to be set. gtfs_file: File selector for a single-frame feed-domain run, such as "stops.txt" with domain="transport". Full feeds can instead be passed as a dict. fhir_resource: FHIR resource selector for domain="healthcare" ("Patient", "Observation", "Encounter"); auto-detected from columns if omitted. media_type: Sub-schema selector for domain="media" ("content" / "release"); auto-detected from columns if omitted. audit_include_phi: For PHI-aware packs (healthcare, education), include raw PHI values in the audit trail instead of masking them as [PHI]. Defaults to False. domain_kwargs: Optional pack-specific constructor arguments. These are forwarded for both single-frame and feed-domain runs. **options: Any :class:~freshdata.CleanConfig field as a keyword override — e.g. strategy ("balanced" default / "aggressive" / "conservative"), missing_threshold_low/_medium/_high, duplicate_threshold, outlier_method, outlier_action, preserve_original, verbose, preserve_columns, target_column, duplicate_keep, impute, outliers. Unknown names raise :class:TypeError.

Examples

import freshdata as fd cleaned = fd.clean(df) cleaned, rep = fd.clean(df, return_report=True) print(rep.summary())

fd.clean(df, outlier_action="flag", target_column="churn", ... preserve_columns=("notes",), verbose=False)

ledger = fd.clean(df, domain="finance") # validate + repair ledger, rep = fd.clean(df, domain="finance", return_report=True) rep.domain_trust_score # 0–1

Natural-language cleaning rules compile deterministically into a :class:~freshdata.ContextPolicy that governs the run (protected columns, id columns, per-column semantic hints)::

cleaned = fd.clean(df, context="CustomerID is unique. Never modify revenue.") policy = fd.compile_context("...", df=df) # inspect/review first cleaned = fd.clean(df, policy=policy) # skip parsing, use as-is

Source code in src/freshdata/api.py
def clean(
    df: pd.DataFrame,
    config: CleanConfig | Mapping[str, object] | None = None,
    *,
    return_report: bool = False,
    source_provenance: dict[str, object] | None = None,
    provenance_confidence_threshold: float = 0.7,
    contract: object | None = None,
    on_unexpected: str = "warn",
    on_missing: str = "fail",
    domain: str | None = None,
    column_map: dict[str, str] | None = None,
    gtfs_file: str | None = None,
    fhir_resource: str | None = None,
    media_type: str | None = None,
    finance_mode: str | None = None,
    audit_include_phi: bool = False,
    domain_kwargs: dict[str, object] | None = None,
    engine: str = "pandas",
    output_format: str = "pandas",
    engine_config: EngineConfig | None = None,
    memory: object | None = None,
    context: str | None = None,
    policy: object | None = None,
    strict: bool = False,
    **options: object,
) -> pd.DataFrame | tuple[pd.DataFrame, CleanReport]:
    """Clean a DataFrame and return a new, repaired one.

    Two layers run in order. **Representation repair** always happens first:

    1.  ``column_names`` — snake_case column names, deduplicate collisions.
    2.  ``strip_whitespace`` — trim surrounding whitespace in text cells.
    3.  ``normalize_sentinels`` — turn "N/A", "null", "-", "" … into missing.
    4.  ``drop_empty_columns`` / ``drop_empty_rows`` — remove all-missing ones.
    5.  ``fix_dtypes`` — text that is really numeric / datetime / boolean gets
        the right dtype (validated; ``numeric_threshold`` of values must parse).
    6.  ``drop_duplicates`` — resolve duplicate rows (``duplicate_keep``
        chooses first/last/drop/aggregate; time-indexed frames are protected).

    Then, with ``strategy="auto"`` (the default), the **decision engine**
    profiles every column — missing ratio, dtype, skewness, cardinality,
    inferred role (id / target / datetime / text / categorical), whether
    missingness looks informative — and applies threshold rules for missing
    values and outliers. Nothing is done silently: every action (including
    deliberately preserving a column) is logged with a rationale, a risk
    level, and a confidence score. ``strategy="conservative"`` disables the
    engine; imputation and outlier handling are then opt-in via ``impute=`` /
    ``outliers=``.

    Parameters
    ----------
    df:
        The DataFrame to clean.
    config:
        A prebuilt :class:`~freshdata.CleanConfig` to start from.
    return_report:
        If True, return ``(cleaned_df, CleanReport)``. The report carries
        per-action rationale/risk/confidence, missing counts before/after,
        warnings, and recommendations for manual review.
    domain:
        Optional domain validator pack (e.g. ``"finance"``). When set, generic
        cleaning runs first (defaulting to ``strategy="conservative"`` so the
        statistical engine never silently alters ledgers/IDs unless you pass an
        explicit ``strategy``), then the pack validates in layers and repairs
        separately; findings and a ``domain_trust_score`` are folded into the
        report. Unknown names raise :class:`~freshdata.domains.UnknownDomainError`.
    column_map:
        Optional ``{actual_column: canonical_field}`` overrides for the domain
        pack's column detection. Requires ``domain`` to be set.
    gtfs_file:
        File selector for a single-frame feed-domain run, such as ``"stops.txt"``
        with ``domain="transport"``. Full feeds can instead be passed as a dict.
    fhir_resource:
        FHIR resource selector for ``domain="healthcare"`` (``"Patient"``,
        ``"Observation"``, ``"Encounter"``); auto-detected from columns if omitted.
    media_type:
        Sub-schema selector for ``domain="media"`` (``"content"`` / ``"release"``);
        auto-detected from columns if omitted.
    audit_include_phi:
        For PHI-aware packs (healthcare, education), include raw PHI values in the
        audit trail instead of masking them as ``[PHI]``. Defaults to False.
    domain_kwargs:
        Optional pack-specific constructor arguments. These are forwarded for
        both single-frame and feed-domain runs.
    **options:
        Any :class:`~freshdata.CleanConfig` field as a keyword override — e.g.
        ``strategy`` (``"balanced"`` default / ``"aggressive"`` / ``"conservative"``),
        ``missing_threshold_low``/``_medium``/``_high``, ``duplicate_threshold``,
        ``outlier_method``, ``outlier_action``, ``preserve_original``, ``verbose``,
        ``preserve_columns``, ``target_column``, ``duplicate_keep``, ``impute``,
        ``outliers``. Unknown names raise :class:`TypeError`.

    Examples
    --------
    >>> import freshdata as fd
    >>> cleaned = fd.clean(df)
    >>> cleaned, rep = fd.clean(df, return_report=True)
    >>> print(rep.summary())

    >>> fd.clean(df, outlier_action="flag", target_column="churn",
    ...          preserve_columns=("notes",), verbose=False)

    >>> ledger = fd.clean(df, domain="finance")          # validate + repair
    >>> ledger, rep = fd.clean(df, domain="finance", return_report=True)
    >>> rep.domain_trust_score                            # 0–1

    Natural-language cleaning rules compile deterministically into a
    :class:`~freshdata.ContextPolicy` that governs the run (protected columns,
    id columns, per-column semantic hints)::

    >>> cleaned = fd.clean(df, context="CustomerID is unique. Never modify revenue.")
    >>> policy = fd.compile_context("...", df=df)         # inspect/review first
    >>> cleaned = fd.clean(df, policy=policy)             # skip parsing, use as-is
    """
    config, options, return_report = _normalize_clean_call(config, options, return_report)

    _fold_context_options(options, context=context, policy=policy, strict=strict)
    domain_kwargs = _merge_pack_selectors(
        domain_kwargs,
        domain,
        fhir_resource=fhir_resource,
        media_type=media_type,
        finance_mode=finance_mode,
        audit_include_phi=audit_include_phi,
    )
    if domain is not None:
        if isinstance(df, dict) or gtfs_file is not None:
            return _clean_feed(
                df,
                domain,
                gtfs_file,
                column_map,
                domain_kwargs,
                config,
                return_report,
                options,
            )
        if getattr(validator_class(domain), "multi_frame", False):
            raise TypeError(
                f"domain {domain!r} requires a feed dict or a single frame with gtfs_file="
            )
        return _clean_with_domain(
            df, domain, column_map, domain_kwargs, config, return_report, options
        )
    if column_map is not None:
        raise TypeError("column_map requires a domain= to be set")
    if gtfs_file is not None:
        raise TypeError("gtfs_file requires domain='transport' (or another feed domain)")

    # Contract gate (F1c): explain incoming schema drift *before* repair.
    # In-memory pandas only — keeps the gate predictable and reproducible.
    contract_diff = None
    if contract is not None:
        if not isinstance(df, pd.DataFrame):
            raise TypeError("contract= requires an in-memory pandas DataFrame")
        if engine != "pandas" or output_format != "pandas" or engine_config is not None:
            raise TypeError("contract= is only supported on the in-memory pandas engine")
        from .enterprise.contracts import ContractViolation, diff_schema as _diff_schema  # noqa: I001, PLC0415

        contract_diff = _diff_schema(
            df,
            contract=contract,  # type: ignore[arg-type]
            on_unexpected=on_unexpected,  # type: ignore[arg-type]
            on_missing=on_missing,  # type: ignore[arg-type]
        )
        if contract_diff.n_errors > 0:
            raise ContractViolation(contract_diff)

    # Cleaning-memory replay (F3a): apply previously accepted decisions when the
    # new data still matches what the memory learned; otherwise it is ignored and
    # the report explains why. In-memory pandas only, like the contract gate.
    mem_match = None
    if memory is not None:
        if not isinstance(df, pd.DataFrame):
            raise TypeError("memory= requires an in-memory pandas DataFrame")
        if engine != "pandas" or output_format != "pandas" or engine_config is not None:
            raise TypeError("memory= is only supported on the in-memory pandas engine")
        from .memory import CleaningMemory, apply_memory  # noqa: PLC0415

        if not isinstance(memory, CleaningMemory):
            raise TypeError("memory= must be a CleaningMemory (see fd.learn_cleaning_memory)")
        options, mem_match = apply_memory(df, memory, dict(options))

    native_source = _is_native_engine_source(df)
    if (
        engine != "pandas"
        or output_format != "pandas"
        or engine_config is not None
        or isinstance(df, str)
        or native_source
    ):
        return _clean_out_of_core(
            df,
            config,
            options,
            engine="auto" if native_source and engine == "pandas" else engine,
            output_format=output_format,
            engine_config=engine_config,
            return_report=return_report,
        )

    want_report = return_report or memory is not None
    cleaner = Cleaner(config=config, **options)
    result = cleaner.clean(df, report=want_report, memory=memory)
    if want_report:
        cleaned, rep = result
        if memory is not None and mem_match is not None:
            from .memory import annotate_report  # noqa: PLC0415

            annotate_report(rep, memory, mem_match)
        if source_provenance is not None:
            from .provenance import (  # noqa: PLC0415
                annotate_provenance,
            )

            annotate_provenance(
                rep, source_provenance,
                confidence_threshold=provenance_confidence_threshold,
            )
        if contract_diff is not None:
            rep.contract_violations = contract_diff.to_dict()
        if return_report:
            return from_pandas(cleaned, df), rep
        return from_pandas(cleaned, df)
    if source_provenance is not None:
        raise ValueError("source_provenance requires return_report=True")
    return from_pandas(result, df)

freshdata.clean_csv

clean_csv(path: str | Path, config: CleanConfig | Mapping[str, object] | None = None, *, output_path: str | Path | None = None, return_report: bool = False, read_csv_kwargs: dict[str, object] | None = None, to_csv_kwargs: dict[str, object] | None = None, context: str | None = None, policy: object | None = None, strict: bool = False, **options: object) -> pd.DataFrame | tuple[pd.DataFrame, CleanReport]

Read a CSV file, clean it, and optionally write the result to disk.

Parameters

path: Path to the input CSV file. output_path: Optional path to write the cleaned CSV. return_report: If True, return (cleaned_df, CleanReport). read_csv_kwargs: Optional keyword arguments forwarded to pandas.read_csv. to_csv_kwargs: Optional keyword arguments forwarded to DataFrame.to_csv. index defaults to False unless explicitly overridden. context / policy / strict: Natural-language rules or a pre-compiled :class:~freshdata.ContextPolicy, forwarded to :func:freshdata.clean. **options: Any :class:~freshdata.CleanConfig field accepted by :func:freshdata.clean.

Examples

import freshdata as fd cleaned = fd.clean_csv("input.csv") fd.clean_csv("input.csv", output_path="cleaned.csv") cleaned, report = fd.clean_csv("input.csv", return_report=True) fd.clean_csv("input.csv", context="Emails must be valid.")

Source code in src/freshdata/api.py
def clean_csv(
    path: str | Path,
    config: CleanConfig | Mapping[str, object] | None = None,
    *,
    output_path: str | Path | None = None,
    return_report: bool = False,
    read_csv_kwargs: dict[str, object] | None = None,
    to_csv_kwargs: dict[str, object] | None = None,
    context: str | None = None,
    policy: object | None = None,
    strict: bool = False,
    **options: object,
) -> pd.DataFrame | tuple[pd.DataFrame, CleanReport]:
    """Read a CSV file, clean it, and optionally write the result to disk.

    Parameters
    ----------
    path:
        Path to the input CSV file.
    output_path:
        Optional path to write the cleaned CSV.
    return_report:
        If True, return ``(cleaned_df, CleanReport)``.
    read_csv_kwargs:
        Optional keyword arguments forwarded to ``pandas.read_csv``.
    to_csv_kwargs:
        Optional keyword arguments forwarded to ``DataFrame.to_csv``.
        ``index`` defaults to False unless explicitly overridden.
    context / policy / strict:
        Natural-language rules or a pre-compiled
        :class:`~freshdata.ContextPolicy`, forwarded to :func:`freshdata.clean`.
    **options:
        Any :class:`~freshdata.CleanConfig` field accepted by
        :func:`freshdata.clean`.

    Examples
    --------
    >>> import freshdata as fd
    >>> cleaned = fd.clean_csv("input.csv")
    >>> fd.clean_csv("input.csv", output_path="cleaned.csv")
    >>> cleaned, report = fd.clean_csv("input.csv", return_report=True)
    >>> fd.clean_csv("input.csv", context="Emails must be valid.")
    """
    if "report" in options:
        return_report = bool(options.pop("report"))
    df = pd.read_csv(path, **(read_csv_kwargs or {}))
    result = clean(
        df,
        config=config,
        return_report=return_report,
        context=context,
        policy=policy,
        strict=strict,
        **options,  # type: ignore[arg-type]
    )
    cleaned_df = cast(pd.DataFrame, result[0] if return_report else result)
    if output_path is not None:
        cleaned_df.to_csv(output_path, **{"index": False, **(to_csv_kwargs or {})})
    return result

freshdata.Cleaner

Cleaner(config: CleanConfig | Mapping[str, object] | None = None, **options: object)

A configured, reusable cleaning pipeline.

Useful when the same settings are applied to many frames (e.g. every file in a directory), or when you want the report after the fact::

cleaner = fd.Cleaner(impute="median", drop_constant_columns=True)
for path in paths:
    cleaned = cleaner.clean(pd.read_csv(path))
    print(cleaner.report_.summary())

Attributes

config: The immutable :class:~freshdata.CleanConfig in effect. report_: The :class:~freshdata.CleanReport from the most recent :meth:clean call (None before the first call).

Source code in src/freshdata/cleaner.py
def __init__(
    self,
    config: CleanConfig | Mapping[str, object] | None = None,
    **options: object,
) -> None:
    if isinstance(config, Mapping):
        merged = dict(config)
        merged.update(options)
        self.config = merge_options(None, **merged)
    else:
        self.config = merge_options(config, **options)
    self.report_: CleanReport | None = None

clean

clean(df: DataFrame, *, report: bool = False, memory: object | None = None) -> pd.DataFrame | tuple[pd.DataFrame, CleanReport]

Clean df and return the result (the input is left unchanged unless preserve_original=False was configured).

With report=True, returns (cleaned_df, CleanReport) instead. The latest report is always available as :attr:report_. memory (a :class:~freshdata.CleaningMemory) lets the semantic stage replay compatible learned repairs; see :func:freshdata.clean's memory=.

Source code in src/freshdata/cleaner.py
def clean(
    self, df: pd.DataFrame, *, report: bool = False, memory: object | None = None
) -> pd.DataFrame | tuple[pd.DataFrame, CleanReport]:
    """Clean *df* and return the result (the input is left unchanged
    unless ``preserve_original=False`` was configured).

    With ``report=True``, returns ``(cleaned_df, CleanReport)`` instead.
    The latest report is always available as :attr:`report_`. ``memory``
    (a :class:`~freshdata.CleaningMemory`) lets the semantic stage replay
    compatible learned repairs; see :func:`freshdata.clean`'s ``memory=``.
    """
    cleaned, rep = run_pipeline(df, self.config, memory=memory)
    self.report_ = rep
    if self.config.verbose:
        print(rep.brief())
    return (cleaned, rep) if report else cleaned

Profiling & inspection

freshdata.profile

Read-only data profiling: what is in this frame, and what would clean() do?

:func:build_profile reuses the exact inference code from the cleaning steps, so every "would convert to …" suggestion is a faithful preview, not a guess. Profiling never modifies the input.

ColumnProfile dataclass

ColumnProfile(name: str, dtype: str, non_null: int, missing: int, missing_pct: float, unique: int | None, sample_values: list[Any], suggested_dtype: str | None, issues: list[str] = list())

Statistics and detected issues for one column.

Profile dataclass

Profile(n_rows: int, n_cols: int, memory: int, duplicate_rows: int | None, missing_cells: int, missing_pct: float, columns: list[ColumnProfile], materialization: dict[str, Any] | None = None)

Bases: HtmlReprMixin

A whole-table profile. Render with print(profile), export with :meth:to_frame or :meth:to_dict, or display the interactive quality cockpit with :meth:show / _repr_html_.

to_frame

to_frame() -> pd.DataFrame

One row per column — convenient to sort/filter in a notebook.

Source code in src/freshdata/profile.py
def to_frame(self) -> pd.DataFrame:
    """One row per column — convenient to sort/filter in a notebook."""
    return pd.DataFrame(
        {
            "dtype": [c.dtype for c in self.columns],
            "non_null": [c.non_null for c in self.columns],
            "missing": [c.missing for c in self.columns],
            "missing_pct": [round(c.missing_pct, 2) for c in self.columns],
            "unique": [c.unique for c in self.columns],
            "suggested_dtype": [c.suggested_dtype for c in self.columns],
            "issues": ["; ".join(c.issues) for c in self.columns],
        },
        index=pd.Index([c.name for c in self.columns], name="column"),
    )

build_profile

build_profile(df: DataFrame, config: CleanConfig, *, sample: int | None = None, max_columns: int | None = None, lazy: bool = False) -> Profile

Profile df without modifying it.

Wide-schema / large-frame perf controls (all optional, behaviour-preserving when unset):

  • sample: profile a deterministic random_state=0 row sample of this size when the frame is larger; per-column statistics become estimates.
  • max_columns: profile only the first this-many columns; the rest are recorded as omitted.
  • lazy: skip the expensive full-frame duplicate-row scan (duplicate_rows is left None).

When any control changes the work set, the returned :class:Profile describes the profiled subset and carries the totals in profile.materialization.

Source code in src/freshdata/profile.py
def build_profile(
    df: pd.DataFrame,
    config: CleanConfig,
    *,
    sample: int | None = None,
    max_columns: int | None = None,
    lazy: bool = False,
) -> Profile:
    """Profile *df* without modifying it.

    Wide-schema / large-frame perf controls (all optional, behaviour-preserving
    when unset):

    - ``sample``: profile a deterministic ``random_state=0`` row sample of this
      size when the frame is larger; per-column statistics become estimates.
    - ``max_columns``: profile only the first this-many columns; the rest are
      recorded as omitted.
    - ``lazy``: skip the expensive full-frame duplicate-row scan
      (``duplicate_rows`` is left ``None``).

    When any control changes the work set, the returned :class:`Profile`
    describes the *profiled subset* and carries the totals in
    ``profile.materialization``.
    """
    if not isinstance(df, pd.DataFrame):
        raise TypeError(f"expected a pandas DataFrame, got {type(df).__name__}")

    rows_total, cols_total = len(df), df.shape[1]
    work = df
    sampled = sample is not None and rows_total > sample
    if sampled:
        work = df.sample(n=sample, random_state=0)
    columns_omitted = 0
    if max_columns is not None and work.shape[1] > max_columns:
        columns_omitted = work.shape[1] - max_columns
        work = work.iloc[:, :max_columns]

    if lazy:
        duplicate_rows: int | None = None
    else:
        try:
            duplicate_rows = int(work.duplicated().sum())
        except TypeError:
            duplicate_rows = None
    n_cells = int(work.size)
    missing_cells = int(work.isna().sum().sum())
    sentinels = active_sentinels(config)
    # Positional access tolerates duplicate column labels.
    columns = [
        _profile_column(work.columns[i], work.iloc[:, i], config, sentinels)
        for i in range(work.shape[1])
    ]

    materialization: dict[str, Any] | None = None
    if sampled or columns_omitted or lazy:
        materialization = {
            "rows_total": rows_total,
            "rows_profiled": len(work),
            "columns_total": cols_total,
            "columns_profiled": work.shape[1],
            "columns_omitted": columns_omitted,
            "sampled": sampled,
            "lazy": lazy,
            "duplicate_scan": not lazy,
        }
    return Profile(
        n_rows=len(work),
        n_cols=work.shape[1],
        memory=memory_bytes(work),
        duplicate_rows=duplicate_rows,
        missing_cells=missing_cells,
        missing_pct=100.0 * missing_cells / n_cells if n_cells else 0.0,
        columns=columns,
        materialization=materialization,
    )

freshdata.infer_roles

infer_roles(df: DataFrame, *, strategy: str = 'balanced', config: CleanConfig | None = None, **options: object) -> pd.DataFrame

Infer column roles and primary missing models without mutating data.

Also reports a per-column semantic_type (email/phone/url/... — see :data:freshdata.semantic.semantic_types.SEMANTIC_TYPES) with a confidence and a compact evidence string. Semantic-type inference is deterministic and model-free; an explicit semantic_context hint always wins. These three columns are additive — the original output is unchanged.

Source code in src/freshdata/api.py
def infer_roles(
    df: pd.DataFrame,
    *,
    strategy: str = "balanced",
    config: CleanConfig | None = None,
    **options: object,
) -> pd.DataFrame:
    """Infer column roles and primary missing models without mutating data.

    Also reports a per-column ``semantic_type`` (email/phone/url/... —
    see :data:`freshdata.semantic.semantic_types.SEMANTIC_TYPES`) with a
    confidence and a compact evidence string. Semantic-type inference is
    deterministic and model-free; an explicit ``semantic_context`` hint always
    wins. These three columns are additive — the original output is unchanged.
    """
    from .semantic.semantic_types import infer_semantic_type  # noqa: PLC0415 — lazy

    cfg = merge_options(config, strategy=strategy, **options)
    frame = to_pandas(df)
    contexts = build_contexts(frame, cfg)
    mode = _engine_mode(cfg)
    hints = cfg.semantic_context if isinstance(cfg.semantic_context, dict) else {}
    column_hints = hints.get("columns", {}) if isinstance(hints.get("columns"), dict) else {}
    rows = []
    for col, ctx in sorted(contexts.items()):
        primary = None
        if ctx.missing_ratio > 0:
            primary = rank_missing_models(frame, col, ctx, cfg, mode=mode).primary
        hint = None
        col_hint = column_hints.get(col)
        if isinstance(col_hint, dict):
            hint = col_hint.get("semantic_type")
        inferred = infer_semantic_type(
            col,
            frame[col],
            role=ctx.role,
            hint=str(hint) if hint else None,
            sample_size=cfg.semantic_sample_size,
        )
        rows.append(
            {
                "column": col,
                "role": ctx.role,
                "missing_pct": round(ctx.missing_ratio * 100, 2),
                "cardinality": ctx.nunique,
                "skew": ctx.skew,
                "domain_sensitive": ctx.domain_sensitive,
                "primary_missing_model": primary.model_id if primary else None,
                "semantic_type": inferred.semantic_type,
                "semantic_type_confidence": round(inferred.confidence, 4),
                "semantic_type_evidence": "; ".join(
                    f"{e.kind}: {e.detail}" for e in inferred.evidence
                ),
            }
        )
    return ReportFrame.wrap(pd.DataFrame(rows), "infer_roles")

freshdata.explain_clean

explain_clean(df: DataFrame, *, strategy: str = 'balanced', config: CleanConfig | None = None, **options: object) -> ExplainReport

Run clean() and return a structured before/after explanation.

Source code in src/freshdata/explain.py
def explain_clean(
    df: pd.DataFrame,
    *,
    strategy: str = "balanced",
    config: CleanConfig | None = None,
    **options: object,
) -> ExplainReport:
    """Run clean() and return a structured before/after explanation."""
    cfg = merge_options(config, strategy=strategy, **options)
    df = to_pandas(df)  # accept polars frames like the other public entry points
    before_stats = _column_stats(df)
    cleaned, report = run_pipeline(df, cfg)

    roles_df = infer_roles(df, config=cfg)

    actions_by_step: dict[str, list[dict[str, Any]]] = defaultdict(list)
    for action in report:
        actions_by_step[action.step].append({
            "column": action.column,
            "description": action.description,
            "count": action.count,
            "rationale": action.rationale,
            "risk": action.risk,
            "confidence": round(action.confidence, 4),
            "model_id": action.model_id,
        })

    post_contexts = build_contexts(cleaned, cfg)
    return ExplainReport(
        strategy=cfg.strategy,
        rows_before=len(df),
        rows_after=len(cleaned),
        cols_before=df.shape[1],
        cols_after=cleaned.shape[1],
        before_stats=before_stats,
        after_stats=_column_stats(cleaned),
        cell_changes=_cell_changes(df, cleaned),
        actions_by_step=dict(actions_by_step),
        narratives=_narratives(post_contexts, list(report), strategy=cfg.strategy),
        report=report,
        roles=roles_df,
    )

Context policies

freshdata.compile_context

compile_context(text: str, df: DataFrame | None = None, *, columns: Sequence[str] | None = None, config: CleanConfig | None = None, strict: bool = False) -> Any

Compile natural-language cleaning rules into a :class:~freshdata.ContextPolicy.

Deterministic, model-free, and offline: the same text always compiles to the same policy. With a frame (or explicit columns) column phrases are resolved against the effective post-normalization schema; without one the policy compiles schema-free and resolves when it meets a frame. The result is inspectable (policy.summary()), reviewable (policy.to_json()), and reusable (fd.clean(df, policy=policy)).

Examples

import freshdata as fd policy = fd.compile_context( ... "CustomerID is unique. Never modify revenue values.", df=df) print(policy.summary()) policy.to_json("policy.json") cleaned = fd.clean(df, policy=policy)

Source code in src/freshdata/api.py
def compile_context(
    text: str,
    df: pd.DataFrame | None = None,
    *,
    columns: Sequence[str] | None = None,
    config: CleanConfig | None = None,
    strict: bool = False,
) -> Any:
    """Compile natural-language cleaning rules into a :class:`~freshdata.ContextPolicy`.

    Deterministic, model-free, and offline: the same text always compiles to the
    same policy. With a frame (or explicit ``columns``) column phrases are
    resolved against the effective post-normalization schema; without one the
    policy compiles schema-free and resolves when it meets a frame. The result
    is inspectable (``policy.summary()``), reviewable (``policy.to_json()``),
    and reusable (``fd.clean(df, policy=policy)``).

    Examples
    --------
    >>> import freshdata as fd
    >>> policy = fd.compile_context(
    ...     "CustomerID is unique. Never modify revenue values.", df=df)
    >>> print(policy.summary())
    >>> policy.to_json("policy.json")
    >>> cleaned = fd.clean(df, policy=policy)
    """
    from .context import compile_context as _compile  # noqa: PLC0415

    return _compile(text, df=df, columns=columns, config=config, strict=strict)

freshdata.validate

validate(df: DataFrame, *, context: str | None = None, policy: object | None = None, config: CleanConfig | None = None, strict: bool = False, **options: object) -> Any

Check df against a context policy without mutating anything.

Returns a :class:~freshdata.FindingList (a plain list of :class:~freshdata.QualityFinding with .errors / .warnings shortcuts) covering unresolved references, compile issues, protected columns, and unique / allowed-values / range violations.

Examples

findings = fd.validate(df, context="CustomerID is unique.") assert not findings.errors

Source code in src/freshdata/api.py
def validate(
    df: pd.DataFrame,
    *,
    context: str | None = None,
    policy: object | None = None,
    config: CleanConfig | None = None,
    strict: bool = False,
    **options: object,
) -> Any:
    """Check *df* against a context policy without mutating anything.

    Returns a :class:`~freshdata.FindingList` (a plain ``list`` of
    :class:`~freshdata.QualityFinding` with ``.errors`` / ``.warnings``
    shortcuts) covering unresolved references, compile issues, protected
    columns, and unique / allowed-values / range violations.

    Examples
    --------
    >>> findings = fd.validate(df, context="CustomerID is unique.")
    >>> assert not findings.errors
    """
    _fold_context_options(options, context=context, policy=policy, strict=strict)
    cfg = merge_options(config, **options)
    if cfg.context is None and cfg.policy is None:
        raise TypeError("fd.validate needs context= (rules text) or policy= (a ContextPolicy)")
    from .context.validate import validate_frame  # noqa: PLC0415

    return validate_frame(to_pandas(df), cfg)

freshdata.ContextPolicy dataclass

ContextPolicy(policy_version: str = POLICY_VERSION, dataset_domain: str | None = None, constraints: tuple[ColumnConstraint, ...] = (), unresolved: tuple[UnresolvedRef, ...] = (), issues: tuple[PolicyIssue, ...] = (), source_text_sha256: str | None = None, strict: bool = False)

The compiled, inspectable contract between the user's prose and the engine.

Immutable and JSON-round-trippable (:meth:to_json / :meth:from_json), so a policy can be reviewed in a pull request like code and passed back verbatim via fd.clean(df, policy=...).

protected_columns property

protected_columns: tuple[str, ...]

Resolved columns under a protected constraint, in policy order.

constraints_for

constraints_for(column: str) -> tuple[ColumnConstraint, ...]

All constraints that apply to column (post-normalization name).

Source code in src/freshdata/context/types.py
def constraints_for(self, column: str) -> tuple[ColumnConstraint, ...]:
    """All constraints that apply to *column* (post-normalization name)."""
    from .normalize import snake_ref  # noqa: PLC0415 - avoid import cycle at load

    matched = []
    for c in self.constraints:
        if c.column == column or (
            c.column is None and snake_ref(c.resolved_from) == snake_ref(column)
        ):
            matched.append(c)
    return tuple(matched)

is_protected

is_protected(column: str) -> bool

True when the policy forbids any modification of column.

Source code in src/freshdata/context/types.py
def is_protected(self, column: str) -> bool:
    """True when the policy forbids any modification of *column*."""
    return any(c.rule == "protected" for c in self.constraints_for(column))

thresholds

thresholds(column: str, kind: str, cfg: CleanConfig) -> Thresholds

Effective (auto, review, floor) thresholds for column and kind.

impute_missing constraints raise the auto threshold for kind="impute" (or "missing"); everything else falls back to the global config thresholds.

Source code in src/freshdata/context/types.py
def thresholds(self, column: str, kind: str, cfg: CleanConfig) -> Thresholds:
    """Effective (auto, review, floor) thresholds for *column* and *kind*.

    ``impute_missing`` constraints raise the auto threshold for
    ``kind="impute"`` (or ``"missing"``); everything else falls back to the
    global config thresholds.
    """
    auto = cfg.semantic_auto_threshold
    review = cfg.semantic_review_threshold
    from_policy = False
    if kind in ("impute", "missing"):
        for c in self.constraints_for(column):
            if c.rule == "impute_missing":
                min_conf = c.params.get("min_confidence")
                if isinstance(min_conf, (int, float)):
                    auto = max(auto, float(min_conf))
                    from_policy = True
    return Thresholds(
        auto=auto, review=min(review, auto), floor=0.50, from_policy=from_policy
    )

to_json

to_json(path: str | Path | None = None) -> str

Serialize to JSON; optionally also write it to path.

Source code in src/freshdata/context/types.py
def to_json(self, path: str | Path | None = None) -> str:
    """Serialize to JSON; optionally also write it to *path*."""
    text = json.dumps(self.to_dict(), indent=2, sort_keys=False)
    if path is not None:
        Path(path).write_text(text + "\n", encoding="utf-8")
    return text

from_json classmethod

from_json(data: str | Path) -> ContextPolicy

Load a policy from a JSON string or a path to a .json file.

Source code in src/freshdata/context/types.py
@classmethod
def from_json(cls, data: str | Path) -> ContextPolicy:
    """Load a policy from a JSON string or a path to a ``.json`` file."""
    if isinstance(data, Path) or (isinstance(data, str) and data.lstrip()[:1] != "{"):
        text = Path(data).read_text(encoding="utf-8")
    else:
        text = data
    return cls.from_dict(json.loads(text))

summary

summary() -> str

Human-readable one-screen description of the compiled policy.

Source code in src/freshdata/context/types.py
def summary(self) -> str:
    """Human-readable one-screen description of the compiled policy."""
    lines = [f"freshdata context policy (v{self.policy_version})"]
    if self.dataset_domain:
        lines.append(f"  domain: {self.dataset_domain}")
    lines.append(f"  constraints: {len(self.constraints)}")
    for c in self.constraints:
        col = c.column if c.column is not None else f"?{c.resolved_from!r}"
        detail = ""
        if c.rule in ("valid_format", "locale_format"):
            detail = f" format={c.params.get('format')}"
            if c.params.get("region"):
                detail += f" region={c.params['region']}"
        elif c.rule == "impute_missing":
            detail = f" min_confidence={c.params.get('min_confidence')}"
        elif c.rule == "allowed_values":
            detail = f" values={list(c.params.get('values', ()))}"
        elif c.rule == "range":
            detail = f" [{c.params.get('lo')}, {c.params.get('hi')}]"
        elif c.rule == "dedup_key":
            detail = f" columns={list(c.params.get('columns', ()))}"
        elif c.rule == "custom":
            detail = f" kind={c.params.get('kind')}"
        hard = " [hard]" if c.enforcement == "hard" else ""
        src = ""
        if c.resolved_from and c.resolved_from != c.column:
            src = f"  (from {c.resolved_from!r}, {c.resolution_confidence:.2f})"
        lines.append(f"    {c.id}: {c.rule:<14} {col}{detail}{hard}{src}")
    if self.unresolved:
        lines.append(f"  unresolved references: {len(self.unresolved)}")
        for u in self.unresolved:
            cands = ", ".join(f"{c} ({s:.2f})" for c, s in u.candidates[:3])
            suffix = f" — candidates: {cands}" if cands else ""
            lines.append(f"    {u.ref!r}: {u.reason}{suffix}")
    if self.issues:
        lines.append(f"  issues: {len(self.issues)}")
        for issue in self.issues:
            lines.append(f"    [{issue.severity}] {issue.kind}: {issue.message}")
    return "\n".join(lines)

lower

lower(cfg: CleanConfig) -> CleanConfig

Return a new :class:CleanConfig with this policy folded in.

Pure: cfg is never mutated. Protected columns are appended to preserve_columns, unique columns to id_columns, per-column semantic hints (semantic_type, region, allowed_values, range, impute confidence, mutability) are merged into semantic_context, and the policy itself is attached as cfg.policy (with context cleared so the text is never recompiled downstream).

Source code in src/freshdata/context/types.py
def lower(self, cfg: CleanConfig) -> CleanConfig:
    """Return a new :class:`CleanConfig` with this policy folded in.

    Pure: *cfg* is never mutated. Protected columns are appended to
    ``preserve_columns``, unique columns to ``id_columns``, per-column
    semantic hints (semantic_type, region, allowed_values, range,
    impute confidence, mutability) are merged into ``semantic_context``,
    and the policy itself is attached as ``cfg.policy`` (with ``context``
    cleared so the text is never recompiled downstream).
    """
    from ..config import CleanConfig  # noqa: PLC0415 - runtime import, cycle-safe

    if not isinstance(cfg, CleanConfig):
        raise TypeError(f"lower() expects a CleanConfig, got {type(cfg).__name__}")

    preserve = list(cfg.preserve_columns)
    id_cols = list(cfg.id_columns)
    duplicate_subset = cfg.duplicate_subset

    semantic: dict[str, Any] = {}
    if isinstance(cfg.semantic_context, dict):
        semantic = dict(cfg.semantic_context)
    columns_meta: dict[str, dict[str, Any]] = {
        str(k): dict(v)
        for k, v in (semantic.get("columns") or {}).items()
        if isinstance(v, dict)
    }

    if self.dataset_domain and not semantic.get("dataset"):
        semantic["dataset"] = self.dataset_domain

    protected = set(self.protected_columns)
    for c in self.constraints:
        if c.rule == "dedup_key":
            # Group constraint (column=None by design): resolved members
            # live in params["columns"].
            cols = tuple(str(x) for x in c.params.get("columns", ()))
            if cols and duplicate_subset is None and c.resolution_confidence > 0.0:
                duplicate_subset = cols
            continue
        if c.column is None:
            continue
        meta = columns_meta.setdefault(c.column, {})
        _fold_constraint(c, meta, preserve, id_cols)
        if c.rule != "protected" and c.column in protected:
            # Protection wins ties: a protected column never gains a hint
            # that could be read as permission to mutate.
            meta.pop("impute_min_confidence", None)
            meta["mutable"] = False

    if columns_meta:
        semantic["columns"] = columns_meta

    return dataclasses.replace(
        cfg,
        preserve_columns=tuple(preserve),
        id_columns=tuple(id_cols),
        duplicate_subset=duplicate_subset,
        semantic_context=semantic if semantic else cfg.semantic_context,
        policy=self,
        context=None,
    )

Planning & comparison

freshdata.suggest_plan

suggest_plan(df: DataFrame, *, config: CleanConfig | None = None, contract: object | None = None, on_unexpected: str = 'warn', on_missing: str = 'fail', context: str | None = None, policy: object | None = None, strict: bool = False, **options: object) -> CleanPlan

Preview engine model choices without mutating df.

With contract= (a :class:~freshdata.DataContract or its mapping), attaches a baseline-free schema diff at plan.schema_diff (a DriftReport) explaining incoming structural drift before any repair; on_unexpected (fail|warn|preserve) and on_missing (fail|warn|ignore) grade undeclared and missing columns. See :func:freshdata.diff_schema.

context= (natural-language rules) or policy= (a pre-compiled :class:~freshdata.ContextPolicy) fold a deterministic context policy into the planned config first — protected columns, id columns, and per-column semantic hints then shape the plan exactly as they would shape :func:freshdata.clean. strict=True raises :class:~freshdata.PolicyError on unresolved or unparsed context. The returned plan's config carries the compiled policy at plan.config.policy.

Source code in src/freshdata/plan.py
def suggest_plan(
    df: pd.DataFrame,
    *,
    config: CleanConfig | None = None,
    contract: object | None = None,
    on_unexpected: str = "warn",
    on_missing: str = "fail",
    context: str | None = None,
    policy: object | None = None,
    strict: bool = False,
    **options: object,
) -> CleanPlan:
    """Preview engine model choices without mutating *df*.

    With ``contract=`` (a :class:`~freshdata.DataContract` or its mapping),
    attaches a baseline-free schema diff at ``plan.schema_diff`` (a
    ``DriftReport``) explaining incoming structural drift before any repair;
    ``on_unexpected`` (``fail|warn|preserve``) and ``on_missing``
    (``fail|warn|ignore``) grade undeclared and missing columns. See
    :func:`freshdata.diff_schema`.

    ``context=`` (natural-language rules) or ``policy=`` (a pre-compiled
    :class:`~freshdata.ContextPolicy`) fold a deterministic context policy into
    the planned config first — protected columns, id columns, and per-column
    semantic hints then shape the plan exactly as they would shape
    :func:`freshdata.clean`. ``strict=True`` raises
    :class:`~freshdata.PolicyError` on unresolved or unparsed context.
    The returned plan's config carries the compiled policy at ``plan.config.policy``.
    """
    if context is not None:
        options["context"] = context
    if policy is not None:
        options["policy"] = policy
    if strict:
        options["strict"] = strict
    cfg = merge_options(config, **options)
    if cfg.context is not None or cfg.policy is not None:
        from .context import apply_policy_to_config  # noqa: PLC0415

        cfg = apply_policy_to_config(cfg, df=df)
    semantic_counts = _semantic_counts(df, cfg) if cfg.semantic_enabled else {}
    repair_plan = _build_repair_plan(df, cfg)
    if cfg.engine_mode is None:
        plans = _semantic_only_plans(semantic_counts)
        return _attach_schema_diff(
            CleanPlan(config=cfg, column_plans=plans, repair_plan=repair_plan),
            df, contract, on_unexpected, on_missing
        )
    preview = _repair_preview(df, cfg)
    if preview.empty:
        plans = _semantic_only_plans(semantic_counts)
        return _attach_schema_diff(
            CleanPlan(config=cfg, column_plans=plans, repair_plan=repair_plan),
            df, contract, on_unexpected, on_missing
        )
    mode = cast(EngineMode, cfg.engine_mode)
    assert mode in ("balanced", "aggressive")
    contexts = build_contexts(preview, cfg)
    plans = {}
    for col in preview.columns:
        ctx = contexts[col]
        missing_choice: ModelChoice | None = None
        missing_alts: tuple[ModelChoice, ...] = ()
        if int(preview[col].isna().sum()) > 0:
            sel = rank_missing_models(preview, col, ctx, cfg, mode=mode)
            missing_choice = sel.primary
            missing_alts = sel.alternatives
        outlier_choice: ModelChoice | None = None
        outlier_action: str | None = None
        n_outliers = 0
        s = preview[col]
        if (
            is_numeric_dtype(s)
            and not is_bool_dtype(s)
            and int(s.notna().sum()) >= _MIN_NON_NULL
            and cfg.outliers is None
        ):
            detected = _detect(s, cfg)
            if detected is not None:
                mask, _, _, _ = detected
                n_outliers = int(mask.sum())
                if n_outliers:
                    share = n_outliers / int(s.notna().sum())
                    action, choice = select_outlier_action(
                        ctx,
                        cfg,
                        mode=mode,
                        share=share,
                    )
                    outlier_choice = choice
                    outlier_action = action
        if missing_choice or outlier_choice:
            plans[str(col)] = ColumnPlan(
                column=str(col),
                missing=missing_choice,
                missing_alternatives=missing_alts,
                outlier=outlier_choice,
                outlier_action=outlier_action,
                n_outliers=n_outliers,
            )
    _merge_semantic_counts(plans, semantic_counts)
    return _attach_schema_diff(
        CleanPlan(config=cfg, column_plans=plans, repair_plan=repair_plan),
        df, contract, on_unexpected, on_missing
    )

freshdata.compare_plans

compare_plans(df: DataFrame, *, strategies: tuple[str, ...] = ('conservative', 'balanced', 'aggressive'), config: CleanConfig | None = None, include_metrics: bool = False, **options: object) -> pd.DataFrame

Side-by-side primary models for each strategy.

With include_metrics=True, adds actual clean outcomes (missing_after, duration_seconds, …) from :func:compare_clean.

Source code in src/freshdata/plan.py
def compare_plans(
    df: pd.DataFrame,
    *,
    strategies: tuple[str, ...] = ("conservative", "balanced", "aggressive"),
    config: CleanConfig | None = None,
    include_metrics: bool = False,
    **options: object,
) -> pd.DataFrame:
    """Side-by-side primary models for each strategy.

    With ``include_metrics=True``, adds actual clean outcomes (missing_after,
    duration_seconds, …) from :func:`compare_clean`.
    """
    base = merge_options(config, **options)
    rows: list[dict[str, Any]] = []
    metrics: pd.DataFrame | None = None
    if include_metrics:
        metrics = compare_clean(df, strategies=strategies, config=base)
        metrics = metrics.set_index("strategy")
    for strategy in strategies:
        plan = suggest_plan(df, config=merge_options(base, strategy=strategy))
        for col, cp in plan.column_plans.items():
            row: dict[str, Any] = {
                "column": col,
                "strategy": strategy,
                "missing_model": cp.missing.model_id if cp.missing else None,
                "outlier_action": cp.outlier_action,
                "n_outliers": cp.n_outliers,
            }
            if metrics is not None and strategy in metrics.index:
                m = metrics.loc[strategy]
                row["missing_after"] = m["missing_after"]
                row["duration_seconds"] = m["duration_seconds"]
            rows.append(row)
    if not rows:
        empty = pd.DataFrame(
            columns=[
                "column",
                "strategy",
                "missing_model",
                "outlier_action",
                "n_outliers",
            ]
        )
        return ReportFrame.wrap(empty, "compare_plans")
    return ReportFrame.wrap(pd.DataFrame(rows), "compare_plans")

freshdata.compare_clean

compare_clean(df: DataFrame, *, strategies: tuple[str, ...] = ('conservative', 'balanced', 'aggressive'), config: CleanConfig | None = None, **options: object) -> pd.DataFrame

Run clean under each strategy and compare quality + efficiency metrics.

Source code in src/freshdata/plan.py
def compare_clean(
    df: pd.DataFrame,
    *,
    strategies: tuple[str, ...] = ("conservative", "balanced", "aggressive"),
    config: CleanConfig | None = None,
    **options: object,
) -> pd.DataFrame:
    """Run clean under each strategy and compare quality + efficiency metrics."""
    base = merge_options(config, **options)
    rows: list[dict[str, Any]] = []
    n_rows = len(df)
    for strategy in strategies:
        cfg = merge_options(base, strategy=strategy, verbose=False)
        _, report = run_pipeline(df, cfg)
        rows.append(
            {
                "strategy": strategy,
                "rows_before": report.rows_before,
                "rows_after": report.rows_after,
                "cols_before": report.cols_before,
                "cols_after": report.cols_after,
                "missing_before": report.missing_before,
                "missing_after": report.missing_after,
                "missing_delta": report.missing_after - report.missing_before,
                "cols_delta": report.cols_after - report.cols_before,
                "duplicates_removed": report.duplicates_removed,
                "outliers_handled": report.outliers_handled,
                "columns_dropped": len(report.columns_dropped),
                "columns_imputed": len(report.columns_imputed),
                "duration_seconds": round(report.duration_seconds, 4),
                "rows_per_second": round(n_rows / report.duration_seconds, 1)
                if report.duration_seconds > 0
                else None,
                "primary_models": json.dumps(_primary_models_from_report(report)),
            }
        )
    return ReportFrame.wrap(pd.DataFrame(rows), "compare_clean")

Configuration

freshdata.CleanConfig dataclass

CleanConfig(column_names: bool = True, drop_empty_rows: bool = True, drop_empty_columns: bool = True, drop_constant_columns: bool = False, strip_whitespace: bool = True, normalize_sentinels: bool = True, extra_sentinels: tuple[str, ...] = (), string_case: str | None = None, fix_dtypes: bool = True, numeric_threshold: float = 0.95, datetime_threshold: float = 0.95, preserve_leading_zeros: bool = True, dayfirst: bool | str = 'auto', decimal: str = '.', thousands: str = ',', drop_duplicates: bool = True, duplicate_subset: tuple[str, ...] | None = None, strategy: str = 'balanced', missing_threshold_low: float = 0.05, missing_threshold_medium: float = 0.3, missing_threshold_high: float = 0.6, duplicate_threshold: float = 0.1, outlier_action: str | None = 'auto', preserve_original: bool = True, verbose: bool = True, preserve_columns: tuple[str, ...] = (), target_column: str | None = None, id_columns: tuple[str, ...] = (), duplicate_keep: str = 'first', allow_timeseries_duplicates: bool = False, advanced_imputation: bool | str = 'auto', missing_indicators: bool | str = 'auto', impute: str | None = None, impute_strategy: Mapping[str, str] | None = None, missforest_max_iter: int = 5, missforest_n_estimators: int = 100, missforest_random_state: int = 42, missforest_min_rows_for_model: int = 50, missforest_add_indicators: bool | str = 'auto', outliers: str | None = None, outlier_method: str = 'iqr', outlier_factor: float | None = None, optimize_memory: bool = False, category_threshold: float = 0.5, reset_index: bool = False, sample_size: int = 10000, random_state: int = 0, semantic_mode: str | None = None, semantic_auto_threshold: float = 0.95, semantic_review_threshold: float = 0.7, semantic_max_distinct_values: int = 500, semantic_sample_size: int = 10000, semantic_backends: tuple[str, ...] = ('deterministic',), semantic_context: object | None = None, semantic_privacy_policy: str = 'local_only', semantic_budget: dict[str, object] | None = None, semantic_embedding_cache_size: int = 65536, context: str | None = None, policy: object | None = None, strict: bool = False)

Options controlling what :func:freshdata.clean does.

Two layers of cleaning are controlled here:

  • Representation repair (whitespace, sentinel strings, wrong dtypes, exact duplicate rows, structurally empty rows/columns) — always safe, on by default.
  • The decision engine (strategy="balanced", the default) — profiles every column and applies accuracy-first rules for missing values and outliers. Use strategy="aggressive" for zero-NaN scrubbing (KNN, column drops, capping). Set strategy="conservative" to disable the engine and only repair representation; statistical changes are then opt-in via impute / outliers.

semantic_enabled property

semantic_enabled: bool

True when semantic_mode actually engages the semantic layer.

engine_mode property

engine_mode: str | None

"balanced", "aggressive", or None when engine is off.

Reports & results

freshdata.CleanReport dataclass

CleanReport(actions: list[Action] = list(), rows_before: int = 0, rows_after: int = 0, cols_before: int = 0, cols_after: int = 0, memory_before: int = 0, memory_after: int = 0, duration_seconds: float = 0.0, missing_before: int = 0, missing_after: int = 0, duplicates_removed: int = 0, outliers_handled: int = 0, columns_dropped: list[str] = list(), columns_imputed: list[str] = list(), columns_preserved: list[str] = list(), warnings: list[str] = list(), recommendations: list[str] = list(), domain: str | None = None, domain_trust_score: float | None = None, domain_findings: list[dict[str, Any]] = list(), domain_repairs: list[dict[str, Any]] = list(), streaming: dict[str, Any] | None = None, backend: str | None = None, materialized: bool = True, fallback_events: list[dict[str, Any]] = list(), backend_differences: list[dict[str, Any]] = list(), stage_timings: list[dict[str, Any]] = list(), source_provenance: dict[str, Any] | None = None, contract_violations: dict[str, Any] | None = None, decisions_hash: str | None = None, undo_log: dict[str, Any] | None = None)

Bases: HtmlReprMixin

Everything one :func:freshdata.clean run did, in order.

Iterable and sized: len(report) is the number of actions, and for action in report walks them in execution order. bool(report) is True iff anything was changed.

Beyond the action log, the report carries a cleaning summary (missing cells before/after, duplicates removed, outliers handled, columns dropped/imputed/preserved), engine warnings for risky columns, and recommendations for manual review.

cells_changed property

cells_changed: int

Total affected cells/rows summed across all actions.

record_fallback

record_fallback(backend: str, step: str, reason: str) -> None

Record that backend delegated step to the pandas reference.

Source code in src/freshdata/report.py
def record_fallback(self, backend: str, step: str, reason: str) -> None:
    """Record that *backend* delegated *step* to the pandas reference."""
    self.fallback_events.append(
        {"backend": backend, "fallback_step": step, "fallback_reason": reason}
    )

record_backend_difference

record_backend_difference(backend: str, step: str, detail: str, *, column: str | None = None) -> None

Record a semantics difference between a native backend and pandas.

Source code in src/freshdata/report.py
def record_backend_difference(
    self, backend: str, step: str, detail: str, *, column: str | None = None
) -> None:
    """Record a semantics difference between a native backend and pandas."""
    self.backend_differences.append(
        {"backend": backend, "step": step, "column": column, "detail": detail}
    )

record_stage_timing

record_stage_timing(backend: str, stage: str, seconds: float) -> None

Record a backend-provided stage runtime in seconds.

Source code in src/freshdata/report.py
def record_stage_timing(self, backend: str, stage: str, seconds: float) -> None:
    """Record a backend-provided stage runtime in seconds."""
    self.stage_timings.append(
        {"backend": backend, "stage": stage, "seconds": float(seconds)}
    )

add

add(step: str, description: str, *, column: str | None = None, count: int = 0, rationale: str = '', risk: str = 'low', confidence: float = 1.0, model_id: str = '', status: str = 'automatic', reversible: bool | None = None, memory_influenced: bool = False, human_review: bool = False, metadata: dict[str, Any] | None = None) -> None

Record one action (internal; called by the pipeline).

Source code in src/freshdata/report.py
def add(
    self,
    step: str,
    description: str,
    *,
    column: str | None = None,
    count: int = 0,
    rationale: str = "",
    risk: str = "low",
    confidence: float = 1.0,
    model_id: str = "",
    status: str = "automatic",
    reversible: bool | None = None,
    memory_influenced: bool = False,
    human_review: bool = False,
    metadata: dict[str, Any] | None = None,
) -> None:
    """Record one action (internal; called by the pipeline)."""
    self.actions.append(
        Action(
            step=step,
            column=column,
            description=description,
            count=int(count),
            rationale=rationale,
            risk=risk,
            confidence=float(confidence),
            model_id=model_id,
            status=status,
            reversible=reversible,
            memory_influenced=memory_influenced,
            human_review=human_review,
            metadata=dict(metadata) if metadata else {},
        )
    )

add_warning

add_warning(message: str) -> None

Record a warning about a risky column or decision (internal).

Source code in src/freshdata/report.py
def add_warning(self, message: str) -> None:
    """Record a warning about a risky column or decision (internal)."""
    if message not in self.warnings:
        self.warnings.append(message)

add_recommendation

add_recommendation(message: str) -> None

Record a suggestion for manual review (internal).

Source code in src/freshdata/report.py
def add_recommendation(self, message: str) -> None:
    """Record a suggestion for manual review (internal)."""
    if message not in self.recommendations:
        self.recommendations.append(message)

to_dict

to_dict() -> dict[str, Any]

Return a JSON-friendly dictionary representation of the report.

This format is ideal for writing to logs, persisting audit snapshots, or returning a stable object from service endpoints.

Examples

report = CleanReport(rows_before=10, rows_after=8, cols_before=4, cols_after=3) payload = report.to_dict() 'actions' in payload True payload['rows_before'], payload['rows_after'] (10, 8)

Source code in src/freshdata/report.py
def to_dict(self) -> dict[str, Any]:
    """Return a JSON-friendly dictionary representation of the report.

    This format is ideal for writing to logs, persisting audit snapshots,
    or returning a stable object from service endpoints.

    Examples
    --------
    >>> report = CleanReport(rows_before=10, rows_after=8, cols_before=4, cols_after=3)
    >>> payload = report.to_dict()
    >>> 'actions' in payload
    True
    >>> payload['rows_before'], payload['rows_after']
    (10, 8)
    """
    payload: dict[str, Any] = {
        "rows_before": self.rows_before,
        "rows_after": self.rows_after,
        "cols_before": self.cols_before,
        "cols_after": self.cols_after,
        "memory_before": self.memory_before,
        "memory_after": self.memory_after,
        "duration_seconds": self.duration_seconds,
        "missing_before": self.missing_before,
        "missing_after": self.missing_after,
        "duplicates_removed": self.duplicates_removed,
        "outliers_handled": self.outliers_handled,
        "columns_dropped": list(self.columns_dropped),
        "columns_imputed": list(self.columns_imputed),
        "columns_preserved": list(self.columns_preserved),
        "warnings": list(self.warnings),
        "recommendations": list(self.recommendations),
        "actions": [self._action_dict(a) for a in self.actions],
    }
    if self.domain is not None:
        payload["domain"] = self.domain
        payload["domain_trust_score"] = self.domain_trust_score
        payload["domain_findings"] = list(self.domain_findings)
        payload["domain_repairs"] = list(self.domain_repairs)
    if self.streaming is not None:
        payload["streaming"] = dict(self.streaming)
    if self.backend is not None:
        payload["backend"] = self.backend
    if not self.materialized:
        payload["materialized"] = False
    if self.fallback_events:
        payload["fallback_events"] = list(self.fallback_events)
    if self.backend_differences:
        payload["backend_differences"] = list(self.backend_differences)
    if self.stage_timings:
        payload["stage_timings"] = list(self.stage_timings)
    if self.source_provenance is not None:
        payload["source_provenance"] = self.source_provenance
    if self.contract_violations is not None:
        payload["contract_violations"] = self.contract_violations
    if self.decisions_hash is not None:
        payload["decisions_hash"] = self.decisions_hash
    return payload

revert

revert(df: DataFrame, action_ids: list[str] | None = None) -> pd.DataFrame

Undo reversible plan actions on df, returning a new frame.

Requires this report to come from freshdata.apply_plan(..., keep_undo=True); with action_ids=None every recorded reversible action is undone. Cells whose rows no longer exist in df are skipped silently (they cannot be restored).

Source code in src/freshdata/report.py
def revert(
    self, df: pd.DataFrame, action_ids: list[str] | None = None
) -> pd.DataFrame:
    """Undo reversible plan actions on *df*, returning a new frame.

    Requires this report to come from
    ``freshdata.apply_plan(..., keep_undo=True)``; with ``action_ids=None``
    every recorded reversible action is undone. Cells whose rows no longer
    exist in *df* are skipped silently (they cannot be restored).
    """
    if not self.undo_log or not self.undo_log.get("entries"):
        raise ValueError(
            "this report holds no undo information — apply the plan with "
            "keep_undo=True to enable revert()"
        )
    wanted = None if action_ids is None else set(action_ids)
    entries = [
        e
        for e in self.undo_log["entries"]
        if wanted is None or e["action_id"] in wanted
    ]
    if wanted is not None:
        known = {e["action_id"] for e in self.undo_log["entries"]}
        missing = sorted(wanted - known)
        if missing:
            raise KeyError(
                f"no undo information for action id(s) {missing}; "
                f"reversible actions: {sorted(known)}"
            )
    out = df.copy(deep=False)
    touched: dict[str, list[dict[str, Any]]] = {}
    for entry in entries:
        touched.setdefault(entry["column"], []).append(entry)
    for column, column_entries in touched.items():
        if column not in out.columns:
            continue
        series = out[column]
        if series.dtype != object:
            series = series.astype(object)
        for entry in column_entries:
            labels = [i for i in entry["index"] if i in series.index]
            if labels:
                series.loc[labels] = entry["value"]
        original_dtype = (self.undo_log.get("column_dtypes") or {}).get(column)
        if original_dtype is not None:
            # Mixed values after a partial revert legitimately stay object.
            with contextlib.suppress(ValueError, TypeError):
                series = series.astype(original_dtype)
        out[column] = series
    return out

to_findings

to_findings(*, lineage_run_id: str | None = None) -> list

Project this report into normalized :class:~freshdata.QualityFinding objects.

Surfaces violated domain rules (enriched with any repair that was applied) and medium/high-risk engine actions, so the result can be exported to dbt tests, a Great Expectations suite, or an exception table. CleanReport keeps its own shape; this is a pure read-only projection.

Examples

CleanReport().to_findings() []

Source code in src/freshdata/report.py
def to_findings(self, *, lineage_run_id: str | None = None) -> list:
    """Project this report into normalized :class:`~freshdata.QualityFinding` objects.

    Surfaces violated domain rules (enriched with any repair that was applied)
    and medium/high-risk engine actions, so the result can be exported to dbt
    tests, a Great Expectations suite, or an exception table. ``CleanReport``
    keeps its own shape; this is a pure read-only projection.

    Examples
    --------
    >>> CleanReport().to_findings()
    []
    """
    payload = {
        "domain_findings": self.domain_findings,
        "domain_repairs": self.domain_repairs,
        "actions": [
            {
                "step": a.step,
                "column": a.column,
                "description": a.description,
                "count": a.count,
                "risk": a.risk,
            }
            for a in self.actions
        ],
    }
    return findings_from_dict(payload, lineage_run_id=lineage_run_id)

to_frame

to_frame() -> pd.DataFrame

Return one action per row as a pandas.DataFrame.

This representation works best when you want to inspect the report in notebooks, ad hoc dashboards, or quick filtering workflows.

Examples

from freshdata import CleanReport, Action report = CleanReport(actions=[Action(step='coerce', column='age')]) frame = report.to_frame() frame.loc[0, 'step'] 'coerce'

Source code in src/freshdata/report.py
def to_frame(self) -> pd.DataFrame:
    """Return one action per row as a `pandas.DataFrame`.

    This representation works best when you want to inspect the report in
    notebooks, ad hoc dashboards, or quick filtering workflows.

    Examples
    --------
    >>> from freshdata import CleanReport, Action
    >>> report = CleanReport(actions=[Action(step='coerce', column='age')])
    >>> frame = report.to_frame()
    >>> frame.loc[0, 'step']
    'coerce'
    """
    return pd.DataFrame(
        [
            (
                a.step,
                a.column,
                a.description,
                a.count,
                a.rationale,
                a.risk,
                a.confidence,
                a.model_id,
                a.status,
                a.reversible,
                a.memory_influenced,
                a.human_review,
            )
            for a in self.actions
        ],
        columns=[
            "step",
            "column",
            "description",
            "count",
            "rationale",
            "risk",
            "confidence",
            "model_id",
            "status",
            "reversible",
            "memory_influenced",
            "human_review",
        ],
    )

summary

summary() -> str

Render a concise text summary for terminal or notebook output.

This method is the quickest way to create a human-readable snapshot of what happened during a clean run.

Examples

report = CleanReport(rows_before=4, rows_after=4, cols_before=2, cols_after=2) text = report.summary() text.startswith('freshdata clean report') True 'rows:' in text True

Source code in src/freshdata/report.py
def summary(self) -> str:
    """Render a concise text summary for terminal or notebook output.

    This method is the quickest way to create a human-readable snapshot
    of what happened during a clean run.

    Examples
    --------
    >>> report = CleanReport(rows_before=4, rows_after=4, cols_before=2, cols_after=2)
    >>> text = report.summary()
    >>> text.startswith('freshdata clean report')
    True
    >>> 'rows:' in text
    True
    """
    if not self.materialized:
        lines = [
            "freshdata clean report",
            f"  rows:    {self.rows_before:,} -> (native handle — not materialized)",
            f"  columns: {self.cols_before:,} -> (native handle — not materialized)",
            "  result:  returned un-materialized; call .fetchdf()/.collect() to pull rows",
            f"  time:    {self.duration_seconds:.3f}s",
        ]
    else:
        d_rows = self.rows_after - self.rows_before
        d_cols = self.cols_after - self.cols_before
        lines = [
            "freshdata clean report",
            f"  rows:    {self.rows_before:,} -> {self.rows_after:,} ({d_rows:+,})",
            f"  columns: {self.cols_before:,} -> {self.cols_after:,} ({d_cols:+,})",
            f"  missing: {self.missing_before:,} -> {self.missing_after:,} cell(s)",
            f"  memory:  {format_bytes(self.memory_before)} -> "
            f"{format_bytes(self.memory_after)}",
            f"  time:    {self.duration_seconds:.3f}s",
        ]
    facts = []
    if self.duplicates_removed:
        facts.append(f"{self.duplicates_removed} duplicate row(s) removed")
    if self.outliers_handled:
        facts.append(f"{self.outliers_handled} outlier(s) handled")
    if self.columns_dropped:
        facts.append(f"dropped: {', '.join(self.columns_dropped)}")
    if self.columns_imputed:
        facts.append(f"imputed: {', '.join(self.columns_imputed)}")
    if self.columns_preserved:
        facts.append(f"preserved: {', '.join(self.columns_preserved)}")
    if facts:
        lines.append("  engine:  " + "; ".join(facts))
    if self.domain is not None:
        n_err = sum(
            1
            for f in self.domain_findings
            if f.get("status") == "violated" and f.get("severity") == "error"
        )
        n_warn = sum(
            1
            for f in self.domain_findings
            if f.get("status") == "violated" and f.get("severity") == "warning"
        )
        score = self.domain_trust_score if self.domain_trust_score is not None else 1.0
        applied = sum(1 for r in self.domain_repairs if r.get("status") == "applied")
        lines.append(
            f"  domain:  {self.domain} — trust {score:.2f}, "
            f"{n_err} error(s), {n_warn} warning(s), {applied} repair(s) applied"
        )
    if self.actions:
        lines.append(f"  actions ({len(self.actions)}):")
        lines.extend(f"    - {a}" for a in self.actions)
    else:
        lines.append("  actions: none — data was already clean")
    if self.warnings:
        lines.append(f"  warnings ({len(self.warnings)}):")
        lines.extend(f"    ! {w}" for w in self.warnings)
    if self.recommendations:
        lines.append(f"  review ({len(self.recommendations)}):")
        lines.extend(f"    ? {r}" for r in self.recommendations)
    cv = self.contract_violations
    if cv is not None:
        verdict = "PASS" if cv.get("passed") else "FAIL"
        n_err = cv.get("n_errors", 0)
        n_warn = cv.get("n_warnings", 0)
        lines.append(
            f"  contract '{cv.get('baseline_name')}' v{cv.get('baseline_version')}: "
            f"{verdict} ({n_err} error(s), {n_warn} warning(s))"
        )
        for f in cv.get("findings", []):
            if f.get("status") == "passed":
                continue
            marker = "✗" if f.get("status") == "failed" else "!"
            col = f" `{f['column']}`" if f.get("column") else ""
            lines.append(f"    {marker} [{f.get('check_id')}]{col}: {f.get('message')}")
    return "\n".join(lines)

brief

brief() -> str

Compact summary for verbose=True console output.

Source code in src/freshdata/report.py
def brief(self) -> str:
    """Compact summary for ``verbose=True`` console output."""
    line = (
        f"freshdata: rows {self.rows_before:,}->{self.rows_after:,}, "
        f"cols {self.cols_before}->{self.cols_after}, "
        f"missing {self.missing_before:,}->{self.missing_after:,}"
    )
    extras = []
    if self.duplicates_removed:
        extras.append(f"{self.duplicates_removed} dup(s) removed")
    if self.outliers_handled:
        extras.append(f"{self.outliers_handled} outlier(s) handled")
    if self.columns_dropped:
        extras.append(f"dropped {len(self.columns_dropped)} column(s)")
    if extras:
        line += " (" + ", ".join(extras) + ")"
    if self.domain is not None:
        score = self.domain_trust_score if self.domain_trust_score is not None else 1.0
        line += f"\n  domain {self.domain}: trust {score:.2f}"
    for w in self.warnings:
        line += f"\n  warning: {w}"
    return line

freshdata.Action dataclass

Action(step: str, column: str | None, description: str, count: int = 0, rationale: str = '', risk: str = 'low', confidence: float = 1.0, model_id: str = '', status: str = 'automatic', reversible: bool | None = None, memory_influenced: bool = False, human_review: bool = False, metadata: dict[str, Any] = dict())

One transformation (or deliberate non-transformation) of the data.

Attributes

step: Machine-readable step name, e.g. "fix_dtypes" or "missing". column: Column the action applied to, or None for table-level actions. description: Human-readable summary of what happened. count: Number of cells or rows affected (0 for informational notes). rationale: Why the decision engine chose this action ("" for non-engine steps). risk: "low", "medium", or "high" — how likely the action is to need review. confidence: Engine confidence in the decision, in [0, 1] (1.0 for non-engine steps, which are deterministic representation repairs).

freshdata.CleanPlan dataclass

CleanPlan(config: CleanConfig, column_plans: dict[str, ColumnPlan] = dict(), schema_diff: Any = None, repair_plan: Any = None)

Bases: HtmlReprMixin

Recommended cleaning configuration and per-column model choices.

summary

summary() -> str

Human-readable primary model per column.

Source code in src/freshdata/plan.py
def summary(self) -> str:
    """Human-readable primary model per column."""
    lines = [
        f"freshdata clean plan (strategy={self.config.strategy!r})",
        f"  columns: {len(self.column_plans)}",
    ]
    if not self.column_plans:
        lines.append("  (no engine actions — conservative or empty frame)")
        return "\n".join(lines)
    name_w = min(24, max(6, *(len(c) for c in self.column_plans)))
    lines.append(f"  {'column':<{name_w}}  missing_model    outlier_action")
    for col, plan in sorted(self.column_plans.items()):
        miss = plan.missing.model_id if plan.missing else "-"
        out = plan.outlier_action or (plan.outlier.model_id if plan.outlier else "-")
        lines.append(f"  {col:<{name_w}}  {miss:<15}  {out}")
    return "\n".join(lines)

alternatives

alternatives() -> pd.DataFrame

One row per (column, model, rank) for notebook review.

Source code in src/freshdata/plan.py
def alternatives(self) -> pd.DataFrame:
    """One row per (column, model, rank) for notebook review."""
    rows: list[tuple[str, str, str, int, float, str, bool, str]] = []
    for col, plan in sorted(self.column_plans.items()):
        if plan.missing:
            rows.append(
                (
                    col,
                    "missing",
                    plan.missing.model_id,
                    0,
                    plan.missing.confidence,
                    plan.missing.rationale,
                    plan.missing.eligible,
                    plan.missing.rejection_reason,
                )
            )
            for rank, alt in enumerate(plan.missing_alternatives, start=1):
                rows.append(
                    (
                        col,
                        "missing",
                        alt.model_id,
                        rank,
                        alt.confidence,
                        alt.rationale,
                        alt.eligible,
                        alt.rejection_reason,
                    )
                )
        if plan.outlier:
            rows.append(
                (
                    col,
                    "outlier",
                    plan.outlier.model_id,
                    0,
                    plan.outlier.confidence,
                    plan.outlier.rationale,
                    plan.outlier.eligible,
                    plan.outlier.rejection_reason,
                )
            )
    if not rows:
        return pd.DataFrame(
            columns=[
                "column",
                "step",
                "model_id",
                "rank",
                "confidence",
                "rationale",
                "eligible",
                "rejection_reason",
            ]
        )
    return pd.DataFrame(
        rows,
        columns=[
            "column",
            "step",
            "model_id",
            "rank",
            "confidence",
            "rationale",
            "eligible",
            "rejection_reason",
        ],
    )

to_frame

to_frame() -> pd.DataFrame

One row per column with primary missing/outlier choices.

Source code in src/freshdata/plan.py
def to_frame(self) -> pd.DataFrame:
    """One row per column with primary missing/outlier choices."""
    return pd.DataFrame(
        [
            {
                "column": col,
                "missing_model": p.missing.model_id if p.missing else None,
                "missing_confidence": p.missing.confidence if p.missing else None,
                "outlier_action": p.outlier_action,
                "outlier_model": p.outlier.model_id if p.outlier else None,
                "n_outliers": p.n_outliers,
                "semantic_proposals": p.semantic_proposals,
            }
            for col, p in sorted(self.column_plans.items())
        ]
    )

freshdata.ColumnPlan dataclass

ColumnPlan(column: str, missing: ModelChoice | None = None, missing_alternatives: tuple[ModelChoice, ...] = (), outlier: ModelChoice | None = None, outlier_action: str | None = None, n_outliers: int = 0, semantic_proposals: int = 0)

Primary and alternative models for one column.

freshdata.Profile dataclass

Profile(n_rows: int, n_cols: int, memory: int, duplicate_rows: int | None, missing_cells: int, missing_pct: float, columns: list[ColumnProfile], materialization: dict[str, Any] | None = None)

Bases: HtmlReprMixin

A whole-table profile. Render with print(profile), export with :meth:to_frame or :meth:to_dict, or display the interactive quality cockpit with :meth:show / _repr_html_.

to_frame

to_frame() -> pd.DataFrame

One row per column — convenient to sort/filter in a notebook.

Source code in src/freshdata/profile.py
def to_frame(self) -> pd.DataFrame:
    """One row per column — convenient to sort/filter in a notebook."""
    return pd.DataFrame(
        {
            "dtype": [c.dtype for c in self.columns],
            "non_null": [c.non_null for c in self.columns],
            "missing": [c.missing for c in self.columns],
            "missing_pct": [round(c.missing_pct, 2) for c in self.columns],
            "unique": [c.unique for c in self.columns],
            "suggested_dtype": [c.suggested_dtype for c in self.columns],
            "issues": ["; ".join(c.issues) for c in self.columns],
        },
        index=pd.Index([c.name for c in self.columns], name="column"),
    )

freshdata.ColumnProfile dataclass

ColumnProfile(name: str, dtype: str, non_null: int, missing: int, missing_pct: float, unique: int | None, sample_values: list[Any], suggested_dtype: str | None, issues: list[str] = list())

Statistics and detected issues for one column.

freshdata.ExplainReport dataclass

ExplainReport(strategy: str, rows_before: int, rows_after: int, cols_before: int, cols_after: int, before_stats: dict[str, dict[str, Any]], after_stats: dict[str, dict[str, Any]], cell_changes: dict[str, int], actions_by_step: dict[str, list[dict[str, Any]]], narratives: list[str], report: CleanReport, roles: DataFrame)

Bases: HtmlReprMixin

Structured explanation of a clean() run.

to_frame

to_frame() -> pd.DataFrame

One row per column: before/after dtype and changed-cell count.

Source code in src/freshdata/explain.py
def to_frame(self) -> pd.DataFrame:
    """One row per column: before/after dtype and changed-cell count."""
    rows = []
    for col in sorted(set(self.before_stats) | set(self.after_stats)):
        before = self.before_stats.get(col, {})
        after = self.after_stats.get(col, {})
        rows.append({
            "column": col,
            "before_dtype": before.get("dtype"),
            "after_dtype": after.get("dtype"),
            "changed_cells": self.cell_changes.get(col, 0),
        })
    return pd.DataFrame(
        rows, columns=["column", "before_dtype", "after_dtype", "changed_cells"]
    )

Enterprise layer

The freshdata.enterprise subpackage is documented in the feature overview. Import its symbols lazily:

from freshdata.enterprise import clean_enterprise, EnterpriseConfig

Compliance

The freshdata.compliance subpackage maps a CleanReport onto regulatory control frameworks; it is documented in the compliance reports guide. Import its symbols lazily:

from freshdata.compliance import generate_compliance_report, ComplianceConfig

Integrations

The freshdata.integrations subpackage runs the clean + trust gate inside Dagster, Airflow, and dbt; it is documented in the orchestration integrations guide. The framework-agnostic core is always importable:

from freshdata.integrations import evaluate_trust_gate, TrustGateResult