弹簧质点系统的动力学图网络建模实例

Posted by Limin on August 12, 2020

注: 本文中实例来自于Google公司DeepMind实验室在GitHub网站发布的图网络graph_nets.

  弹簧质点模型,是一种典型的图网络模型。关于图网络,见笔者之间的一篇文章《图网络与知识表示概述》。弹簧质点模型是利用牛顿运动定律来模拟物体变形的方法。实际上,模拟一个力学体系下的变形物体最简单的方法就是将其表示为弹簧质点系统,这是因为它的物理模拟和时间积分的各项技术都比较成熟。换而言之,实际应用中使用弹簧质点系统“物美价廉”,够快够简单。

弹簧质点系统理论上可以模拟任意的力学结构,因为它是非欧几里德结构数据。

  弹簧质点系统理论上可以模拟任意的力学结构,因为它是非欧几里德结构数据。数据类型可以分为两大类:欧几里德结构数据 (Euclidean Structure Data) 以及非欧几里德结构数据 (Non-Euclidean Structure Data)。结构数据的特点就是:“排列整齐”,可以用矩阵来表达其像素。常见的时间序列、图像等都属于欧几里德结构数据。

欧几里德结构数据。

  图数据例如弹簧质点系统是一种典型的非欧几里德结构的数据。其标志性特点是:对于数据中的某个点,难以定义出其邻居节点出来,或者是不同节点的邻居节点的数量是不同的。正因为如此,它才得以模拟任意的力学结构。

  在牛顿力学体系下解动力学方程组时,最简单的方式是使用显式欧拉解法,下一时刻的状态完全由当前状态决定 (例如a(n)=a(n-1)+b(n-1)), 适用于求解和时间相关的动力学问题。显式求解是对时间进行差分,不存在头疼的不收敛问题。但是显式解法只能在步长很小的时候稳定。过多和过小的时间步往往导致求解时间非常漫长,但总能给出一个计算结果。解方程所花时间成本和金钱成本非常昂贵。

显式积分。

  显式积分适用于粒子的下一时刻的状态完全由当前状态决定,称为马尔可夫过程,是不具备记忆特质的 (memorylessness)。换言之,马可夫过程的条件概率仅仅与系统的当前状态相关,而与它的过去历史或未来状态,都是独立、不相关的。这个过程完全随机,是对复杂动力学系统一种无奈的简单近似。例如,一周前的天气是对未来天气有影响的,长期尺度下“夏涝冬旱”的时间间隔相关也是常见现象。

  具备离散状态的马可夫过程,通常被称为马可夫链。举个例子,假设你有一个住得很远的朋友 (郊区天气预报常常不准确),他每天跟你打电话告诉你他那天做了什么。你的朋友仅仅对三种活动感兴趣:公园散步,购物以及清理房间。他选择做什么事情只凭天气。

states = ('Rainy', 'Sunny')
 
observations = ('walk', 'shop', 'clean')
 
# 你对于你朋友住的地方的天气情况的平均概率。

start_probability = {'Rainy': 0.6, 'Sunny': 0.4}
 
# 如果今天下雨,那么明天天晴的概率只有30%。反之概率为60%。

transition_probability = {
    'Rainy' : {'Rainy': 0.7, 'Sunny': 0.3},
    'Sunny' : {'Rainy': 0.4, 'Sunny': 0.6},
    }
 
# 如果下雨,50% 的概率在清理房间;如果天晴,则有60%的概率在散步。

emission_probability = {
    'Rainy' : {'walk': 0.1, 'shop': 0.4, 'clean': 0.5},
    'Sunny' : {'walk': 0.6, 'shop': 0.3, 'clean': 0.1},
    }

  当随机的马尔可夫过程是非离散的连续时间序列时,称为维纳过程。由于它与物理学中的布朗运动有密切关系,也常被称为“布朗运动过程”或简称为布朗运动。布朗运动的典型例子是指悬浮在液体或者气体中的微小颗粒所进行的无休止地随机运动。

  谷歌公司DeepMind实验室给出了一个弹簧质点系统的实例,介绍了如何使用Graph_Nets库来学习和预测一组由弹簧连接的质点的运动状态。训练后,通过比较图网络的输出和真实行为来说明图网络的预测能力。图网络模型更新时使用的预测算法是显式积分。

  工欲善其事,必先利其器。首先安装必须的Python工具包:

