Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
tensorflow
GitHub Repository: tensorflow/docs-l10n
Path: blob/master/site/ko/io/tutorials/kafka.ipynb
25118 views
Kernel: Python 3
#@title Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.

Kafka 및 Tensorflow-IO를 사용하여 스트리밍 데이터에 강력한 머신러닝 적용

개요

이 튜토리얼은 Kafka 클러스터에서 tf.data.Dataset로 데이터를 스트리밍한 다음, tf.keras와 연계하여 이 스트리밍 데이터를 훈련과 추론에 이용하는 방법을 소개합니다.

Kafka는 주로 데이터 파이프라인에서 확장 가능하고 내결함성이 있는 스트리밍 데이터를 제공하는 분산 이벤트 스트리밍 플랫폼입니다. 이 플랫폼은 중요 업무용 데이터 전송이 주요 요구 사항인 수많은 대기업의 필수 기술 구성 요소입니다.

참고: kafka 구성 요소에 대한 기본적인 이해가 있으면 튜토리얼을 진행하기가 더 쉽습니다.

참고: 이 튜토리얼을 실행하려면 Java 런타임 환경이 필요합니다.

설정

필요한 tensorflow-io 및 kafka 패키지 설치하기

!pip install tensorflow-io !pip install kafka-python

패키지 가져오기

import os from datetime import datetime import time import threading import json from kafka import KafkaProducer from kafka.errors import KafkaError from sklearn.model_selection import train_test_split import pandas as pd import tensorflow as tf import tensorflow_io as tfio

tf 및 tfio 가져오기 검증하기

print("tensorflow-io version: {}".format(tfio.__version__)) print("tensorflow version: {}".format(tf.__version__))

Kafka 및 Zookeeper 인스턴스 다운로드 및 설정하기

데모 목적으로 다음 인스턴스가 로컬에서 설정됩니다.

  • Kafka(브로커: 127.0.0.1:9092)

  • Zookeeper(노드: 127.0.0.1:2181)

!curl -sSOL https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz !tar -xzf kafka_2.13-3.1.0.tgz

인스턴스를 만들기 위해 기본 구성(Apache Kafka에서 제공)을 사용합니다.

!./kafka_2.13-3.1.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.1.0/config/zookeeper.properties !./kafka_2.13-3.1.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.1.0/config/server.properties !echo "Waiting for 10 secs until kafka and zookeeper services are up and running" !sleep 10

인스턴스가 데몬 프로세스로 시작되면 grep으로 프로세스 목록에서 kafka를 출력합니다. 두 개의 Java 프로세스는 zookeeper 및 kafka 인스턴스에 해당합니다.

!ps -ef | grep kafka

다음 사양으로 kafka 토픽을 만듭니다.

  • susy-train: partitions=1, replication-factor=1

  • susy-test: partitions=2, replication-factor=1

!./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic susy-train !./kafka_2.13-3.1.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 2 --topic susy-test

토픽의 구성에 관한 자세한 내용을 설명합니다.

!./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-train !./kafka_2.13-3.1.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic susy-test

복제 요소 1은 데이터가 복제되지 않음을 나타냅니다. 이는 kafka 설정에 하나의 브로커가 존재하기 때문입니다. 운영 시스템에서 부트스트랩 서버의 수는 수백 개 노드의 범위에 있을 수 있습니다. 여기서 복제를 사용한 내결함성의 필요성이 대두됩니다.

자세한 내용은 문서를 참조하세요.

SUSY 데이터세트

Kafka는 이벤트 스트리밍 플랫폼이므로 다양한 소스의 데이터가 여기에 기록될 수 있습니다. 예를 들면, 다음과 같습니다.

  • 웹 트래픽 로그

  • 천문학 측정

  • IoT 센서 데이터

  • 제품 리뷰 및 기타

이 튜토리얼의 목적을 위해 SUSY 데이터세트를 다운로드하고 데이터를 kafka에 수동으로 입력하겠습니다. 이 분류 문제의 목표는 초대칭 입자를 생성하는 신호 프로세스와 그렇지 않은 백그라운드 프로세스를 구분하는 것입니다.

!curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

데이터세트 살펴보기

첫 번째 열은 클래스 레이블(신호는 1, 배경은 0)이고, 이어서 18개의 특성(8개의 하위 수준 특성, 그리고 10개의 상위 수준 특성)이 뒤따릅니다. 처음 8개의 특성은 가속기의 입자 검출기에 의해 측정되는 운동학적 특성입니다. 마지막 10개 특성은 처음 8개 특성의 함수입니다. 이러한 특성은 물리학자들이 두 클래스를 쉽게 구분하기 위해 유도한 높은 수준의 특성입니다.

