Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

TensorFlow

import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf

print(sys.version)
print(tf.__version__)

CPU만 사용하도록 설정

tf.config.set_visible_devices([], device_type='GPU')

1Tensor

1.1Ranks

scalar = rank_0_tensor = tf.constant(1)
scalar
vector = rank_1_tensor = tf.constant([1, 2])
vector
matrix = rank_2_tensor = tf.constant([[0, 1, 2], [3, 4, 5]], dtype='float16')
matrix
rank_3_tensor = tf.constant([
    [[1, 2], 
    [3, 4]],
    [[5, 6], 
    [7, 8]]
])
rank_3_tensor.shape

1.2텐서 형상 조정

tf.reshape(matrix, shape=tf.size(matrix))
tf.reshape(matrix, (3, 2))
tf.transpose(matrix)

NumPy.ndarray와 TensorFlow.Tensor

vector = tf.constant(np.arange(10))
vector
vector.numpy()
tf.ones(shape=(2, 1))
tf.zeros(shape=(2, 1))
tf.random.normal(shape=(3, 1))

2산술 연산

a = tf.constant([[1, 2], [3, 4]])
a - a
a = tf.constant([[1, 2], [3, 4]])
b = a + 1
c = a * b
c
a = tf.constant([[1, 2], [3, 4]])
b = tf.constant([[-1, -1], [1, 1]])
tf.matmul(a, b)
try:
    print('NumPy')
    x = np.ones(shape=(2, 2))
    x[0, 0] = 0
    print(x)
except:
    pass

try:
    print('TensorFlow')
    x = tf.ones(shape=(2, 2))
    x[0, 0] = 0
except TypeError as e:
    print(e)

2.1차원 조정

a = tf.constant(tf.range(5))
A1 = tf.expand_dims(a, axis=0)
A2 = tf.expand_dims(a, axis=1)
print(a.shape, A1.shape, A2.shape)
print(tf.squeeze(A1).shape, tf.squeeze(A2).shape)
a - a
a - A1
a - A2
assert tf.tensordot(a, a, axes=1) == tf.reduce_sum(a*a)
assert tf.matmul(A1, tf.transpose(A1)) \
    == tf.matmul(A1, A2) \
    == tf.tensordot(A1, A2, axes=1)

3TensorFlow 변수(variable)

v = tf.Variable(initial_value=tf.zeros((3, 4)))
v.assign(tf.ones((3, 4)))
v[0, 0].assign(0)
v
v.assign_add(tf.ones(v.shape))
v.assign_sub(tf.ones(v.shape))
v
x = tf.Variable(np.arange(10))
y = x ** 2
y

4경사 산출

f(x)=x2fx=2xf(x) = x^2 \rightarrow \frac{\partial f}{\partial x}=2x
x = tf.Variable(1.0)
with tf.GradientTape() as tape:
    y = x ** 2

dydx = tape.gradient(y, x)
dydx.numpy()
x = tf.Variable(tf.range(-5, 5, 0.1))
f = lambda x: x ** 2

with tf.GradientTape() as tape:
    y = f(x)

dydx = tape.gradient(y, x)

def gradient_descent(f, init_x, lr=0.01, step_num=100):
    x = tf.Variable(init_x)
    x_history = [tf.constant(x)]
    for i in range(step_num):
        with tf.GradientTape() as tape:
            y = f(x)
        grad = tape.gradient(y, x)
        x.assign_sub(lr * grad)
        x_history.append(tf.constant(x))
    return tf.constant([xi.numpy() for xi in x_history])

x_history = gradient_descent(f, init_x=tf.constant(-4.0), lr=0.1, step_num=100)
plt.plot(x.numpy(), y.numpy())
plt.plot(x.numpy(), dydx.numpy())
plt.plot(x_history.numpy(), f(x_history).numpy(), 'ro')
plt.plot(0, 0, 'o')
plt.show()
p(t)=4.9t2p(t) = 4.9t ^ 2
v(t)=9.8tv(t) = 9.8t
a=9.8a = 9.8
time = tf.Variable(0.)
with tf.GradientTape() as second_tape:
    with tf.GradientTape() as first_tape:
        position = 4.9 * time ** 2
    speed = first_tape.gradient(position, time)
acceleration = second_tape.gradient(speed, time)
print(position, speed, acceleration)
time = tf.Variable(initial_value=np.arange(1, 11), dtype=tf.float32)
with tf.GradientTape() as second_tape:
    with tf.GradientTape() as first_tape:
        position = 4.9 * time ** 2
    speed = first_tape.gradient(position, time)
acceleration = second_tape.gradient(speed, time)

pd.DataFrame(np.vstack([position.numpy(), speed.numpy(), acceleration.numpy()]), index=['postion', 'speed', 'acceleration']).T

tf.Variable 과 최적화(optimizer)