pip install graph_nets matplotlib scipy "tensorflow>=1.15,<2" "dm-sonnet<2" "tensorflow_probability<0.9"

  导入工具包:

import time
from graph_nets import blocks
from graph_nets import graphs
from graph_nets import modules
from graph_nets import utils_np
import networkx as nx
from graph_nets import utils_tf
from graph_nets.demos import models
from matplotlib import pyplot as plt
import numpy as np
import sonnet as snt
import tensorflow as tf

# 设定 TensorFlow随机张量,以便产生可重复结果。
SEED = 1
np.random.seed(SEED)
tf.set_random_seed(SEED)

  定义一个基本的弹簧质点系统的图结构。图结构是个数据字典data_dict,包含全局变量、节点、边缘、接收节点和发送节点的属性信息向量。该结构由n个质量为1kg的质点被弹簧连接成的链状结构。其第一个质点和最后一个质点的位置是固定的。质点水平排列,开始时质点之间相距均为d米,这也是弹簧的休息长度。连接它们的弹簧的弹簧常数为50 N/m,重力为10 N,在负y方向。

  为所有的质点设定初始位置和速度。其中,最左边的质点在位置(0,0),其他质点 (从左到右排列) 的坐标是左邻质点的X坐标加上d米,Y坐标均为0,所有质点的初始速度均为0 m/s。

def base_graph(n, d):
#定义节点
  nodes = np.zeros((n, 5), dtype=np.float32)
  half_width = d * n / 2.0 # 原点位于链条的正中间位置
  nodes[:, 0] = np.linspace(
      -half_width, half_width, num=n, endpoint=False, dtype=np.float32)
  nodes[(0, -1), -1] = 1. #第一个点和最后一个点位置固定

  # 定义连边
  edges, senders, receivers = [], [], []
  for i in range(n - 1):
    left_node = i
    right_node = i + 1

    #双向连边排除固定的最左侧点和最右侧点,从0到1、从5到4的作用力是单向的。
    if right_node < n - 1: 
      edges.append([50., d]) # 连边的两个属性为弹簧系数和长度 
      senders.append(left_node) # 发送节点为左侧节点 
      receivers.append(right_node) # 接收节点为右侧节点
   #力的作用是相互的。
    if left_node > 0:
      edges.append([50., d])
      senders.append(right_node) # 发送节点为右侧节点 
      receivers.append(left_node) # 接收节点为左侧节点

  return {
      "globals": [0., -10.], # 全局变量重力为10 N,在负y方向。
      "nodes": nodes,
      "edges": edges,
      "receivers": receivers,
      "senders": senders
  }

  测试一下图结构数据,例如5个质点、间隔距离为10米,该字典数据中,全局变量向量初始时水平方向压力/拉力为零,重力为-10 N;节点信息向量中,第一列为节点X坐标,第二列为Y坐标,第三列和第四列为X方向和Y方向的速度,第五列为0/1布尔值,代表质点是否被固定;连边向量中,两个属性分别为弹力和重力。

自定义数据

  将图结构可视化,其自带的可视化包为了美观,可能体现不出来节点的坐标信息。

#使用utils_np.data_dicts_to_graphs_tuple将图形词典放入GraphsTuple中。
graphs_tuple = utils_np.data_dicts_to_graphs_tuple([Base_G])

# GraphsTuple可以转换为networkx图形对象的`列表’,以便于可视化。
graphs_nx = utils_np.graphs_tuple_to_networkxs(graphs_tuple)
ax = plt.figure(dpi=300, figsize=(3, 3)).gca()
nx.draw(graphs_nx[0], ax=ax)
_ = ax.set_title("Graph Test")

