the game where you go into mines and start crafting! but for consoles (forked directly from smartcmd's github)
1#include "stdafx.h"
2#include "InputOutputStream.h"
3#include "Socket.h"
4#include "Connection.h"
5#include "ThreadName.h"
6#include "compression.h"
7#include "..\Minecraft.Client\PS3\PS3Extras\ShutdownManager.h"
8
9// This should always be enabled, except for debugging use
10#ifndef _DEBUG
11#define CONNECTION_ENABLE_TIMEOUT_DISCONNECT 1
12#endif
13
14int Connection::readThreads = 0;
15int Connection::writeThreads = 0;
16
17int Connection::readSizes[256];
18int Connection::writeSizes[256];
19
20
21
22void Connection::_init()
23{
24// printf("Con:0x%x init\n",this);
25 InitializeCriticalSection(&writeLock);
26 InitializeCriticalSection(&threadCounterLock);
27 InitializeCriticalSection(&incoming_cs);
28
29 running = true;
30 quitting = false;
31 disconnected = false;
32 disconnectReason = DisconnectPacket::eDisconnect_None;
33 noInputTicks = 0;
34 estimatedRemaining = 0;
35 fakeLag = 0;
36 slowWriteDelay = 50;
37
38 saqThreadID = 0;
39 closeThreadID = 0;
40
41 tickCount = 0;
42
43}
44
45// 4J Jev, need to delete the critical section.
46Connection::~Connection()
47{
48 // 4J Stu - Just to be sure, make sure the read and write threads terminate themselves before the connection object is destroyed
49 running = false;
50 if( dis ) dis->close(); // The input stream needs closed before the readThread, or the readThread
51 // may get stuck whilst blocking waiting on a read
52 readThread->WaitForCompletion(INFINITE);
53 writeThread->WaitForCompletion(INFINITE);
54
55 DeleteCriticalSection(&writeLock);
56 DeleteCriticalSection(&threadCounterLock);
57 DeleteCriticalSection(&incoming_cs);
58
59 delete m_hWakeReadThread;
60 delete m_hWakeWriteThread;
61
62 // These should all have been destroyed in close() but no harm in checking again
63 delete byteArrayDos;
64 byteArrayDos = NULL;
65 delete baos;
66 baos = NULL;
67 if( bufferedDos )
68 {
69 bufferedDos->deleteChildStream();
70 delete bufferedDos;
71 bufferedDos = NULL;
72 }
73 delete dis;
74 dis = NULL;
75}
76
77Connection::Connection(Socket *socket, const wstring& id, PacketListener *packetListener) // throws IOException
78{
79 _init();
80
81 this->socket = socket;
82
83 address = socket->getRemoteSocketAddress();
84
85 this->packetListener = packetListener;
86
87 //try {
88 socket->setSoTimeout(30000);
89 socket->setTrafficClass(IPTOS_THROUGHPUT | IPTOS_LOWDELAY);
90
91 /* 4J JEV no catch
92 } catch (SocketException e) {
93 // catching this exception because it (apparently?) causes problems
94 // on OSX Tiger
95 System.err.println(e.getMessage());
96 }*/
97
98 dis = new DataInputStream(socket->getInputStream(packetListener->isServerPacketListener()));
99
100 sos = socket->getOutputStream(packetListener->isServerPacketListener());
101 bufferedDos = new DataOutputStream(new BufferedOutputStream(sos, SEND_BUFFER_SIZE));
102 baos = new ByteArrayOutputStream( SEND_BUFFER_SIZE );
103 byteArrayDos = new DataOutputStream(baos);
104
105 m_hWakeReadThread = new C4JThread::Event;
106 m_hWakeWriteThread = new C4JThread::Event;
107
108 const char *szId = wstringtofilename(id);
109 char readThreadName[256];
110 char writeThreadName[256];
111 sprintf(readThreadName,"%s read\n",szId);
112 sprintf(writeThreadName,"%s write\n",szId);
113
114 readThread = new C4JThread(runRead, (void*)this, readThreadName, READ_STACK_SIZE);
115 writeThread = new C4JThread(runWrite, this, writeThreadName, WRITE_STACK_SIZE);
116 readThread->SetProcessor(CPU_CORE_CONNECTIONS);
117 writeThread->SetProcessor(CPU_CORE_CONNECTIONS );
118#ifdef __ORBIS__
119 readThread->SetPriority(THREAD_PRIORITY_BELOW_NORMAL); // On Orbis, this core is also used for Matching 2, and that priority of that seems to be always at default no matter what we set it to. Prioritise this below Matching 2.
120 writeThread->SetPriority(THREAD_PRIORITY_BELOW_NORMAL); // On Orbis, this core is also used for Matching 2, and that priority of that seems to be always at default no matter what we set it to. Prioritise this below Matching 2.
121#endif
122
123 readThread->Run();
124 writeThread->Run();
125
126
127 /* 4J JEV, java:
128 new Thread(wstring(id).append(L" read thread")) {
129
130 };
131
132 writeThread = new Thread(id + " write thread") {
133 public void run() {
134
135 };
136
137 readThread->start();
138 writeThread->start();
139 */
140}
141
142
143void Connection::setListener(PacketListener *packetListener)
144{
145 this->packetListener = packetListener;
146}
147
148void Connection::send(shared_ptr<Packet> packet)
149{
150 if (quitting) return;
151
152 MemSect(15);
153 // 4J Jev, synchronized (&writeLock)
154 EnterCriticalSection(&writeLock);
155
156 estimatedRemaining += packet->getEstimatedSize() + 1;
157 if (packet->shouldDelay)
158 {
159 // 4J We have delayed it enough by putting it in the slow queue, so don't delay when we actually send it
160 packet->shouldDelay = false;
161 outgoing_slow.push(packet);
162 }
163 else
164 {
165 outgoing.push(packet);
166 }
167
168 // 4J Jev, end synchronized.
169 LeaveCriticalSection(&writeLock);
170 MemSect(0);
171}
172
173
174void Connection::queueSend(shared_ptr<Packet> packet)
175{
176 if (quitting) return;
177 EnterCriticalSection(&writeLock);
178 estimatedRemaining += packet->getEstimatedSize() + 1;
179 outgoing_slow.push(packet);
180 LeaveCriticalSection(&writeLock);
181}
182
183bool Connection::writeTick()
184{
185 bool didSomething = false;
186
187 // 4J Stu - If the connection is closed and the output stream has been deleted
188 if(bufferedDos==NULL || byteArrayDos==NULL)
189 return didSomething;
190
191 // try {
192 if (!outgoing.empty() && (fakeLag == 0 || System::currentTimeMillis() - outgoing.front()->createTime >= fakeLag))
193 {
194 shared_ptr<Packet> packet;
195
196 EnterCriticalSection(&writeLock);
197
198 packet = outgoing.front();
199 outgoing.pop();
200 estimatedRemaining -= packet->getEstimatedSize() + 1;
201
202 LeaveCriticalSection(&writeLock);
203
204 Packet::writePacket(packet, bufferedDos);
205
206
207#ifndef _CONTENT_PACKAGE
208 // 4J Added for debugging
209 int playerId = 0;
210 if( !socket->isLocal() )
211 {
212 Socket *socket = getSocket();
213 if( socket )
214 {
215 INetworkPlayer *player = socket->getPlayer();
216 if( player )
217 {
218 playerId = player->GetSmallId();
219 }
220 }
221 Packet::recordOutgoingPacket(packet,playerId);
222 }
223#endif
224
225 // 4J Stu - Changed this so that rather than writing to the network stream through a buffered stream we want to:
226 // a) Only push whole "game" packets to QNet, rather than amalgamated chunks of data that may include many packets, and partial packets
227 // b) To be able to change the priority and queue of a packet if required
228 //sos->writeWithFlags( baos->buf, 0, baos->size(), 0 );
229 //baos->reset();
230
231 writeSizes[packet->getId()] += packet->getEstimatedSize() + 1;
232 didSomething = true;
233 }
234
235 if ((slowWriteDelay-- <= 0) && !outgoing_slow.empty() && (fakeLag == 0 || System::currentTimeMillis() - outgoing_slow.front()->createTime >= fakeLag))
236 {
237 shared_ptr<Packet> packet;
238
239 //synchronized (writeLock) {
240
241 EnterCriticalSection(&writeLock);
242
243 packet = outgoing_slow.front();
244 outgoing_slow.pop();
245 estimatedRemaining -= packet->getEstimatedSize() + 1;
246
247 LeaveCriticalSection(&writeLock);
248
249 // If the shouldDelay flag is still set at this point then we want to write it to QNet as a single packet with priority flags
250 // Otherwise just buffer the packet with other outgoing packets as the java game did
251 if(packet->shouldDelay)
252 {
253 Packet::writePacket(packet, byteArrayDos);
254
255 // 4J Stu - Changed this so that rather than writing to the network stream through a buffered stream we want to:
256 // a) Only push whole "game" packets to QNet, rather than amalgamated chunks of data that may include many packets, and partial packets
257 // b) To be able to change the priority and queue of a packet if required
258#ifdef _XBOX
259 int flags = QNET_SENDDATA_LOW_PRIORITY | QNET_SENDDATA_SECONDARY;
260#else
261 int flags = NON_QNET_SENDDATA_ACK_REQUIRED;
262#endif
263 sos->writeWithFlags( baos->buf, 0, baos->size(), flags );
264 baos->reset();
265 }
266 else
267 {
268 Packet::writePacket(packet, bufferedDos);
269 }
270
271#ifndef _CONTENT_PACKAGE
272 // 4J Added for debugging
273 if( !socket->isLocal() )
274 {
275 int playerId = 0;
276 if( !socket->isLocal() )
277 {
278 Socket *socket = getSocket();
279 if( socket )
280 {
281 INetworkPlayer *player = socket->getPlayer();
282 if( player )
283 {
284 playerId = player->GetSmallId();
285 }
286 }
287 Packet::recordOutgoingPacket(packet,playerId);
288 }
289 }
290#endif
291
292 writeSizes[packet->getId()] += packet->getEstimatedSize() + 1;
293 slowWriteDelay = 0;
294 didSomething = true;
295 }
296 /* 4J JEV, removed try/catch
297 } catch (Exception e) {
298 if (!disconnected) handleException(e);
299 return false;
300 } */
301
302 return didSomething;
303}
304
305
306void Connection::flush()
307{
308 // TODO 4J Stu - How to interrupt threads? Or do we need to change the multithreaded functions a bit more
309 //readThread.interrupt();
310 //writeThread.interrupt();
311 m_hWakeReadThread->Set();
312 m_hWakeWriteThread->Set();
313}
314
315
316bool Connection::readTick()
317{
318 bool didSomething = false;
319
320 // 4J Stu - If the connection has closed and the input stream has been deleted
321 if(dis==NULL)
322 return didSomething;
323
324 //try {
325
326 shared_ptr<Packet> packet = Packet::readPacket(dis, packetListener->isServerPacketListener());
327
328 if (packet != NULL)
329 {
330 readSizes[packet->getId()] += packet->getEstimatedSize() + 1;
331 EnterCriticalSection(&incoming_cs);
332 if(!quitting)
333 {
334 incoming.push(packet);
335 }
336 LeaveCriticalSection(&incoming_cs);
337 didSomething = true;
338 }
339 else
340 {
341// printf("Con:0x%x readTick close EOS\n",this);
342
343 // 4J Stu - Remove this line
344 // Fix for #10410 - UI: If the player is removed from a splitscreened host�s game, the next game that player joins will produce a message stating that the host has left.
345 //close(DisconnectPacket::eDisconnect_EndOfStream);
346 }
347
348
349 /* 4J JEV, removed try/catch
350 } catch (Exception e) {
351 if (!disconnected) handleException(e);
352 return false;
353 } */
354
355 return didSomething;
356}
357
358
359/* 4J JEV, removed try/catch
360void handleException(Exception e)
361{
362e.printStackTrace();
363close("disconnect.genericReason", "Internal exception: " + e.toString());
364}*/
365
366
367void Connection::close(DisconnectPacket::eDisconnectReason reason, ...)
368{
369// printf("Con:0x%x close\n",this);
370 if (!running) return;
371// printf("Con:0x%x close doing something\n",this);
372 disconnected = true;
373
374 va_list input;
375 va_start( input, reason );
376
377 disconnectReason = reason;//va_arg( input, const wstring );
378
379 vector<void *> objs = vector<void *>();
380 void *i = NULL;
381 while (i != NULL)
382 {
383 i = va_arg( input, void* );
384 objs.push_back(i);
385 }
386
387 if( objs.size() )
388 {
389 disconnectReasonObjects = &objs[0];
390 }
391 else
392 {
393 disconnectReasonObjects = NULL;
394 }
395
396 // int count = 0, sum = 0, i = first;
397 // va_list marker;
398 //
399 // va_start( marker, first );
400 // while( i != -1 )
401 // {
402 // sum += i;
403 // count++;
404 // i = va_arg( marker, int);
405 // }
406 // va_end( marker );
407 // return( sum ? (sum / count) : 0 );
408
409
410// CreateThread(NULL, 0, runClose, this, 0, &closeThreadID);
411
412 running = false;
413
414 if( dis ) dis->close(); // The input stream needs closed before the readThread, or the readThread
415 // may get stuck whilst blocking waiting on a read
416
417 // Make sure that the read & write threads are dead before we go and kill the streams that they depend on
418 readThread->WaitForCompletion(INFINITE);
419 writeThread->WaitForCompletion(INFINITE);
420
421 delete dis;
422 dis = NULL;
423 if( bufferedDos )
424 {
425 bufferedDos->close();
426 bufferedDos->deleteChildStream();
427 delete bufferedDos;
428 bufferedDos = NULL;
429 }
430 if( byteArrayDos )
431 {
432 byteArrayDos->close();
433 delete byteArrayDos;
434 byteArrayDos = NULL;
435 }
436 if( socket )
437 {
438 socket->close(packetListener->isServerPacketListener());
439 socket = NULL;
440 }
441}
442
443void Connection::tick()
444{
445 if (estimatedRemaining > 1 * 1024 * 1024)
446 {
447 close(DisconnectPacket::eDisconnect_Overflow);
448 }
449 EnterCriticalSection(&incoming_cs);
450 bool empty = incoming.empty();
451 LeaveCriticalSection(&incoming_cs);
452 if (empty)
453 {
454#if CONNECTION_ENABLE_TIMEOUT_DISCONNECT
455 if (noInputTicks++ == MAX_TICKS_WITHOUT_INPUT)
456 {
457 close(DisconnectPacket::eDisconnect_TimeOut);
458 }
459#endif
460 }
461 // 4J Stu - Moved this a bit later in the function to stop the race condition of Disconnect packets not being processed when local client leaves
462 //else if( socket && socket->isClosing() )
463 //{
464 // close(DisconnectPacket::eDisconnect_Closed);
465 //}
466 else
467 {
468 noInputTicks = 0;
469
470 }
471
472 // 4J Added - Send a KeepAlivePacket every now and then to ensure that our read and write threads don't timeout
473 tickCount++;
474 if (tickCount % 20 == 0)
475 {
476 send( shared_ptr<KeepAlivePacket>( new KeepAlivePacket() ) );
477 }
478
479 // 4J Stu - 1.8.2 changed from 100 to 1000
480 int max = 1000;
481
482 // 4J-PB - NEEDS CHANGED!!!
483 // If we can call connection.close from within a packet->handle, then we can lockup because the loop below has locked incoming_cs, and the connection.close will flag the read and write threads for the connection to close.
484 // they are running on other threads, and will try to lock incoming_cs
485 // We got this with a pre-login packet of a player who wasn't allowed to play due to parental controls, so was kicked out
486 // This has been changed to use a eAppAction_ExitPlayerPreLogin which will run in the main loop, so the connection will not be ticked at that point
487
488
489 EnterCriticalSection(&incoming_cs);
490 // 4J Stu - If disconnected, then we shouldn't process incoming packets
491 std::vector< shared_ptr<Packet> > packetsToHandle;
492 while (!disconnected && !g_NetworkManager.IsLeavingGame() && g_NetworkManager.IsInSession() && !incoming.empty() && max-- >= 0)
493 {
494 shared_ptr<Packet> packet = incoming.front();
495 packetsToHandle.push_back(packet);
496 incoming.pop();
497 }
498 LeaveCriticalSection(&incoming_cs);
499
500 // MGH - moved the packet handling outside of the incoming_cs block, as it was locking up sometimes when disconnecting
501 for(int i=0; i<packetsToHandle.size();i++)
502 {
503 PIXBeginNamedEvent(0,"Handling packet %d\n",packetsToHandle[i]->getId());
504 packetsToHandle[i]->handle(packetListener);
505 PIXEndNamedEvent();
506 }
507 flush();
508
509 // 4J Stu - Moved this a bit later in the function to stop the race condition of Disconnect packets not being processed when local client leaves
510 if( socket && socket->isClosing() )
511 {
512 close(DisconnectPacket::eDisconnect_Closed);
513 }
514
515 // 4J - split the following condition (used to be disconnect && iscoming.empty()) so we can wrap the access in a critical section
516 if (disconnected)
517 {
518 EnterCriticalSection(&incoming_cs);
519 bool empty = incoming.empty();
520 LeaveCriticalSection(&incoming_cs);
521 if( empty )
522 {
523 packetListener->onDisconnect(disconnectReason, disconnectReasonObjects);
524 disconnected = false; // 4J added - don't keep sending this every tick
525 }
526 }
527}
528
529SocketAddress *Connection::getRemoteAddress()
530{
531 return (SocketAddress *) address;
532}
533
534void Connection::sendAndQuit()
535{
536 if (quitting)
537 {
538 return;
539 }
540// printf("Con:0x%x send & quit\n",this);
541 flush();
542 quitting = true;
543 // TODO 4J Stu - How to interrupt threads? Or do we need to change the multithreaded functions a bit more
544 //readThread.interrupt();
545
546#if 1
547 // 4J - this used to be in a thread but not sure why, and is causing trouble for us if we kill the connection
548 // whilst the thread is still expecting to be able to send a packet a couple of seconds after starting it
549 if (running)
550 {
551 // 4J TODO writeThread.interrupt();
552 close(DisconnectPacket::eDisconnect_Closed);
553 }
554#else
555 CreateThread(NULL, 0, runSendAndQuit, this, 0, &saqThreadID);
556#endif
557}
558
559int Connection::countDelayedPackets()
560{
561 return (int)outgoing_slow.size();
562}
563
564
565int Connection::runRead(void* lpParam)
566{
567 ShutdownManager::HasStarted(ShutdownManager::eConnectionReadThreads);
568 Connection *con = (Connection *)lpParam;
569
570 if (con == NULL)
571 {
572#ifdef __PS3__
573 ShutdownManager::HasFinished(ShutdownManager::eConnectionReadThreads);
574#endif
575 return 0;
576 }
577
578 Compression::UseDefaultThreadStorage();
579
580 CRITICAL_SECTION *cs = &con->threadCounterLock;
581
582 EnterCriticalSection(cs);
583 con->readThreads++;
584 LeaveCriticalSection(cs);
585
586 //try {
587
588 MemSect(19);
589 while (con->running && !con->quitting && ShutdownManager::ShouldRun(ShutdownManager::eConnectionReadThreads))
590 {
591 while (con->readTick())
592 ;
593
594 // try {
595 //Sleep(100L);
596 // TODO - 4J Stu - 1.8.2 changes these sleeps to 2L, but not sure whether we should do that as well
597 con->m_hWakeReadThread->WaitForSignal(100L);
598 }
599 MemSect(0);
600
601 /* 4J JEV, removed try/catch
602 } catch (InterruptedException e) {
603 }
604 }
605 } finally {
606 synchronized (threadCounterLock) {
607 readThreads--;
608 }
609 } */
610
611 ShutdownManager::HasFinished(ShutdownManager::eConnectionReadThreads);
612 return 0;
613}
614
615int Connection::runWrite(void* lpParam)
616{
617 ShutdownManager::HasStarted(ShutdownManager::eConnectionWriteThreads);
618 Connection *con = dynamic_cast<Connection *>((Connection *) lpParam);
619
620 if (con == NULL)
621 {
622 ShutdownManager::HasFinished(ShutdownManager::eConnectionWriteThreads);
623 return 0;
624 }
625
626 Compression::UseDefaultThreadStorage();
627
628 CRITICAL_SECTION *cs = &con->threadCounterLock;
629
630 EnterCriticalSection(cs);
631 con->writeThreads++;
632 LeaveCriticalSection(cs);
633
634 // 4J Stu - Adding this to force us to run through the writeTick at least once after the event is fired
635 // Otherwise there is a race between the calling thread setting the running flag and this loop checking the condition
636 DWORD waitResult = WAIT_TIMEOUT;
637
638 while ((con->running || waitResult == WAIT_OBJECT_0 ) && ShutdownManager::ShouldRun(ShutdownManager::eConnectionWriteThreads))
639 {
640 while (con->writeTick())
641 ;
642
643 //Sleep(100L);
644 // TODO - 4J Stu - 1.8.2 changes these sleeps to 2L, but not sure whether we should do that as well
645 waitResult = con->m_hWakeWriteThread->WaitForSignal(100L);
646
647 if (con->bufferedDos != NULL) con->bufferedDos->flush();
648 //if (con->byteArrayDos != NULL) con->byteArrayDos->flush();
649 }
650
651
652 // 4J was in a finally block.
653 EnterCriticalSection(cs);
654 con->writeThreads--;
655 LeaveCriticalSection(cs);
656
657 ShutdownManager::HasFinished(ShutdownManager::eConnectionWriteThreads);
658 return 0;
659}
660
661int Connection::runClose(void* lpParam)
662{
663 Connection *con = dynamic_cast<Connection *>((Connection *) lpParam);
664
665 if (con == NULL) return 0;
666
667 //try {
668
669 Sleep(2000);
670 if (con->running)
671 {
672 // 4J TODO writeThread.interrupt();
673 con->close(DisconnectPacket::eDisconnect_Closed);
674 }
675
676 /* 4J Jev, removed try/catch
677 } catch (Exception e) {
678 e.printStackTrace();
679 } */
680
681 return 1;
682}
683
684int Connection::runSendAndQuit(void* lpParam)
685{
686 Connection *con = dynamic_cast<Connection *>((Connection *) lpParam);
687// printf("Con:0x%x runSendAndQuit\n",con);
688
689 if (con == NULL) return 0;
690
691 //try {
692
693 Sleep(2000);
694 if (con->running)
695 {
696 // 4J TODO writeThread.interrupt();
697 con->close(DisconnectPacket::eDisconnect_Closed);
698// printf("Con:0x%x runSendAndQuit close\n",con);
699 }
700
701// printf("Con:0x%x runSendAndQuit end\n",con);
702 /* 4J Jev, removed try/catch
703 } catch (Exception e) {
704 e.printStackTrace();
705 } */
706
707 return 0;
708}