-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_EP.py
executable file
·121 lines (114 loc) · 4.07 KB
/
test_EP.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/python
'''
Program:
This is a test program for Ensemble Photometry.
Usage:
test_EP.py
Editor:
Jacob975
20181009
#################################
update log
'''
import numpy as np
np.set_printoptions(threshold='nan')
import time
from photometry_lib import parabola, EP
import matplotlib.pyplot as plt
from uncertainties import unumpy, umath
# Convert flux to magnitude
def flux2mag(flux, err_flux):
# select the value without problem
index_proper_value = (err_flux > 0) & (flux > 0)
# find the mag in proper values.
uflux = unumpy.uarray(flux[index_proper_value], err_flux[index_proper_value])
umag = -2.5 * unumpy.log(uflux, 10)
proper_mag = unumpy.nominal_values(umag)
proper_err_mag = unumpy.std_devs(umag)
# fill up the rest with -999
mag = np.full(len(flux), -999.0)
err_mag = np.full(len(flux), -999.0)
mag[index_proper_value] = proper_mag
err_mag[index_proper_value] = proper_err_mag
return mag, err_mag
# Convert magnitude to flux
def mag2flux(mag, err_mag):
umag = unumpy.uarray(mag, err_mag)
uflux = 10 ** (-0.4*umag)
flux = unumpy.nominal_values(uflux)
err_flux = unumpy.std_devs(uflux)
return flux, err_flux
#--------------------------------------------
# Main code
if __name__ == "__main__":
# Measure time
start_time = time.time()
#----------------------------------------
t = np.arange(100)
# Period
T = 50
# standard deviation
stdev = 10
airmass = parabola(t, -0.3, 50, 1000)
mag_airmass = -2.5 * np.log10(airmass)
shifted_mag_airmass = mag_airmass - mag_airmass[0]
# Model a Target Star
'''
# Sine wave (radian)
intrinsic = np.sin(2 * np.pi * t / T) + 2
'''
# Stage func
intrinsic = np.ones(len(t)) * 2.0
intrinsic[:20] = 1.0
intrinsic[-20:] = 1.0
# Simulate the observe data.
mag_intrinsic = -2.5 * np.log10(intrinsic)
shifted_mag_intrinsic = mag_intrinsic - mag_intrinsic[0]
flux = airmass * intrinsic + np.random.normal(0, stdev, 100)
err_flux = np.sqrt(flux)
mag, err_mag = flux2mag(flux, err_flux)
target = np.transpose([ t, mag, err_mag])
# Model Comparison Stars
comparison_stars = []
for i in xrange(10):
flux_tmp = airmass + np.random.normal(0, stdev, 100)
err_flux_tmp = np.sqrt(flux_tmp)
mag_tmp, err_mag_tmp = flux2mag(flux_tmp, err_flux_tmp)
comparison_star = np.transpose([t, mag_tmp, err_mag_tmp])
comparison_stars.append(comparison_star)
comparison_stars = np.array(comparison_stars)
student = EP(target, comparison_stars)
ems, var_ems, m0s, var_m0s = student.make_airmass_model()
failure, correlated_target, matched = student.phot(target)
#----------------------------------------
# Plot the answers
# target
target_plt = plt.figure("target", figsize=(8, 6), dpi=100)
plt.subplot(2, 1, 1)
plt.title('observed_target')
plt.xlabel("time")
plt.ylabel('mag')
frame1 = plt.gca()
frame1.axes.get_xaxis().set_visible(False)
plt.errorbar(target[:,0], target[:,1] - target[0,1], yerr = target[:,2], fmt = 'ro', label = 'observed_target')
plt.legend()
plt.subplot(2, 1, 2)
plt.title('correlated_target')
plt.ylabel('flux')
plt.xlabel('time')
plt.scatter(t, shifted_mag_intrinsic, label = 'intrinsic_target', alpha = 0.5)
plt.errorbar(t, correlated_target[:,1] - correlated_target[0,1], yerr = correlated_target[:,2], fmt = 'ro', label = 'correlated_target', alpha = 0.5)
plt.legend()
# comparison_airmass
comparison_airmass_plt = plt.figure("comparison_airmass", figsize = (8, 3), dpi = 100)
plt.title('comparison_airmass')
plt.ylabel('flux')
plt.xlabel('time')
plt.plot(t, shifted_mag_airmass, label = 'intrinsic_airmass', alpha = 0.5)
plt.errorbar(t, ems, yerr = var_ems, fmt = 'ro', label = 'comparison_airmass', alpha = 0.5)
plt.legend()
plt.show()
#---------------------------------------
# Measure time
elapsed_time = time.time() - start_time
print "Exiting Main Program, spending ", elapsed_time, "seconds."