图结构可视化运行结果,为了美观,没有体现出节点的坐标。

  弹簧质点系统使用的物理定律为胡克定律,应力和变形呈线性关系。由弹簧质点系统的图结构定义函数可以看出,节点属性信息是5*5的向量,分别是X坐标、Y坐标、X方向速度、Y方向速度、是否固定 (0/1),亦即[x, y, v_x, v_y, is_fixed]。k为弹簧系数, x_rest为连边也就是弹簧本来的休息长度。

胡克定律

def hookes_law(receiver_nodes, sender_nodes, k, x_rest):
  diff = receiver_nodes[..., 0:2] - sender_nodes[..., 0:2]# XY坐标差
  x = tf.norm(diff, axis=-1, keepdims=True)# 求范数也就是距离
  force_magnitude = -1 * tf.multiply(k, (x - x_rest) / x)# 应用虎克定律
  force = force_magnitude * diff
  return force

  大多数微分方程都不具有标准形式,也不能通过解析方法求解,这意味着我们找不到通用解。在这种情况下,我们需要使用数值方法来确定微分方程的解。最简单的积分方法之一是以数学家Leonhard Euler命名的Euler欧拉积分方法。欧拉方法是一阶方法,这意味着局部误差 (每步误差) 与步长的平方成正比,全局误差 (给定时间的误差) 与步长成正比。Euler积分方法也是一种显式积分方法,这意味着下一步的系统状态是根据当前时间的系统状态来计算的。定义一阶欧拉积分,返回的向量与节点向量形状相同,但是更新了距离和速度。

def euler_integration(nodes, force_per_node, step_size):
  is_fixed = nodes[..., 4:5] #第五列 
  force_per_node *= 1 - is_fixed #固定点的强迫力为0,其他点为1。
  new_vel = nodes[..., 2:4] + force_per_node * step_size   #XY速度+强迫*步长。
  return new_vel

  基于上述方法,构建物理模拟器。该模拟器所需要的输入是上文中的自定义的图结构数据,输出也是图结构数据,其中,连边向量更新了应用胡克定律后的结果,从[弹簧系数spring_constant, 休息长度rest_length] 变为X方向上的强迫力[f_x, f_y]。

class SpringMassSimulator(snt.AbstractModule):
  def __init__(self, step_size, name="SpringMassSimulator"):
    super(SpringMassSimulator, self).__init__(name=name)
    self._step_size = step_size

    with self._enter_variable_scope():
      self._aggregator = blocks.ReceivedEdgesToNodesAggregator(
          reducer=tf.unsorted_segment_sum)

  def _build(self, graph):
    receiver_nodes = blocks.broadcast_receiver_nodes_to_edges(graph)
    sender_nodes = blocks.broadcast_sender_nodes_to_edges(graph)
    spring_force_per_edge = hookes_law(receiver_nodes, sender_nodes,
                                       graph.edges[..., 0:1],
                                       graph.edges[..., 1:2])
    graph = graph.replace(edges=spring_force_per_edge)

    spring_force_per_node = self._aggregator(graph)
    gravity = blocks.broadcast_globals_to_nodes(graph)
    updated_velocities = euler_integration(
        graph.nodes, spring_force_per_node + gravity, self._step_size)
    graph = graph.replace(nodes=updated_velocities)
    return graph

  定义节点向量应用欧拉积分,更新节点的位置和速度。

def prediction_to_next_state(input_graph, predicted_graph, step_size):
  new_pos = input_graph.nodes[..., :2] + predicted_graph.nodes * step_size
  new_nodes = tf.concat(
      [new_pos, predicted_graph.nodes, input_graph.nodes[..., 4:5]], axis=-1)
  return input_graph.replace(nodes=new_nodes)

  定义图网络节点的相互作用:

