Skip to content

Commit

Permalink
add SelectionSummary task
Browse files Browse the repository at this point in the history
  • Loading branch information
mafrahm committed Aug 16, 2024
1 parent 897f6bc commit 2cbcdf3
Showing 1 changed file with 118 additions and 1 deletion.
119 changes: 118 additions & 1 deletion hbw/tasks/inspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
Custom tasks for inspecting the configuration or certain task outputs.
"""

# from functools import cached_property

import law
import luigi

from columnflow.tasks.framework.mixins import (
ProducersMixin, MLModelsMixin,
)
from columnflow.tasks.framework.base import ConfigTask
from columnflow.tasks.framework.base import ConfigTask, Requirements
from columnflow.tasks.framework.mixins import DatasetsProcessesMixin, SelectorMixin, CalibratorsMixin
from columnflow.tasks.framework.parameters import SettingsParameter
from columnflow.tasks.reduction import ReducedEventsUser
from columnflow.tasks.selection import MergeSelectionStats
from columnflow.util import maybe_import, dev_sandbox
from columnflow.columnar_util import get_ak_routes, update_ak_array

Expand All @@ -21,6 +25,119 @@

ak = maybe_import("awkward")

logger = law.logger.get_logger(__name__)


class SelectionSummary(
HBWTask,
DatasetsProcessesMixin,
SelectorMixin,
CalibratorsMixin,
):
reqs = Requirements(MergeSelectionStats=MergeSelectionStats)

sandbox = dev_sandbox(law.config.get("analysis", "default_columnar_sandbox"))

keys_of_interest = law.CSVParameter(
# default=("num_events", "num_events_per_process", "sum_mc_weight", "sum_mc_weigth_per_process"),
default=tuple(),
)

# @cached_property
# def datasets(self):
# return [dataset.name for dataset in self.config_inst.datasets]

def requires(self):
print("SelectionSummary requires")
reqs = {}
for dataset in self.datasets:
reqs[dataset] = self.reqs.MergeSelectionStats.req(
self,
dataset=dataset,
tree_index=0,
branch=-1,
_exclude=self.reqs.MergeSelectionStats.exclude_params_forest_merge,
)
return reqs

@property
def keys_repr(self):
return "_".join(sorted(self.keys_of_interest))

def output(self):
output = {
"selection_summary": self.target("selection_summary.txt"),
}
return output

def write_selection_summary(self, outp):
import csv
outp.touch()
lumi = self.config_inst.x.luminosity
inputs = self.input()

empty_datasets = []

keys_of_interest = self.keys_of_interest or ["selection_eff", "expected_yield", "num_events_selected"]
header_map = {
"xsec": "CrossSection [pb]",
"empty": "Empty?",
"selection_eff": "Efficiency",
"expected_yield": "Yields",
"num_events_selected": "NSelected",
}

with open(outp.path, "w") as f:
writer = csv.writer(f)

writer.writerow(["Dataset"] + [header_map.get(key, key) for key in keys_of_interest])
for dataset in self.datasets:
stats = inputs[dataset]["collection"][0]["stats"].load(formatter="json")
# hists = inputs[dataset]["collection"][0]["hists"].load(formatter="pickle")

xsec = self.config_inst.get_dataset(dataset).processes.get_first().xsecs.get(
self.config_inst.campaign.ecm, None,
)

def safe_div(num, den):
return num / den if den != 0 else 0

missing_keys = {"sum_mc_weight", "sum_mc_weight_selected"} - set(stats.keys())
if missing_keys:
logger.warning(f"Missing keys in stats in dataset {dataset}: {missing_keys}")
continue

selection_eff = safe_div(stats["sum_mc_weight_selected"], stats["sum_mc_weight"])
if xsec is not None:
expected_yield = xsec * selection_eff * lumi

if stats["num_events_selected"] == 0:
empty_datasets.append(dataset)

selection_summary = {
"xsec": xsec.nominal,
"empty": True if stats["num_events_selected"] == 0 else False,
"selection_eff": selection_eff,
"expected_yield": expected_yield.nominal,
}
for key in keys_of_interest:
if key in selection_summary.keys():
continue
if key in stats:
selection_summary[key] = stats[key]
else: # default to empty string
selection_summary[key] = ""

row = [dataset] + [selection_summary[key] for key in keys_of_interest]
print(row)
writer.writerow([dataset] + [selection_summary[key] for key in keys_of_interest])

self.publish_message(f"Empty datasets: {empty_datasets}")

def run(self):
output = self.output()
self.write_selection_summary(output["selection_summary"])


class DumpAnalysisSummary(
HBWTask,
Expand Down

0 comments on commit 2cbcdf3

Please sign in to comment.