# -*- coding: utf-8 -*-
import time, ntptime
import os.path
from mms_net_socket import MMS_Socket
from mms_net_define import *
from mmsconfig import *
from utils import print_hex
HTTP_POST_TEMPLATE = \
'POST %s HTTP/1.1\r\n\
Accept-Language: en\r\n\
Host: %s\r\n\
Date: %s\r\n\
%s\
Content-Length: %d\r\n\
User-Agent: %s\r\n\
Content-Type: application/vnd.wap.mms-message\r\n\r\n'
HTTP_GET_TEMPLATE = \
'GET %s HTTP/1.1\r\n\
Accept: */*\r\n\
Accept-Language: en\r\n\
Host: %s\r\n\
%s\
User-Agent: %s\r\n\r\n'
X_WAP_PROFILE = ''
#weekdayname = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
weekdayname = [_('Mon'), _('Tue'), _('Wed'), _('Thu'), _('Fri'), _('Sat'), _('Sun')]
#monthname = [None,
# 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
# 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
monthname = [None,_('Jan'), _('Feb'), _('Mar'), _('Apr'), _('May'), _('Jun'),_('Jul'), _('Aug'), _('Sep'), _('Oct'), _('Nov'), _('Dec')]
def Print_MMSHeader(header, headertype=''):
print '# # # Print_MMSHeader:', headertype
print_hex(header)
print '# # # Print_MMSHeader END'
def Change_Smil(smil_name):
h = open(smil_name)
oldsmil = h.read()
h.close()
newsmil = ''
for i in oldsmil:
if i == '&':
newsmil += '&'
else:
newsmil += i
h = open(smil_name, 'wb+')
newsmil = UnicodeToUtf8(newsmil)
h.write(newsmil)
h.close()
def Parse_MMS(fname):
from mmsmanager import MMSMessageInfo, MMSMediaInfo
msg = None
mretrieve = M_Retrieve_Conf()
import os.path
from mmsmanager import mmsmgr
if DEBUG_MMS_NET:
print 'Parse_MMS', fname
if os.path.exists(fname):
hfile = open(fname)
mretrieve.m_response_buffer = hfile.read()
hfile.close()
if not mretrieve.Parse_MMSHeader(False):
msg = MMSMessageInfo()
mmsmgr.msg = msg
return False
if True == mretrieve.Parse_MMSBody()[0]:
msg = mretrieve.m_messageinfo
else:
msg = MMSMessageInfo()
mmsmgr.msg = msg
return False
else:
msg = MMSMessageInfo()
mmsmgr.msg = msg
return True
mmsmgr.msg = msg
if DEBUG_MMS_NET:
print '-msg media-'
for i in mretrieve.m_messageinfo.media:
print i
print 'text:', i.fname_text
print 'image:', i.fname_image
print 'sound:', i.fname_sound
print 'video:', i.fname_video
for j in mretrieve.m_messageinfo.slide_time:
print 'slide time', j
print 'msg insert', mmsmgr.msg
if None != msg and \
M_SEND_REQ == mretrieve.m_mmsheader_field.m_XMmsMessageType:
if [] != mretrieve.m_to_phone:
for i in mretrieve.m_to_phone:
mmsmgr.addaddress(0, 0, i)
if [] != mretrieve.m_to_email:
for i in mretrieve.m_to_email:
mmsmgr.addaddress(1, 0, i)
if [] != mretrieve.m_cc_phone:
for i in mretrieve.m_cc_phone:
mmsmgr.addaddress(0, 1, i)
if [] != mretrieve.m_cc_email:
for i in mretrieve.m_cc_email:
mmsmgr.addaddress(1, 1, i)
if [] != mretrieve.m_bcc_phone:
for i in mretrieve.m_bcc_phone:
mmsmgr.addaddress(0, 2, i)
if [] != mretrieve.m_bcc_email:
for i in mretrieve.m_bcc_phone:
mmsmgr.addaddress(1, 2, i)
if DEBUG_MMS_NET:
print mmsmgr.msg.info_received
print mmsmgr.msg.info_inbox_sender
print mmsmgr.msg.info_sender
print 'Parse_MMS end'
return True
def Parse_MMSDeliveryReport(noti_data):
if DEBUG_MMS_NET:
print 'Parse_MMSDeliveryReport'
deliveryinfo = M_Delivery_Ind(noti_data)
if deliveryinfo.m_error:
return None, None
return deliveryinfo.Get_Delivery_Info()
def Parse_MMSNoti(noti_data):
from mmsmanager import mmsmgr
mmsmgr.MMS_Directory()
def Make_MMSnotification(noti_data):
if TEST_CTR_NET:
return noti_data
if DEBUG_MMS_NET:
print 'Make_MMSnotification'
print noti_data
temp = []
for i in noti_data:
temp.append(i[1][1])
return ''.join(temp)
mms_noti = Make_MMSnotification(noti_data)
if DEBUG_MMS_NET:
print 'mms_noti'
notiinfo = M_Notification_Ind()
if not notiinfo.Parse_MMS_Noti(mms_noti):
return None, None, None, None, None, None
fromnumber = ''
sub = ''
report = False
url = ''
tranid = ''
mmsexp = ''
if notiinfo.m_mmsheader_field.m_Subject:
sub = notiinfo.m_mmsheader_field.m_Subject.textstring
if notiinfo.m_mmsheader_field.m_XMmsDeliveryReport:
report = YES_NO(notiinfo.m_mmsheader_field.m_XMmsDeliveryReport).yes_no
if notiinfo.m_mmsheader_field.m_XMmsContentLocation:
url = notiinfo.m_mmsheader_field.m_XMmsContentLocation.data
if notiinfo.m_mmsheader_field.m_XMmsTrancationId:
tranid = notiinfo.m_mmsheader_field.m_XMmsTrancationId.data
if notiinfo.m_mmsheader_field.m_XMmsExpiry:
mmsexp = notiinfo.m_mmsheader_field.m_XMmsExpiry.valuestr
if notiinfo.m_mmsheader_field.m_From:
fromnumber = notiinfo.m_mmsheader_field.m_From.encodedstring.textstring.data
idx = fromnumber.find('.')
if DEBUG_MMS_NET:
print '################################'
print fromnumber, idx
if -1 != idx:
if DEBUG_MMS_NET:
print 'newnum'
tempnumber = fromnumber[idx+1:]
tempnumber += ':'
tempnumber += fromnumber[0]
fromnumber = tempnumber
if DEBUG_MMS_NET:
print 'Parse_MMSNoti'
print 'fromnumber:',fromnumber
print 'sub:',sub
print 'report:',report
print 'url:',url
print 'tranid:',tranid
print 'mmsexp:',mmsexp
return fromnumber, sub, report, url, tranid, mmsexp
#현재 시간(년월일시분초)를 생성한다.
def Get_CurrentDate():
ttime = time.localtime(ntptime.ntime())
return time.strftime('%Y%m%d%H%M%S', ttime)
def Make_UINTVAR(intval, valuelength=True):
uintval = []
itempvar = 0
value = intval
blastbyte = True
while 1:
itempvar = 0
itempvar = (value & UINTVAR_MASK)
if (False == blastbyte):
itempvar |= UINTVAR_CONTINUE
uintval.insert(0, chr(itempvar))
value = value >> UINTVAR_PAYLOADLEN
if (0 == value):
break
blastbyte = False
if valuelength:
if (UINTVAR_BYTE <= intval):
uintval.insert(0, chr(UINTVAR_BYTE))
return uintval
def ValuelengthToInteger(valuelength):
intervalue = 0
tempvalue = 0
index = 0
tempvalue = ord(valuelength[index])
if DEBUG_MMS_NET:
print 'ValuelengthToInteger', tempvalue
if UINTVAR_BYTE == tempvalue:
index += 1
while (index < len(valuelength)):
tempvalue = ord(valuelength[index])
intervalue += tempvalue & UINTVAR_MASK
if not (tempvalue & UINTVAR_CONTINUE):
index += 1
break
index += 1
intervalue = intervalue << UINTVAR_PAYLOADLEN
return intervalue, index
def UnicodeToUtf8(unicodestring):
return unicode(unicodestring).encode('utf-8')
def LongintgerToInteger(longintegerstr):
longint = 0
for i in range(len(longintegerstr)):
longint = longint << 8
longint = longint + ord(longintegerstr[i])
return longint
def Parse_Url(mms_url):
import urlparse
return urlparse.urlparse(mms_url.strip())
def Get_Posturl(mms_url):
post = Parse_Url(mms_url)[2]
if not len(post):
post = '/'
return post
def Get_Host(mms_url):
return Parse_Url(mms_url)[1]
class MMS_Http:
def __init__(self):
self.m_HTTPheader_buffer = [] #HTTP 헤더 저장.
pass
#현재의 GTM 시간을 생성한다.
def Get_GTMtime(self):
now = ntptime.ntime()
year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
return "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
weekdayname[wd],
day, monthname[month],
year,
hh, mm, ss)
#HTTP POST 헤더를 생성한다.
#int content_length: [in] body의 크기.
def Make_HTTP_POST(self, content_length, connection='', mmsc_url=''):
self.m_HTTPheader_buffer = []
if DEBUG_MMS_NET:
print 'MMS_Http::Make_HTTP_POST:', content_length, connection, mmsc_url
self.m_HTTPheader_buffer = HTTP_POST_TEMPLATE % \
(Get_Posturl(mmsc_url), Get_Host(mmsc_url),
self.Get_GTMtime(), connection, content_length,
'LVP-2800_DA610')
if DEBUG_MMS_NET:
print 'HTTP_POST:',self.m_HTTPheader_buffer
#HTTP GET 헤더를 생성한다.
#sequence geturl: [in] 수신할 메시지의 url
def Make_HTTP_GET(self, url):
self.m_HTTPheader_buffer = []
if DEBUG_MMS_NET:
print 'Make_HTTP_GET url:', url
self.m_HTTPheader_buffer = HTTP_GET_TEMPLATE % \
(Get_Posturl(url), Get_Host(url), 'Connection: Keep-Alive\r\n', 'LVP-2800_DA610')
if DEBUG_MMS_NET:
print 'HTTP_GET:',self.m_HTTPheader_buffer
#HTTP 헤더를 socket을 이용해 전송한다.
#MMS_Socket mms_socket: [in] mms 전송을 위한 소켓 클래스 객체.
def Send_HTTPHeader(self, mms_socket, totmsgsize=0):
if DEBUG_MMS_NET:
print 'Send_HTTPHeader', totmsgsize
mms_socket.Send_Data(self.m_HTTPheader_buffer, totmsgsize)
class YES_NO:
def __init__(self, byte):
self.yes_no = None
self.Set_Value(byte)
def Set_Value(self, byte):
if (YES == ord(byte)):
self.yes_no = 1
else:
self.yes_no = 0
class TEXT_STRING:
def __init__(self, string = ''):
self.data = None
self.Set_Value(string)
def Set_Value(self, string):
stringlen = len(string)
if stringlen:
self.data = string[0:stringlen-1]
def Get_Value(self):
return self.data
class ENCODED_STRING:
def __init__(self, string = ''):
self.valuelength = None
self.charset = None
self.textstring = None
self.Set_Value(string)
def Set_Value(self, string):
if (UINTVAR_BYTE < ord(string[0])):
self.textstring = TEXT_STRING(string)
else:
self.valuelength, value_len = ValuelengthToInteger(string)
self.charset = ord(string[value_len])-WELL_KNOWN_VALUE
if utf_8 == self.charset:
if (0<=ord(string[value_len+1]) and \
ord(string[value_len+1])<=31) or \
ord(string[value_len+1])==127:
if DEBUG_MMS_NET:
print 'subject utf-8 +2'
self.textstring = TEXT_STRING(string[value_len+2:])
else:
self.textstring = TEXT_STRING(string[value_len+1:])
if DEBUG_MMS_NET:
print 'subject utf-8'
else:
if DEBUG_MMS_NET:
print 'subject utf-8 else'
self.textstring = TEXT_STRING(string[value_len+1:])
def Get_Value(self):
if None != self.textstring:
return self.textstring.Get_Value()
else:
return self.textstring
class FROM_VALUE:
def __init__(self, string = ''):
self.valuelength = 0
self.token = 0
self.encodedstring = None
self.Set_Value(string)
def Set_Value(self, string):
if DEBUG_MMS_NET:
print 'FROM_VALUE.set_value'
self.valuelength, value_len = ValuelengthToInteger(string)
self.token = ord(string[value_len])
if (ADDRESS_PRESENT_TOKEN == self.token):
self.encodedstring = ENCODED_STRING(string[value_len+1:])
def Get_Value(self):
if None == self.encodedstring:
return self.encodedstring
else:
return self.encodedstring.Get_Value()
class CONTENT_LOCATION(TEXT_STRING):
def __init__(self, string = ''):
TEXT_STRING.__init__(self, string)
class LONG_INTEGER:
def __init__(self, string = ''):
self.valuelength = 0
self.valuestr = ''
self.valueinteger = 0
self.Set_Value(string)
def Set_Value(self, string = ''):
self.valuelength = ord(string[0])
self.valuestr = string[1:]
self.valueinteger = LongintgerToInteger(self.valuestr)
class CONTENT_TYPE:
def __init__(self):
self.valuelength = 0
self.contenttype = 0
self.othervalue = ''
def Set_Value(self, valuelen, contenttype, other):
self.valuelength = valuelen
self.contenttype = contenttype
self.othervalue = other
class MM1_Field:
def __init__(self):
self.m_Bcc = [] #[]
self.m_Cc = [] #[]
self.m_XMmsContentLocation = None
self.m_ContentType = None
self.m_Date = None
self.m_XMmsDeliveryReport = None
self.m_XMmsDeliveryTime = None
self.m_XMmsExpiry = None
self.m_From = None
self.m_XMmsMessageClass = None
self.m_MessageID = None
self.m_XMmsMessageType = None #1byte
self.m_XMmsMMSVersion = None #1byte
self.m_XMmsMessageSize = None
self.m_XMmsPriority = None
self.m_XMmsReadReport = None
self.m_XMmsReportAllowed = None
self.m_XMmsResponseStatus = None
self.m_XMmsResponseText = None
self.m_XMmsSenderVisibility = None
self.m_XMmsStaus = None
self.m_Subject = None
self.m_To = [] #[]
self.m_XMmsTrancationId = None #Text-string
self.m_XMmsRetrieveStaus = None
self.m_XMmsRetrieveText = None
self.m_XMmsReadStatus = None
self.m_XMmsReplyCharging = None
self.m_XMmsReplyChargingDeadline = None
self.m_XMmsReplyChargingID = None
self.m_XMmsReplyChargingSize = None
self.m_XMmsPreviouslySentBy = None
self.m_XMmsPreviouslySentDate = None
class MMS_BaseResponse:
def __init__(self):
self.m_mmsheader_field = MM1_Field()#[]
self.m_response_buffer = ''
self.m_mmsheader_index = 0
def Recv_Response(self, mms_socket, cb=None, progress=False):
self.m_response_buffer = ''
self.m_mmsheader_index = 0
if cb:
if DEBUG_MMS_NET:
print 'Recv_Response.cb'
mms_socket.Recv_DataInit(cb)
else:
if DEBUG_MMS_NET:
print 'Recv_Response.else'
self.m_response_buffer = ''.join(mms_socket.Recv_Data(progress))
if DEBUG_MMS_NET:
print 'Recv_Response.end'
def Get_HTTPBodyIndex(self):
self.m_mmsheader_index = self.m_response_buffer.find('\r\n\r\n')
if (-1 != self.m_mmsheader_index):
self.m_mmsheader_index += 4
if DEBUG_MMS_NET:
print 'Get_HTTPBodyIndex', self.m_mmsheader_index
def Parse_AttrType(self, index):
attrtype = ord(self.m_response_buffer[index])
if (WELL_KNOWN_VALUE < attrtype):
attrtype -= WELL_KNOWN_VALUE
else:
attrtype = 0
if DEBUG_MMS_NET:
print 'Parse_AttrType', attrtype, ord(self.m_response_buffer[index]), index
print_hex(self.m_response_buffer[index])
return (1, attrtype)
def Parse_AttrValue(self, index, attrtype):
if DEBUG_MMS_NET:
print 'Parse_AttrValue', index, attrtype
attrvalueresult = None
if (Bcc_NUM == attrtype):
attrvalueresult = self.Get_Bcc(index)
self.m_mmsheader_field.m_Bcc.append(ENCODED_STRING(attrvalueresult[1]))
elif (Cc_NUM == attrtype):
attrvalueresult = self.Get_Cc(index)
self.m_mmsheader_field.m_Cc.append(ENCODED_STRING(attrvalueresult[1]))
elif (X_Mms_Content_Location_NUM == attrtype):
attrvalueresult = self.Get_XMMSContentLocation(index)
self.m_mmsheader_field.m_XMmsContentLocation = \
CONTENT_LOCATION(attrvalueresult[1])
elif (Content_Type_NUM == attrtype):
attrvalueresult = self.Get_ContentType(index)
self.m_mmsheader_field.m_ContentType = \
attrvalueresult[1]
elif (Date_NUM == attrtype):
attrvalueresult = self.Get_Date(index)
self.m_mmsheader_field.m_Date = \
LONG_INTEGER(attrvalueresult[1])
elif (X_Mms_Delivery_Report_NUM == attrtype):
attrvalueresult = self.Get_1Byte(index)
self.m_mmsheader_field.m_XMmsDeliveryReport = \
attrvalueresult[1]
elif (X_Mms_Delivery_Time_NUM == attrtype):
attrvalueresult = self.Get_XMMSDeliveryTime(index)
self.m_mmsheader_field.m_XMmsDeliveryTime = \
attrvalueresult[1]
elif (X_Mms_Expiry_NUM == attrtype):
attrvalueresult = self.Get_Expiry(index)
self.m_mmsheader_field.m_XMmsExpiry = \
LONG_INTEGER(attrvalueresult[1])
elif (From_NUM == attrtype):
attrvalueresult = self.Get_From(index)
self.m_mmsheader_field.m_From = \
FROM_VALUE(attrvalueresult[1])
elif (X_Mms_Message_Class_NUM == attrtype):
attrvalueresult = self.Get_1Byte(index)
self.m_mmsheader_field.m_XMmsMessageClass = \
attrvalueresult[1]
elif (Message_ID_NUM == attrtype):
attrvalueresult = self.Get_MessageID(index)
self.m_mmsheader_field.m_MessageID = \
attrvalueresult[1]
elif (X_Mms_Message_Type_NUM == attrtype):
attrvalueresult = self.Get_1Byte(index)
self.m_mmsheader_field.m_XMmsMessageType = \
ord(attrvalueresult[1])
elif (X_Mms_MMS_Version_NUM == attrtype):
attrvalueresult = self.Get_ShortInteger(index)
self.m_mmsheader_field.m_XMmsMMSVersion = \
attrvalueresult[1]
elif (X_Mms_Message_Size_NUM == attrtype):
attrvalueresult = self.Get_XMMSMessageSize(index)
self.m_mmsheader_field.m_XMmsMessageSize = \
LONG_INTEGER(attrvalueresult[1])
elif (X_Mms_Priority_NUM == attrtype):
attrvalueresult = self.Get_XMMSPriority(index)
self.m_mmsheader_field.m_XMmsPriority = \
attrvalueresult[1]
elif (X_Mms_Read_Report_NUM == attrtype):
attrvalueresult = self.Get_XMMSReadReport(index)
self.m_mmsheader_field.m_XMmsReadReport = \
attrvalueresult[1]
elif (X_Mms_Report_Allowed_NUM == attrtype):
attrvalueresult = self.Get_XMMSReportAllowed(index)
self.m_mmsheader_field.m_XMmsReportAllowed = \
attrvalueresult[1]
elif (X_Mms_Response_Status_NUM == attrtype):
attrvalueresult = self.Get_1Byte(index)
self.m_mmsheader_field.m_XMmsResponseStatus = \
attrvalueresult[1]
elif (X_Mms_Response_Text_NUM == attrtype):
attrvalueresult = self.Get_EncodedString(index)
self.m_mmsheader_field.m_XMmsResponseText = \
attrvalueresult[1]
elif (X_Mms_Sender_Visibility_NUM == attrtype):
attrvalueresult = self.Get_XMMSSenderVisibility(index)
self.m_mmsheader_field.m_XMmsSenderVisibility = \
attrvalueresult[1]
elif (X_Mms_Staus_NUM == attrtype):
attrvalueresult = self.Get_XMMSStatus(index)
self.m_mmsheader_field.m_XMmsStaus = \
attrvalueresult[1]
elif (Subject_NUM == attrtype):
attrvalueresult = self.Get_Subject(index)
self.m_mmsheader_field.m_Subject = \
ENCODED_STRING(attrvalueresult[1])
elif (To_NUM == attrtype):
attrvalueresult = self.Get_To(index)
self.m_mmsheader_field.m_To.append(ENCODED_STRING(attrvalueresult[1]))
elif (X_Mms_Trancation_Id_NUM == attrtype):
attrvalueresult = self.Get_XMMSTransactionID(index)
self.m_mmsheader_field.m_XMmsTrancationId = \
TEXT_STRING(attrvalueresult[1])
elif (X_Mms_Retrieve_Staus_NUM == attrtype):
attrvalueresult = self.Get_XMMSRetrieveStatus(index)
self.m_mmsheader_field.m_XMmsRetrieveStaus = \
attrvalueresult[1]
elif (X_Mms_Retrieve_Text_NUM == attrtype):
attrvalueresult = self.Get_XMMSRetrieveText(index)
self.m_mmsheader_field.m_XMmsRetrieveText = \
ENCODED_STRING(attrvalueresult[1])
elif (X_Mms_Read_Status_NUM == attrtype):
attrvalueresult = self.Get_XMMSReadStatus(index)
self.m_mmsheader_field.m_XMmsReadStatus = \
attrvalueresult[1]
elif (X_Mms_Reply_Charging_NUM == attrtype):
attrvalueresult = self.Get_XMMSReplyCharging(index)
self.m_mmsheader_field.m_XMmsReplyCharging = \
attrvalueresult[1]
elif (X_Mms_Reply_Charging_Deadline_NUM == attrtype):
attrvalueresult = self.Get_XMMSReplyChargingDeadline(index)
self.m_mmsheader_field.m_XMmsReplyChargingDeadline = \
attrvalueresult[1]
elif (X_Mms_Reply_Charging_ID_NUM == attrtype):
attrvalueresult = self.Get_XMMSReplyChargingID(index)
self.m_mmsheader_field.m_XMmsReplyChargingID = \
attrvalueresult[1]
elif (X_Mms_Reply_Charging_Size_NUM == attrtype):
attrvalueresult = self.Get_XMMSReplyChargindSize(index)
self.m_mmsheader_field.m_XMmsReplyChargingSize = \
LONG_INTEGER(attrvalueresult[1])
elif (X_Mms_Previously_Sent_By_NUM == attrtype):
attrvalueresult = self.Get_XMMSPreviouslySentBy(index)
self.m_mmsheader_field.m_XMmsPreviouslySentBy = \
attrvalueresult[1]
pass
elif (X_Mms_Previously_Sent_Date_NUM == attrtype):
attrvalueresult = self.Get_XMMSPreviouslySentDate(index)
self.m_mmsheader_field.m_XMmsPreviouslySentDate = \
attrvalueresult[1]
if DEBUG_MMS_NET:
print 'Parse_AttrValue.attrvalueresult', attrvalueresult
try:
print_hex(attrvalueresult[1])
except:
print 'attr except'
if attrvalueresult:
return attrvalueresult[0]
else:
return 1
def Get_ContentType(self, index):
uintval = 0
contype = 0
valuelen = 0
contenttype = CONTENT_TYPE()
if (ord(self.m_response_buffer[index]) <= UINTVAR_BYTE):
uintval, valuelen = ValuelengthToInteger(self.m_response_buffer[index:])
else:
contype = ord(self.m_response_buffer[index])
if (APPLICATION_VNDWAPMULTIPARTMIXED+WELL_KNOWN_VALUE == \
contype):
contenttype.Set_Value(1, contype, '')
return (1, contenttype)
else:
uintval, valuelen = ValuelengthToInteger(self.m_response_buffer[index:])
contype = ord(self.m_response_buffer[index+valuelen])
contenttype.Set_Value(uintval, contype,
self.m_response_buffer[index+valuelen:index+uintval+valuelen])
return (uintval+valuelen, contenttype)
def Get_XMMSDeliveryTime(self, index):
uintval = 0
uintval, valuelen = ValuelengthToInteger(self.m_response_buffer[index:])
return (uintval+valuelen, self.m_response_buffer[index:uintval+valuelen])
def Get_XMMSPreviouslySentDate(self, index):
uintval = 0
uintval, valuelen = ValuelengthToInteger(self.m_response_buffer[index:])
return (uintval+valuelen, self.m_response_buffer[index:uintval+valuelen])
def Get_XMMSPreviouslySentBy(self, index):
uintval = 0
uintval, valuelen = ValuelengthToInteger(self.m_response_buffer[index:])
return (uintval+valuelen, self.m_response_buffer[index:uintval+valuelen])
def Get_XMMSReportAllowed(self, index):
return self.Get_1Byte(index)
def Get_XMMSReadStatus(self, index):
return self.Get_1Byte(index)
def Get_XMMSStatus(self, index):
return self.Get_1Byte(index)
def Get_XMMSSenderVisibility(self, index):
return self.Get_1Byte(index)
def Get_XMMSRetrieveText(self, index):
return self.Get_EncodedString(index)
def Get_XMMSRetrieveStatus(self, index):
return self.Get_1Byte(index)
def Get_XMMSMessageSize(self, index):
return self.Get_LongInteger(index)
def Get_Date(self, index):
return self.Get_LongInteger(index)
def Get_XMMSReadReport(self, index):
return self.Get_1Byte(index)
def Get_XMMSPriority(self, index):
return self.Get_1Byte(index)
def Get_Bcc(self, index):
return self.Get_EncodedString(index)
def Get_Cc(self, index):
return self.Get_EncodedString(index)
def Get_To(self, index):
return self.Get_EncodedString(index)
def Get_XMMSContentLocation(self, index):
return self.Get_String(index)
def Get_XMMSReplyChargindSize(self, index):
return self.Get_LongInteger(index)
def Get_XMMSReplyChargingDeadline(self, index):
uintval = 0
uintval, valuelen = ValuelengthToInteger(self.m_response_buffer[index:])
return (uintval+valuelen, self.m_response_buffer[index:uintval+valuelen])
def Get_XMMSReplyChargingID(self, index):
return self.Get_String(index)
def Get_XMMSReplyCharging(self, index):
return self.Get_1Byte(index)
def Get_MessageID(self, index):
return self.Get_String(index)
def Get_XMMSTransactionID(self, index):
return self.Get_String(index)
def Get_Expiry(self, index):
uintval = 0
uintval, valuelen = ValuelengthToInteger(self.m_response_buffer[index:])
if DEBUG_MMS_NET:
print 'Get_Expiry', uintval+valuelen,\
self.m_response_buffer[index:index+uintval+valuelen]
return (uintval+valuelen, \
self.m_response_buffer[index:index+uintval+valuelen])
def Get_LongInteger(self, index):#maxlen:4byte, 8bit value
longinteger_len = ord(self.m_response_buffer[index])
return (longinteger_len+1, \
self.m_response_buffer[index:index+longinteger_len+1])
def Get_Subject(self, index):
if DEBUG_MMS_NET:
print 'Get_Subject'
return self.Get_EncodedString(index)
def Get_From(self, index):
return self.Get_String(index)
def Get_EncodedString(self, index):
return self.Get_String(index)
def Get_ShortInteger(self, index):
return self.Get_1Byte(index)
def Get_1Byte(self, index):
value = 0
if (index < len(self.m_response_buffer)):
value = self.m_response_buffer[index]
return (1, value)
def Get_String(self, index):
endindex = index
while (endindex < len(self.m_response_buffer)):
if (chr(0) == self.m_response_buffer[endindex]):
endindex += 1
break
endindex += 1
if DEBUG_MMS_NET:
string = self.m_response_buffer[index:endindex]
print string[1:]
return (endindex-index, self.m_response_buffer[index:endindex])
def Parse_MMSHeader(self, httpmsg=True):
try:
if (httpmsg):
self.Get_HTTPBodyIndex()
if DEBUG_MMS_NET:
try:
Print_MMSHeader(self.m_response_buffer[self.m_mmsheader_index:200])
except:
print 'Print_MMSHeader except'
index = self.m_mmsheader_index
body_index = 0
while (index < len(self.m_response_buffer)):
attrtype = self.Parse_AttrType(index)
index += attrtype[0]
attrvaluelen = self.Parse_AttrValue(index, attrtype[1])
index += attrvaluelen
if (Content_Type_NUM == attrtype[1]):
body_index = index
if (DEBUG_MMS_NET):
print 'content-type break'
break
if DEBUG_MMS_NET:
print 'Parse_MMSHeader'
self.m_mmsbody_index = body_index
return True
except:
return False
def Get_HTTP_Result(self):
front = self.m_response_buffer.find('HTTP/1.1 ')
if -1 == front:
if DEBUG_MMS_NET:
print 'Get_HTTP_Result', BAD_HTTP
return BAD_HTTP
front += len('HTTP/1.1 ')
rear = self.m_response_buffer[front:].find(' ')
if DEBUG_MMS_NET:
print 'Get_HTTP_Result', self.m_response_buffer[front:front+rear]
return self.m_response_buffer[front:front+rear]
class M_Delivery_Ind(MMS_BaseResponse):
def __init__(self , deliverydata):
MMS_BaseResponse.__init__(self)
self.m_error = False
self.m_response_buffer = deliverydata[self.Find_MMS_Header(deliverydata):]
if not self.Parse_MMSHeader(False):
self.m_error = True
def Find_MMS_Header(self, deliverydata):
if DEBUG_MMS_NET:
print 'Find_MMS_Header', ord(deliverydata[0]), \
ord(deliverydata[1]), ord(deliverydata[2])
return ord(deliverydata[2])+3
def Get_Delivery_Info(self):
if DEBUG_MMS_NET:
print 'Get_Delivery_Info', self.m_mmsheader_field.m_XMmsStaus
status = ''
if MMSSTATUS_Expired == ord(self.m_mmsheader_field.m_XMmsStaus):
status = MMSSTATUS_Expired_TXT
elif MMSSTATUS_Retrieved == ord(self.m_mmsheader_field.m_XMmsStaus):
status = MMSSTATUS_Retrieved_TXT
elif MMSSTATUS_Rejected == ord(self.m_mmsheader_field.m_XMmsStaus):
status = MMSSTATUS_Rejected_TXT
elif MMSSTATUS_Deferred == ord(self.m_mmsheader_field.m_XMmsStaus):
status = MMSSTATUS_Deferred_TXT
elif MMSSTATUS_Unrecognised == ord(self.m_mmsheader_field.m_XMmsStaus):
status = MMSSTATUS_Unrecognised_TXT
elif MMSSTATUS_Indeterminate == ord(self.m_mmsheader_field.m_XMmsStaus):
status = MMSSTATUS_Indeterminate_TXT
elif MMSSTATUS_Forwarded == ord(self.m_mmsheader_field.m_XMmsStaus):
status = MMSSTATUS_Forwarded_TXT
to = ''
if len(self.m_mmsheader_field.m_To) and \
self.m_mmsheader_field.m_To[0].textstring and \
self.m_mmsheader_field.m_To[0].textstring.data:
to = self.m_mmsheader_field.m_To[0].textstring.data
return to, status
class M_Notification_Ind(MMS_BaseResponse):
def __init__(self):
MMS_BaseResponse.__init__(self)
def Parse_MMS_Noti(self, noti_data):
self.m_response_buffer = noti_data[self.Find_MMS_Header(noti_data):]
if not self.Parse_MMSHeader(False):
return False
else:
return True
def Find_MMS_Header(self, noti_data):
if DEBUG_MMS_NET:
print 'Find_MMS_Header', ord(noti_data[2])
return ord(noti_data[2])+3
class M_Send_Conf(MMS_BaseResponse):
def __init__(self):
MMS_BaseResponse.__init__(self)
def Get_Result(self):
if DEBUG_MMS_NET:
print 'M_Send_Conf.Get_Result'
print self.m_response_buffer
if (HTTP_OK == self.Get_HTTP_Result()):
if DEBUG_MMS_NET:
print 'HTTP_OK'
if not self.Parse_MMSHeader():
return False
if (XMmsResponseStatus_Ok == \
ord(self.m_mmsheader_field.m_XMmsResponseStatus)):
return True
else:
return False
else:
return False
class M_Retrieve_Conf(MMS_BaseResponse):
def __init__(self):
MMS_BaseResponse.__init__(self)
self.m_mmsbody_index = 0
self.m_multipartentry = []
from mmsmanager import MMSMessageInfo
self.m_messageinfo = MMSMessageInfo()
self.m_multipartnum = 0
self.m_to_phone = []
self.m_to_email = []
self.m_cc_phone = []
self.m_cc_email = []
self.m_bcc_phone = []
self.m_bcc_email = []
def Get_EntryNumber(self):
self.m_multipartnum, val_len = \
ValuelengthToInteger(self.m_response_buffer[self.m_mmsbody_index:])
if DEBUG_MMS_NET:
print 'mms_net_pdu.M_Retrieve_Conf.Parse_MMSBody', \
self.m_multipartnum, val_len
self.m_mmsbody_index += val_len
def Get_EntryData(self, entrysave=False):
def Check_Fileext(filename, mediatype):
newfilename = ''
extcorrect = True
fileext = os.path.splitext(filename)[1]
extlist = MEDIA_TYPE_EXT.get(mediatype)
if not extlist:
return extcorrect, newfilename
if fileext:
fileext = fileext.upper()
if not extlist.count(fileext):
newfilename = os.path.splitext(filename)[0]
newfilename += extlist[0].lower()
extcorrect = False
else:
newfilename = os.path.splitext(filename)[0]
newfilename += extlist[0].lower()
extcorrect = False
return extcorrect, newfilename
def Check_ContentType(contenttype):
if APPLICATION_SMIL == contenttype or \
AUDIO_AMR == contenttype or \
AUDIO_MIDI == contenttype or \
AUDIO_WAV == contenttype or \
VIDEO_3GPP == contenttype or \
VIDEO_MP4 == contenttype or \
TEXT_PLAIN == contenttype or \
IMAGE_GIF == contenttype or \
IMAGE_JPEG == contenttype or \
APPLICATION_VNDOMADRMMESSAGE == contenttype or \
TEXT_XIMELODY == contenttype or \
AUDIO_IMELODY == contenttype or \
AUDIO_XIMELODY == contenttype or \
IMAGE_BMP == contenttype or \
AUDIO_XWAV == contenttype or \
AUDIO_XMIDI == contenttype or \
AUDIO_SPMIDI == contenttype or \
IMAGE_XBMP == contenttype or \
IMAGE_PNG == contenttype:
return contenttype
if APPLICATION_VNDOMADRMMESSAGE_STR == contenttype:
return APPLICATION_VNDOMADRMMESSAGE
return UNKNOWN_TYPE
def Get_ContentType(contenttypebuf):
def Get_HeaderName(contenttypebuffer):
headername = None
headerlen = 0
if UINTVAR_BYTE < ord(contenttypebuffer[headerlen]) and \
ord(contenttypebuffer[headerlen]) < WELL_KNOWN_VALUE:#text-string
n_len = contenttypebuffer[headerlen:].find('\0')
if -1 == n_len:
headerlen += 1
else:
headername = contenttypebuffer[headerlen:n_len]#Null 제외.
headerlen += (n_len+1)
elif WELL_KNOWN_VALUE <= ord(contenttypebuffer[headerlen]):#well-known
headername = contenttypebuffer[headerlen]
headerlen += 1
elif UINTVAR_BYTE == ord(contenttypebuffer[headerlen]):#uintval
pass
else:#value of length
pass
return (headername, headerlen)
def Get_HeaderValue(contenttypebuffer):
headervalue = None
valuelen = 0
if DEBUG_MMS_NET:
print '-Get_Headervalue', contenttypebuffer
if UINTVAR_BYTE < ord(contenttypebuffer[valuelen]) and \
ord(contenttypebuffer[valuelen]) < WELL_KNOWN_VALUE:#text-string
v_len = contenttypebuffer[valuelen:].find('\0')
if -1 == v_len:
headerlen += 1
else:
headervalue = contenttypebuffer[valuelen:v_len]#Null 제외.
valuelen += (v_len+1)
elif WELL_KNOWN_VALUE <= ord(contenttypebuffer[valuelen]):#well-known
headervalue = contenttypebuffer[valuelen]
valuelen += 1
elif UINTVAR_BYTE == ord(contenttypebuffer[valuelen]):#uintval
pass
else:#value of length
pass
if DEBUG_MMS_NET:
print '== headervalue result', headervalue, valuelen
return (headervalue, valuelen)
#-------------------------------------------------------
if DEBUG_MMS_NET:
print 'Get_EntryData.Get_ContentType'
content_type = None
char_set = None
medianame = None
contentid = None
contentlocation = None
boundary = None
n_len = 0
index = 0
nameresult = None
valueresult = None
if ord(contenttypebuf[0]) <= UINTVAR_BYTE:
contentlen, cval_len = ValuelengthToInteger(contenttypebuf)
index += cval_len
else:
contentlen = len(contenttypebuf)
if DEBUG_MMS_NET:
print 'contentlen', contentlen
print contenttypebuf
if WELL_KNOWN_VALUE <= ord(contenttypebuf[index]):
content_type = ord(contenttypebuf[index])-WELL_KNOWN_VALUE
index += 1
else:
t_len = contenttypebuf[index:].find('\0')
if -1 != t_len:
content_type = contenttypebuf[index:index+t_len]
index += (t_len+1)
if DEBUG_MMS_NET:
print 'content_type', content_type
content_type = Check_ContentType(content_type)
if DEBUG_MMS_NET:
print '== INDEX == ', index, contentlen, content_type
while index < len(contenttypebuf):
if chr(int('85', 16)) == contenttypebuf[index]:
if DEBUG_MMS_NET:
print '== 85'
index += 1
valueresult = Get_HeaderValue(contenttypebuf[index:])
medianame = valueresult[0]
index += valueresult[1]
if DEBUG_MMS_NET:
print '== 85', medianame, index
elif chr(int('81', 16)) == contenttypebuf[index]:
if DEBUG_MMS_NET:
print '== 81'
index += 1
valueresult = Get_HeaderValue(contenttypebuf[index:])
char_set = ord(valueresult[0])-WELL_KNOWN_VALUE
index += valueresult[1]
if DEBUG_MMS_NET:
print '== 81', char_set, index
elif chr(int('c0', 16)) == contenttypebuf[index]:
if DEBUG_MMS_NET:
print '== c0'
index += 1
valueresult = Get_HeaderValue(contenttypebuf[index:])
contentid = valueresult[0]
index += valueresult[1]
if DEBUG_MMS_NET:
print '== c0', contentid, index
elif chr(int('8e', 16)) == contenttypebuf[index]:
if DEBUG_MMS_NET:
print '== 8e'
index += 1
valueresult = Get_HeaderValue(contenttypebuf[index:])
contentlocation = valueresult[0]
index += valueresult[1]
if DEBUG_MMS_NET:
print '== 8e', contentlocation, index
elif UINTVAR_BYTE < ord(contenttypebuf[index]) and \
ord(contenttypebuf[index]) < WELL_KNOWN_VALUE:#문자열.
nameresult = Get_HeaderName(contenttypebuf[index:])
if -1 == nameresult[1]:
index += 1
continue
attrname = nameresult[0][0:nameresult[1]-1]#Null 제외.
if DEBUG_MMS_NET:
print '== attrname', attrname
index += nameresult[1]
valueresult = Get_HeaderValue(contenttypebuf[index:])
if MIME_BOUNDARY == attrname:
boundary = valueresult[0][0:valueresult[1]-1]#Null 제외.
if DEBUG_MMS_NET:
print '== boundray', boundary
index += valueresult[1]
else:
index += 1
if DEBUG_MMS_NET:
print 'return',content_type, char_set, medianame, contentid, \
contentlocation, boundary
return (content_type, char_set, medianame, contentid, \
contentlocation, boundary)
def EntryHeaderToInteger(valuelength):
intervalue = 0
tempvalue = 0
index = 0
tempvalue = ord(valuelength[index])
if DEBUG_MMS_NET:
print 'EntryHeaderToInteger', tempvalue
while (index < len(valuelength)):
tempvalue = ord(valuelength[index])
intervalue += tempvalue & UINTVAR_MASK
if not (tempvalue & UINTVAR_CONTINUE):
index += 1
break
index += 1
intervalue = intervalue << UINTVAR_PAYLOADLEN
return intervalue, index
#--------------------------------------------------------------------------
boundary_str = None
entry = MultipartEntry_Info()
headerlen, hval_len = \
EntryHeaderToInteger(self.m_response_buffer[self.m_mmsbody_index:])
if DEBUG_MMS_NET:
print 'header len', headerlen, hval_len, \
ord(self.m_response_buffer[self.m_mmsbody_index])
self.m_mmsbody_index += hval_len
datalen, dval_len = \
EntryHeaderToInteger(self.m_response_buffer[self.m_mmsbody_index:])
if DEBUG_MMS_NET:
print 'data len', datalen, dval_len, \
ord(self.m_response_buffer[self.m_mmsbody_index])
self.m_mmsbody_index += dval_len
#server error ????
#if UINTVAR_BYTE <= headerlen:
# tidx = self.m_response_buffer[self.m_mmsbody_index:self.m_mmsbody_index+headerlen].find('application/smil')
# if -1 != tidx:
# print 'HEADER + 1',tidx
# headerlen += 1
if DEBUG_MMS_NET:
print '<------------->', \
ord(self.m_response_buffer[self.m_mmsbody_index]),\
ord(self.m_response_buffer[self.m_mmsbody_index+headerlen-1]),\
self.m_response_buffer[self.m_mmsbody_index+headerlen-1]
if UINTVAR_BYTE == ord(self.m_response_buffer[self.m_mmsbody_index])and\
chr(0) != self.m_response_buffer[self.m_mmsbody_index+headerlen-1]:
if DEBUG_MMS_NET:
print '@#$content-type 1F'
headerlen += 1
#server error ????
entry.m_mediatype, entry.m_charset, entry.m_medianame, \
entry.m_contentid, entry.m_contentlocation, boundary_str = \
Get_ContentType(self.m_response_buffer[self.m_mmsbody_index:self.m_mmsbody_index+headerlen])
if APPLICATION_VNDOMADRMMESSAGE == entry.m_mediatype:
self.m_messageinfo.isdrm_forward = True
if entry.m_contentid:
entry.m_path = \
mms_temp_message+entry.m_contentid[2:len(entry.m_contentid)-1]
else:
entry.m_path = mms_temp_message+entry.m_medianame
correct, newpath = Check_Fileext(entry.m_path, entry.m_mediatype)
if not correct:
if DEBUG_MMS_NET:
print 'new m_path', entry.m_path, newpath
entry.m_path = newpath
if DEBUG_MMS_NET:
print 'm_path', entry.m_path
self.m_mmsbody_index += headerlen
entry.m_databeginindex = self.m_mmsbody_index
entry.m_dataendindex = self.m_mmsbody_index + datalen
if entrysave:
self.Save_TempEntryData(datalen, entry, boundary_str)
else:
if (TEXT_XVCARD == entry.m_mediatype):
vcardpath = mms_temp_message+entry.m_medianame
self.Save_TempEntryData(datalen, entry, boundary_str, vcardpath)
elif APPLICATION_SMIL == entry.m_mediatype:
self.Save_TempEntryData(datalen, entry, boundary_str)
self.m_mmsbody_index += datalen
if len(self.m_response_buffer) < self.m_mmsbody_index:
if DEBUG_MMS_NET:
print 'Get_EntryData false', \
self.m_mmsbody_index, len(self.m_response_buffer)
return False
if DEBUG_MMS_NET:
print 'media info'
print entry.m_mediatype, entry.m_charset, entry.m_medianame, \
entry.m_contentid, entry.m_contentlocation
self.m_multipartentry.append(entry)
return True
def Get_Line(self, data_index):
lineend = self.m_response_buffer[data_index:].find(CRCF)
if DEBUG_MMS_NET:
print 'Get_Line', lineend, self.m_response_buffer[data_index:data_index+lineend]
return lineend
def Get_StrAttrName(self, str_value):
return str_value.find(': ')
def Save_TempEntryData(self, datalen, entry, boundary, tmpvcard=None):
from basemodel import StorageFullError
if APPLICATION_VNDOMADRMMESSAGE == entry.m_mediatype:
currentindex = self.m_mmsbody_index
lineend = 0
encoding = ENCODING_7BIT
while currentindex < self.m_mmsbody_index+datalen:
lineend = self.Get_Line(currentindex)
if 0 == lineend:
currentindex += len(CRCF)
break
elif -1 == lineend:
break
else:
if boundary == self.m_response_buffer[currentindex:currentindex+lineend]:
currentindex += (lineend+len(CRCF))
else:
n_len = self.Get_StrAttrName(self.m_response_buffer[currentindex:currentindex+lineend])
if -1 != n_len:
if CONTENTENCODING == \
self.m_response_buffer[currentindex:currentindex+n_len]:
encoding = \
self.m_response_buffer[currentindex+n_len+2:currentindex+lineend]
currentindex += (lineend+len(CRCF))
if ENCODING_BASE64 == encoding:
import base64
decodedata = \
base64.decodestring(self.m_response_buffer[currentindex:self.m_mmsbody_index+datalen])
try:
hfile = open(entry.m_path, 'wb+')
hfile.write(decodedata)
hfile.close()
except:
hfile.close()
raise StorageFullError
return
elif ENCODING_7BIT == encoding:
import gsm7
decodedata = \
gsm7.gsm7bit_decode(self.m_response_buffer[currentindex:self.m_mmsbody_index+datalen])
try:
hfile = open(entry.m_path, 'wb+')
hfile.write(decodedata)
hfile.close()
except:
hfile.close()
raise StorageFullError
return
else:
try:
hfile = open(entry.m_path, 'wb+')
hfile.write(self.m_response_buffer[currentindex:self.m_mmsbody_index+datalen])
hfile.close()
except:
hfile.close()
raise StorageFullError
return
elif APPLICATION_SMIL == entry.m_mediatype or \
TEXT_PLAIN == entry.m_mediatype:
if utf_8 != entry.m_charset:
txtbuf = self.m_response_buffer[self.m_mmsbody_index:self.m_mmsbody_index+datalen]
if DEBUG_MMS_NET:
print 'text not utf8', entry.m_charset, datalen
if iso_10646_ucs_2 == entry.m_charset:
txtbuf = unicode(txtbuf, iso_10646_ucs_2str)
elif iso_8859_1 == entry.m_charset:
txtbuf = unicode(txtbuf, iso_8859_1str)
elif iso_8859_2 == entry.m_charset:
txtbuf = unicode(txtbuf, iso_8859_2str)
elif iso_8859_3 == entry.m_charset:
txtbuf = unicode(txtbuf, iso_8859_3str)
elif iso_8859_4 == entry.m_charset:
txtbuf = unicode(txtbuf, iso_8859_4str)
elif iso_8859_5 == entry.m_charset:
txtbuf = unicode(txtbuf, iso_8859_5str)
elif iso_8859_6 == entry.m_charset:
txtbuf = unicode(txtbuf, iso_8859_6str)
elif iso_8859_7 == entry.m_charset:
txtbuf = unicode(txtbuf, iso_8859_7str)
elif iso_8859_8 == entry.m_charset:
txtbuf = unicode(txtbuf, iso_8859_8str)
elif iso_8859_9 == entry.m_charset:
txtbuf = unicode(txtbuf, iso_8859_9str)
elif shift_JIS == entry.m_charset:
txtbuf = unicode(txtbuf, shift_JISstr)
elif us_ascii == entry.m_charset:
txtbuf = unicode(txtbuf, us_asciistr)
utfbuf = UnicodeToUtf8(txtbuf)
if DEBUG_MMS_NET:
print 'utf8'
try:
hfile = open(entry.m_path, 'wb+')
hfile.write(utfbuf)
hfile.close()
except:
hfile.close()
if DEBUG_MMS_NET:
print 'utf_8 != entry.m_charset'
raise StorageFullError
return
else:
try:
if DEBUG_MMS_NET:
print 'text utf8'
temp = self.m_response_buffer[self.m_mmsbody_index:self.m_mmsbody_index+datalen]
latin = unicode(temp, 'utf-8')
try:
latin = unicode(temp).encode('iso8859-1')
except:
pass
try:
latin = unicode(latin, 'utf-8')
except:
latin = temp
latin = UnicodeToUtf8(latin)
hfile = open(entry.m_path, 'wb+')
hfile.write(latin)
hfile.close()
except:
hfile.close()
if DEBUG_MMS_NET:
print 'utf_8 != entry.m_charset else EXCEPT'
raise StorageFullError
return
elif TEXT_XVCARD == entry.m_mediatype:
if tmpvcard:
hfile = open(tmpvcard, 'wb+')
else:
hfile = open(entry.m_path, 'wb+')
if utf_8 != entry.m_charset:
txtbuf = self.m_response_buffer[self.m_mmsbody_index:self.m_mmsbody_index+datalen]
utfbuf = unicode(txtbuf).encode('utf-8')
try:
hfile.write(utfbuf)
hfile.close()
except:
hfile.close()
raise StorageFullError
return
else:
try:
hfile.write(self.m_response_buffer[self.m_mmsbody_index:self.m_mmsbody_index+datalen])
hfile.close()
except:
hfile.close()
raise StorageFullError
return
else:
try:
hfile = open(entry.m_path, 'wb+')
hfile.write(self.m_response_buffer[self.m_mmsbody_index:self.m_mmsbody_index+datalen])
hfile.close()
except:
hfile.close()
raise StorageFullError
return
def Parse_MMSBody(self, entrysave=True):
if DEBUG_MMS_NET:
print 'mms_net_pdu.M_Retrieve_Conf.Parse_MMSBody'
if (0 == self.m_mmsbody_index):
if DEBUG_MMS_NET:
print 'Parse_MMSBody == 0'
return False
self.Get_EntryNumber()
for i in range(self.m_multipartnum):
from basemodel import StorageFullError
try:
if False == self.Get_EntryData(entrysave):
if DEBUG_MMS_NET:
print 'Parse_MMSBody Error'
return False
except StorageFullError:
if DEBUG_MMS_NET:
print 'Parse_MMSBody Error StorageFullError'
return False, PARSING_STORAGEFULL
except:
if DEBUG_MMS_NET:
print 'Parse_MMSBody Error except'
h = open('messages/reterror', 'wb+')
h.write(self.m_response_buffer)
h.close()
return False, PARSING_FAIL
try:
msginfo_result = self.Set_MMSMessageInfo()
except:
return False, PARSING_FAIL
if msginfo_result:
return True, ''
else:
return False, PARSING_FAIL
def Set_MMSMessageInfo(self):
def Cut_Address(phone_number):
if None == phone_number:
return ''
end = phone_number.find('/')
if -1 == end:
return phone_number
else:
return phone_number[0:end]
def Trans_Time(time_str):
if str(time_str).endswith('ms'):
return int(str(time_str).replace('ms',''))
elif str(time_str).endswith('s'):
return int(str(time_str).replace('s',''))
else:
return int(time_str)* 1000
from mmsmanager import MMSMessageInfo, MMSMediaInfo
if None != self.m_mmsheader_field.m_To and \
M_SEND_REQ == self.m_mmsheader_field.m_XMmsMessageType:
for i in self.m_mmsheader_field.m_To:
if -1 != i.textstring.data.find('@'):
self.m_to_email.append(i.textstring.data)
else:
self.m_to_phone.append(Cut_Address(i.textstring.data))
if None != self.m_mmsheader_field.m_Cc and \
M_SEND_REQ == self.m_mmsheader_field.m_XMmsMessageType:
for i in self.m_mmsheader_field.m_Cc:
if -1 != i.textstring.data.find('@'):
self.m_cc_email.append(i.textstring.data)
else:
self.m_cc_phone.append(Cut_Address(i.textstring.data))
if None != self.m_mmsheader_field.m_Bcc and \
M_SEND_REQ == self.m_mmsheader_field.m_XMmsMessageType:
for i in self.m_mmsheader_field.m_Bcc:
if -1 != i.textstring.data.find('@'):
self.m_bcc_email.append(i.textstring.data)
else:
self.m_bcc_phone.append(Cut_Address(i.textstring.data))
if None != self.m_mmsheader_field.m_From:
if M_SEND_REQ == self.m_mmsheader_field.m_XMmsMessageType:
self.m_messageinfo.info_sender = \
Cut_Address(self.m_mmsheader_field.m_From.Get_Value())
else:
self.m_messageinfo.info_inbox_sender = \
Cut_Address(self.m_mmsheader_field.m_From.Get_Value())
idx = self.m_messageinfo.info_inbox_sender.find('.')
if -1 != idx:
tempnumber = self.m_messageinfo.info_inbox_sender[idx+1:]
tempnumber += ':'
tempnumber += self.m_messageinfo.info_inbox_sender[0]
self.m_messageinfo.info_inbox_sender = tempnumber
if None != self.m_mmsheader_field.m_Subject:
self.m_messageinfo.subject = self.m_mmsheader_field.m_Subject.Get_Value()
else:
self.m_messageinfo.subject = ''
from mmsplayer import Get_SlideInfo
slide = []
for i in self.m_multipartentry:
print i.m_mediatype
if i.m_mediatype and APPLICATION_SMIL == i.m_mediatype:
self.m_messageinfo.fname_smil = \
unicode(i.m_path).encode('utf-8')
slide = Get_SlideInfo(i.m_path)
if not slide:
Change_Smil(i.m_path)
slide = Get_SlideInfo(i.m_path)
if not slide:
return False
print 'slide', slide
break
for i in slide:
mindex = 0
mediainfo = MMSMediaInfo()
for j in i[1:]:
mindex += 1
if len(j):
if 1 == mindex: #image
mediainfo.fname_image = self.Find_MediaPath(j['src'])
elif 2 == mindex:#audio
mediainfo.fname_sound = self.Find_MediaPath(j['src'])
elif 3 == mindex:#text
mediainfo.fname_text = self.Find_MediaPath(j['src'])
elif 4 == mindex:#video
mediainfo.fname_video = self.Find_MediaPath(j['src'])
elif 5 == mindex:#
pass
self.m_messageinfo.media.append(mediainfo)
self.m_messageinfo.slide_time.append(Trans_Time((i[0])['dur']))
if DEBUG_MMS_NET:
print 'self.m_messageinfo.media'
print self.m_messageinfo.subject
print self.m_messageinfo.info_sender
print self.m_messageinfo.info_inbox_sender
for i in self.m_messageinfo.media:
print i
print i.fname_text
print i.fname_image
print i.fname_sound
print i.fname_video
return True
def Find_MediaPath(self, fname):
if DEBUG_MMS_NET:
print 'Find_MediaPath',fname
for i in self.m_multipartentry:
if DEBUG_MMS_NET:
print '------------'
print i.m_mediatype
print i.m_charset
print i.m_medianame
print i.m_contentid
print i.m_contentlocation
print i.m_databeginindex
print i.m_dataendindex
print i.m_path
if i.m_mediatype and None != i.m_medianame and len(i.m_medianame) and (-1 != fname.find(i.m_medianame)):
if DEBUG_MMS_NET:
print '--1', i.m_path
return i.m_path
elif i.m_mediatype and None != i.m_contentid and len(i.m_contentid):# and (-1 != fname.find(i.m_contentid)):
preidx = i.m_contentid.find('<')
endidx = i.m_contentid.find('>')
newcid = i.m_contentid[preidx+1:endidx]
if -1 != fname.find(newcid):
if DEBUG_MMS_NET:
print '--2',i.m_path
return i.m_path
if DEBUG_MMS_NET:
print 'None'
return None
def Save_Retrieve_Message(self, fname):
hrecv = open(fname, 'wb+')
hrecv.write(self.m_response_buffer[self.m_mmsheader_index:])
hrecv.close()
def Get_NotifyRespInd_Result(self):
return self.Get_HTTP_Result()
def Get_MRetrieveConf_Result(self):
if (HTTP_OK == self.Get_HTTP_Result()):
if not self.Parse_MMSHeader():
return False, PARSING_FAIL
if (len(self.m_response_buffer) == self.m_mmsheader_index):
return False, ''
else:
return self.Parse_MMSBody(False)
else:
return False, ''
def Get_MRetrieveConf_Info(self):
if not self.m_mmsheader_field.m_Subject:
if DEBUG_MMS_NET:
print 'Get_MRetrieveConf_Info no subject'
return ''
subject = self.m_mmsheader_field.m_Subject.Get_Value()
if DEBUG_MMS_NET:
print 'Get_MRetrieveConf_Info',subject
return subject
def Get_RetrieveMsg_Size(self):
return len(self.m_response_buffer[self.m_mmsheader_index:])
class M_Forward_Conf(MMS_BaseResponse):
def __init__(self):
MMS_BaseResponse.__init__(self)
def Get_MForwardConf_Result(self):
if (HTTP_OK == self.Get_HTTP_Result()):
if not self.Parse_MMSHeader():
return False
if (XMmsResponseStatus_Ok == \
ord(self.m_mmsheader_field.m_XMmsResponseStatus)):
return True
else:
return False
else:
return False
class MultipartEntry_Info:
def __init__(self):
self.m_mediatype = None
self.m_charset = None
self.m_medianame = None
self.m_contentid = None
self.m_contentlocation = None
self.m_databeginindex = 0
self.m_dataendindex = 0
self.m_path = None
class MultipartEntry:
def __init__(self, mediafilepath):
self.m_bError = False #error
self.m_entrypath = mediafilepath
self.m_entrydata_buffer = [] #파일 데이타 버퍼. 전송.
self.m_entrydatasize = 0
self.m_contenttype = []
self.m_contenttype_buffer = '' #전송
self.m_contenttypeheader = []
self.m_contenttypeheader_buffer = '' #전송
self.m_entrysize = []
self.m_entrysize_buffer = '' #전송
self.m_entrytotalsize = 0
if DEBUG_MMS_NET:
print 'mms_net_pdu.MultipartEntry.init', mediafilepath
if (False == os.path.exists(self.m_entrypath)):
self.m_bError = True
else:
self.LoadEntryData()
self.Make_ContentType()
self.m_contenttype_buffer = ''.join(self.m_contenttype)
self.Make_ContentType_Headers()
self.m_contenttypeheader_buffer = ''.join(self.m_contenttypeheader)
self.Cal_EntrySize()
self.m_entrysize_buffer = ''.join(self.m_entrysize)
self.Cal_TotalSize()
def Save_Entry(self, file_handle):
if DEBUG_MMS_NET:
print 'mms_net_pdu.MultipartEntry.Save_Entry'
file_handle.write(self.m_entrysize_buffer)
file_handle.write(self.m_contenttype_buffer)
file_handle.write(self.m_contenttypeheader_buffer)
file_handle.write(self.m_entrydata_buffer)
def Send_Entry(self, mms_socket, totmsgsize=0):
mms_socket.Send_Data(self.m_entrysize_buffer, totmsgsize)
mms_socket.Send_Data(self.m_contenttype_buffer, totmsgsize)
mms_socket.Send_Data(self.m_contenttypeheader_buffer, totmsgsize)
mms_socket.Send_Data(self.m_entrydata_buffer, totmsgsize)
def Get_TotalSize(self):
return self.m_entrytotalsize
def Cal_TotalSize(self):
self.m_entrytotalsize = len(self.m_entrysize)+ \
len(self.m_contenttype)+ \
len(self.m_contenttypeheader)+ \
self.m_entrydatasize
if DEBUG_MMS_NET:
print 'mms_net_pdu.MultipartEntry.Cal_TotalSize', self.m_entrytotalsize
#header길이와 data길이를 계산한다.
def Cal_EntrySize(self):
datalen = Make_UINTVAR(self.m_entrydatasize, False)
headerlen = Make_UINTVAR(len(self.m_contenttype)+ \
len(self.m_contenttypeheader), False)
if DEBUG_MMS_NET:
print 'Cal_EntrySize', datalen, headerlen
self.m_entrysize.extend(headerlen)
self.m_entrysize.extend(datalen)
#미디어 파일을 크기와 데이터를 읽어온다.
def LoadEntryData(self):
self.m_entrydatasize = os.path.getsize(self.m_entrypath)
self.filehandle = open(self.m_entrypath)
self.m_entrydata_buffer = self.filehandle.read()
self.filehandle.close()
if DEBUG_MMS_NET:
print 'file size', self.m_entrydatasize, len(self.m_entrydata_buffer)
#multipartentry의 content type에 1바이트 크기의 내용을 추가한다.
def ContentType_Append(self, char_value, insert=False, insert_index=0):
if (False == insert):
self.m_contenttype.append(char_value)
else:
self.m_contenttype.insert(insert_index, char_value)
#multipartentry의 content type에 문자열을 추가 시킨다.
def ContentType_Extend(self, char_array):
self.m_contenttype.extend(char_array)
def ContentType_Insert(self, insertvalue):
if not len(insertvalue):
return
for i in range(len(insertvalue)):
self.m_contenttype.insert(0, insertvalue[-(i+1)])
#MM1 multipart entry의 컨텐트 타입을 생성한다.
def Make_ContentType(self):
file_ext = os.path.splitext(self.m_entrypath)[1]
file_ext = file_ext.upper()
btext = False
if (not cmp(file_ext, '.SMIL') or not cmp(file_ext, '.SMI')):
self.ContentType_Extend(APPLICATION_SMIL)
self.ContentType_Append(chr(0))
btext = True
self.ContentType_Append(chr(int('85', 16)))
file_name = os.path.split(self.m_entrypath)[1]
utf8fname = UnicodeToUtf8(os.path.splitext(file_name)[0])
self.ContentType_Extend(utf8fname)
self.ContentType_Append(chr(0))
self.ContentType_Append(chr(int('81', 16)))
self.ContentType_Append(chr(int('ea', 16)))
elif (not cmp(file_ext, '.JPG') or not cmp(file_ext, '.JPEG')):
self.ContentType_Append(chr(IMAGE_JPEG+WELL_KNOWN_VALUE))
elif (not cmp(file_ext, '.GIF')):
self.ContentType_Append(chr(IMAGE_GIF+WELL_KNOWN_VALUE))
elif (not cmp(file_ext, '.TXT')):
self.ContentType_Append(chr(TEXT_PLAIN+WELL_KNOWN_VALUE))
btext = True
self.ContentType_Append(chr(int('85', 16)))
file_name = os.path.split(self.m_entrypath)[1]
utf8fname = UnicodeToUtf8(os.path.splitext(file_name)[0])
self.ContentType_Extend(utf8fname)
self.ContentType_Append(chr(0))
self.ContentType_Append(chr(int('81', 16)))
self.ContentType_Append(chr(utf_8+WELL_KNOWN_VALUE))
elif not cmp(file_ext, '.PNG'):
self.ContentType_Append(chr(IMAGE_PNG+WELL_KNOWN_VALUE))
elif not cmp(file_ext, '.WAV'):
self.ContentType_Extend(AUDIO_XWAV)
self.ContentType_Append(chr(0))
elif not cmp(file_ext, '.MID'):
self.ContentType_Extend(AUDIO_MIDI)
self.ContentType_Append(chr(0))
elif not cmp(file_ext, '.AMR'):
self.ContentType_Extend(AUDIO_AMR)
self.ContentType_Append(chr(0))
elif not cmp(file_ext, '.3GP'):
self.ContentType_Extend(VIDEO_3GPP)
self.ContentType_Append(chr(0))
elif not cmp(file_ext, '.MP4'):
self.ContentType_Extend(VIDEO_MP4)
self.ContentType_Append(chr(0))
elif not cmp(file_ext, '.VCF'):
self.ContentType_Extend(chr(TEXT_XVCARD+WELL_KNOWN_VALUE))
elif not cmp(file_ext, '.IMY'):
self.ContentType_Extend(TEXT_XIMELODY)
self.ContentType_Append(chr(0))
elif not cmp(file_ext, '.BMP'):
self.ContentType_Extend(IMAGE_BMP)
self.ContentType_Append(chr(0))
else:
self.ContentType_Append(chr(STAR_STAR+WELL_KNOWN_VALUE))
if not btext:
self.ContentType_Append(chr(int('85', 16)))
file_name = os.path.split(self.m_entrypath)[1]
utf8fname = UnicodeToUtf8(os.path.splitext(file_name)[0])
self.ContentType_Extend(utf8fname)
self.ContentType_Append(chr(0))
#content-type의 길이.
contentlen = Make_UINTVAR(len(self.m_contenttype))
self.ContentType_Insert(contentlen)
#multipartentry의 헤더에 1바이트 크기의 내용을 추가한다.
def ContentTypeHeader_Append(self, char_value):
self.m_contenttypeheader.append(char_value)
#multipartentry의 헤더에 문자열을 추가 시킨다.
def ContentTypeHeader_Extend(self, char_array):
self.m_contenttypeheader.extend(char_array)
#MM1 mutipartentry의 헤더를 생성한다.
def Make_ContentType_Headers(self):
self.ContentTypeHeader_Append(chr(int('8e', 16))) #content-location
file_name = os.path.split(self.m_entrypath)[1]
utf8fname = UnicodeToUtf8(os.path.splitext(file_name)[0])
self.ContentTypeHeader_Extend(utf8fname)
self.ContentTypeHeader_Append(chr(0))
self.ContentTypeHeader_Append(chr(int('c0', 16))) #content-id
self.ContentTypeHeader_Append(chr(int('22', 16)))
self.ContentTypeHeader_Append(chr(int('3c', 16)))
self.ContentTypeHeader_Extend(utf8fname)
self.ContentTypeHeader_Append(chr(int('3e', 16)))
self.ContentTypeHeader_Append(chr(0))
class MMS_Header:
def __init__(self, msginfo=None):
self.m_mmsheader = []
self.m_mmsheader_buffer = ''
self.m_sendmsginfo = msginfo
self.m_mmsheader_size = 0
def Save_MMSHeader(self, file_handle):
file_handle.write(self.m_mmsheader_buffer)
def Get_MMSHeader_Size(self):
return self.m_mmsheader_size
#mms header buffer(m_mmsheader_buffer)에 내용을 추가한다.
#char_value: [in] 추가할 데이터.
#insert=False: [in] 버퍼의 뒤에 추가할 경우
#insert_index=0: [in]
def MMSHeader_Append(self, char_value, insert=False, insert_index=0):
if (False == insert):
self.m_mmsheader.append(char_value)
else:
self.m_mmsheader.insert(insert_index, char_value)
#mms header buffer(m_mmsheader_buffer)에 내용을 추가한다.
#char_array: [in]
#array_len: [in]
def MMSHeader_Extend(self, char_array):
self.m_mmsheader.extend(char_array)
def Append_MMS_Type(self, message_type):
self.MMSHeader_Append(chr(X_Mms_Message_Type_NUM+WELL_KNOWN_VALUE))
self.MMSHeader_Append(chr(message_type))
def Append_MMS_TransactionID(self, info_sender, bmake=False):
def Create_TransctionID(info_sender):
if not info_sender:
from setting import setting
info_sender = setting.tel_number
transactionID = []
utf8sender = UnicodeToUtf8(info_sender)
if DEBUG_MMS_NET:
print 'TransactiondID', utf8sender, utf8sender[-1]
transactionID.extend(Get_CurrentDate())
transactionID.extend(utf8sender)
if chr(0) != utf8sender[-1]:
transactionID.append(chr(0))
return transactionID
self.MMSHeader_Append(chr(X_Mms_Trancation_Id_NUM+WELL_KNOWN_VALUE))
if DEBUG_MMS_NET:
print 'Append_MMS_TransactionID', info_sender, bmake
if bmake:
self.MMSHeader_Extend(Create_TransctionID(info_sender))
else:
utf8sender = UnicodeToUtf8(info_sender)
self.MMSHeader_Extend(utf8sender)
if chr(0) != utf8sender[-1]:
self.MMSHeader_Append(chr(0))
def Append_MMS_Version(self):
self.MMSHeader_Append(chr(X_Mms_MMS_Version_NUM+WELL_KNOWN_VALUE)) #X-Mms-MMS-Version
self.MMSHeader_Append(chr(MMS_VERSION_VALUE)) #MMS-Version value 1.1
def Append_MMS_From(self, info_sender): #text-string
def Create_From(info_sender):
from_value = []
utf8from = UnicodeToUtf8(info_sender)
from_value.append(chr(ADDRESS_PRESENT_TOKEN)) #Address-present-token 0x80
from_value.extend(utf8from)
from_value.extend(ADDRESSTYPE_PHONE)
from_value.append(chr(0))
senderlen = Make_UINTVAR(len(utf8from)+2+len(ADDRESSTYPE_PHONE))
if not len(senderlen):
return
for i in range(len(senderlen)):
from_value.insert(0, senderlen[-(i+1)])
return from_value
if DEBUG_MMS_NET:
print 'Append_MMS_From', info_sender
if None == info_sender:
return
self.MMSHeader_Append(chr(From_NUM+WELL_KNOWN_VALUE)) #From
self.MMSHeader_Extend(Create_From(info_sender))
def Add_ReceiveAddress(self, addr, addtype):
if DEBUG_MMS_NET:
print 'Add_ReceiveAddress', addr
address = ''
addrlen = 0
uiaddrlen =[]
self.MMSHeader_Append(chr(addtype+WELL_KNOWN_VALUE)) #To
address = UnicodeToUtf8(addr)
addrlen = len(address) + 2
if (-1 != addr.find('@')):
uiaddrlen = Make_UINTVAR(addrlen)
self.MMSHeader_Extend(uiaddrlen)
self.MMSHeader_Append(chr(utf_8+WELL_KNOWN_VALUE))
self.MMSHeader_Extend(address)
else:
addrlen += len(ADDRESSTYPE_PHONE)
uiaddrlen = Make_UINTVAR(addrlen)
self.MMSHeader_Extend(uiaddrlen)
self.MMSHeader_Append(chr(utf_8+WELL_KNOWN_VALUE))
self.MMSHeader_Extend(address)
self.MMSHeader_Extend(ADDRESSTYPE_PHONE)
self.MMSHeader_Append(chr(0))
def Append_MMS_Receiver(self, To, Cc, Bcc):
for t in To:
self.Add_ReceiveAddress(t, To_NUM)
for c in Cc:
self.Add_ReceiveAddress(c, Cc_NUM)
for b in Bcc:
self.Add_ReceiveAddress(b, Bcc_NUM)
def Append_MMS_Subject(self, msg_subject):
utf8subject = UnicodeToUtf8(msg_subject)
subjectlen = len(utf8subject)
subjectlen += 2
self.MMSHeader_Append(chr(Subject_NUM+WELL_KNOWN_VALUE)) #Subject
valuelen = Make_UINTVAR(subjectlen)
self.MMSHeader_Extend(valuelen) #value-lenght
self.MMSHeader_Append(chr(utf_8+WELL_KNOWN_VALUE)) #char-set
self.MMSHeader_Extend(utf8subject) #text-string
self.MMSHeader_Append(chr(0)) #NULL
def Append_MMS_Class(self):
self.MMSHeader_Append(chr(X_Mms_Message_Class_NUM+WELL_KNOWN_VALUE))#X-Mms-Message-Class
self.MMSHeader_Append(chr(PERSONAL_CLASS)) #Personal
def Append_MMS_Visibility(self):
self.MMSHeader_Append(chr(X_Mms_Sender_Visibility_NUM+WELL_KNOWN_VALUE))#X-Mms-Sender-Visibility
self.MMSHeader_Append(chr(SENDER_SHOW)) #
def Append_MMS_Delivery(self, delivery=True):
self.MMSHeader_Append(chr(X_Mms_Delivery_Report_NUM+WELL_KNOWN_VALUE))#X-Mms-Delivery-Report
if True == delivery:
self.MMSHeader_Append(chr(YES))
else:
self.MMSHeader_Append(chr(NO))
def Append_MMS_Read(self):
self.MMSHeader_Append(chr(X_Mms_Read_Report_NUM+WELL_KNOWN_VALUE))#X-Mms-Read-Report
self.MMSHeader_Append(chr(NO)) #NO:81
def Append_MMS_ContentType(self, fname_smil):
def Create_ContentType(fname_smil):
contenttype_value = []
fsmil = os.path.split(fname_smil)[1]
fsmil = os.path.splitext(fsmil)[0]
utf8fname = UnicodeToUtf8(fsmil)
contenttype_value.append(chr(APPLICATION_VNDWAPMULTIPARTRELATED+WELL_KNOWN_VALUE)) #content-type value
contenttype_value.append(chr(int('89', 16))) #type parameter
contenttype_value.extend('application/smil') #type parameter value
contenttype_value.append(chr(0))
contenttype_value.append(chr(int('8a', 16))) #start parameter
contenttype_value.append('<')
contenttype_value.extend(utf8fname)
contenttype_value.append('>')
contenttype_value.append(chr(0))
typelen = len(contenttype_value)
contenttypelen = Make_UINTVAR(typelen)
if not len(contenttypelen):
return
for i in range(len(contenttypelen)):
contenttype_value.insert(0, contenttypelen[-(i+1)])
return contenttype_value
if DEBUG_MMS_NET:
print 'fname_smil', fname_smil
self.MMSHeader_Append(chr(Content_Type_NUM+WELL_KNOWN_VALUE)) #Content-Type
self.MMSHeader_Extend(Create_ContentType(fname_smil))
def Append_MMS_Status(self, mmsstatus):
self.MMSHeader_Append(chr(X_Mms_Staus_NUM+WELL_KNOWN_VALUE)) #X-Mms-Status
self.MMSHeader_Append(chr(mmsstatus))
def Append_MMS_ReportAllowed(self, reportallowed):
self.MMSHeader_Append(chr(X_Mms_Report_Allowed_NUM+WELL_KNOWN_VALUE))#X-Mms-Status
self.MMSHeader_Append(chr(reportallowed))
def Append_MMS_ContentLocation(contentlocation):
self.MMSHeader_Append(chr(X_Mms_Content_Location_NUM+WELL_KNOWN_VALUE)) #X-Mms-Content-Location
self.MMSHeader_Extend(Ccontentlocation)
if chr(0) != contentlocation[-1]:
self.MMSHeader_Append(chr(0))
#M-Send.Req 헤더를 생성한다.
#sendinfo: [in] MMSMessageInfo
def Make_MSendReq_Header(self, sendinfo):
self.m_mmsheader = []
self.Append_MMS_Type(M_SEND_REQ)
self.Append_MMS_TransactionID(sendinfo.info_sender, True)
self.Append_MMS_Version()
self.Append_MMS_From(sendinfo.info_sender) #unicode, text-string
from mmsmanager import mmsmgr
to,cc,bcc = mmsmgr.getaddress(-1, True)
self.Append_MMS_Receiver(to, cc, bcc)
self.Append_MMS_Subject(sendinfo.subject)
self.Append_MMS_Class()
self.Append_MMS_Visibility()
self.Append_MMS_Delivery(sendinfo.delivery_report)
self.Append_MMS_Read()
self.Append_MMS_ContentType(sendinfo.fname_smil)
self.m_mmsheader_size = len(self.m_mmsheader)
if DEBUG_MMS_NET:
print 'Make_MSendReq_Header'
self.m_mmsheader_buffer = ''.join(self.m_mmsheader)
def Send_MMS_Header(self, mms_socket, totmsgsize=0):
mms_socket.Send_Data(self.m_mmsheader_buffer, totmsgsize)
def Make_MNotifyRespInd_Header(self, transactionid):
self.m_mmsheader = []
self.Append_MMS_Type(M_NOTIFYRESP_IND)
self.Append_MMS_TransactionID(transactionid, transactionid==None)
self.Append_MMS_Version()
self.Append_MMS_Status(MMSSTATUS_Deferred)
self.Append_MMS_ReportAllowed(YES)
if TEST_CTR_NET:
self.MMSHeader_Extend('\r\n\r\n')
self.m_mmsheader_size = len(self.m_mmsheader)
self.m_mmsheader_buffer = ''.join(self.m_mmsheader)
def Make_MAcknowledgeInd_Header(self, transactionid):
self.m_mmsheader = []
self.Append_MMS_Type(M_ACKNOWLEDGE_IND)
self.Append_MMS_TransactionID(transactionid, transactionid==None)
self.Append_MMS_Version()
self.Append_MMS_ReportAllowed(YES)
self.m_mmsheader_size = len(self.m_mmsheader)
self.m_mmsheader_buffer = ''.join(self.m_mmsheader)
def Make_MForwardReq_Header(self, fowardinfo, transactionid, contentlocation):
self.m_mmsheader = []
self.Append_MMS_Type(M_FORWARD_REQ)
self.Append_MMS_TransactionID(transactionid, transactionid==None)
self.Append_MMS_Version()
self.Append_MMS_From(fowardinfo.info_sender)
from mmsmanager import mmsmgr
to,cc,bcc = mmsmgr.getaddress(-1, True)
self.Append_MMS_Receiver(to, cc, bcc)
self.Append_MMS_Delivery()
self.Append_MMS_Read()
self.Append_MMS_ContentLocation(contentlocation)
class MMS_Body:
def __init__(self):
self.m_multipartentry = []
self.m_mmsbodysize = 0
self.m_entrynumber = []
self.m_entrynumber_buffer = ''
def Save_MMSBody(self, file_handle):
self.Save_Entrynumber(file_handle)
if DEBUG_MMS_NET:
print 'mms_net_pdu.mms_body.Save_MMSBody', \
len(self.m_multipartentry)
for i in self.m_multipartentry:
i.Save_Entry(file_handle)
def Save_Entrynumber(self, file_handle):
if DEBUG_MMS_NET:
print 'Save_Entrynumber', ord(self.m_entrynumber_buffer)
file_handle.write(self.m_entrynumber_buffer)
def Get_MMSBody_Size(self):
if DEBUG_MMS_NET:
print 'mms_net_pdu.MMS_Body.Get_MMSBody_Size', self.m_mmsbodysize
return self.m_mmsbodysize
def Make_MMS_Body(self, msginfo):
if DEBUG_MMS_NET:
print 'Make_MMS_Body'
for i in msginfo.media:
if (None != i.fname_text):
if DEBUG_MMS_NET:
print 'text :', i.fname_text
self.m_multipartentry.append(MultipartEntry(i.fname_text))
if (None != i.fname_sound):
if DEBUG_MMS_NET:
print 'soound :', i.fname_sound
self.m_multipartentry.append(MultipartEntry(i.fname_sound))
if (None != i.fname_image):
if DEBUG_MMS_NET:
print 'image :', i.fname_image
self.m_multipartentry.append(MultipartEntry(i.fname_image))
if (None != i.fname_video):
if DEBUG_MMS_NET:
print 'video :', i.fname_video
self.m_multipartentry.append(MultipartEntry(i.fname_video))
self.m_multipartentry.append(MultipartEntry(msginfo.fname_smil))
media_num = 0
for i in self.m_multipartentry:
self.m_mmsbodysize += i.Get_TotalSize()
media_num += 1
if DEBUG_MMS_NET:
print 'Make_MMS_Body', self.m_mmsbodysize, media_num
self.m_entrynumber = Make_UINTVAR(media_num, False)
self.m_mmsbodysize += len(self.m_entrynumber)
self.m_entrynumber_buffer = ''.join(self.m_entrynumber)
pass
def Send_MMSBody(self, mms_socket, totmsgsize=0):
self.Send_MMSBody_EntryNumber(mms_socket, totmsgsize)
self.Send_MMSBody_Entry(mms_socket, totmsgsize)
def Send_MMSBody_EntryNumber(self, mms_socket, totmsgsize=0):
mms_socket.Send_Data(self.m_entrynumber_buffer, totmsgsize)
def Send_MMSBody_Entry(self, mms_socket, totmsgsize=0):
for i in self.m_multipartentry:
i.Send_Entry(mms_socket, totmsgsize)
class M_NotifyResp_Ind_PDU:
def __init__(self):
self.m_mmsheader = MMS_Header()
def Make_M_NotifyResp_Ind_PDU(self, transactionid=None):
if DEBUG_MMS_NET:
print 'Make_M_NotifyResp_Ind_PDU', transactionid
self.m_mmsheader.Make_MNotifyRespInd_Header(transactionid)
def Send_M_NotifyResp_Ind_PDU(self, mms_socket):
self.m_mmsheader.Send_MMS_Header(mms_socket)
def Get_PDU_Size(self):
return self.m_mmsheader.Get_MMSHeader_Size()
class M_Acknowledge_Ind_PDU:
def __init__(self, transcationid):
self.m_mtransactionid = transcationid
self.m_mmsheader = MMS_Header()
def Send_M_Acknowledge_Ind_PDU(self, mms_socket):
self.m_mmsheader.Send_MMS_Header(mms_socket)
def Make_M_Acknowledge_Ind_PDU(self, transactionid=None):
if transactionid:
self.m_mtransactionid = transactionid
self.m_mmsheader.Make_MAcknowledgeInd_Header(self.m_mtransactionid)
def Get_PDU_Size(self):
return self.m_mmsheader.Get_MMSHeader_Size()
class M_Send_Req_PDU:
def __init__(self, msginfo):
self.m_sendmsginfo = msginfo
self.m_mmsheader = MMS_Header()
self.m_mmsbody = MMS_Body()
if DEBUG_MMS_NET:
print 'mms_net_pdu.M_Send_Req_PDU.m_sendmsginfo'
print self.m_sendmsginfo.info_sender
print self.m_sendmsginfo.fname_smil
def Send_M_Send_Req_PDU(self, mms_socket, totmsgsize=0):
self.m_mmsheader.Send_MMS_Header(mms_socket, totmsgsize)
self.m_mmsbody.Send_MMSBody(mms_socket, totmsgsize)
def Get_PDU_Size(self):
if DEBUG_MMS_NET:
print 'mms_net_pdu.M_Send_Req_PDU.Get_PDU_Size', \
self.m_mmsheader.Get_MMSHeader_Size(), self.m_mmsbody.Get_MMSBody_Size()
return self.m_mmsheader.Get_MMSHeader_Size()+self.m_mmsbody.Get_MMSBody_Size()
#M-Send.req의 내용을 생성한다.
def Make_M_Send_Req_PDU(self):
self.m_mmsheader.Make_MSendReq_Header(self.m_sendmsginfo)
self.m_mmsbody.Make_MMS_Body(self.m_sendmsginfo)
#전송에 사용한 메시지(m_mmsheader, m_multipartentry)를 저장한다.
def Save_MMS_PDU(self, filepath):
if DEBUG_MMS_NET:
print 'mms_net_pdu.M_Send_Req_PDU.Save_MMS_PDU'
sendmsgfile = open(filepath, 'wb+')
self.m_mmsheader.Save_MMSHeader(sendmsgfile)
self.m_mmsbody.Save_MMSBody(sendmsgfile)
sendmsgfile.close()
class M_Forward_Req:
def __init__(self, msginfo):
self.m_sendmsginfo = msginfo
self.m_transactionid = None
self.m_contentlocation = None
self.m_mmsheader = MMS_Header()
def Make_M_Forward_Req_PDU(self):
self.m_mmsheader.Make_MForwardReq_Header(self.m_sendmsginfo, \
self.m_transactionid, self.m_contentlocation)
def Send_M_Forward_Req_PDU(self, mms_socket):
self.m_mmsheader.Send_MMS_Header(mms_socket)
def Get_PDU_Size(self):
return self.m_mmsheader.Get_MMSHeader_Size()