def roll_out_physics(simulator, graph, steps, step_size):
  def body(t, graph, nodes_per_step):
    predicted_graph = simulator(graph)
    if isinstance(predicted_graph, list):
      predicted_graph = predicted_graph[-1]
    graph = prediction_to_next_state(graph, predicted_graph, step_size)
    return t + 1, graph, nodes_per_step.write(t, graph.nodes)

  nodes_per_step = tf.TensorArray(
      dtype=graph.nodes.dtype, size=steps + 1, element_shape=graph.nodes.shape)
  nodes_per_step = nodes_per_step.write(0, graph.nodes)

  _, g, nodes_per_step = tf.while_loop(
      lambda t, *unused_args: t <= steps,
      body,
      loop_vars=[1, graph, nodes_per_step])
  return g, nodes_per_step.stack()

  添加随机漫步噪声,将均匀分布的噪声应用于该系统。这一步很关键,长时间步长的模拟会导致误差累积,亦即引入了计算噪声。参照Google公司DeepMind实验室于2020年2月21日在arxiv上发布了预印版文章《Learning to Simulate Complex Physics with Graph Networks》,通过人为加入噪声破坏训练数据,以此减轻错误的积累,防止过拟合。

def apply_noise(graph, node_noise_level, edge_noise_level, global_noise_level):
  node_position_noise = tf.random_uniform(
      [graph.nodes.shape[0].value, 2],
      minval=-node_noise_level,
      maxval=node_noise_level)
  edge_spring_constant_noise = tf.random_uniform(
      [graph.edges.shape[0].value, 1],
      minval=-edge_noise_level,
      maxval=edge_noise_level)
  global_gravity_y_noise = tf.random_uniform(
      [graph.globals.shape[0].value, 1],
      minval=-global_noise_level,
      maxval=global_noise_level)

  return graph.replace(
      nodes=tf.concat(
          [graph.nodes[..., :2] + node_position_noise, graph.nodes[..., 2:]],
          axis=-1),
      edges=tf.concat(
          [
              graph.edges[..., :1] + edge_spring_constant_noise,
              graph.edges[..., 1:]
          ],
          axis=-1),
      globals=tf.concat(
          [
              graph.globals[..., :1],
              graph.globals[..., 1:] + global_gravity_y_noise
          ],
          axis=-1))

  计算物理系统中弹簧的休息长度,亦即每个边的节点之间的距离。

def set_rest_lengths(graph):
  receiver_nodes = blocks.broadcast_receiver_nodes_to_edges(graph)
  sender_nodes = blocks.broadcast_sender_nodes_to_edges(graph)
  rest_length = tf.norm(
      receiver_nodes[..., :2] - sender_nodes[..., :2], axis=-1, keepdims=True)
  return graph.replace(
      edges=tf.concat([graph.edges[..., :1], rest_length], axis=-1))

  应用噪声,然后模拟弹簧质点系统的若干步长后的状态。输出结果是添加噪声和计算出休息长度的图结构数据。

def generate_trajectory(simulator, graph, steps, step_size, node_noise_level,
                        edge_noise_level, global_noise_level):
  graph = apply_noise(graph, node_noise_level, edge_noise_level,
                      global_noise_level)
  graph = set_rest_lengths(graph)
  _, n = roll_out_physics(simulator, graph, steps, step_size)
  return graph, n

  定义预测结果评价函数,也就是监督每一步模拟的目标和产出之间的误差损失。

def create_loss_ops(target_op, output_ops):
  loss_ops = [
      tf.reduce_mean(
          tf.reduce_sum((output_op.nodes - target_op[..., 2:4])**2, axis=-1))
      for output_op in output_ops
  ]
  return loss_ops

  汇总图结构的迭代过程。

def make_all_runnable_in_session(*args):
  return [utils_tf.make_runnable_in_session(a) for a in args]

  图网络模型的三步走策略:编码、内核处理器、解码。编码器从初始输入数据中构建图结构,内核处理器计算不同步长后的状态,解码器提取中每一个步长后的节点的状态。

