Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
shivamshrirao
GitHub Repository: shivamshrirao/diffusers
Path: blob/main/tests/schedulers/test_scheduler_ipndm.py
1448 views
1
import tempfile
2
3
import torch
4
5
from diffusers import IPNDMScheduler
6
7
from .test_schedulers import SchedulerCommonTest
8
9
10
class IPNDMSchedulerTest(SchedulerCommonTest):
11
scheduler_classes = (IPNDMScheduler,)
12
forward_default_kwargs = (("num_inference_steps", 50),)
13
14
def get_scheduler_config(self, **kwargs):
15
config = {"num_train_timesteps": 1000}
16
config.update(**kwargs)
17
return config
18
19
def check_over_configs(self, time_step=0, **config):
20
kwargs = dict(self.forward_default_kwargs)
21
num_inference_steps = kwargs.pop("num_inference_steps", None)
22
sample = self.dummy_sample
23
residual = 0.1 * sample
24
dummy_past_residuals = [residual + 0.2, residual + 0.15, residual + 0.1, residual + 0.05]
25
26
for scheduler_class in self.scheduler_classes:
27
scheduler_config = self.get_scheduler_config(**config)
28
scheduler = scheduler_class(**scheduler_config)
29
scheduler.set_timesteps(num_inference_steps)
30
# copy over dummy past residuals
31
scheduler.ets = dummy_past_residuals[:]
32
33
if time_step is None:
34
time_step = scheduler.timesteps[len(scheduler.timesteps) // 2]
35
36
with tempfile.TemporaryDirectory() as tmpdirname:
37
scheduler.save_config(tmpdirname)
38
new_scheduler = scheduler_class.from_pretrained(tmpdirname)
39
new_scheduler.set_timesteps(num_inference_steps)
40
# copy over dummy past residuals
41
new_scheduler.ets = dummy_past_residuals[:]
42
43
output = scheduler.step(residual, time_step, sample, **kwargs).prev_sample
44
new_output = new_scheduler.step(residual, time_step, sample, **kwargs).prev_sample
45
46
assert torch.sum(torch.abs(output - new_output)) < 1e-5, "Scheduler outputs are not identical"
47
48
output = scheduler.step(residual, time_step, sample, **kwargs).prev_sample
49
new_output = new_scheduler.step(residual, time_step, sample, **kwargs).prev_sample
50
51
assert torch.sum(torch.abs(output - new_output)) < 1e-5, "Scheduler outputs are not identical"
52
53
def test_from_save_pretrained(self):
54
pass
55
56
def check_over_forward(self, time_step=0, **forward_kwargs):
57
kwargs = dict(self.forward_default_kwargs)
58
num_inference_steps = kwargs.pop("num_inference_steps", None)
59
sample = self.dummy_sample
60
residual = 0.1 * sample
61
dummy_past_residuals = [residual + 0.2, residual + 0.15, residual + 0.1, residual + 0.05]
62
63
for scheduler_class in self.scheduler_classes:
64
scheduler_config = self.get_scheduler_config()
65
scheduler = scheduler_class(**scheduler_config)
66
scheduler.set_timesteps(num_inference_steps)
67
68
# copy over dummy past residuals (must be after setting timesteps)
69
scheduler.ets = dummy_past_residuals[:]
70
71
if time_step is None:
72
time_step = scheduler.timesteps[len(scheduler.timesteps) // 2]
73
74
with tempfile.TemporaryDirectory() as tmpdirname:
75
scheduler.save_config(tmpdirname)
76
new_scheduler = scheduler_class.from_pretrained(tmpdirname)
77
# copy over dummy past residuals
78
new_scheduler.set_timesteps(num_inference_steps)
79
80
# copy over dummy past residual (must be after setting timesteps)
81
new_scheduler.ets = dummy_past_residuals[:]
82
83
output = scheduler.step(residual, time_step, sample, **kwargs).prev_sample
84
new_output = new_scheduler.step(residual, time_step, sample, **kwargs).prev_sample
85
86
assert torch.sum(torch.abs(output - new_output)) < 1e-5, "Scheduler outputs are not identical"
87
88
output = scheduler.step(residual, time_step, sample, **kwargs).prev_sample
89
new_output = new_scheduler.step(residual, time_step, sample, **kwargs).prev_sample
90
91
assert torch.sum(torch.abs(output - new_output)) < 1e-5, "Scheduler outputs are not identical"
92
93
def full_loop(self, **config):
94
scheduler_class = self.scheduler_classes[0]
95
scheduler_config = self.get_scheduler_config(**config)
96
scheduler = scheduler_class(**scheduler_config)
97
98
num_inference_steps = 10
99
model = self.dummy_model()
100
sample = self.dummy_sample_deter
101
scheduler.set_timesteps(num_inference_steps)
102
103
for i, t in enumerate(scheduler.timesteps):
104
residual = model(sample, t)
105
sample = scheduler.step(residual, t, sample).prev_sample
106
107
for i, t in enumerate(scheduler.timesteps):
108
residual = model(sample, t)
109
sample = scheduler.step(residual, t, sample).prev_sample
110
111
return sample
112
113
def test_step_shape(self):
114
kwargs = dict(self.forward_default_kwargs)
115
116
num_inference_steps = kwargs.pop("num_inference_steps", None)
117
118
for scheduler_class in self.scheduler_classes:
119
scheduler_config = self.get_scheduler_config()
120
scheduler = scheduler_class(**scheduler_config)
121
122
sample = self.dummy_sample
123
residual = 0.1 * sample
124
125
if num_inference_steps is not None and hasattr(scheduler, "set_timesteps"):
126
scheduler.set_timesteps(num_inference_steps)
127
elif num_inference_steps is not None and not hasattr(scheduler, "set_timesteps"):
128
kwargs["num_inference_steps"] = num_inference_steps
129
130
# copy over dummy past residuals (must be done after set_timesteps)
131
dummy_past_residuals = [residual + 0.2, residual + 0.15, residual + 0.1, residual + 0.05]
132
scheduler.ets = dummy_past_residuals[:]
133
134
time_step_0 = scheduler.timesteps[5]
135
time_step_1 = scheduler.timesteps[6]
136
137
output_0 = scheduler.step(residual, time_step_0, sample, **kwargs).prev_sample
138
output_1 = scheduler.step(residual, time_step_1, sample, **kwargs).prev_sample
139
140
self.assertEqual(output_0.shape, sample.shape)
141
self.assertEqual(output_0.shape, output_1.shape)
142
143
output_0 = scheduler.step(residual, time_step_0, sample, **kwargs).prev_sample
144
output_1 = scheduler.step(residual, time_step_1, sample, **kwargs).prev_sample
145
146
self.assertEqual(output_0.shape, sample.shape)
147
self.assertEqual(output_0.shape, output_1.shape)
148
149
def test_timesteps(self):
150
for timesteps in [100, 1000]:
151
self.check_over_configs(num_train_timesteps=timesteps, time_step=None)
152
153
def test_inference_steps(self):
154
for t, num_inference_steps in zip([1, 5, 10], [10, 50, 100]):
155
self.check_over_forward(num_inference_steps=num_inference_steps, time_step=None)
156
157
def test_full_loop_no_noise(self):
158
sample = self.full_loop()
159
result_mean = torch.mean(torch.abs(sample))
160
161
assert abs(result_mean.item() - 2540529) < 10
162
163