脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服务器之家 - 脚本之家 - Python - 用Python监控NASA TV直播画面的实现步骤

用Python监控NASA TV直播画面的实现步骤

2021-11-08 10:27Python中文社区 Python

本文分享一个名为"Spacestills"的开源程序,它可以用于查看 NASA TV 的直播画面(静止帧)

演示地址:

https://replit.com/@PaoloAmoroso/spacestills

用Python监控NASA TV直播画面的实现步骤

这是一个具有GUI的简单系统,它访问feed流并从Web下载数据。该程序仅需350行代码,并依赖于一些开源的Python库。

关于程序

Spacestills会定期从feed流中下载NASA TV静止帧并将其显示在GUI中。
该程序可以校正帧的纵横比,并将其保存为PNG格式。它会自动下载最新的帧,并提供手动重新加载,禁用自动重新加载或更改下载频率的选项。
Spacestillsis是一个比较初级的版本,但是它可以做一些有用的事情:捕获并保存NASA TV直播的太空事件图像。太空爱好者经常在社交网络或论坛共享他们从NASA TV手动获取的屏幕截图。Spacestills节省了使用屏幕捕获工具的时间,并保存了可供共享的图像文件。您可以在Replit上在线运行Spacestills。

开发环境

笔者用Replit开发了Spacestills。Replit是云上的开发,部署和协作环境,它支持包括Python在内的数十种编程语言和框架。作为Chrome操作系统和云计算爱好者,笔者非常喜欢Replit,因为它可以在浏览器中完全正常运行,无需下载或安装任何内容。

资源和依赖包

Spacestills依赖于一些外部资源和Python库。

NASA TV feed 流

肯尼迪航天中心的网站上有一个页面,其中包含精选的NASA视频流,包括NASA电视公共频道。feed流显示最新的静止帧并自动更新。
每个feed都带有三种尺寸的帧,Spacestills依赖于具有704x408像素帧的最大NASA TV feed流。最大更新频率为每45秒一次。因此,检索最新的静止帧就像从feed流的URL下载JPEG图像一样简单。
原始图像被垂直拉伸,看起来很奇怪。因此,该程序可以通过压缩图像并生成未失真的16:9版本来校正纵横比。

Python

因PySimpleGUI的原因需要安装 Python 3.6 版本。

第三方库

  • Pillow:图像处理
  • PySimpleGUI:GUI框架(Spacestills使用Tkinter后端)
  • Request:HTTP请求

完整代码

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
from io import BytesIO
from datetime import datetime, timedelta
from pathlib import Path
import requests
from requests.exceptions import Timeout
from PIL import Image
import PySimpleGUI as sg
 
 
FEED_URL = 'https://science.ksc.nasa.gov/shuttle/countdown/video/chan2large.jpg'
 
# Frame size without and with 16:9 aspect ratio correction
WIDTH = 704
HEIGHT = 480
HEIGHT_16_9 = 396
 
# Minimum, default, and maximum autoreload interval in seconds
MIN_DELTA = 45
DELTA = MIN_DELTA
MAX_DELTA = 300
 
 
class StillFrame():
    """Holds a still frame.
    
    The image is stored as a PNG PIL.Image and kept in PNG format.
    Attributes
    ----------
        image : PIL.Image
            A still frame
        original : PIL.Image
            Original frame with wchich the instance is initialized, cached in case of
            resizing to the original size
    
    Methods
    -------
        bytes : Return the raw bytes
        resize : Resize the screenshot
        new_size : Calculate new aspect ratio
    """
 
    def __init__(self, image):
        """Convert the image to PNG and cache the converted original.
        Parameters
        ----------
            image : PIL.Image
                Image to store
        """
        self.image = image
        self._topng()
        self.original = self.image
 
    def _topng(self):
        """Convert image format of frame to PNG.
        Returns
        -------
            StillFrame
                Frame with image in PNG format
        """
        if not self.image.format == 'PNG':
            png_file = BytesIO()
            self.image.save(png_file, 'png')
            png_file.seek(0)
            png_image = Image.open(png_file)
            self.image = png_image
        return self
 
    def bytes(self):
        """Return raw bytes of a frame image.
        
        Returns
        -------
            bytes
                Byte stream of the frame image
        """
        file = BytesIO()
        self.image.save(file'png')
        file.seek(0)
        return file.read()
 
    def new_size(self):
        """Return image size toggled between original and 16:9.
        
        Returns
        -------
            2-tuple
                New size
        """
        size = self.image.size
        original_size = self.original.size
        new_size = (WIDTH, HEIGHT_16_9) if size == original_size else (WIDTH, HEIGHT)
        return new_size
 
    def resize(self, new_size):
        """Resize frame image.
        
        Parameters
        ----------
            new_size : 2-tuple
                New size
        Returns
        -------
            StillFrame
                Frame with image resized
        """
        if not(self.image.size == new_size):
            self.image = self.image.resize(new_size)
        return self
    
 
def make_blank_image(size=(WIDTH, HEIGHT)):
    """Create a blank image with a blue background.
    
    Parameters
    ----------
        size : 2-tuple
            Image size
    
    Returns
    -------
        PIL.Image
            Blank image
    """
    image = Image.new('RGB', size=size, color='blue')
    return image
 
 