编码、处理、解码

  采用监督学习的方式进行训练。训练损失是根据每个处理步骤的输出计算的。这样做的原因是鼓励模型尽量在尽可能少的步骤中解决问题。这也有助于使中间步骤的输出更容易解释。由于输入从不重复,所以不需要单独的评估整个数据集,训练损失已经对每一步的预测误差做出衡量。结果显示,经过超过10000次的迭代后,模型仍然能产生较好的预测结果。

  与常规的机器学习方法一样,图网络也可以自定义训练集和测试 (泛化) 集。

tf.reset_default_graph()
rand = np.random.RandomState(SEED)
num_processing_steps_tr = 1 # 训练集
num_processing_steps_ge = 1 # 测试泛化集

# 设置参数,生成训练集数据。
num_training_iterations = 100000 #迭代一万次 
batch_size_tr = 256 #训练集大小
batch_size_ge = 100 #测试集大小
num_time_steps = 50 # 时间步数
step_size = 0.1 # 时间步长
num_masses_min_max_tr = (5, 9) #5-8个弹簧质点
dist_between_masses_min_max_tr = (0.2, 1.0) # 弹簧距离0.2-1 米

model = models.EncodeProcessDecode(node_output_size=2)

# 训练集图结构数据,根据上面参数随机生成。
num_masses_tr = rand.randint(*num_masses_min_max_tr, size=batch_size_tr)
dist_between_masses_tr = rand.uniform(
    *dist_between_masses_min_max_tr, size=batch_size_tr)
static_graph_tr = [
    base_graph(n, d) for n, d in zip(num_masses_tr, dist_between_masses_tr)
]
base_graph_tr = utils_tf.data_dicts_to_graphs_tuple(static_graph_tr)

# 测试集图结构数据,4个质点、弹簧长度1米,和9个质点、弹簧长度为0.5米。
base_graph_4_ge = utils_tf.data_dicts_to_graphs_tuple(
    [base_graph(4, 0.5)] * batch_size_ge)

base_graph_9_ge = utils_tf.data_dicts_to_graphs_tuple(
    [base_graph(9, 0.5)] * batch_size_ge)

  准备好数据后,就可以构建物理模拟器了。

simulator = SpringMassSimulator(step_size=step_size)

# 在初始状态的弹簧系数、质点位置、重力场中引入随机噪声。
initial_conditions_tr, true_trajectory_tr = generate_trajectory(
    simulator,
    base_graph_tr,
    num_time_steps,
    step_size,
    node_noise_level=0.04,
    edge_noise_level=5.0,
    global_noise_level=1.0)

# 随机选择迭代的时间步长
t = tf.random_uniform([], minval=0, maxval=num_time_steps - 1, dtype=tf.int32)
input_graph_tr = initial_conditions_tr.replace(nodes=true_trajectory_tr[t])
target_nodes_tr = true_trajectory_tr[t + 1]
output_ops_tr = model(input_graph_tr, num_processing_steps_tr)

  在两个不同的测试集上应训练好的模拟器。

# 4 个质点的系统
initial_conditions_4_ge, true_trajectory_4_ge = generate_trajectory(
    lambda x: model(x, num_processing_steps_ge),
    base_graph_4_ge,
    num_time_steps,
    step_size,
    node_noise_level=0.04,
    edge_noise_level=5.0,
    global_noise_level=1.0)
_, true_nodes_rollout_4_ge = roll_out_physics(
    simulator, initial_conditions_4_ge, num_time_steps, step_size)
_, predicted_nodes_rollout_4_ge = roll_out_physics(
    lambda x: model(x, num_processing_steps_ge), initial_conditions_4_ge,
    num_time_steps, step_size)

# 9 个质点的系统
initial_conditions_9_ge, true_trajectory_9_ge = generate_trajectory(
    lambda x: model(x, num_processing_steps_ge),
    base_graph_9_ge,
    num_time_steps,
    step_size,
    node_noise_level=0.04,
    edge_noise_level=5.0,
    global_noise_level=1.0)
