流数据的理想Java数据结构(Ideal Java Data Structure for Streaming data) java

我有一个特定的用例,但无法找出正确的数据结构来使用。

我有一个线程将流对象保存到HashMap中。 与市场数据类似的东西,你有很高的未知频率。

另一个不断读取此地图以更新Price对象并通过按键查询的线程,而不按特定顺序。 对于给定周期中的相同密钥,查询可能会多次。 读取和写入非常频繁,但读取线程只对最新可用数据感兴趣,这些数据已完全更新,并且在写入完成之前不一定会阻塞。

我希望您对这种用例的理想数据结构有所了解。 有比可用的ConcurrentHashMap更好的实现吗?

谢谢

I had a specific use case in mind but was not able to figure out the right data structure to use.

I have one thread which keeps streaming objects into a HashMap. Something similar to market data where you have a high and unknown frequency of ticks.

Another thread which constantly reads this map for updated Price objects and queries by key in no particular order. The queries may be multiple times for the same key in a given cycle. The reads and writes are very frequent but the read thread is only interested in the latest available data that is fully updated and doesn't necessarily block till write is complete.

I wanted your thoughts on an ideal data structure for such use cases. Are there better implementations than ConcurrentHashMap that is available?

Thanks

最满意答案

一种方法是复制写入方案,如下所示:

public class Prices { private volatile Map<String, Integer> prices = Collections.emptyMap(); public void putPrice(String ticker, int price) { HashMap<String, Integer> newPrices = new HashMap<String, Integer>(prices); newPrices.put(ticker, price); prices = newPrices; } public Integer getPrice(String ticker) { return prices.get(ticker); } }

这对获取的开销很小 - 一个从volatile读取,然后是一个正常的hash查找。 然而,它对投入有很大的开销 - 创建一个全新的地图,再加上一个写入volatile的地方。 如果读写比高,这可能仍然是一个很好的折衷。

只有在实际需要添加新条目时才更改地图,而不是更新现有地图; 你可以通过使用可变值来实现这一点:

public class Prices { private volatile Map<String, AtomicInteger> prices = Collections.emptyMap(); public void putPrice(String ticker, int price) { AtomicInteger priceHolder = prices.get(ticker); if (priceHolder != null) { priceHolder.set(price); } else { HashMap<String, AtomicInteger> newPrices = new HashMap<String, AtomicInteger>(prices); newPrices.put(ticker, new AtomicInteger(price)); prices = newPrices; } } public Integer getPrice(String ticker) { AtomicInteger priceHolder = prices.get(ticker); if (priceHolder != null) return priceHolder.get(); else return null; } }

我不确定AtomicInteger的性能特点是什么; 这可能比看起来慢。 假设AtomicInteger不是不合理的缓慢,这应该是非常快的 - 它涉及从volatile中加两次读取,加上每次get的正常哈希查找,以及从volatile,hash查找中读取,以及对volatile中的单次写入以更新现有价格。 它仍然涉及复制地图以增加新的价格。 但是,在典型的市场中,这种情况并不常见。

One approach would be a copy-on-write scheme, something like this:

public class Prices { private volatile Map<String, Integer> prices = Collections.emptyMap(); public void putPrice(String ticker, int price) { HashMap<String, Integer> newPrices = new HashMap<String, Integer>(prices); newPrices.put(ticker, price); prices = newPrices; } public Integer getPrice(String ticker) { return prices.get(ticker); } }

This has a minimal overhead for gets - one read from a volatile, and then a normal hash lookup. However, it has a substantial overhead for puts - the creation of a whole new map, plus a write to a volatile. If your ratio of reads to writes was high, this might still be a good tradeoff.

You can improve this by only mutating the map when you actually need to add a new entry, rather than updating an existing one; you can achieve that by using mutable values:

public class Prices { private volatile Map<String, AtomicInteger> prices = Collections.emptyMap(); public void putPrice(String ticker, int price) { AtomicInteger priceHolder = prices.get(ticker); if (priceHolder != null) { priceHolder.set(price); } else { HashMap<String, AtomicInteger> newPrices = new HashMap<String, AtomicInteger>(prices); newPrices.put(ticker, new AtomicInteger(price)); prices = newPrices; } } public Integer getPrice(String ticker) { AtomicInteger priceHolder = prices.get(ticker); if (priceHolder != null) return priceHolder.get(); else return null; } }

I'm not sure what the performance characteristics of an AtomicInteger are; it's possible this is slower than it looks. Assuming AtomicInteger is not unreasonably slow, this should be pretty fast - it involves two reads from a volatile plus a normal hash lookup for each get, and a read from a volatile, a hash lookup, and a single write to a volatile for updates to existing prices. It still involves duplicating the map for addition of new prices. However, in a typical market, that doesn't happen often.