def download_image(url):
    """Download current NASA TV image.
    Parameters
    ----------
        url : str
            URL to download the image from
    
    Returns
    -------
        PIL.Image
            Downloaded image if no errors, otherwise blank image
    """
    try:
        response = requests.get(url, timeout=(0.50.5))
        if response.status_code == 200:
            image = Image.open(BytesIO(response.content))
        else:
            image = make_blank_image()
    except Timeout:
        image = make_blank_image()
    return image
 
 
def refresh(window, resize=False, feed=FEED_URL):
    """Display the latest still frame in window.
    
    Parameters
    ----------
        window : sg.Window
            Window to display the still to
        feed : string
            Feed URL
    
    Returns
    -------
        StillFrame
            Refreshed screenshot
    """
    still = StillFrame(download_image(feed))
    if resize:
        still = change_aspect_ratio(window, still, new_size=(WIDTH, HEIGHT_16_9))
    else:
        window['-IMAGE-'].update(data=still.bytes())
    return still
 
 
def change_aspect_ratio(window, still, new_size=(WIDTH, HEIGHT_16_9)):
    """Change the aspect ratio of the still displayed in window.
    
    Parameters
    ----------
        window : sg.Window
            Window containing the still
        new_size : 2-tuple
            New size of the still
    
    Returns
    -------
        StillFrame
            Frame containing the resized image
    """
    resized_still = still.resize(new_size)
    window['-IMAGE-'].update(data=resized_still.bytes())
    return resized_still
 
 
def save(still, path):
    """Save still to a file.
    Parameters
    ----------
        still : StillFrame
            Still to save
        path : string
            File name
    
    Returns
    -------
        Boolean
            True if file saved with no errors
    """
    filename = Path(path)
    try:
        with open(filename, 'wb') as file:
            file.write(still.bytes())
        saved = True
    except OSError:
        saved = False
    return saved
 
 
def next_timeout(delta):
    """Return the moment in time right now + delta seconds from now.
    Parameters
    ----------
        delta : int
            Time in seconds until the next timeout
    
    Returns
    -------
        datetime.datetime
            Moment in time of the next timeout
    """
    rightnow = datetime.now()
    return rightnow + timedelta(seconds=delta)
 
 
def timeout_due(next_timeout):
    """Return True if the next timeout is due.
    Parameters
    ----------
        next_timeout : datetime.datetime
    
    Returns
    -------
        bool
            True if the next timeout is due
    """
    rightnow = datetime.now()
    return rightnow >= next_timeout
 
 
def validate_delta(value):
    """Check if value is an int within the proper range for a time delta.
    Parameters
    ----------
        value : int
            Time in seconds until the next timeout
    
    Returns
    -------
        int
            Time in seconds until the next timeout
        bool
            True if the argument is a valid time delta
    """
    isinteger = False
    try:
        isinteger = type(int(value)) is int
    except Exception:
        delta = DELTA
    delta = int(value) if isinteger else delta
    isvalid = MIN_DELTA <= delta <= MAX_DELTA
    delta = delta if isvalid else DELTA
    return delta, isinteger and isvalid
 
 
LAYOUT = [[sg.Image(key='-IMAGE-')],
          [sg.Checkbox('Correct aspect ratio', key='-RESIZE-', enable_events=True),
           sg.Button('Reload', key='-RELOAD-'),
           sg.Button('Save', key='-SAVE-'),
           sg.Exit()],
          [sg.Checkbox('Auto-reload every (seconds):', key='-AUTORELOAD-',
                       default=True),
           sg.Input(DELTA, key='-DELTA-', size=(31), justification='right'),
           sg.Button('Set', key='-UPDATE_DELTA-')]]
 
 
def main(layout):
    """Run event loop."""
    window = sg.Window('Spacestills', layout, finalize=True)
    current_still = refresh(window)
 
    delta = DELTA
    next_reload_time = datetime.now() + timedelta(seconds=delta)
 
    while True:
        event, values = window.read(timeout=100)
        if event in (sg.WIN_CLOSED, 'Exit'):
            break
        elif ((event == '-RELOAD-'or
                (values['-AUTORELOAD-'and timeout_due(next_reload_time))):
            current_still = refresh(window, values['-RESIZE-'])
            if values['-AUTORELOAD-']:
                next_reload_time = next_timeout(delta)
        elif event == '-RESIZE-':
            current_still = change_aspect_ratio(
                window, current_still, current_still.new_size())
        elif event == '-SAVE-':
            filename = sg.popup_get_file(
                'File name', file_types=[('PNG''*.png')], save_as=True,
                title='Save image', default_extension='.png')
            if filename:
                saved = save(current_still, filename)
                if not saved:
                    sg.popup_ok('Error while saving file:', filename, title='Error')
        elif event == '-UPDATE_DELTA-':
            # The current cycle should complete at the already scheduled time. So
            # don't update next_reload_time yet because it'll be taken care of at the
            # next -AUTORELOAD- or -RELOAD- event.
            delta, valid = validate_delta(values['-DELTA-'])
            if not valid:
                window['-DELTA-'].update(str(DELTA))
 
    window.close()
    del window
 
 
if __name__ == '__main__':
    main(LAYOUT)

以上就是用 Python 监控 NASA TV 直播画面的实现步骤的详细内容,更多关于Python 监控 NASA TV 直播画面的资料请关注服务器之家其它相关文章!

原文链接:https://mp.weixin.qq.com/s/E6zS1tMHEq_cA5rELh1F0A

延伸 · 阅读

精彩推荐