如何標準化高度波動的時間序列?
我應該如何標準化顯示高波動性的時間序列數據以便在 ANN 中使用?讓我們以比特幣的價格為例。
我有以下疑問:
- 如本文所述,僅僅標準化數據集是不夠的。這是有道理的,因為比特幣的價格範圍是 0.06 美元到 19,343 美元。我認為如果將數據作為滑動窗口提供給 ANN,那麼每個窗口都必須進行標準化。
- Min-Max 和 Z-Score 等方法適用於模式比較並標準化範圍。1000 美元和 1200 美元之間的模式與 1000 美元和 1400 美元之間的模式完全相同,因此丟失了有價值的信息。當然,可以創建一個附加特徵來表示波動性。
那麼,將波動的時間序列數據(如比特幣的價格)標準化的最佳方法是什麼?
TLDR
重新閱讀鏈接到論文,非常**好!無論如何,我認為是這樣,一旦你確定了你想要解決的問題,它似乎有一些可靠的選擇……
現在我不打算把這些留給你,所以讓我們在牢記你的問題的同時剖析這篇論文的一些內容。
該論文討論了他們何時選擇對頁面上的數據進行規範化和重新規範化,
2
1.2.3 Arbitrary Query Lengths cannot be Indexed
並且在該頁面的末尾附近非常清楚,在處理如此大的數據集時,沒有****一種用於任意長度的相似性搜索的技術。…儘管如果我了解他們在這種情況下要搜索的內容,那麼只有在您輸入大量可能在相同或相鄰時間間隔內相關或不相關的序列時,這才應該是一個問題…短, DTW (Dynamic Time Warp)那裡的東西很時髦,我會重申重新閱讀這篇論文可能是一個好主意……這對於醫學相關領域來說完全有道理,例如。論文中的 EEG 示例將是
n
每個時間段從每個受試者的大腦中採樣的點數t
乘以時間片的長度。換句話說,一公噸的數據!
n
這引出了一個問題,即您希望使用加密貨幣的維度有多大?無論如何,
4.2.1
就數據預處理而言,部分事情變得有趣,它們甚至在最後提供了[43]
一些源代碼的鏈接以供使用。就預處理的最佳方式而言,這似乎仍然是一個積極研究的主題,並且取決於模型,以及正在訓練的內容和方式。儘管就希望在每個時間範圍內進行標準化或標準化而言,您似乎走在了正確的軌道上。
關於相關問題如何將無界變量表示為 0 到 1 之間的數字的一個建議(這似乎是您的問題的核心**
unbounded variable
,比特幣是一個可測量的維度),提供了一些偽代碼Using a trainable minmax
。我已使用以下最小最大化方法將其部分轉換為(部分功能)Python 的版本,來自How to normalize data between -1 and 1? ,用於演示您將在預處理中遇到的一些問題。$$ x''' = (b-a)\frac{x - \min{x}}{\max{x} - \min{x}} + a $$
#!/usr/bin/env python from __future__ import division class Scaler_MinMax(list): """ Scaler_MinMax is a `list` that scales inputted, or `append`ed, or `extend`ed values to be between chosen `scale_`s """ def __init__(self, l, scale_min = -1, scale_max = 1, auto_recalibrate = False): self.scale_min = scale_min self.scale_max = scale_max self.range_min = min(l) self.range_max = max(l) self.initial_values = l self.auto_recalibrate = auto_recalibrate super(Scaler_MinMax, self).__init__(self.scale(numbers = l)) def scale(self, numbers = [], scale_min = None, scale_max = None, range_min = None, range_max = None): """ Returns list of scaled values """ if scale_min is None: scale_min = self.scale_min if scale_max is None: scale_max = self.scale_max if range_min is None: range_min = self.range_min if range_max is None: range_max = self.range_max return [((x - range_min) / (range_max - range_min)) * (scale_max - scale_min) + scale_min for x in numbers] def unscale(self, numbers = [], scale_min = None, scale_max = None, range_min = None, range_max = None): """ Returns list of unscaled values """ if not numbers: numbers = self if scale_min is None: scale_min = self.scale_min if scale_max is None: scale_max = self.scale_max if range_min is None: range_min = self.range_min if range_max is None: range_max = self.range_max return [(((y - scale_min) / (scale_max - scale_min)) * (range_max - range_min)) + range_min for y in numbers] def re_calibrate(self): """ Re-sets `self.range_min`, `self.range_max`, and `self` """ self.range_min = min(self.initial_values) self.range_max = max(self.initial_values) super(Scaler_MinMax, self).__init__(self.scale( numbers = self.initial_values, scale_min = self.scale_min, scale_max = self.scale_max, range_min = self.range_min, range_max = self.range_max)) """ Supered class overrides """ def __add__(self, other): if not isinstance(other, list): raise TypeError("can only concatenate list (not '{0}') to list".format(type(other))) return super(Scaler_MinMax, self).__add__(self.scale(numbers = other)) def append(self, value): """ Appends to `self.initial_values` and `self.append(self.scale([value])[0])` """ self.initial_values.append(value) if isinstance(value, list): super(Scaler_MinMax, self).append([self.scale(numbers = [i])[0] for i in value]) else: super(Scaler_MinMax, self).append(self.scale(numbers = [value])[0]) if self.auto_recalibrate is True: if self.range_max != max(self.initial_values) or self.range_min != min(self.initial_values): self.re_calibrate() def clear(self): """ Clears `self.initial_values` and `self` of __ALL__ values """ self.initial_values.clear() super(Scaler_MinMax, self).clear() def extend(self, iterable): """ Extends `self.initial_values` and `self.extend(self.scale(iterable))` """ self.initial_values.extend(iterable) super(Scaler_MinMax, self).extend(self.scale(iterable)) if self.auto_recalibrate is True: if self.range_max != max(self.initial_values) or self.range_min != min(self.initial_values): self.re_calibrate() def insert(self, index, value): """ Inserts `value` into `self.initial_values` and `self.scale([value])` into `self` """ self.initial_values.insert(index, value) super(Scaler_MinMax, self).insert(index, self.scale([value])) if self.auto_recalibrate is True: if self.range_max != max(self.initial_values) or self.range_min != min(self.initial_values): self.re_calibrate() def pop(self, index): """ Returns tuple of `pop`ed `index`s values from `self.initial_values`, and `self` """ output = self.initial_values.pop(index), super(Scaler_MinMax, self).pop(index) if self.auto_recalibrate is True: if self.range_max != max(self.initial_values) or self.range_min != min(self.initial_values): self.re_calibrate() return output def remove(self, value): """ Removes value from `self.initial_values` and `self` > use of `index` calls to ensure paired values are `pop`ed/`remove`d does increase the expense of this method """ if value in self.initial_values: i = self.initial_values.index(value) self.pop(i) self.initial_values.remove(value) elif value in self: i = self.index(value) self.initial_values.pop(i) super(Scaler_MinMax, self).remove(value) else: raise ValueError("{0}.remove(x) not in list".format(self.__class__.__name__)) if self.auto_recalibrate is True: if self.range_max != max(self.initial_values) or self.range_min != min(self.initial_values): self.re_calibrate() def reverse(self): """ Reverses `self.initial_values` and `self` __IN PLACE__ """ self.initial_values.reverse() super(Scaler_MinMax, self).reverse() def sort(self): """ Sorts `self.initial_values` and `self` __IN PLACE__ """ self.initial_values.sort() super(Scaler_MinMax, self).sort()
示例用法和限制
s = Scaler_MinMax([x for x in range(9)]) s # -> [-1.0, -0.75, -0.5, -0.25, 0.0, 0.25, 0.5, 0.75, 1.0] s.unscale() # -> [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0] s.initial_values # -> [0, 1, 2, 3, 4, 5, 6, 7, 8] s.append(20) s # -> [-1.0, -0.75, -0.5, -0.25, 0.0, 0.25, 0.5, 0.75, 1.0, 4.0] s.re_calibrate() s # -> [-1.0, -0.9, -0.8, -0.7, -0.6, -0.5, -0.4, -0.30000000000000004, -0.19999999999999996, 1.0] s.unscale() # Floating point errors! # -> [0.0, 0.9999999999999998, 1.9999999999999996, 3.0000000000000004, 4.0, 5.0, 6.0, 7.0, 8.0, 20.0] s.initial_values # -> [0, 1, 2, 3, 4, 5, 6, 7, 8, 20]
如上所示,浮點錯誤是
initial_values
保存和使用的原因,re_calibrate()
如果不這樣做並unscale()
用於re_calibrate()
浮點錯誤會復合!此外,不僅有很多地方可以改進代碼,而且在已解析數據旁邊有一個未解析數據的額外副本可以使內存使用量增加一倍以上。鏈接到論文中還討論了浮點錯誤的累加,以及它們在
4.2.1 Early Abandoning Z-Normalization
接近末尾的頁面上的緩解方法,他們建議每百萬個子序列“清除”此類錯誤。
旁注,以便每個人都在同一頁面上…
$$ \mu = \frac{1}{m} \sum {x_i} $$
…在python中看起來有點像…
#!/usr/bin/env python from __future__ import division # Thanks be to https://stackoverflow.com/a/31136897/2632107 try: # Python 2 range = xrange except NameError: # Python 3 pass def calc_mu(m_denominator=2, x_list=[1.5, 42], start=0, end=2, step=1): ''' μ = 1/m * ∑x_i ''' chosen = [] for i in range(start, end, step): chosen.append(x_list[i]) return float(1) / m_denominator * sum(chosen)
……好吧,如果我理解的話
An example of subscript and summation notation
,我必須複習一下並涉足可汗學院Sequences and Series
,以便更全面地掌握SIDKDD trillion
論文的數學內容。
希望將一些數學與執行類似功能的代碼一起揭開你如何開始嘗試數據預處理方法的神秘面紗。因為看起來會有很多試錯法來找到最好的預處理方法,不管它試圖強制饋送到神經網絡的方法是什麼。
我知道我沒有一個完整的答案,但如果有一種公開可用的最佳方法來預測
y
時間序列預測,那麼很可能會是市場的拖釣,而不僅僅是來自機器人的閃崩……哪個,提示,提示,也許是訓練模型進行預測的好主意;-)如頂部所述,我認為您需要非常精確地確定要解決的問題,例如,您可以嘗試預測價格的相對變化,而不是直接預測價格,在這種情況下,百分位數可能會起作用,有點像使用與論文中所示類似的方法將遺傳信息編碼為時間序列。
未來編輯器的代碼註釋
我沒有在代碼中添加Terrace
threshold
建議的代碼,以使事情變得更簡單,此外,我編寫代碼示例的目的不是為了提高效率或在生產中使用,而是為了便於訪問,請保留任何編輯更正並本著讓盡可能多的讀者可以訪問的精神。Scaler_MinMax
否則,我認為讀者/編輯嘗試用另一種語言編寫自己的代碼示例和/或使用不同的方法來壓縮數據會更有幫助,這很酷,看看其他人想出什麼.