Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
anasty17
GitHub Repository: anasty17/mirror-leech-telegram-bot
Path: blob/master/bot/helper/ext_utils/media_utils.py
1635 views
1
from PIL import Image
2
from aiofiles.os import remove, path as aiopath, makedirs
3
from asyncio import (
4
create_subprocess_exec,
5
gather,
6
wait_for,
7
)
8
from asyncio.subprocess import PIPE
9
from os import path as ospath
10
from re import search as re_search, escape
11
from time import time
12
from aioshutil import rmtree
13
14
from ... import LOGGER, DOWNLOAD_DIR, threads, cores
15
from .bot_utils import cmd_exec, sync_to_async
16
from .files_utils import get_mime_type, is_archive, is_archive_split
17
from .status_utils import time_to_seconds
18
19
20
async def create_thumb(msg, _id=""):
21
if not _id:
22
_id = time()
23
path = f"{DOWNLOAD_DIR}thumbnails"
24
else:
25
path = "thumbnails"
26
await makedirs(path, exist_ok=True)
27
photo_dir = await msg.download()
28
output = ospath.join(path, f"{_id}.jpg")
29
await sync_to_async(Image.open(photo_dir).convert("RGB").save, output, "JPEG")
30
await remove(photo_dir)
31
return output
32
33
34
async def get_media_info(path):
35
try:
36
result = await cmd_exec(
37
[
38
"ffprobe",
39
"-hide_banner",
40
"-loglevel",
41
"error",
42
"-print_format",
43
"json",
44
"-show_format",
45
path,
46
]
47
)
48
except Exception as e:
49
LOGGER.error(f"Get Media Info: {e}. Mostly File not found! - File: {path}")
50
return 0, None, None
51
if result[0] and result[2] == 0:
52
fields = eval(result[0]).get("format")
53
if fields is None:
54
LOGGER.error(f"get_media_info: {result}")
55
return 0, None, None
56
duration = round(float(fields.get("duration", 0)))
57
tags = fields.get("tags", {})
58
artist = tags.get("artist") or tags.get("ARTIST") or tags.get("Artist")
59
title = tags.get("title") or tags.get("TITLE") or tags.get("Title")
60
return duration, artist, title
61
return 0, None, None
62
63
64
async def get_document_type(path):
65
is_video, is_audio, is_image = False, False, False
66
if (
67
is_archive(path)
68
or is_archive_split(path)
69
or re_search(r".+(\.|_)(rar|7z|zip|bin)(\.0*\d+)?$", path)
70
):
71
return is_video, is_audio, is_image
72
mime_type = await sync_to_async(get_mime_type, path)
73
if mime_type.startswith("image"):
74
return False, False, True
75
try:
76
result = await cmd_exec(
77
[
78
"ffprobe",
79
"-hide_banner",
80
"-loglevel",
81
"error",
82
"-print_format",
83
"json",
84
"-show_streams",
85
path,
86
]
87
)
88
if result[1] and mime_type.startswith("video"):
89
is_video = True
90
except Exception as e:
91
LOGGER.error(f"Get Document Type: {e}. Mostly File not found! - File: {path}")
92
if mime_type.startswith("audio"):
93
return False, True, False
94
if not mime_type.startswith("video") and not mime_type.endswith("octet-stream"):
95
return is_video, is_audio, is_image
96
if mime_type.startswith("video"):
97
is_video = True
98
return is_video, is_audio, is_image
99
if result[0] and result[2] == 0:
100
fields = eval(result[0]).get("streams")
101
if fields is None:
102
LOGGER.error(f"get_document_type: {result}")
103
return is_video, is_audio, is_image
104
is_video = False
105
for stream in fields:
106
if stream.get("codec_type") == "video":
107
codec_name = stream.get("codec_name", "").lower()
108
if codec_name not in {"mjpeg", "png", "bmp"}:
109
is_video = True
110
elif stream.get("codec_type") == "audio":
111
is_audio = True
112
return is_video, is_audio, is_image
113
114
115
async def take_ss(video_file, ss_nb) -> bool:
116
duration = (await get_media_info(video_file))[0]
117
if duration != 0:
118
dirpath, name = video_file.rsplit("/", 1)
119
name, _ = ospath.splitext(name)
120
dirpath = f"{dirpath}/{name}_mltbss"
121
await makedirs(dirpath, exist_ok=True)
122
interval = duration // (ss_nb + 1)
123
cap_time = interval
124
cmds = []
125
for i in range(ss_nb):
126
output = f"{dirpath}/SS.{name}_{i:02}.png"
127
cmd = [
128
"taskset",
129
"-c",
130
f"{cores}",
131
"ffmpeg",
132
"-hide_banner",
133
"-loglevel",
134
"error",
135
"-ss",
136
f"{cap_time}",
137
"-i",
138
video_file,
139
"-q:v",
140
"1",
141
"-frames:v",
142
"1",
143
"-threads",
144
f"{threads}",
145
output,
146
]
147
cap_time += interval
148
cmds.append(cmd_exec(cmd))
149
try:
150
resutls = await wait_for(gather(*cmds), timeout=60)
151
if resutls[0][2] != 0:
152
LOGGER.error(
153
f"Error while creating screenshots from video. Path: {video_file}. stderr: {resutls[0][1]}"
154
)
155
await rmtree(dirpath, ignore_errors=True)
156
return False
157
except:
158
LOGGER.error(
159
f"Error while creating screenshots from video. Path: {video_file}. Error: Timeout some issues with ffmpeg with specific arch!"
160
)
161
await rmtree(dirpath, ignore_errors=True)
162
return False
163
return dirpath
164
else:
165
LOGGER.error("take_ss: Can't get the duration of video")
166
return False
167
168
169
async def get_audio_thumbnail(audio_file):
170
output_dir = f"{DOWNLOAD_DIR}thumbnails"
171
await makedirs(output_dir, exist_ok=True)
172
output = ospath.join(output_dir, f"{time()}.jpg")
173
cmd = [
174
"taskset",
175
"-c",
176
f"{cores}",
177
"ffmpeg",
178
"-hide_banner",
179
"-loglevel",
180
"error",
181
"-i",
182
audio_file,
183
"-an",
184
"-vcodec",
185
"copy",
186
"-threads",
187
f"{threads}",
188
output,
189
]
190
try:
191
_, err, code = await wait_for(cmd_exec(cmd), timeout=60)
192
if code != 0 or not await aiopath.exists(output):
193
LOGGER.error(
194
f"Error while extracting thumbnail from audio. Name: {audio_file} stderr: {err}"
195
)
196
return None
197
except:
198
LOGGER.error(
199
f"Error while extracting thumbnail from audio. Name: {audio_file}. Error: Timeout some issues with ffmpeg with specific arch!"
200
)
201
return None
202
return output
203
204
205
async def get_video_thumbnail(video_file, duration):
206
output_dir = f"{DOWNLOAD_DIR}thumbnails"
207
await makedirs(output_dir, exist_ok=True)
208
output = ospath.join(output_dir, f"{time()}.jpg")
209
if duration is None:
210
duration = (await get_media_info(video_file))[0]
211
if duration == 0:
212
duration = 3
213
duration = duration // 2
214
cmd = [
215
"taskset",
216
"-c",
217
f"{cores}",
218
"ffmpeg",
219
"-hide_banner",
220
"-loglevel",
221
"error",
222
"-ss",
223
f"{duration}",
224
"-i",
225
video_file,
226
"-vf",
227
"thumbnail",
228
"-q:v",
229
"1",
230
"-frames:v",
231
"1",
232
"-threads",
233
f"{threads}",
234
output,
235
]
236
try:
237
_, err, code = await wait_for(cmd_exec(cmd), timeout=60)
238
if code != 0 or not await aiopath.exists(output):
239
LOGGER.error(
240
f"Error while extracting thumbnail from video. Name: {video_file} stderr: {err}"
241
)
242
return None
243
except:
244
LOGGER.error(
245
f"Error while extracting thumbnail from video. Name: {video_file}. Error: Timeout some issues with ffmpeg with specific arch!"
246
)
247
return None
248
return output
249
250
251
async def get_multiple_frames_thumbnail(video_file, layout, keep_screenshots):
252
ss_nb = layout.split("x")
253
ss_nb = int(ss_nb[0]) * int(ss_nb[1])
254
dirpath = await take_ss(video_file, ss_nb)
255
if not dirpath:
256
return None
257
output_dir = f"{DOWNLOAD_DIR}thumbnails"
258
await makedirs(output_dir, exist_ok=True)
259
output = ospath.join(output_dir, f"{time()}.jpg")
260
cmd = [
261
"taskset",
262
"-c",
263
f"{cores}",
264
"ffmpeg",
265
"-hide_banner",
266
"-loglevel",
267
"error",
268
"-pattern_type",
269
"glob",
270
"-i",
271
f"{escape(dirpath)}/*.png",
272
"-vf",
273
f"tile={layout}, thumbnail",
274
"-q:v",
275
"1",
276
"-frames:v",
277
"1",
278
"-f",
279
"mjpeg",
280
"-threads",
281
f"{threads}",
282
output,
283
]
284
try:
285
_, err, code = await wait_for(cmd_exec(cmd), timeout=60)
286
if code != 0 or not await aiopath.exists(output):
287
LOGGER.error(
288
f"Error while combining thumbnails for video. Name: {video_file} stderr: {err}"
289
)
290
return None
291
except:
292
LOGGER.error(
293
f"Error while combining thumbnails from video. Name: {video_file}. Error: Timeout some issues with ffmpeg with specific arch!"
294
)
295
return None
296
finally:
297
if not keep_screenshots:
298
await rmtree(dirpath, ignore_errors=True)
299
return output
300
301
302
class FFMpeg:
303
304
def __init__(self, listener):
305
self._listener = listener
306
self._processed_bytes = 0
307
self._last_processed_bytes = 0
308
self._processed_time = 0
309
self._last_processed_time = 0
310
self._speed_raw = 0
311
self._progress_raw = 0
312
self._total_time = 0
313
self._eta_raw = 0
314
self._time_rate = 0.1
315
self._start_time = 0
316
317
@property
318
def processed_bytes(self):
319
return self._processed_bytes
320
321
@property
322
def speed_raw(self):
323
return self._speed_raw
324
325
@property
326
def progress_raw(self):
327
return self._progress_raw
328
329
@property
330
def eta_raw(self):
331
return self._eta_raw
332
333
def clear(self):
334
self._start_time = time()
335
self._processed_bytes = 0
336
self._processed_time = 0
337
self._speed_raw = 0
338
self._progress_raw = 0
339
self._eta_raw = 0
340
self._time_rate = 0.1
341
self._last_processed_time = 0
342
self._last_processed_bytes = 0
343
344
async def _ffmpeg_progress(self):
345
while not (
346
self._listener.subproc.returncode is not None
347
or self._listener.is_cancelled
348
or self._listener.subproc.stdout.at_eof()
349
):
350
try:
351
line = await wait_for(self._listener.subproc.stdout.readline(), 60)
352
except:
353
break
354
line = line.decode().strip()
355
if not line:
356
break
357
if "=" in line:
358
key, value = line.split("=", 1)
359
if value != "N/A":
360
if key == "total_size":
361
self._processed_bytes = int(value) + self._last_processed_bytes
362
self._speed_raw = self._processed_bytes / (
363
time() - self._start_time
364
)
365
elif key == "speed":
366
self._time_rate = max(0.1, float(value.strip("x")))
367
elif key == "out_time":
368
self._processed_time = (
369
time_to_seconds(value) + self._last_processed_time
370
)
371
try:
372
self._progress_raw = (
373
self._processed_time * 100
374
) / self._total_time
375
self._eta_raw = (
376
self._total_time - self._processed_time
377
) / self._time_rate
378
except:
379
self._progress_raw = 0
380
self._eta_raw = 0
381
382
async def ffmpeg_cmds(self, ffmpeg, f_path):
383
self.clear()
384
self._total_time = (await get_media_info(f_path))[0]
385
base_name, ext = ospath.splitext(f_path)
386
dir, base_name = base_name.rsplit("/", 1)
387
indices = [
388
index
389
for index, item in enumerate(ffmpeg)
390
if item.startswith("mltb") or item == "mltb"
391
]
392
outputs = []
393
for index in indices:
394
output_file = ffmpeg[index]
395
if output_file != "mltb" and output_file.startswith("mltb"):
396
bo, oext = ospath.splitext(output_file)
397
if oext:
398
if ext == oext:
399
prefix = f"ffmpeg{index}." if bo == "mltb" else ""
400
else:
401
prefix = ""
402
ext = ""
403
else:
404
prefix = ""
405
else:
406
prefix = f"ffmpeg{index}."
407
output = f"{dir}/{prefix}{output_file.replace("mltb", base_name)}{ext}"
408
outputs.append(output)
409
ffmpeg[index] = output
410
if self._listener.is_cancelled:
411
return False
412
self._listener.subproc = await create_subprocess_exec(
413
*ffmpeg, stdout=PIPE, stderr=PIPE
414
)
415
await self._ffmpeg_progress()
416
_, stderr = await self._listener.subproc.communicate()
417
code = self._listener.subproc.returncode
418
if self._listener.is_cancelled:
419
return False
420
if code == 0:
421
return outputs
422
elif code == -9:
423
self._listener.is_cancelled = True
424
return False
425
else:
426
try:
427
stderr = stderr.decode().strip()
428
except:
429
stderr = "Unable to decode the error!"
430
LOGGER.error(
431
f"{stderr}. Something went wrong while running ffmpeg cmd, mostly file requires different/specific arguments. Path: {f_path}"
432
)
433
for op in outputs:
434
if await aiopath.exists(op):
435
await remove(op)
436
return False
437
438
async def convert_video(self, video_file, ext, retry=False):
439
self.clear()
440
self._total_time = (await get_media_info(video_file))[0]
441
base_name = ospath.splitext(video_file)[0]
442
output = f"{base_name}.{ext}"
443
if retry:
444
cmd = [
445
"taskset",
446
"-c",
447
f"{cores}",
448
"ffmpeg",
449
"-hide_banner",
450
"-loglevel",
451
"error",
452
"-progress",
453
"pipe:1",
454
"-i",
455
video_file,
456
"-map",
457
"0",
458
"-c:v",
459
"libx264",
460
"-c:a",
461
"aac",
462
"-threads",
463
f"{threads}",
464
output,
465
]
466
if ext == "mp4":
467
cmd[17:17] = ["-c:s", "mov_text"]
468
elif ext == "mkv":
469
cmd[17:17] = ["-c:s", "ass"]
470
else:
471
cmd[17:17] = ["-c:s", "copy"]
472
else:
473
cmd = [
474
"taskset",
475
"-c",
476
f"{cores}",
477
"ffmpeg",
478
"-hide_banner",
479
"-loglevel",
480
"error",
481
"-progress",
482
"pipe:1",
483
"-i",
484
video_file,
485
"-map",
486
"0",
487
"-c",
488
"copy",
489
"-threads",
490
f"{threads}",
491
output,
492
]
493
if self._listener.is_cancelled:
494
return False
495
self._listener.subproc = await create_subprocess_exec(
496
*cmd, stdout=PIPE, stderr=PIPE
497
)
498
await self._ffmpeg_progress()
499
_, stderr = await self._listener.subproc.communicate()
500
code = self._listener.subproc.returncode
501
if self._listener.is_cancelled:
502
return False
503
if code == 0:
504
return output
505
elif code == -9:
506
self._listener.is_cancelled = True
507
return False
508
else:
509
if await aiopath.exists(output):
510
await remove(output)
511
if not retry:
512
return await self.convert_video(video_file, ext, True)
513
try:
514
stderr = stderr.decode().strip()
515
except:
516
stderr = "Unable to decode the error!"
517
LOGGER.error(
518
f"{stderr}. Something went wrong while converting video, mostly file need specific codec. Path: {video_file}"
519
)
520
return False
521
522
async def convert_audio(self, audio_file, ext):
523
self.clear()
524
self._total_time = (await get_media_info(audio_file))[0]
525
base_name = ospath.splitext(audio_file)[0]
526
output = f"{base_name}.{ext}"
527
cmd = [
528
"taskset",
529
"-c",
530
f"{cores}",
531
"ffmpeg",
532
"-hide_banner",
533
"-loglevel",
534
"error",
535
"-progress",
536
"pipe:1",
537
"-i",
538
audio_file,
539
"-threads",
540
f"{threads}",
541
output,
542
]
543
if self._listener.is_cancelled:
544
return False
545
self._listener.subproc = await create_subprocess_exec(
546
*cmd, stdout=PIPE, stderr=PIPE
547
)
548
await self._ffmpeg_progress()
549
_, stderr = await self._listener.subproc.communicate()
550
code = self._listener.subproc.returncode
551
if self._listener.is_cancelled:
552
return False
553
if code == 0:
554
return output
555
elif code == -9:
556
self._listener.is_cancelled = True
557
return False
558
else:
559
try:
560
stderr = stderr.decode().strip()
561
except:
562
stderr = "Unable to decode the error!"
563
LOGGER.error(
564
f"{stderr}. Something went wrong while converting audio, mostly file need specific codec. Path: {audio_file}"
565
)
566
if await aiopath.exists(output):
567
await remove(output)
568
return False
569
570
async def sample_video(self, video_file, sample_duration, part_duration):
571
self.clear()
572
self._total_time = sample_duration
573
dir, name = video_file.rsplit("/", 1)
574
output_file = f"{dir}/SAMPLE.{name}"
575
segments = [(0, part_duration)]
576
duration = (await get_media_info(video_file))[0]
577
remaining_duration = duration - (part_duration * 2)
578
parts = (sample_duration - (part_duration * 2)) // part_duration
579
time_interval = remaining_duration // parts
580
next_segment = time_interval
581
for _ in range(parts):
582
segments.append((next_segment, next_segment + part_duration))
583
next_segment += time_interval
584
segments.append((duration - part_duration, duration))
585
586
filter_complex = ""
587
for i, (start, end) in enumerate(segments):
588
filter_complex += (
589
f"[0:v]trim=start={start}:end={end},setpts=PTS-STARTPTS[v{i}]; "
590
)
591
filter_complex += (
592
f"[0:a]atrim=start={start}:end={end},asetpts=PTS-STARTPTS[a{i}]; "
593
)
594
595
for i in range(len(segments)):
596
filter_complex += f"[v{i}][a{i}]"
597
598
filter_complex += f"concat=n={len(segments)}:v=1:a=1[vout][aout]"
599
600
cmd = [
601
"taskset",
602
"-c",
603
f"{cores}",
604
"ffmpeg",
605
"-hide_banner",
606
"-loglevel",
607
"error",
608
"-progress",
609
"pipe:1",
610
"-i",
611
video_file,
612
"-filter_complex",
613
filter_complex,
614
"-map",
615
"[vout]",
616
"-map",
617
"[aout]",
618
"-c:v",
619
"libx264",
620
"-c:a",
621
"aac",
622
"-threads",
623
f"{threads}",
624
output_file,
625
]
626
627
if self._listener.is_cancelled:
628
return False
629
self._listener.subproc = await create_subprocess_exec(
630
*cmd, stdout=PIPE, stderr=PIPE
631
)
632
await self._ffmpeg_progress()
633
_, stderr = await self._listener.subproc.communicate()
634
code = self._listener.subproc.returncode
635
if self._listener.is_cancelled:
636
return False
637
if code == -9:
638
self._listener.is_cancelled = True
639
return False
640
elif code == 0:
641
return output_file
642
else:
643
try:
644
stderr = stderr.decode().strip()
645
except Exception:
646
stderr = "Unable to decode the error!"
647
LOGGER.error(
648
f"{stderr}. Something went wrong while creating sample video, mostly file is corrupted. Path: {video_file}"
649
)
650
if await aiopath.exists(output_file):
651
await remove(output_file)
652
return False
653
654
async def split(self, f_path, file_, parts, split_size):
655
self.clear()
656
multi_streams = True
657
self._total_time = duration = (await get_media_info(f_path))[0]
658
base_name, extension = ospath.splitext(file_)
659
split_size -= 3000000
660
start_time = 0
661
i = 1
662
while i <= parts or start_time < duration - 4:
663
out_path = f_path.replace(file_, f"{base_name}.part{i:03}{extension}")
664
cmd = [
665
"taskset",
666
"-c",
667
f"{cores}",
668
"ffmpeg",
669
"-hide_banner",
670
"-loglevel",
671
"error",
672
"-progress",
673
"pipe:1",
674
"-ss",
675
str(start_time),
676
"-i",
677
f_path,
678
"-fs",
679
str(split_size),
680
"-map",
681
"0",
682
"-map_chapters",
683
"-1",
684
"-async",
685
"1",
686
"-strict",
687
"-2",
688
"-c",
689
"copy",
690
"-threads",
691
f"{threads}",
692
out_path,
693
]
694
if not multi_streams:
695
del cmd[15]
696
del cmd[15]
697
if self._listener.is_cancelled:
698
return False
699
self._listener.subproc = await create_subprocess_exec(
700
*cmd, stdout=PIPE, stderr=PIPE
701
)
702
await self._ffmpeg_progress()
703
_, stderr = await self._listener.subproc.communicate()
704
code = self._listener.subproc.returncode
705
if self._listener.is_cancelled:
706
return False
707
if code == -9:
708
self._listener.is_cancelled = True
709
return False
710
elif code != 0:
711
try:
712
stderr = stderr.decode().strip()
713
except:
714
stderr = "Unable to decode the error!"
715
try:
716
await remove(out_path)
717
except:
718
pass
719
if multi_streams:
720
LOGGER.warning(
721
f"{stderr}. Retrying without map, -map 0 not working in all situations. Path: {f_path}"
722
)
723
multi_streams = False
724
continue
725
else:
726
LOGGER.warning(
727
f"{stderr}. Unable to split this video, if it's size less than {self._listener.max_split_size} will be uploaded as it is. Path: {f_path}"
728
)
729
return False
730
out_size = await aiopath.getsize(out_path)
731
if out_size > self._listener.max_split_size:
732
split_size -= (out_size - self._listener.max_split_size) + 5000000
733
LOGGER.warning(
734
f"Part size is {out_size}. Trying again with lower split size!. Path: {f_path}"
735
)
736
await remove(out_path)
737
continue
738
lpd = (await get_media_info(out_path))[0]
739
if lpd == 0:
740
LOGGER.error(
741
f"Something went wrong while splitting, mostly file is corrupted. Path: {f_path}"
742
)
743
break
744
elif duration == lpd:
745
LOGGER.warning(
746
f"This file has been splitted with default stream and audio, so you will only see one part with less size from original one because it doesn't have all streams and audios. This happens mostly with MKV videos. Path: {f_path}"
747
)
748
break
749
elif lpd <= 3:
750
await remove(out_path)
751
break
752
self._last_processed_time += lpd
753
self._last_processed_bytes += out_size
754
start_time += lpd - 3
755
i += 1
756
return True
757
758
759