class TFLinearRegression:
    def __init__(self, w=None, b=None):
        self.w = tf.Variable(w, dtype=tf.float32) if w is not None else None
        self.b = tf.Variable(b, dtype=tf.float32) if b is not None else None

    def __call__(self, Xs):
        Xs = tf.constant(Xs, dtype=tf.float32)
        return tf.matmul(Xs, tf.expand_dims(self.w, axis=-1)) + self.b
    
    def fit(self, Xs, ys, 학습횟수, 학습률):
        # 데이터셋을 텐서플로우 자료형으로 변환
        Xs = tf.constant(Xs, dtype=tf.float32)
        ys = tf.constant(ys, dtype=tf.float32)
        # 1. 매개변수 초기화
        표본수, 특성차원 = Xs.shape
        self.w = tf.Variable(tf.zeros(특성차원), dtype=tf.float32)
        self.b = tf.Variable(0.0, dtype=tf.float32)
        # 2. 최적화
        # 1) 방정식 (소규모; 저차원 데이터셋)
        # 2) 경사하강법 (근사적 최적해. 대규모; 고차원 데이터셋)
        loss_history = []
        for i in range(학습횟수):
            print(f'[학습 {i}] w={self.w.numpy()[0]:.3f}, b={self.b.numpy():.3f}', end=' ')
            # 손실에 대한 경사 산출
            with tf.GradientTape() as tape:
                Ys_pred = self(Xs)
                y_pred = tf.squeeze(Ys_pred) # (표본수, 1) -> (표본수,)
                loss = tf.reduce_mean(tf.square(y_pred - ys))            
            dw, db = tape.gradient(loss, [self.w, self.b])
            # 매개변수 갱신: 경사 하강
            self.w.assign_sub(dw * 학습률)
            self.b.assign_sub(db * 학습률)
            
            loss_history.append(loss.numpy())
            print(f'손실={loss.numpy():.3f}')
        return loss_history
from sklearn.linear_model import LinearRegression # 선형회귀 알고리즘

random = np.random.RandomState(1)
xs = np.linspace(-10, 10, 100)
noise = random.randn(len(xs))
ys = xs + noise

Xs = xs.reshape(-1, 1) # 배치 입력으로 형상
model = LinearRegression()
model.fit(Xs, ys)
w = model.coef_
b = model.intercept_

가내수공업_모델 = TFLinearRegression()
# assert np.allclose(model.predict(Xs), 가내수공업_모델(Xs).numpy().flatten())
loss_history = 가내수공업_모델.fit(Xs, ys, 학습횟수=10, 학습률=0.01)

plt.scatter(xs, ys)
plt.plot(xs, ys - noise, c='red', linestyle='--')
plt.plot(xs, model.predict(Xs), c='black')
plt.plot(xs, 가내수공업_모델(Xs), c='blue')
plt.show()

5실행 방식

  1. 즉시 실행 (Eager Execution)

  2. 그래프 실행 (Graph Execution)

def affine(x, w, b):
    return tf.matmul(x, w) + b

graph_affine = tf.function(affine)

X = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
w = tf.random.normal((X.shape[-1], 1))
b = tf.random.normal((1,))

z1 = affine(X, w, b)
z2 = graph_affine(X, w, b)

assert np.all(z1 == z2)
print(tf.autograph.to_code(affine))

그래프 함수(tf.function)는 호출하는 모든 하위 함수에 적용됩니다.

def sigmoid(x):
    return 1 / (1 + tf.exp(-x))

@tf.function
def dense(x, w, b):
    return sigmoid(affine(x, w, b))

X = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
w = tf.random.normal((X.shape[-1], 1))
b = tf.random.normal((1,))

dense(X, w, b)
print(tf.autograph.to_code(dense.python_function))
@tf.function
def mean_squared_error(y_true, y_pred):
    print('한 번만 출력')
    tf.print('매번 출력')
    return tf.reduce_mean((y_true - y_pred) ** 2)

for _ in range(3):
    y_true = tf.random.uniform([5], maxval=10, dtype=tf.int32)
    y_pred = tf.random.uniform([5], maxval=10, dtype=tf.int32)
    loss = mean_squared_error(y_true, y_pred)
    print(f'Loss={loss:.2f}')

5.1즉시 실행과 그래프 실행 실행 속도 비교

그래프 실행은 간단한 작은 연산들이 많은 경우, 실행 속도 향상에 도움이 될 수 있습니다.

import timeit

def forward(x, layers):
    for layer in layers:
        x = layer(x)
    return x

forward_graph = tf.function(forward)

dense = tf.keras.layers.Dense(100, activation='relu')

x = tf.random.normal((1, 100))
assert np.all(forward(x, [dense] * 2) == forward_graph(x, [dense] * 2))