COLUMNS = [ # labels 'class', # low-level features 'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi', 'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi', 'missing_energy_magnitude', 'missing_energy_phi', # high-level derived features 'MET_rel', 'axial_MET', 'M_R', 'M_TR_2', 'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)' ]

전체 데이터세트는 5백만 개의 행으로 구성됩니다. 그러나 이 튜토리얼에서는 데이터 이동에 소요되는 시간을 줄이고 API의 기능을 이해하는 데 더 많은 시간을 할애하기 위해 데이터세트의 일부(100,000행)만 고려하겠습니다.

susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000) susy_df = next(susy_iterator) susy_df.head()
# Number of datapoints and columns len(susy_df), len(susy_df.columns)
# Number of datapoints belonging to each class (0: background noise, 1: signal) len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])

데이터세트 분할하기

train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True) print("Number of training samples: ",len(train_df)) print("Number of testing sample: ",len(test_df)) x_train_df = train_df.drop(["class"], axis=1) y_train_df = train_df["class"] x_test_df = test_df.drop(["class"], axis=1) y_test_df = test_df["class"] # The labels are set as the kafka message keys so as to store data # in multiple-partitions. Thus, enabling efficient data retrieval # using the consumer groups. x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:])) y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:])) x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:])) y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
NUM_COLUMNS = len(x_train_df.columns) len(x_train), len(y_train), len(x_test), len(y_test)

kafka에 훈련 및 테스트 데이터 저장하기

kafka에 데이터를 저장하면 훈련 및 추론 목적으로 데이터를 지속해서 원격으로 검색하기 위한 환경이 시뮬레이션됩니다.

def error_callback(exc): raise Exception('Error while sendig data to kafka: {0}'.format(str(exc))) def write_to_kafka(topic_name, items): count=0 producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092']) for message, key in items: producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback) count+=1 producer.flush() print("Wrote {0} messages into topic: {1}".format(count, topic_name)) write_to_kafka("susy-train", zip(x_train, y_train)) write_to_kafka("susy-test", zip(x_test, y_test))

tfio 훈련 데이터세트 정의하기

IODataset 클래스는 kafka에서 tensorflow로 데이터를 스트리밍하는 데 사용됩니다. 이 클래스는 tf.data.Dataset에서 상속되므로 tf.data.Dataset의 모든 유용한 기능을 즉시 사용할 수 있습니다.

def decode_kafka_item(item): message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(item.key) return (message, key) BATCH_SIZE=64 SHUFFLE_BUFFER_SIZE=64 train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0) train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE) train_ds = train_ds.map(decode_kafka_item) train_ds = train_ds.batch(BATCH_SIZE)

모델 구축 및 훈련하기

# Set the parameters OPTIMIZER="adam" LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True) METRICS=['accuracy'] EPOCHS=10
# design/build the model model = tf.keras.Sequential([ tf.keras.layers.Input(shape=(NUM_COLUMNS,)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(256, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dropout(0.4), tf.keras.layers.Dense(1, activation='sigmoid') ]) print(model.summary())
# compile the model model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model model.fit(train_ds, epochs=EPOCHS)

참고: 훈련 단계를 온라인 훈련과 혼동하지 마세요. 온라인 훈련은 완전히 다른 패러다임이며 이후 섹션에서 다룹니다.

데이터세트의 일부만 사용되기 때문에 훈련 단계에서 정확성이 ~78%로 제한됩니다. 그러나 모델의 성능 향상을 위해 kafka에 추가 데이터를 자유롭게 저장하기 바랍니다. 또한 tfio kafka 데이터세트의 기능을 보여주는 것이 목표였기 때문에 더 작고 덜 복잡한 신경망이 사용되었습니다. 그러나 탐구 목적으로 모델의 복잡성을 증가시키고, 학습 전략을 수정하고, 하이퍼 매개변수를 조정하는 등의 시도를 해볼 수 있습니다. 기본 접근 방식에 대해서는 이 문서를 참조하세요.

테스트 데이터 추론하기

내결함성과 함께 '정확히 한 번'의 의미 체계를 지켜 테스트 데이터를 추론하기 위해 streaming.KafkaGroupIODataset를 이용할 수 있습니다.

tfio 테스트 데이터세트 정의하기

stream_timeout 매개변수는 새 데이터 포인트가 토픽으로 스트리밍될 수 있도록 지정된 기간 동안 차단합니다. 이렇게 하면 데이터가 간헐적으로 토픽으로 스트리밍되는 경우, 새 데이터세트를 만들 필요가 없습니다.

test_ds = tfio.experimental.streaming.KafkaGroupIODataset( topics=["susy-test"], group_id="testcg", servers="127.0.0.1:9092", stream_timeout=10000, configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], ) def decode_kafka_test_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) test_ds = test_ds.map(decode_kafka_test_item) test_ds = test_ds.batch(BATCH_SIZE)