流数据的理想Java数据结构(Ideal Java Data Structure for Streaming data) java

我有一个特定的用例,但无法找出正确的数据结构来使用。

我有一个线程将流对象保存到HashMap中。 与市场数据类似的东西,你有很高的未知频率。

另一个不断读取此地图以更新Price对象并通过按键查询的线程,而不按特定顺序。 对于给定周期中的相同密钥,查询可能会多次。 读取和写入非常频繁,但读取线程只对最新可用数据感兴趣,这些数据已完全更新,并且在写入完成之前不一定会阻塞。

我希望您对这种用例的理想数据结构有所了解。 有比可用的ConcurrentHashMap更好的实现吗?

谢谢

I had a specific use case in mind but was not able to figure out the right data structure to use.

I have one thread which keeps streaming objects into a HashMap. Something similar to market data where you have a high and unknown frequency of ticks.

Another thread which constantly reads this map for updated Price objects and queries by key in no particular order. The queries may be multiple times for the same key in a given cycle. The reads and writes are very frequent but the read thread is only interested in the latest available data that is fully updated and doesn't necessarily block till write is complete.

I wanted your thoughts on an ideal data structure for such use cases. Are there better implementations than ConcurrentHashMap that is available?

Thanks

最满意答案

一种方法是复制写入方案,如下所示:

public class Prices { private volatile Map<String, Integer> prices = Collections.emptyMap(); public void putPrice(String ticker, int price) { HashMap<String, Integer> newPrices = new HashMap<String, Integer>(prices); newPrices.put(ticker, price); prices = newPrices; } public Integer getPrice(String ticker) { return prices.get(ticker); } }

这对获取的开销很小 - 一个从volatile读取,然后是一个正常的hash查找。 然而,它对投入有很大的开销 - 创建一个全新的地图,再加上一个写入volatile的地方。 如果读写比高,这可能仍然是一个很好的折衷。

只有在实际需要添加新条目时才更改地图,而不是更新现有地图; 你可以通过使用可变值来实现这一点:

public class Prices { private volatile Map<String, AtomicInteger> prices = Collections.emptyMap(); public void putPrice(String ticker, int price) { AtomicInteger priceHolder = prices.get(ticker); if (priceHolder != null) { priceHolder.set(price); } else { HashMap<String, AtomicInteger> newPrices = new HashMap<String, AtomicInteger>(prices); newPrices.put(ticker, new AtomicInteger(price)); prices = newPrices; } } public Integer getPrice(String ticker) { AtomicInteger priceHolder = prices.get(ticker); if (priceHolder != null) return priceHolder.get(); else return null; } }

我不确定AtomicInteger的性能特点是什么; 这可能比看起来慢。 假设AtomicInteger不是不合理的缓慢,这应该是非常快的 - 它涉及从volatile中加两次读取,加上每次get的正常哈希查找,以及从volatile,hash查找中读取,以及对volatile中的单次写入以更新现有价格。 它仍然涉及复制地图以增加新的价格。 但是,在典型的市场中,这种情况并不常见。

One approach would be a copy-on-write scheme, something like this:

public class Prices { private volatile Map<String, Integer> prices = Collections.emptyMap(); public void putPrice(String ticker, int price) { HashMap<String, Integer> newPrices = new HashMap<String, Integer>(prices); newPrices.put(ticker, price); prices = newPrices; } public Integer getPrice(String ticker) { return prices.get(ticker); } }

This has a minimal overhead for gets - one read from a volatile, and then a normal hash lookup. However, it has a substantial overhead for puts - the creation of a whole new map, plus a write to a volatile. If your ratio of reads to writes was high, this might still be a good tradeoff.

You can improve this by only mutating the map when you actually need to add a new entry, rather than updating an existing one; you can achieve that by using mutable values:

public class Prices { private volatile Map<String, AtomicInteger> prices = Collections.emptyMap(); public void putPrice(String ticker, int price) { AtomicInteger priceHolder = prices.get(ticker); if (priceHolder != null) { priceHolder.set(price); } else { HashMap<String, AtomicInteger> newPrices = new HashMap<String, AtomicInteger>(prices); newPrices.put(ticker, new AtomicInteger(price)); prices = newPrices; } } public Integer getPrice(String ticker) { AtomicInteger priceHolder = prices.get(ticker); if (priceHolder != null) return priceHolder.get(); else return null; } }

I'm not sure what the performance characteristics of an AtomicInteger are; it's possible this is slower than it looks. Assuming AtomicInteger is not unreasonably slow, this should be pretty fast - it involves two reads from a volatile plus a normal hash lookup for each get, and a read from a volatile, a hash lookup, and a single write to a volatile for updates to existing prices. It still involves duplicating the map for addition of new prices. However, in a typical market, that doesn't happen often.