proio
event.cc
1 #include <stdarg.h>
2 #include <algorithm>
3 #include <sstream>
4 
5 #include "event.h"
6 #include "reader.h"
7 
8 using namespace proio;
9 using namespace google::protobuf;
10 
12  eventProto = new proto::Event();
13  SetDescriptorPool();
14  UseGeneratedPool();
15  dirtyTags = false;
16 }
17 
18 Event::Event(const std::string &data) : Event() { eventProto->ParseFromString(data); }
19 
20 Event::~Event() {
21  delete eventProto;
22  for (auto idEntryPair : entryCache) delete idEntryPair.second;
23  for (auto descVectorPair : store)
24  for (auto entry : descVectorPair.second) delete entry;
25 }
26 
27 uint64_t Event::AddEntry(Message *entry, std::string tag) {
28  uint64_t typeID = getTypeID(entry);
29  proto::Any entryProto;
30  entryProto.set_type(typeID);
31 
32  eventProto->set_nentries(eventProto->nentries() + 1);
33  uint64_t id = eventProto->nentries();
34  (*eventProto->mutable_entry())[id] = entryProto;
35 
36  entryCache[id] = entry;
37 
38  if (tag.size() > 0) TagEntry(id, tag);
39 
40  return id;
41 }
42 
43 Message *Event::GetEntry(uint64_t id) {
44  if (entryCache.count(id)) return entryCache[id];
45 
46  if (!eventProto->entry().count(id)) return NULL;
47  const proto::Any entryProto = eventProto->entry().at(id);
48 
49  const Descriptor *desc = getDescriptor(entryProto.type());
50  if (!desc) throw unknownMessageTypeError;
51  Message *entry;
52  std::vector<Message *> &storeEntries = store[desc];
53  if (storeEntries.size() > 0) {
54  entry = storeEntries.back();
55  storeEntries.pop_back();
56  } else {
57  const Message *prototype = NULL;
58  if (useGenPool) prototype = MessageFactory::generated_factory()->GetPrototype(desc);
59  if (!prototype && messageFactory) prototype = messageFactory->GetPrototype(desc);
60  if (prototype)
61  entry = prototype->New();
62  else
63  throw unknownMessageTypeError;
64  }
65  if (!entry->ParseFromString(entryProto.payload())) {
66  entry->Clear();
67  store[entry->GetDescriptor()].push_back(entry);
68  throw deserializationError;
69  }
70  entryCache[id] = entry;
71 
72  return entry;
73 }
74 
75 void Event::TagEntry(uint64_t id, std::string tag) { (*eventProto->mutable_tag())[tag].add_entry(id); }
76 
77 void Event::UntagEntry(uint64_t id, std::string tag) {
78  if (!eventProto->tag().count(tag)) return;
79 
80  auto entries = eventProto->mutable_tag()->at(tag).mutable_entry();
81  for (auto iter = entries->begin(); iter != entries->end(); iter++) {
82  if ((*iter) == id) {
83  entries->erase(iter);
84  break;
85  }
86  }
87 }
88 
89 void Event::RemoveEntry(uint64_t id) {
90  if (entryCache.count(id)) {
91  Message *entry = entryCache[id];
92  entryCache.erase(id);
93  entry->Clear();
94  store[entry->GetDescriptor()].push_back(entry);
95  }
96  eventProto->mutable_entry()->erase(id);
97  dirtyTags = true;
98 }
99 
100 std::vector<std::string> Event::Tags() {
101  std::vector<std::string> tags;
102  for (auto stringTagPair : eventProto->tag()) {
103  tags.push_back(stringTagPair.first);
104  }
105  std::sort(tags.begin(), tags.end());
106  return tags;
107 }
108 
109 std::vector<uint64_t> Event::TaggedEntries(std::string tag) {
110  if (eventProto->tag().count(tag)) {
111  tagCleanup();
112  auto entries = eventProto->tag().at(tag).entry();
113  std::vector<uint64_t> returnEntries;
114  for (uint64_t entry : entries) returnEntries.push_back(entry);
115  return returnEntries;
116  }
117  return std::vector<uint64_t>();
118 }
119 
120 std::vector<uint64_t> Event::AllEntries() {
121  auto entries = eventProto->entry();
122  std::vector<uint64_t> returnEntries;
123  for (auto idEntryPair : entries) returnEntries.push_back(idEntryPair.first);
124  return returnEntries;
125 }
126 
127 std::vector<std::string> Event::EntryTags(uint64_t id) {
128  std::vector<std::string> tags;
129  for (auto stringTagPair : eventProto->tag()) {
130  for (uint64_t entry : stringTagPair.second.entry())
131  if (entry == id) {
132  tags.push_back(stringTagPair.first);
133  break;
134  }
135  }
136  std::sort(tags.begin(), tags.end());
137  return tags;
138 }
139 
140 void Event::DeleteTag(std::string tag) { eventProto->mutable_tag()->erase(tag); }
141 
142 Message *Event::Free(const Descriptor *desc) {
143  std::vector<Message *> &storeEntries = store[desc];
144  if (storeEntries.size() > 0) {
145  Message *entry = storeEntries.back();
146  storeEntries.pop_back();
147  return entry;
148  } else
149  return NULL;
150 }
151 
152 std::string Event::String() {
153  std::string printString;
154  for (auto tag : Tags()) {
155  printString += "---------- TAG: " + tag + " ----------\n";
156  auto entries = TaggedEntries(tag);
157  for (uint64_t entryID : entries) {
158  std::stringstream ss;
159  ss << "ID: " << entryID << "\n";
160  Message *entry = GetEntry(entryID);
161  if (entry) {
162  ss << "Entry type: " << entry->GetTypeName() << "\n";
163  ss << entry->DebugString() << "\n";
164  } else
165  ss << "not found\n";
166  printString += ss.str();
167  }
168  }
169  return printString;
170 }
171 
173  for (auto idEntryPair : entryCache) {
174  int64 id = idEntryPair.first;
175  Message *entry = idEntryPair.second;
176 
177  size_t byteSize = entry->ByteSizeLong();
178  uint8_t *buffer = new uint8_t[byteSize];
179  entry->SerializeToArray(buffer, byteSize);
180  entry->Clear();
181  store[entry->GetDescriptor()].push_back(entry);
182 
183  (*eventProto->mutable_entry())[id].set_payload(buffer, byteSize);
184  delete[] buffer;
185  }
186  entryCache.clear();
187 
188  tagCleanup();
189 }
190 
191 void Event::Clear() {
192  eventProto->Clear();
193  revTypeLookup.clear();
194  for (auto idEntryPair : entryCache) {
195  Message *entry = idEntryPair.second;
196  entry->Clear();
197  store[entry->GetDescriptor()].push_back(entry);
198  }
199  entryCache.clear();
200  descriptorCache.clear();
201  metadata.clear();
202  dirtyTags = false;
203 }
204 
205 void Event::SetDescriptorPool(const DescriptorPool *pool) {
206  if (pool != descriptorPool) {
207  clearDescriptors();
208  descriptorPool = pool;
209  }
210 }
211 
212 void Event::UseGeneratedPool(bool useGenPool) {
213  if (this->useGenPool != useGenPool) {
214  clearDescriptors();
215  this->useGenPool = useGenPool;
216  }
217 }
218 
219 Event &Event::operator=(const Event &event) {
220  if (&event == this) return *this;
221  Clear();
222  *this->eventProto = *event.eventProto;
223  this->revTypeLookup = event.revTypeLookup;
224  this->descriptorCache = event.descriptorCache;
225  for (auto idEntryPair : event.entryCache) {
226  auto entry = idEntryPair.second;
227  const Descriptor *desc = getDescriptor(getTypeID(entry));
228  if (!desc) throw unknownMessageTypeError;
229  Message *newEntry;
230  std::vector<Message *> &storeEntries = store[desc];
231  if (storeEntries.size() > 0) {
232  newEntry = storeEntries.back();
233  storeEntries.pop_back();
234  } else {
235  const Message *prototype = NULL;
236  if (useGenPool) prototype = MessageFactory::generated_factory()->GetPrototype(desc);
237  if (!prototype && messageFactory) prototype = messageFactory->GetPrototype(desc);
238  if (prototype)
239  newEntry = prototype->New();
240  else
241  throw unknownMessageTypeError;
242  }
243  newEntry->MergeFrom(*entry);
244  this->entryCache[idEntryPair.first] = newEntry;
245  }
246  this->metadata = event.metadata;
247  this->dirtyTags = event.dirtyTags;
248  return *this;
249 }
250 
251 proto::Event *Event::getProto() { return eventProto; }
252 
253 uint64_t Event::getTypeID(Message *entry) {
254  std::string typeName = entry->GetTypeName();
255  if (revTypeLookup.count(typeName)) {
256  return revTypeLookup[typeName];
257  }
258 
259  for (auto typePair : eventProto->type()) {
260  if (typePair.second.compare(typeName) == 0) {
261  revTypeLookup[typeName] = typePair.first;
262  return typePair.first;
263  }
264  }
265 
266  eventProto->set_ntypes(eventProto->ntypes() + 1);
267  uint64_t typeID = eventProto->ntypes();
268  (*eventProto->mutable_type())[typeID] = typeName;
269  revTypeLookup[typeName] = typeID;
270  return typeID;
271 }
272 
273 const Descriptor *Event::getDescriptor(uint64_t typeID) {
274  if (!descriptorCache.count(typeID)) {
275  const std::string typeName = eventProto->type().at(typeID);
276  descriptorCache[typeID] = NULL;
277  if (useGenPool)
278  descriptorCache[typeID] = DescriptorPool::generated_pool()->FindMessageTypeByName(typeName);
279  if (!descriptorCache[typeID] && descriptorPool)
280  descriptorCache[typeID] = descriptorPool->FindMessageTypeByName(typeName);
281  }
282  return descriptorCache[typeID];
283 }
284 
285 void Event::tagCleanup() {
286  if (!dirtyTags) return;
287  auto tags = eventProto->mutable_tag();
288  for (auto iter = tags->begin(); iter != tags->end(); iter++) {
289  RepeatedField<uint64_t> *entryList = iter->second.mutable_entry();
290  for (int i = entryList->size() - 1; i >= 0; i--) {
291  if (!eventProto->entry().count((*entryList)[i])) {
292  for (int j = i; j < entryList->size() - 1; j++) entryList->Set(j, entryList->Get(j + 1));
293  entryList->RemoveLast();
294  }
295  }
296  }
297  dirtyTags = false;
298 }
299 void Event::clearDescriptors() {
300  FlushCache();
301  for (auto descVectorPair : store)
302  for (auto entry : descVectorPair.second) delete entry;
303  store.clear();
304  descriptorCache.clear();
305  messageFactory.reset(new DynamicMessageFactory());
306 }
google::protobuf::Message * Free(const google::protobuf::Descriptor *desc)
Definition: event.cc:142
void DeleteTag(std::string tag)
Definition: event.cc:140
std::vector< std::string > Tags()
Definition: event.cc:100
std::vector< std::string > EntryTags(uint64_t id)
Definition: event.cc:127
Definition: event.h:11
uint64_t AddEntry(google::protobuf::Message *entry, std::string tag="")
Definition: event.cc:27
google::protobuf::Message * GetEntry(uint64_t id)
Definition: event.cc:43
void TagEntry(uint64_t id, std::string tag)
Definition: event.cc:75
void FlushCache()
Definition: event.cc:172
void SetDescriptorPool(const google::protobuf::DescriptorPool *pool=NULL)
Definition: event.cc:205
void Clear()
Definition: event.cc:191
void RemoveEntry(uint64_t id)
Definition: event.cc:89
std::string String()
Definition: event.cc:152
void UntagEntry(uint64_t id, std::string tag)
Definition: event.cc:77
void UseGeneratedPool(bool useGenPool=true)
Definition: event.cc:212
std::vector< uint64_t > AllEntries()
Definition: event.cc:120
std::vector< uint64_t > TaggedEntries(std::string tag)
Definition: event.cc:109