이 클래스는 훈련 목적으로 사용할 수 있지만 주의해야 할 사항들이 있습니다. 모든 메시지를 kafka에서 읽고 최신 오프셋이 streaming.KafkaGroupIODataset를 사용하여 커밋되면 소비자가 처음부터 메시지 읽기를 다시 시작하지 않습니다. 따라서, 훈련하는 동안 데이터가 지속적으로 유입되는 상태에서 한 번의 epoch 동안에만 훈련할 수 있습니다. 이러한 종류의 기능은 훈련 단계에서 사용되는 사례가 제한적이며, 일단 데이터 포인트가 모델에 의해 소비되면 더 이상 필요하지 않아 폐기할 수 있습니다.

그러나 이 기능은 정확히 한 번의 의미 체계를 가지고 강력한 추론이 필요한 경우에 그 가치를 드러냅니다.

테스트 데이터에 대한 성능 평가하기

res = model.evaluate(test_ds) print("test loss, test acc:", res)

추론은 '정확히 한 번'의 의미 체계를 기반으로 하므로, 테스트 세트에 대한 평가는 한 번만 실행할 수 있습니다. 테스트 데이터에서 추론을 다시 실행하려면 새 소비자 그룹을 사용해야 합니다.

testcg 소비자 그룹의 오프셋 지연 추적하기

!./kafka_2.13-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group testcg

current-offset이 모든 파티션의 log-end-offset과 일치하면, 소비자가 kafka 토픽에서 모든 메시지를 가져왔음을 나타냅니다.

온라인 학습

온라인 머신러닝 패러다임은 머신러닝 모델을 훈련하기 위한 전통적/기존 방식과는 약간 다릅니다. 전자의 경우, 모델은 새로운 데이터 포인트를 사용할 수 있게 되는 즉시 해당 매개변수를 점진적으로 학습/업데이트하며, 이 프로세스는 무기한 계속됩니다. 반면, 후자의 경우에는 데이터세트가 고정되고 모델이 n회 반복됩니다. 온라인 학습에서는 모델이 사용한 데이터를 훈련에 다시 사용할 수 없습니다.

streaming.KafkaBatchIODataset를 사용하면 이제 이러한 방식으로 모델을 훈련할 수 있습니다. SUSY 데이터세트를 계속해서 사용하여 이 기능을 시연하겠습니다.

온라인 학습을 위한 tfio 훈련 데이터세트

streaming.KafkaBatchIODataset는 API의 streaming.KafkaGroupIODataset와 유사합니다. 또한, stream_timeout 매개변수를 사용하여 시간 초과 전에 데이터세트가 새 메시지를 차단하는 기간을 구성하는 것이 좋습니다. 아래 인스턴스에서 데이터세트는 30000밀리초의 stream_timeout으로 구성됩니다. 이는 토픽의 모든 메시지가 소비된 후, 데이터세트가 시간 초과되고 kafka 클러스터에서 연결 해제되기 전에 추가로 30초 동안 대기함을 의미합니다. 시간이 초과되기 전에 새 메시지가 토픽으로 스트리밍되면 새로 소비된 데이터 포인트에 대해 데이터 소비 및 모델 훈련이 재개됩니다. 무기한 차단하려면 -1을 설정합니다.

online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset( topics=["susy-train"], group_id="cgonline", servers="127.0.0.1:9092", stream_timeout=10000, # in milliseconds, to block indefinitely, set it to -1. configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", "auto.offset.reset=earliest" ], )

online_train_ds가 생성하는 모든 항목은 그 자체로 tf.data.Dataset입니다. 따라서, 모든 표준 변환을 평소와 같이 적용할 수 있습니다.

def decode_kafka_online_item(raw_message, raw_key): message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)]) key = tf.strings.to_number(raw_key) return (message, key) for mini_ds in online_train_ds: mini_ds = mini_ds.shuffle(buffer_size=32) mini_ds = mini_ds.map(decode_kafka_online_item) mini_ds = mini_ds.batch(32) if len(mini_ds) > 0: model.fit(mini_ds, epochs=3)

점진적으로 훈련된 모델은 사용 사례에 따라 주기적으로 저장할 수 있으며, 온라인 또는 오프라인 모드에서 테스트 데이터를 추론하는 데 이용할 수 있습니다.

참고: streaming.KafkaBatchIODatasetstreaming.KafkaGroupIODataset은 아직 실험 단계에 있으며 사용자 피드백을 기반으로 개선할 여지가 남아 있습니다.

참고 자료:

  • Baldi, P., P. Sadowski, and D. Whiteson. “Searching for Exotic Particles in High-energy Physics with Deep Learning.” Nature Communications 5 (July 2, 2014)

  • SUSY 데이터세트: https://archive.ics.uci.edu/ml/datasets/SUSY#