layers = [dense] * 100
eager_time = timeit.timeit(lambda: forward(x, layers), number=1000)
graph_time = timeit.timeit(lambda: forward_graph(x, layers), number=1000)
    
print(f'Eager: {eager_time:.2f} Vs. Graph: {graph_time:.2f}')

합성곱 (convolution) 연산처럼 각 연산의 비용이 높지만 연산의 개수 자체가 적은 경우는 그래프 실행으로 연산 속도가 크게 개선되지 않을 수 있습니다.

import timeit

def forward(x, layers):
    for layer in layers:
        x = layer(x)
    return x

forward_graph = tf.function(forward)

x = tf.random.normal((1, 100, 100, 100))
conv_layer = tf.keras.layers.Conv2D(100, (3, 3), activation='relu')
assert np.all(forward(x, [conv_layer]) == forward_graph(x, [conv_layer]))

results = {}
for n_layers in range(1, 11):    
    layers = [conv_layer] * n_layers
    print(f'#layers: {len(layers)}')
    
    eager_time = timeit.timeit(lambda: forward(x, layers), number=100)
    graph_time = timeit.timeit(lambda: forward_graph(x, layers), number=100)
    
    print(f'Eager: {eager_time:.2f} Vs. Graph: {graph_time:.2f}')
    results[n_layers] = {
        'Eager': eager_time,
        'Graph': graph_time
    }

pd.DataFrame(results)

6MLP 구현

class TF완전연결:
    def __init__(self, 입력수, 출력수, 활성화=None):
        self.W = tf.Variable(tf.random.normal((입력수, 출력수)))
        self.b = tf.Variable(tf.zeros(출력수))
        self.activation = 활성화

    def __call__(self, Xs):
        z = tf.matmul(Xs, self.W) + self.b
        if self.activation:
            return self.activation(z)
        return z
    
class TF신경망:
    def __init__(self, 손실함수):
        self.layers = []
        self.loss_func = 손실함수

    def add(self, layer):
        self.layers.append(layer)

    def __call__(self, Xs):
        Xs = tf.convert_to_tensor(Xs, dtype=tf.float32)
        outputs = Xs
        for layer in self.layers:
            outputs = layer(outputs)
        return outputs
    
    def fit(self, data, target, 배치크기, 에폭수, 학습률):
        # 학습횟수
        표본수 = len(data)
        에폭당_배치수 = 표본수 // 배치크기 # 소수점 이하 버림
        학습횟수 = 에폭당_배치수 * 에폭수
        print(f'배치크기={배치크기}, 에폭수={에폭수}, 학습횟수={학습횟수}({에폭당_배치수}/에폭)')
        loss_history = []
        for 학습 in range(학습횟수):
            # 1. 미니 배치
            배치색인 = tf.random.shuffle(tf.range(표본수))[:배치크기]
            X_batch = tf.gather(data, 배치색인)
            y_batch = tf.gather(target, 배치색인)
            # 2. 경사 산출 (자동 미분)
            with tf.GradientTape() as tape:
                outputs = self(X_batch)
                losses = self.loss_func(y_batch, outputs)
                mean_loss = tf.reduce_mean(losses)
            loss_history.append(mean_loss.numpy())
            # 역전파 대상 변수
            params = [(layer.W, layer.b) for layer in self.layers]
            # 자동 미분 (역전파)
            grads = tape.gradient(mean_loss, params)
            # 3. 매개변수 갱신 (경사 하강)
            for layer, (W, b) in zip(self.layers, grads):
                # W -= dW * learning_rate
                layer.W.assign_sub(W * 학습률)
                layer.b.assign_sub(b * 학습률)

            if 학습 == 0 or (학습 + 1) % 100 == 0:
                print(f'[학습 {학습 + 1}] Loss: {mean_loss:.3f}')
def 교차엔트로피오차(y, y_pred):
    delta = 1e-7
    return -tf.reduce_sum(y * tf.math.log(y_pred + delta), axis=1)

y = tf.constant([[0, 1, 0], [1, 0, 0]], dtype=tf.float32)
y_pred = tf.constant([[0.1, 0.7, 0.2], [0.9, 0.1, 0.0]], dtype=tf.float32)

교차엔트로피오차(y, y_pred).numpy()
FC = TF완전연

model = TF신경망(교차엔트로피오차)
model.add(FC(784, 50, tf.nn.sigmoid))
model.add(FC(50, 100, tf.nn.sigmoid))
model.add(FC(100, 10, tf.nn.softmax))

X_train = 전처리(train_images)
Y_train = np.eye(10)[train_labels]
X_train = tf.convert_to_tensor(X_train, dtype=tf.float32)
Y_train = tf.convert_to_tensor(Y_train, dtype=tf.float32)

loss_history = model.fit(X_train, Y_train, 배치크기=100, 에폭수=10, 학습률=0.1)