# -*- coding: utf-8 -*-
import struct
from collections import deque
from functools import partial
from irc3.compat import asyncio
class DCCBase(asyncio.Protocol):
idle_handle = None
idle_timeout = None
fd = None
def __init__(self, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
self.ready = asyncio.Future(loop=self.loop)
self.started = asyncio.Future(loop=self.loop)
self.closed = asyncio.Future(loop=self.loop)
self.timeout_handle = None
def factory(self):
return self
def connection_made(self, transport):
self.transport = transport
self.started.set_result(self)
def connection_lost(self, exc):
self.close(exc)
def close(self, result=None):
if self.idle_handle is not None:
self.idle_handle.cancel()
if self.transport:
self.transport.close()
info = self.bot.dcc.connections[self.type]
if self.port in info['masks'][self.mask]:
info['total'] -= 1
del info['masks'][self.mask][self.port]
if not self.closed.done():
self.closed.set_result(result)
self.bot.log.debug('%s closed', self)
def set_timeout(self):
if self.idle_handle is not None:
self.idle_handle.cancel()
if self.idle_timeout:
self.idle_handle = self.loop.call_later(
self.idle_timeout, self.idle_timeout_reached)
def idle_timeout_reached(self, *args):
if self.type == 'chat':
msg = "Your idle is too high. Closing connection."
self.send_line(msg)
self.close()
def __str__(self):
return '%s with %s' % (self.__class__.__name__, self.mask)
def __repr__(self):
return '<%s with %s>' % (self.__class__.__name__, self.mask)
[docs]class DCCChat(DCCBase):
"""DCC CHAT implementation"""
type = 'chat'
ctcp = 'DCC CHAT chat {0.ip} {0.port}'
[docs] def connection_made(self, transport):
super(DCCChat, self).connection_made(transport)
self.encoding = getattr(self.bot, 'encoding', 'ascii')
self.set_timeout()
self.queue = deque()
[docs] def decode(self, data):
"""Decode data with bot's encoding"""
return data.decode(self.encoding, 'ignore')
[docs] def data_received(self, data):
"""data received"""
self.set_timeout()
data = self.decode(data)
if self.queue:
data = self.queue.popleft() + data
lines = data.replace('\r', '').split('\n')
self.queue.append(lines.pop(-1))
for line in lines:
self.bot.dispatch(line, iotype='dcc_in', client=self)
def write(self, data):
if data is not None:
data = data.encode(self.encoding)
if not data.endswith(b'\r\n'):
data = data + b'\r\n'
self.transport.write(data)
def send_line(self, message):
self.write(message)
self.bot.dispatch(message, iotype='dcc_out', client=self)
def send(self, *messages):
for message in messages:
self.send_line(message)
def action(self, message):
message = '\x01ACTION ' + message + '\x01'
self.send_line(message)
def actions(self, *messages):
for message in messages:
self.action(message)
[docs]class DCCGet(DCCBase):
"""DCC GET implementation"""
type = 'get'
ctcp = None
[docs] def connection_made(self, transport):
super(DCCGet, self).connection_made(transport)
if self.resume:
self.bytes_received = self.offset
else:
self.bytes_received = 0
self.fd = open(self.filepath, 'ab')
[docs] def data_received(self, data):
self.set_timeout()
self.fd.write(data)
self.bytes_received += len(data)
self.transport.write(struct.pack('!I', self.bytes_received))
def close(self, *args, **kwargs):
if self.fd:
self.fd.close()
self.fd = None
super(DCCGet, self).close(*args, **kwargs)
[docs]class DCCSend(DCCBase):
"""DCC SEND implementation"""
type = 'send'
ctcp = 'DCC SEND {0.filename_safe} {0.ip} {0.port} {0.filesize}'
block_size = 1024 * 64
limit_rate = None
filepath = None
[docs] def connection_made(self, transport):
super(DCCSend, self).connection_made(transport)
self.delay = 1. / (self.limit_rate / 64.) if self.limit_rate else None
socket = self.transport.get_extra_info('socket')
self.socket = socket
self.sendfile = getattr(self.socket, 'sendfile', None)
if self.sendfile:
self.socket.setblocking(1)
self.fd = open(self.filepath, 'rb')
self.fd_fileno = self.fd.fileno()
# remove existing transport ref. It shouldn't read/write anything
transports = getattr(self.loop, '_transports', None)
if transports is not None:
del self.loop._transports[transport._sock_fd]
self.loop.remove_writer(socket)
self.loop.add_writer(socket, self.next_chunk)
def write(self, *args): # pragma: no cover
raise NotImplementedError('write is not available during DCCSend')
def send_chunk(self):
if self.sendfile:
sent = self.sendfile(self.fd, self.offset, self.block_size)
else:
self.fd.seek(self.offset)
sent = self.socket.send(self.fd.read(self.block_size))
return sent
def next_chunk(self):
try:
sent = self.send_chunk()
except Exception as e: # pragma: no cover
self.bot.log.exception(e)
self.fd.close()
sent = 0
if sent != 0:
self.offset += sent
cb = partial(self.loop.add_writer, self.socket, self.next_chunk)
if self.delay is not None:
self.loop.call_later(self.delay, cb)
else:
cb()
else:
self.loop.remove_writer(self.socket)
[docs] def data_received(self, data):
self.set_timeout()
bytes_received = (
struct.unpack('!I', data[i:i + 4])[0]
for i in range(0, len(data), 4))
for recv in bytes_received:
if recv == self.filesize:
self.transport.close()
def close(self, *args, **kwargs):
if self.fd:
self.fd.close()
self.fd = None
super(DCCSend, self).close(*args, **kwargs)