_, true_nodes_rollout_9_ge = roll_out_physics(
    simulator, initial_conditions_9_ge, num_time_steps, step_size)
_, predicted_nodes_rollout_9_ge = roll_out_physics(
    lambda x: model(x, num_processing_steps_ge), initial_conditions_9_ge,
    num_time_steps, step_size)

  评估模拟器在训练集和测试集上的性能,给出损失误差和进行优化。

# 模拟器在训练集上的损失
loss_ops_tr = create_loss_ops(target_nodes_tr, output_ops_tr)
# Training loss across processing steps.
loss_op_tr = sum(loss_ops_tr) / num_processing_steps_tr

# 4个质点的测试集损失
loss_op_4_ge = tf.reduce_mean(
    tf.reduce_sum(
        (predicted_nodes_rollout_4_ge[..., 2:4] -
         true_nodes_rollout_4_ge[..., 2:4])**2,
        axis=-1))

# 9个质点的测试集损失
loss_op_9_ge = tf.reduce_mean(
    tf.reduce_sum(
        (predicted_nodes_rollout_9_ge[..., 2:4] -
         true_nodes_rollout_9_ge[..., 2:4])**2,
        axis=-1))

# 优化器
learning_rate = 1e-3
optimizer = tf.train.AdamOptimizer(learning_rate)
step_op = optimizer.minimize(loss_op_tr)

input_graph_tr = make_all_runnable_in_session(input_graph_tr)
initial_conditions_4_ge = make_all_runnable_in_session(initial_conditions_4_ge)
initial_conditions_9_ge = make_all_runnable_in_session(initial_conditions_9_ge)

  重置tf.session, 但是保留Tensorflow的计算图。

try:
  sess.close()
except NameError:
  pass
sess = tf.Session()
sess.run(tf.global_variables_initializer())

  输出训练好的模拟器在测试集上的预测损失:

last_iteration = 0
logged_iterations = []
losses_tr = []
losses_4_ge = []
losses_9_ge = []

log_every_seconds = 20

print("# (iteration number), T (elapsed seconds), "
      "Ltr (training 1-step loss), "
      "Lge4 (test/generalization rollout loss for 4-mass strings), "
      "Lge9 (test/generalization rollout loss for 9-mass strings)")

start_time = time.time()
last_log_time = start_time
for iteration in range(last_iteration, num_training_iterations):
  last_iteration = iteration
  train_values = sess.run({
      "step": step_op,
      "loss": loss_op_tr,
      "input_graph": input_graph_tr,
      "target_nodes": target_nodes_tr,
      "outputs": output_ops_tr
  })
  the_time = time.time()
  elapsed_since_last_log = the_time - last_log_time
  if elapsed_since_last_log > log_every_seconds:
    last_log_time = the_time
    test_values = sess.run({
        "loss_4": loss_op_4_ge,
        "true_rollout_4": true_nodes_rollout_4_ge,
        "predicted_rollout_4": predicted_nodes_rollout_4_ge,
        "loss_9": loss_op_9_ge,
        "true_rollout_9": true_nodes_rollout_9_ge,
        "predicted_rollout_9": predicted_nodes_rollout_9_ge
    })
    elapsed = time.time() - start_time
    losses_tr.append(train_values["loss"])
    losses_4_ge.append(test_values["loss_4"])
    losses_9_ge.append(test_values["loss_9"])
    logged_iterations.append(iteration)
    print("# {:05d}, T {:.1f}, Ltr {:.4f}, Lge4 {:.4f}, Lge9 {:.4f}".format(
        iteration, elapsed, train_values["loss"], test_values["loss_4"],
        test_values["loss_9"]))

运行结果

  最后,将模拟结果可视化。

def get_node_trajectories(rollout_array, batch_size):  
  return np.split(rollout_array[..., :2], batch_size, axis=1)

