给定数据集的估计方法排名#
我们通过根据它们在考虑观察到的未建模混杂误差和未观察到的混杂误差的驳斥测试中的表现进行排名,来说明对给定数据集的各种估计方法的比较。
[ ]:
# Importing all the required libraries
import sys
import argparse
import xgboost
import numpy as np
import pandas as pd
import os
import pdb
import random
from xgboost import XGBRegressor
from sklearn.preprocessing import PolynomialFeatures
from sklearn.linear_model import LassoCV
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier, GradientBoostingRegressor
from sklearn.linear_model import LogisticRegressionCV
from sklearn.metrics import mean_absolute_error
from dowhy import CausalModel
from datetime import datetime
from collections import namedtuple
import statsmodels.api as sm
from sklearn import linear_model
import dowhy
from dowhy.utils import dgp
from dowhy.utils.dgps.linear_dgp import LinearDataGeneratingProcess
from dowhy import CausalModel
from datetime import datetime
from collections import namedtuple
from dowhy.causal_refuters.add_unobserved_common_cause import AddUnobservedCommonCause
import matplotlib.pyplot as plt
import matplotlib.lines as mlines
import matplotlib.transforms as mtransforms
# Config dict to set the logging level
import logging.config
DEFAULT_LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'loggers': {
'': {
'level': 'WARN',
},
}
}
logging.config.dictConfig(DEFAULT_LOGGING)
# Disabling warnings output
import warnings
from sklearn.exceptions import DataConversionWarning, ConvergenceWarning
warnings.filterwarnings(action='ignore', category=DataConversionWarning)
warnings.filterwarnings(action='ignore', category=ConvergenceWarning)
[ ]:
def convert_singleton_to_float(arr):
'''Helper function.'''
array = []
if len(arr) == 1 and type(arr[0]) != np.ndarray:
return arr[0]
for element in arr:
while type(element) == np.ndarray or isinstance(element, list) :
if len(element) > 1:
raise ValueError("This script only accepts one value for the refute")
element = element[0]
array.append(element)
return array
[ ]:
def ensure_dir(file_path):
directory = os.path.dirname(file_path)
if not os.path.exists(directory):
os.makedirs(directory)
RESULTSFOLDER = "results/"
ensure_dir(RESULTSFOLDER)
# Create the estimator named tuple to wrap the name and properties
Estimator = namedtuple('Estimator', ['name','params'])
Refuter = namedtuple('Refuter', ['name','params'])
class Experiment():
'''
Class to define the experiment setup to compare a list of estimators across a list of refuters for the given dataset.
'''
def __init__(self, **kwargs):
self.experiment_name = kwargs['experiment_name']
self.experiment_id = kwargs['experiment_id']
self.num_experiments = kwargs['num_experiments']
self.sample_sizes = kwargs['sample_sizes']
self.dgps = kwargs['dgps']
self.estimators = kwargs['estimators']
self.refuters = kwargs['refuters']
self.results = []
self.simulate_unobserved_confounding = kwargs["simulate_unobserved_confounding"]
# Handle input errors in sample_sizes
if isinstance(self.sample_sizes, list) == False:
if type(self.sample_sizes) != int:
raise ValueError('The input to "sample_sizes" should be an int or a list')
else:
self.sample_sizes = [self.sample_sizes]
# Handle input errors in DGPs
if isinstance(self.dgps, list) == False:
if isinstance(self.dgps, DataGeneratingProcess) == False:
raise ValueError('The input to "dgps" should be a list or a subclass of "DataGeneratingProcess"')
else:
self.dgps = [self.dgps]
# Handle inputs errors in estimators
if isinstance(self.estimators, list) == False:
if isinstance(self.estimators, Estimator) == False:
raise ValueError('The input to "estimators" should be a list or an Estimator namedtuple')
else:
self.estimators = [self.estimators]
# Handle input errors in refuters
if isinstance(self.refuters, list) == False:
if isinstance(self.refuters, Refuter) == False:
raise ValueError('The input to "refuters" should be a list of a Refuter namedtuple')
else:
self.refuters = [self.refuters]
def experiment(self):
print("\n\nRunning Experiment:",self.experiment_name + '_' + str(self.experiment_id) )
for exp in range(self.num_experiments):
print("\n\nRunning Experiment Number:",exp)
for sample_size in self.sample_sizes:
print("\n\nCurrent Sample Size:",sample_size)
for dgp in self.dgps:
print("\n\nThe current DGP:")
print(dgp)
estimates = []
estimate_values = []
estimated_effect = []
new_effect = []
p_value = []
data = dgp.generate_data(sample_size)
print("printing data shape")
print(data.values.shape)
print(dgp.true_value)
print("check")
if dgp.treatment_is_binary:
data[dgp.treatment] = data[dgp.treatment].astype(bool)
#k = len(dgp.confounder)-4
#confounder_list = random.sample(dgp.confounder, k)
confounder_list = ['w2','w3']
s = set(confounder_list)
unobserved_confounders = [x for x in dgp.confounder if x not in s]
df_unobserved_confounders = pd.DataFrame(data = data[[c for c in data.columns if c in unobserved_confounders]])
df_unobserved_confounders.to_csv("results/unobserved_confounders.csv")
print("printing length of confounder list:", len(confounder_list))
print("printing confounder list:", confounder_list)
print("data columns")
print("data columns", data.columns)
model = CausalModel(
data = data,
treatment = dgp.treatment,
outcome = dgp.outcome,
common_causes = confounder_list,
effect_modifiers = dgp.effect_modifier
)
model.view_model()
from IPython.display import Image, display
display(Image(filename="causal_model.png"))
identified_estimand = model.identify_effect(proceed_when_unidentifiable=True)
print("identified_estimand:", identified_estimand)
#print("identified_estimand:", identified_estimand)
print("\n\nRunning the estimators:\n")
for estimator in self.estimators:
print("The current estimator:", estimator)
print("estimator.params", estimator.params)
estimate = model.estimate_effect(
identified_estimand,
method_name = estimator.name,
method_params = estimator.params
)
print("printing estimate's type")
print(type(estimate))
estimates.append(estimate)
estimate_values.append(estimate.value)
estimate_values = convert_singleton_to_float(estimate_values)
print("estimate_values", estimate_values)
print("\n\nRunning the refuters:\n")
for refuter in self.refuters:
print("The current refuter:", refuter)
for estimate in estimates:
if self.simulate_unobserved_confounding == True:
print("********%%%%%%%%%$$$$$&&^**^^^^*^*^*")
if refuter.name == 'dummy_outcome_refuter':
add_unobserved_confounder = AddUnobservedCommonCause(data, identified_estimand, estimate)
print("add_unobserved_confounder", add_unobserved_confounder)
unobserved_confounder_values = add_unobserved_confounder.include_simulated_confounder(convergence_threshold = 0.11, c_star_max = 1500)
refuter.params['unobserved_confounder_values'] = unobserved_confounder_values
print('refuter.params', refuter.params)
refute = model.refute_estimate(
identified_estimand,
estimate,
method_name = refuter.name,
**refuter.params,
)
print("printing refute's type")
print(type(refute))
if(refuter.name == 'dummy_outcome_refuter'):
refute = refute[0]
if refute.refutation_result is not None:
p_value.append(refute.refutation_result['p_value'])
else:
p_value.append(None)
estimated_effect.append(refute.estimated_effect)
#print("refute.estimate_effect()", refute.estimate_effect())
new_effect.append(refute.new_effect)
estimated_effect = convert_singleton_to_float(estimated_effect)
new_effect = convert_singleton_to_float(new_effect)
p_value = convert_singleton_to_float(p_value)
true_value = convert_singleton_to_float(dgp.true_value)
print("estimated effect", estimated_effect)
print("new_effect", new_effect)
print("p_value", p_value)
print("true value", true_value)
self.results.append([exp, sample_size, dgp.NAME, *estimate_values, *estimated_effect, *new_effect, *p_value, true_value])
print("\n\nCompleted all experiments. Saving the data...")
COLUMNS = ['EXPERIMENT', 'SAMPLE_SIZE', 'DGP']
RESULT_CATEGORIES = ['ESTIMATED_EFFECT', 'NEW_EFFECT', 'P_VALUE']
estimator_names = [estimator.name for estimator in self.estimators]
refuter_names = [refuter.name for refuter in self.refuters]
for estimator_name in estimator_names:
COLUMNS += ['ORIGINAL_ESTIMATE'+ ':' + estimator_name]
for result_category in RESULT_CATEGORIES:
for refuter_name in refuter_names:
for estimator_name in estimator_names:
COLUMNS += [refuter_name + ':' + estimator_name + ':' + result_category]
COLUMNS += ['TRUE_VALUE']
csv_file = RESULTSFOLDER + self.experiment_name+ '_' + str(self.experiment_id) + '_' + str(datetime.utcnow().date()) + '_data.csv'
onlyres_csv_file = RESULTSFOLDER + "onlyres_"+ self.experiment_name+ '_' + str(self.experiment_id) + '_' + str(datetime.utcnow()) + '_data.csv'
self.results = pd.DataFrame(data=self.results,columns=COLUMNS)
self.results.to_csv(csv_file.replace(" ", ""), index=False)
print("Data has been saved in ",csv_file)
return csv_file
[ ]:
#Defining the Data Generating Process
ldgp = LinearDataGeneratingProcess(treatment=['t1'], outcome=['y'], confounder=['w1','w2', 'w3','w4','w5','w6'], effect_modifier=['x1','x2'], seed=None, treatment_is_binary=True)
#Defining the sample size
sample_size = 1000
dgp_dict = {'ldgp':ldgp}
dgp_list = []
dgp_list.append( dgp_dict['ldgp'] )
# Create a namedtuple to store the name of the estimator and the parameters passed
estimator_list = ["backdoor.linear_regression",
#"backdoor.propensity_score_stratification",
"backdoor.propensity_score_matching",
"backdoor.propensity_score_weighting",
"backdoor.econml.dml.DML",
"backdoor.econml.dr.LinearDRLearner",
#"backdoor.econml.metalearners.TLearner",
#"backdoor.econml.metalearners.XLearner",
#"backdoor.causalml.inference.meta.LRSRegressor",
#"backdoor.causalml.inference.meta.XGBTRegressor",
#"backdoor.causalml.inference.meta.MLPTRegressor",
#"backdoor.causalml.inference.meta.BaseXRegressor"
]
method_params= [ None,
#None,
{ "init_params":{} },
{ "init_params":{} },
{"init_params":{'model_y':GradientBoostingRegressor(),
'model_t': GradientBoostingRegressor(),
"model_final":LassoCV(fit_intercept=False),
'featurizer':PolynomialFeatures(degree=1, include_bias=True)},
"fit_params":{}},
{"init_params":{ 'model_propensity': LogisticRegressionCV(cv=3, solver='lbfgs', multi_class='auto'),
},
"fit_params":{}
},
'''{"init_params": {'models': GradientBoostingRegressor(n_estimators=100, max_depth=6, min_samples_leaf=int(sample_size/100))
},
"fit_params":{}
},
{"init_params":{'models': GradientBoostingRegressor(n_estimators=100, max_depth=6, min_samples_leaf=int(sample_size/100)),
'propensity_model': RandomForestClassifier(n_estimators=100, max_depth=6,
min_samples_leaf=int(sample_size/100))
},
"fit_params":{}
},
{"init_params":{},},
{"init_params":{
'learner':XGBRegressor()
}
}'''
]
estimator_tuples = []
refuter_tuples = []
refuter_list = ['dummy_outcome_refuter']
refuter_params = [{'num_simulations':5,'transformation_list': [('random_forest',{'n_estimators':100, 'max_depth':6})], 'true_causal_effect':(lambda x:0.5)}]
# Iterate through the names and parameters to create a list of namedtuples
for name, param in zip(estimator_list,method_params):
estimator_tuples.append(Estimator._make([name, param]))
for name, param in zip(refuter_list, refuter_params):
refuter_tuples.append(Refuter._make([name, param]))
[ ]:
def plot_MAEs(res):
true_value_column = res.columns[-1]
estimate_columns=res.columns[3:-1]
#print(estimate_columns)
#print(type(estimate_columns))
estimate_columns.append(pd.Index(res["TRUE_VALUE"]))
#print(estimate_columns)
fig, ax = plt.subplots()
MAE ={}
for colname in estimate_columns:
if colname not in ('ORIGINAL_ESTIMATE:backdoor.propensity_score_weighting',):
#'ORIGINAL_ESTIMATE:backdoor.econml.metalearners.TLearner'):
plt.plot(res[colname], res["TRUE_VALUE"], marker='o', linestyle="None", label=colname)
"Mean Absolute Error (MAE): {}".format(mean_absolute_error(res[colname], res["TRUE_VALUE"]))
MAE[colname] = mean_absolute_error(res[colname], res["TRUE_VALUE"])
fig.suptitle('Calibration plot showing the accuracy of different causal estimators [P(T=1)=0.9]')
ax.set_xlabel('Estimated effect')
ax.set_ylabel('True causal effect')
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.20),
fancybox=True, shadow=True, ncol=2)
plt.show()
print("Printing MAE of various estimates: ")
MAE_values = {k: v for k, v in sorted(MAE.items(), key=lambda item: item[1], reverse = True)}
for k,v in MAE_values.items():
print(k, v)
[ ]:
def plot_estimators_and_refuters(refuter, estimator):
x = list(res['EXPERIMENT'])
y1 = list(res[refuter+':'+estimator+':ESTIMATED_EFFECT'])
y2 = list(res[refuter+':'+estimator+':NEW_EFFECT'])
#print(res['TRUE_VALUE'])
y3 = list(res['TRUE_VALUE'])
y4 = list(res[refuter+':'+estimator+':P_VALUE'])
plt.scatter(x, y1, c ="blue", label = "Estimated Effect")
plt.scatter(x, y2, c ="red", label = "New Effect")
plt.scatter(x, y3, c ="green", label = "True Value")
plt.scatter(x, y4, c ="yellow", label = "P Value")
plt.xlabel("EXPERIMENT")
plt.ylabel("EFFECT")
legend = plt.legend(loc=4, fontsize='small', fancybox=True)
plt.title(estimator)
plt.show()
plt.savefig(estimator+'.png')
[ ]:
def plot_deviations(estimator_list, deviation_list):
plt.scatter(estimator_list, deviation_list)
plt.xticks(estimator_list, estimator_list, rotation='vertical')
plt.show()
观察到的未建模混杂误差#
对于每个估计器,我们使用虚拟结果反驳器来检查每个估计器的未建模混杂误差。也就是说,我们仅在观察到的混杂因素上对每个估计器进行反驳测试,并分析在观察到的变量中存在多少未建模的混杂误差。
[ ]:
# Define the properties of the experiment
# The name of the experiment
# The experiment ID
# The number of experiments to be run with the SAME parameters
# The size of the samples to be run
# The list of DGPs to be run
# The list of estimators
observed_confounding_error = Experiment(
experiment_name='Test',
experiment_id='1',
num_experiments=10, # 10
sample_sizes=sample_size,
dgps=dgp_list,
estimators=estimator_tuples,
refuters=refuter_tuples,
simulate_unobserved_confounding = False
)
# Run the experiment
res = pd.read_csv(observed_confounding_error.experiment())
[ ]:
#PLOT
#This plot shows the Mean Absolute Error of the Orginal Estimate from the true value and of the New Effect from
#the expected value for each estimator.
plot_MAEs(res)
基于原始估计的排名#
原始估计值是在真实值(即地面实况)存在的情况下计算的。然而,在许多现实生活中的数据集中,地面实况可能未知。因此,我们希望我们的反驳测试产生的排名与从原始估计值中获得的排名一致。根据原始估计值,估计器的排名应如下(MAE最小的方法应获得最佳排名):1. DMLCateEstimator 2. LinearRegression 3. LinearDRLearner 4. Propensity Score Matching
[ ]:
estimator_list = ["backdoor.linear_regression",
#"backdoor.propensity_score_stratification",
"backdoor.propensity_score_matching",
"backdoor.econml.dml.DML",
"backdoor.econml.dr.LinearDRLearner",
#"backdoor.econml.metalearners.TLearner",
#"backdoor.econml.metalearners.XLearner",
#"backdoor.causalml.inference.meta.LRSRegressor",
#"backdoor.causalml.inference.meta.XGBTRegressor",
#"backdoor.causalml.inference.meta.MLPTRegressor",
#"backdoor.causalml.inference.meta.BaseXRegressor"
]
[ ]:
#This plot shows the deviation of the original estimate, the new effect and the estimated effect from the true value
refuter = 'dummy_outcome_refuter'
deviation_list = []
for estimator in estimator_list:
plot_estimators_and_refuters(refuter, estimator)
avg_deviation = ((res[refuter+':'+estimator+':NEW_EFFECT']).sum(axis=0))
print(avg_deviation)
deviation_list.append(avg_deviation)
[ ]:
plot_deviations(estimator_list, deviation_list)
for i in range(len(estimator_list)):
print(estimator_list[i] +": "+ str(deviation_list[i]))
[ ]:
{k: v for k, v in sorted(zip(estimator_list, deviation_list), key=lambda item: item[1], reverse = True)}
基于新效果的排名(反驳结果)#
基于偏差绝对值的排名是:1. 倾向得分匹配 2. 线性DR学习器 3. DML CATE估计器 4. 线性回归
显然,观察到的未建模混杂误差无法与基于原始估计的排名相匹配。它甚至无法根据真实值判断出方法中明显的赢家是DML CATE估计器。
未观察到的混淆错误#
对于每个估计器,我们现在模拟未观察到的混杂因素,并使用虚拟结果反驳器检查其效果,以检查每个估计器的未观察到的混杂误差。也就是说,我们不仅对观察到的混杂因素运行反驳测试,还对我们使用AddUnobservedCommonCause类模拟的未观察到的混杂因素运行反驳测试,并分析是否存在未观察到的(缺失的)强混杂因素需要加以考虑。
[ ]:
unobserved_confounding_error = Experiment(
experiment_name='Test',
experiment_id='2',
num_experiments=10, # 10
sample_sizes=sample_size,
dgps=dgp_list,
estimators=estimator_tuples,
refuters=refuter_tuples,
simulate_unobserved_confounding = True
)
# Run the experiment
res = pd.read_csv(unobserved_confounding_error.experiment())
[ ]:
##This plot shows the Mean Absolute Error of the Orginal Estimate from the true value and of the New Effect from
#the expected value for each estimator.
plot_MAEs(res)
基于原始估计的排名#
原始估计值是在真实值(即地面实况)存在的情况下计算的。然而,在许多现实生活中的数据集中,地面实况可能未知。因此,我们希望我们的反驳测试产生的排名与从原始估计值中获得的排名一致。根据原始估计值,估计器的排名应如下(MAE最小的方法应获得最佳排名):1. DMLCateEstimator 2. Propensity Score Matching 3. LinearRegression 4. LinearDRLearner
[ ]:
#This plot shows the deviation of the original estimate, the new effect and the estimated effect from the true value
refuter = 'dummy_outcome_refuter'
deviation_list = []
for estimator in estimator_list:
plot_estimators_and_refuters(refuter, estimator)
avg_deviation = ((res[refuter+':'+estimator+':NEW_EFFECT']).sum(axis=0))
print(avg_deviation)
deviation_list.append(avg_deviation)
[ ]:
plot_deviations(estimator_list, deviation_list)
for i in range(len(estimator_list)):
print(estimator_list[i] +": "+ str(deviation_list[i]))
[ ]:
{k: v for k, v in sorted(zip(estimator_list, deviation_list), key=lambda item: item[1], reverse = True)}
基于新效应的排名(反驳结果)#
基于偏差绝对值的排名是:1. DML 2. 线性DR学习器 3. 倾向得分匹配 4. 线性回归