fig = plt.figure(1, dpi=300, figsize=(18, 3))
fig.clf()
x = np.array(logged_iterations)
# Next-step Loss.
y = losses_tr
ax = fig.add_subplot(1, 3, 1)
ax.plot(x, y, "k")
ax.set_title("Next step loss")
# Rollout 5 loss.
y = losses_4_ge
ax = fig.add_subplot(1, 3, 2)
ax.plot(x, y, "k")
ax.set_title("Rollout loss: 5-mass string")
# Rollout 9 loss.
y = losses_9_ge
ax = fig.add_subplot(1, 3, 3)
ax.plot(x, y, "k")
ax.set_title("Rollout loss: 9-mass string")

# 可视化轨迹
true_rollouts_4 = get_node_trajectories(test_values["true_rollout_4"],
                                        batch_size_ge)
predicted_rollouts_4 = get_node_trajectories(test_values["predicted_rollout_4"],
                                             batch_size_ge)
true_rollouts_9 = get_node_trajectories(test_values["true_rollout_9"],
                                        batch_size_ge)
predicted_rollouts_9 = get_node_trajectories(test_values["predicted_rollout_9"],
                                             batch_size_ge)

true_rollouts = true_rollouts_4
predicted_rollouts = predicted_rollouts_4
true_rollouts = true_rollouts_9
predicted_rollouts = predicted_rollouts_9

num_graphs = len(true_rollouts)
num_time_steps = true_rollouts[0].shape[0]

# 绘制弹簧质点系统的状态随着时间的变化。
max_graphs_to_plot = 1
num_graphs_to_plot = min(num_graphs, max_graphs_to_plot)
num_steps_to_plot = 24
max_time_step = num_time_steps - 1
step_indices = np.floor(np.linspace(0, max_time_step,
                                    num_steps_to_plot)).astype(int).tolist()
w = 6
h = int(np.ceil(num_steps_to_plot / w))
fig = plt.figure(101, figsize=(18, 8))
fig.clf()
for i, (true_rollout, predicted_rollout) in enumerate(
    zip(true_rollouts, predicted_rollouts)):
  xys = np.hstack([predicted_rollout, true_rollout]).reshape([-1, 2])
  xs = xys[:, 0]
  ys = xys[:, 1]
  b = 0.05
  xmin = xs.min() - b * xs.ptp()
  xmax = xs.max() + b * xs.ptp()
  ymin = ys.min() - b * ys.ptp()
  ymax = ys.max() + b * ys.ptp()
  if i >= num_graphs_to_plot:
    break
  for j, step_index in enumerate(step_indices):
    iax = i * w + j + 1
    ax = fig.add_subplot(h, w, iax)
    ax.plot(
        true_rollout[step_index, :, 0],
        true_rollout[step_index, :, 1],
        "k",
        label="True")
    ax.plot(
        predicted_rollout[step_index, :, 0],
        predicted_rollout[step_index, :, 1],
        "r",
        label="Predicted")
    ax.set_title("Example {:02d}: frame {:03d}".format(i, step_index))
    ax.set_xlim(xmin, xmax)
    ax.set_ylim(ymin, ymax)
    ax.set_xticks([])
    ax.set_yticks([])
    if j == 0:
      ax.legend(loc=3)

  可视化运行结果:

模拟器的预测损失。

每个步长预测值和实际值对比

  从图中可以看出,在50个时间步长内,模拟的弹簧质点系统状态和能够很好的接近真实状态。模拟结果较好的原因第一个是因为这个物理系统简单,第二个是因为引入了随机误差,从而抵消了计算误差和过拟合现象,第三个是加入了优化器,在每一步的预测中都计算误差损失,从而不断迭代优化。

  图网络模型的出现将重塑各个学科的生态。只有同时对物理机制和数据结构有着深入的理解,才能发挥图网络的作用,从而一举超越学科内传统的模型。这是对学术圈浮躁学风的鞭策。